Coverage for rfpy/api/endpoints/auth.py: 95%
133 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-24 10:52 +0000
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-24 10:52 +0000
1"""
2Users, roles and authentication
3"""
5from typing import List, Optional
6from uuid import uuid4
8from sqlalchemy.orm import Session
10from rfpy.suxint import http
11from rfpy.auth import AuthorizationFailure, ROLES, perms
12from rfpy.web import serial
13from rfpy.api import fetch, validate
14from rfpy.model.humans import (
15 BuyerOrganisation,
16 ConsultantOrganisation,
17 User,
18 CustomRole,
19 UserRole,
20 Organisation,
21 OrganisationType,
22)
23from rfpy.model.audit import AuditEvent
24from rfpy.model.exc import DuplicateDataProvided
27@http
28def get_role(session: Session, role_id: str) -> List[str]:
29 """
30 List permissions associated with the provided Role
31 """
32 if role_id in ROLES:
33 return sorted(ROLES[role_id])
34 custom_role = session.query(CustomRole).filter(CustomRole.name == role_id).one()
35 return sorted(p.permission_id for p in custom_role.permissions)
38@http
39def get_roles(session: Session, user: User) -> List[str]:
40 """
41 List all available roles - built-in and custom (defined by the current users organisation)
42 """
43 roles = list(ROLES.keys())
44 for role in session.query(CustomRole).filter(CustomRole.org_id == user.org_id):
45 roles.append(role.name)
46 return roles
49@http
50def get_users(
51 session: Session, user: User, q_org_id: Optional[str] = None
52) -> List[serial.BaseUser]:
53 """
54 List users belonging to the Organisation given by orgId. If this parameter is not provided
55 then users are returned for the current logged-in user's organisation.
56 """
57 if q_org_id is None or q_org_id == user.org_id:
58 org = user.organisation
59 else:
60 org = fetch.organisation(session, q_org_id)
61 validate.check(user, perms.MANAGE_USERS, target_org=org)
62 return [serial.User.model_validate(u) for u in org.users]
65@http
66def get_user(session: Session, user: User, user_id: Optional[str]) -> serial.FullUser:
67 """
68 Fetch an existing User by ID
69 """
70 if not user_id:
71 target_user = user
72 else:
73 target_user = fetch.user(session, user_id)
74 if target_user.org_id != user.org_id:
75 validate.check(user, perms.MANAGE_USERS, target_user=target_user)
76 return serial.FullUser.model_validate(target_user)
79@http
80def put_user(session: Session, user: User, user_doc: serial.EditableUser):
81 """
82 Update an existing user account
83 """
84 target_user = fetch.user(session, user_doc.id)
85 if user is not target_user:
86 validate.check(user, action=perms.MANAGE_USERS, target_user=target_user)
88 evt = AuditEvent.create(
89 session,
90 "USER_UPDATED",
91 object_id=target_user.id,
92 user_id=user.id,
93 org_id=user.org_id,
94 )
95 session.add(evt)
96 for key in ("fullname", "email", "type"):
97 doc_val = getattr(user_doc, key)
98 if getattr(target_user, key) != doc_val:
99 evt.add_change(key, getattr(target_user, key), doc_val)
100 setattr(target_user, key, doc_val)
102 session.query(UserRole).filter(UserRole.user == target_user).delete()
103 session.flush()
104 for role in user_doc.roles:
105 evt.add_change("role assigned", "", role)
106 target_user.add_role(role)
109@http
110def post_user(
111 session: Session, user: User, user_doc: serial.EditableUser
112) -> serial.StringId:
113 """
114 Create a new user account.
116 User ID must be unique across the system. An HTTP 409 response is returned
117 if the user ID provided already exists in the database;
118 """
120 target_org_id = user_doc.org_id
121 if target_org_id is None:
122 organisation = user.organisation
123 else:
124 organisation = fetch.organisation(session, target_org_id)
126 validate.check(user, action=perms.MANAGE_USERS, target_org=organisation)
128 new_user_id = user_doc.id
129 existing_user = session.get(User, new_user_id)
130 if existing_user is not None:
131 raise DuplicateDataProvided(f"User '{new_user_id}' already exits")
132 user = User(new_user_id)
133 user.fullname = user_doc.fullname
134 user.type = user_doc.type
135 user.email = user_doc.email
136 user.org_id = organisation.id
138 evt = AuditEvent.create(
139 session,
140 "USER_UPDATED",
141 object_id=user.id,
142 user_id=user.id,
143 org_id=user.org_id,
144 )
145 session.add(evt)
146 evt.add_change("fullname", "", user.fullname)
147 evt.add_change("email", "", user.email)
148 evt.add_change("org_id", "", organisation.id)
149 evt.add_change("type", "", user.type)
150 for role in user_doc.roles:
151 evt.add_change("role", "", role)
152 user.add_role(role)
153 session.add(user)
154 return serial.StringId(id=user.id)
157@http
158def delete_user(session: Session, user: User, user_id_doc: serial.UserId):
159 """
160 Delete the user account given by 'id' in the body document.
162 If the user ID is that of the current user an HTTP 400 Error reponse is returned -
163 a user cannot delete themselves via this method.
164 """
165 user_id = user_id_doc.id
166 target_user = fetch.user(session, user_id)
167 if user is target_user:
168 raise ValueError("A user cannot delete their own account")
169 validate.check(user, action=perms.MANAGE_USERS, target_user=target_user)
171 session.delete(target_user)
172 evt = AuditEvent.create(
173 session,
174 "USER_DELETED",
175 object_id=target_user.id,
176 user_id=user.id,
177 org_id=user.org_id,
178 )
179 evt.add_change("User", user_id, None)
180 session.add(evt)
183@http
184def get_organisations(
185 session: Session, user: User, org_type: str
186) -> List[serial.BaseOrganisation]:
187 """
188 Get an array of organisations. The type of organisation returned depends on the value of the
189 query parameter 'orgType':
190 - 'vendors', the default, provides an array of vendor(supplier) organisations that have been
191 invited to respond to Projects issued by the current users' organisation.
192 - 'clients' - get buyer organisations that are clients of current user's organisation. This
193 option is only valid for Consultant organisations.
194 """
195 validate.check(user, action=perms.MANAGE_ORGANISATION, target_org=user.organisation)
196 fo = serial.BaseOrganisation.model_validate
197 if org_type == "vendors":
198 return [
199 fo(org)
200 for org in user.organisation.suppliers.filter(
201 Organisation.type == OrganisationType.RESPONDENT
202 )
203 ]
204 if org_type == "clients":
205 if not user.organisation.is_consultant:
206 raise AuthorizationFailure("Action only permitted for Consultant Users")
207 assert isinstance(user.organisation, ConsultantOrganisation)
208 return [fo(org) for org in user.organisation.clients]
211@http
212def get_organisation(
213 session: Session, user: User, q_org_id: Optional[str]
214) -> serial.OrgWithUsers:
215 """
216 Fetch organisation details together with an array of users.
218 Consultant users can provide an "orgId" query parameter to fetch details for a client
219 organisation.
220 """
221 if not q_org_id:
222 org = user.organisation
223 else:
224 org = fetch.organisation(session, q_org_id)
225 validate.check(user, action=perms.MANAGE_ORGANISATION, target_org=org)
226 return serial.OrgWithUsers.model_validate(org)
229@http
230def post_client(
231 session: Session, user: User, new_client_doc: serial.NewClient
232) -> serial.OrgWithUsers:
233 """
234 Create a new organisation. This operation is only valid for Consultant users creating
235 Buyer organisations for their clients.
236 """
237 validate.check(user, action=perms.MANAGE_ORGANISATION, target_org=user.organisation)
238 assert isinstance(user.organisation, ConsultantOrganisation)
239 org = BuyerOrganisation(str(uuid4()))
240 org.name = new_client_doc.org_name
241 org.domain_name = new_client_doc.domain_name
242 client_user = User(
243 new_client_doc.administrator_email,
244 fullname=new_client_doc.administrator_name,
245 email=new_client_doc.administrator_email,
246 )
247 client_user.add_role("Administrator")
248 org.users.append(client_user)
249 user.organisation.clients.append(org)
250 return get_organisation(session, user, q_org_id=org.id)
253@http
254def put_organisation(
255 session: Session, user: User, org_doc: serial.Organisation
256) -> serial.BaseOrganisation:
257 """
258 Update name, password_expiry and public fields for the given organisation.
259 """
260 if org_doc.id == user.org_id:
261 org = user.organisation
262 else:
263 org = fetch.organisation(session, org_doc.id)
264 validate.check(user, action=perms.MANAGE_ORGANISATION, target_org=org)
265 org.name = org_doc.name
266 org.domain_name = org_doc.domain_name
267 org.password_expiry = org_doc.password_expiry
268 org.public = org_doc.public
269 return serial.BaseOrganisation.model_validate(org)