Coverage for rfpy/api/endpoints/auth.py: 95%
130 statements
« prev ^ index » next coverage.py v7.0.1, created at 2022-12-31 16:00 +0000
« prev ^ index » next coverage.py v7.0.1, created at 2022-12-31 16:00 +0000
1'''
2Users, roles and authentication
3'''
4from typing import List
5from uuid import uuid4
7from sqlalchemy.orm import Session
9from rfpy.suxint import http
10from rfpy.auth import AuthorizationFailure, ROLES, perms
11from rfpy.web import serial
12from rfpy.api import fetch, validate
13from rfpy.model.humans import (
14 BuyerOrganisation, User, CustomRole, UserRole, Organisation, OrganisationType
15)
16from rfpy.model.audit import AuditEvent
17from rfpy.model.exc import DuplicateDataProvided
20@http
21def get_role(session: Session, role_id):
22 '''
23 List permissions associated with the provided Role
24 '''
25 if role_id in ROLES:
26 return sorted(ROLES[role_id])
27 custom_role = session.query(CustomRole).filter(CustomRole.name == role_id).one()
28 return sorted(p.permission_id for p in custom_role.permissions)
31@http
32def get_roles(session, user):
33 '''
34 List all available roles - built-in and custom (defined by the current users organisation)
35 '''
36 roles = list(ROLES.keys())
37 role: CustomRole
38 for role in session.query(CustomRole)\
39 .filter(CustomRole.org_id == user.org_id):
40 roles.append(role.name)
41 return roles
44@http
45def get_users(session, user, q_org_id=None) -> List[serial.BaseUser]:
46 '''
47 List users belonging to the Organisation given by orgId. If this parameter is not provided
48 then users are returned for the current logged-in user's organisation.
49 '''
50 if q_org_id is None or q_org_id == user.org_id:
51 org = user.organisation
52 else:
53 org = fetch.organisation(session, q_org_id)
54 validate.check(user, perms.MANAGE_USERS, target_org=org)
55 return [serial.User.from_orm(u) for u in org.users]
58@http
59def get_user(session, user, user_id) -> serial.FullUser:
60 '''
61 Fetch an existing User by ID
62 '''
63 if not user_id:
64 target_user = user
65 else:
66 target_user = fetch.user(session, user_id)
67 if target_user.org_id != user.org_id:
68 validate.check(user, perms.MANAGE_USERS, target_user=target_user)
69 return serial.FullUser.from_orm(target_user)
72@http
73def put_user(session, user, user_doc):
74 '''
75 Update an existing user account
76 '''
77 target_user = fetch.user(session, user_doc['id'])
78 if user is not target_user:
79 validate.check(user, action=perms.MANAGE_USERS, target_user=target_user)
81 evt = AuditEvent.create(
82 "USER_UPDATED",
83 object_id=target_user.id,
84 user_id=user.id,
85 org_id=user.org_id
86 )
87 session.add(evt)
88 for key in ('fullname', 'email', 'type'):
89 if getattr(target_user, key) != user_doc[key]:
90 evt.add_change(key, getattr(target_user, key), user_doc[key])
91 setattr(target_user, key, user_doc[key])
93 session.query(UserRole).filter(UserRole.user == target_user).delete()
94 session.flush()
95 for role in user_doc['roles']:
96 evt.add_change('role assigned', '', role)
97 target_user.add_role(role)
100@http
101def post_user(session, user: User, user_doc) -> serial.Id:
102 '''
103 Create a new user account.
105 User ID must be unique across the system. An HTTP 409 response is returned
106 if the user ID provided already exists in the database;
107 '''
109 target_org_id = user_doc.get('org_id', None)
110 if target_org_id is None:
111 organisation = user.organisation
112 else:
113 organisation = fetch.organisation(session, target_org_id)
115 validate.check(user, action=perms.MANAGE_USERS, target_org=organisation)
117 new_user_id = user_doc['id']
118 existing_user = session.query(User).get(new_user_id)
119 if existing_user is not None:
120 raise DuplicateDataProvided(f"User '{new_user_id}' already exits")
121 user = User(new_user_id)
122 user.fullname = user_doc['fullname']
123 user.type = user_doc['type']
124 user.email = user_doc['email']
125 user.org_id = organisation.id
127 evt = AuditEvent.create(
128 "USER_UPDATED",
129 object_id=user.id,
130 user_id=user.id,
131 org_id=user.org_id,
132 )
133 session.add(evt)
134 evt.add_change('fullname', '', user.fullname)
135 evt.add_change('email', '', user.email)
136 evt.add_change('org_id', '', organisation.id)
137 evt.add_change('type', '', user.type)
138 for role in user_doc['roles']:
139 evt.add_change('role', '', role)
140 user.add_role(role)
141 session.add(user)
142 return {'id': user.id}
145@http
146def delete_user(session, user, user_id_doc):
147 '''
148 Delete the user account given by 'id' in the body document.
150 If the user ID is that of the current user an HTTP 400 Error reponse is returned -
151 a user cannot delete themselves via this method.
152 '''
153 user_id = user_id_doc['id']
154 target_user = fetch.user(session, user_id)
155 if user is target_user:
156 raise ValueError('A user cannot delete their own account')
157 validate.check(user, action=perms.MANAGE_USERS, target_user=target_user)
159 session.delete(target_user)
160 evt = AuditEvent.create(
161 "USER_DELETED",
162 object_id=target_user.id,
163 user_id=user.id,
164 org_id=user.org_id
165 )
166 evt.add_change('User', user_id, None)
167 session.add(evt)
170@http
171def get_organisations(session, user: User, org_type) -> List[serial.BaseOrganisation]:
172 '''
173 Get an array of organisations. The type of organisation returned depends on the value of the
174 query parameter 'orgType':
175 - 'vendors', the default, provides an array of vendor(supplier) organisations that have been
176 invited to respond to Projects issued by the current users' organisation.
177 - 'clients' - get buyer organisations that are clients of current user's organisation. This
178 option is only valid for Consultant organisations.
179 '''
180 validate.check(user, action=perms.MANAGE_ORGANISATION, target_org=user.organisation)
181 fo = serial.BaseOrganisation.from_orm
182 if org_type == 'vendors':
183 return [fo(org) for org in
184 user.organisation.suppliers.filter(
185 Organisation.type == OrganisationType.RESPONDENT)]
186 if org_type == 'clients':
187 if not user.organisation.is_consultant:
188 raise AuthorizationFailure('Action only permitted for Consultant Users')
189 return [fo(org) for org in user.organisation.clients]
192@http
193def get_organisation(session, user: User, q_org_id) -> serial.OrgWithUsers:
194 '''
195 Fetch organisation details together with an array of users.
197 Consultant users can provide an "orgId" query parameter to fetch details for a client
198 organisation.
199 '''
200 if not q_org_id:
201 org = user.organisation
202 else:
203 org = fetch.organisation(session, q_org_id)
204 validate.check(user, action=perms.MANAGE_ORGANISATION, target_org=org)
205 return serial.OrgWithUsers.from_orm(org)
208@http
209def post_client(session,
210 user: User,
211 new_client_doc: serial.NewClient) -> serial.OrgWithUsers:
212 '''
213 Create a new organisation. This operation is only valid for Consultant users creating
214 Buyer organisations for their clients.
215 '''
216 validate.check(user, action=perms.MANAGE_ORGANISATION, target_org=user.organisation)
217 org = BuyerOrganisation(str(uuid4()))
218 org.name = new_client_doc.org_name
219 org.domain_name = new_client_doc.domain_name
220 client_user = User(
221 new_client_doc.administrator_email,
222 fullname=new_client_doc.administrator_name,
223 email=new_client_doc.administrator_email
224 )
225 client_user.add_role('Administrator')
226 org.users.append(client_user)
227 user.organisation.clients.append(org)
228 return get_organisation(session, user, q_org_id=org.id)
231@http
232def put_organisation(session,
233 user: User,
234 org_doc: serial.Organisation) -> serial.BaseOrganisation:
235 '''
236 Update name, password_expiry and public fields for the given organisation.
237 '''
238 if org_doc.id == user.org_id:
239 org = user.organisation
240 else:
241 org = fetch.organisation(session, org_doc.id)
242 validate.check(user, action=perms.MANAGE_ORGANISATION, target_org=org)
243 org.name = org_doc.name
244 org.domain_name = org_doc.domain_name
245 org.password_expiry = org_doc.password_expiry
246 org.public = org_doc.public
247 return serial.BaseOrganisation.from_orm(org)