Coverage for rfpy/api/endpoints/auth.py: 95%

133 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-04-24 10:52 +0000

1""" 

2Users, roles and authentication 

3""" 

4 

5from typing import List, Optional 

6from uuid import uuid4 

7 

8from sqlalchemy.orm import Session 

9 

10from rfpy.suxint import http 

11from rfpy.auth import AuthorizationFailure, ROLES, perms 

12from rfpy.web import serial 

13from rfpy.api import fetch, validate 

14from rfpy.model.humans import ( 

15 BuyerOrganisation, 

16 ConsultantOrganisation, 

17 User, 

18 CustomRole, 

19 UserRole, 

20 Organisation, 

21 OrganisationType, 

22) 

23from rfpy.model.audit import AuditEvent 

24from rfpy.model.exc import DuplicateDataProvided 

25 

26 

27@http 

28def get_role(session: Session, role_id: str) -> List[str]: 

29 """ 

30 List permissions associated with the provided Role 

31 """ 

32 if role_id in ROLES: 

33 return sorted(ROLES[role_id]) 

34 custom_role = session.query(CustomRole).filter(CustomRole.name == role_id).one() 

35 return sorted(p.permission_id for p in custom_role.permissions) 

36 

37 

38@http 

39def get_roles(session: Session, user: User) -> List[str]: 

40 """ 

41 List all available roles - built-in and custom (defined by the current users organisation) 

42 """ 

43 roles = list(ROLES.keys()) 

44 for role in session.query(CustomRole).filter(CustomRole.org_id == user.org_id): 

45 roles.append(role.name) 

46 return roles 

47 

48 

49@http 

50def get_users( 

51 session: Session, user: User, q_org_id: Optional[str] = None 

52) -> List[serial.BaseUser]: 

53 """ 

54 List users belonging to the Organisation given by orgId. If this parameter is not provided 

55 then users are returned for the current logged-in user's organisation. 

56 """ 

57 if q_org_id is None or q_org_id == user.org_id: 

58 org = user.organisation 

59 else: 

60 org = fetch.organisation(session, q_org_id) 

61 validate.check(user, perms.MANAGE_USERS, target_org=org) 

62 return [serial.User.model_validate(u) for u in org.users] 

63 

64 

65@http 

66def get_user(session: Session, user: User, user_id: Optional[str]) -> serial.FullUser: 

67 """ 

68 Fetch an existing User by ID 

69 """ 

70 if not user_id: 

71 target_user = user 

72 else: 

73 target_user = fetch.user(session, user_id) 

74 if target_user.org_id != user.org_id: 

75 validate.check(user, perms.MANAGE_USERS, target_user=target_user) 

76 return serial.FullUser.model_validate(target_user) 

77 

78 

79@http 

80def put_user(session: Session, user: User, user_doc: serial.EditableUser): 

81 """ 

82 Update an existing user account 

83 """ 

84 target_user = fetch.user(session, user_doc.id) 

85 if user is not target_user: 

86 validate.check(user, action=perms.MANAGE_USERS, target_user=target_user) 

87 

88 evt = AuditEvent.create( 

89 session, 

90 "USER_UPDATED", 

91 object_id=target_user.id, 

92 user_id=user.id, 

93 org_id=user.org_id, 

94 ) 

95 session.add(evt) 

96 for key in ("fullname", "email", "type"): 

97 doc_val = getattr(user_doc, key) 

98 if getattr(target_user, key) != doc_val: 

99 evt.add_change(key, getattr(target_user, key), doc_val) 

100 setattr(target_user, key, doc_val) 

101 

102 session.query(UserRole).filter(UserRole.user == target_user).delete() 

103 session.flush() 

104 for role in user_doc.roles: 

105 evt.add_change("role assigned", "", role) 

106 target_user.add_role(role) 

107 

108 

109@http 

110def post_user( 

111 session: Session, user: User, user_doc: serial.EditableUser 

112) -> serial.StringId: 

113 """ 

114 Create a new user account. 

115 

116 User ID must be unique across the system. An HTTP 409 response is returned 

117 if the user ID provided already exists in the database; 

118 """ 

119 

120 target_org_id = user_doc.org_id 

121 if target_org_id is None: 

122 organisation = user.organisation 

123 else: 

124 organisation = fetch.organisation(session, target_org_id) 

125 

126 validate.check(user, action=perms.MANAGE_USERS, target_org=organisation) 

127 

128 new_user_id = user_doc.id 

129 existing_user = session.get(User, new_user_id) 

130 if existing_user is not None: 

131 raise DuplicateDataProvided(f"User '{new_user_id}' already exits") 

132 user = User(new_user_id) 

133 user.fullname = user_doc.fullname 

134 user.type = user_doc.type 

135 user.email = user_doc.email 

136 user.org_id = organisation.id 

137 

138 evt = AuditEvent.create( 

139 session, 

140 "USER_UPDATED", 

141 object_id=user.id, 

142 user_id=user.id, 

143 org_id=user.org_id, 

144 ) 

145 session.add(evt) 

146 evt.add_change("fullname", "", user.fullname) 

147 evt.add_change("email", "", user.email) 

148 evt.add_change("org_id", "", organisation.id) 

149 evt.add_change("type", "", user.type) 

150 for role in user_doc.roles: 

151 evt.add_change("role", "", role) 

152 user.add_role(role) 

153 session.add(user) 

154 return serial.StringId(id=user.id) 

155 

156 

157@http 

158def delete_user(session: Session, user: User, user_id_doc: serial.UserId): 

159 """ 

160 Delete the user account given by 'id' in the body document. 

161 

162 If the user ID is that of the current user an HTTP 400 Error reponse is returned - 

163 a user cannot delete themselves via this method. 

164 """ 

165 user_id = user_id_doc.id 

166 target_user = fetch.user(session, user_id) 

167 if user is target_user: 

168 raise ValueError("A user cannot delete their own account") 

169 validate.check(user, action=perms.MANAGE_USERS, target_user=target_user) 

170 

171 session.delete(target_user) 

172 evt = AuditEvent.create( 

173 session, 

174 "USER_DELETED", 

175 object_id=target_user.id, 

176 user_id=user.id, 

177 org_id=user.org_id, 

178 ) 

179 evt.add_change("User", user_id, None) 

180 session.add(evt) 

181 

182 

183@http 

184def get_organisations( 

185 session: Session, user: User, org_type: str 

186) -> List[serial.BaseOrganisation]: 

187 """ 

188 Get an array of organisations. The type of organisation returned depends on the value of the 

189 query parameter 'orgType': 

190 - 'vendors', the default, provides an array of vendor(supplier) organisations that have been 

191 invited to respond to Projects issued by the current users' organisation. 

192 - 'clients' - get buyer organisations that are clients of current user's organisation. This 

193 option is only valid for Consultant organisations. 

194 """ 

195 validate.check(user, action=perms.MANAGE_ORGANISATION, target_org=user.organisation) 

196 fo = serial.BaseOrganisation.model_validate 

197 if org_type == "vendors": 

198 return [ 

199 fo(org) 

200 for org in user.organisation.suppliers.filter( 

201 Organisation.type == OrganisationType.RESPONDENT 

202 ) 

203 ] 

204 if org_type == "clients": 

205 if not user.organisation.is_consultant: 

206 raise AuthorizationFailure("Action only permitted for Consultant Users") 

207 assert isinstance(user.organisation, ConsultantOrganisation) 

208 return [fo(org) for org in user.organisation.clients] 

209 

210 

211@http 

212def get_organisation( 

213 session: Session, user: User, q_org_id: Optional[str] 

214) -> serial.OrgWithUsers: 

215 """ 

216 Fetch organisation details together with an array of users. 

217 

218 Consultant users can provide an "orgId" query parameter to fetch details for a client 

219 organisation. 

220 """ 

221 if not q_org_id: 

222 org = user.organisation 

223 else: 

224 org = fetch.organisation(session, q_org_id) 

225 validate.check(user, action=perms.MANAGE_ORGANISATION, target_org=org) 

226 return serial.OrgWithUsers.model_validate(org) 

227 

228 

229@http 

230def post_client( 

231 session: Session, user: User, new_client_doc: serial.NewClient 

232) -> serial.OrgWithUsers: 

233 """ 

234 Create a new organisation. This operation is only valid for Consultant users creating 

235 Buyer organisations for their clients. 

236 """ 

237 validate.check(user, action=perms.MANAGE_ORGANISATION, target_org=user.organisation) 

238 assert isinstance(user.organisation, ConsultantOrganisation) 

239 org = BuyerOrganisation(str(uuid4())) 

240 org.name = new_client_doc.org_name 

241 org.domain_name = new_client_doc.domain_name 

242 client_user = User( 

243 new_client_doc.administrator_email, 

244 fullname=new_client_doc.administrator_name, 

245 email=new_client_doc.administrator_email, 

246 ) 

247 client_user.add_role("Administrator") 

248 org.users.append(client_user) 

249 user.organisation.clients.append(org) 

250 return get_organisation(session, user, q_org_id=org.id) 

251 

252 

253@http 

254def put_organisation( 

255 session: Session, user: User, org_doc: serial.Organisation 

256) -> serial.BaseOrganisation: 

257 """ 

258 Update name, password_expiry and public fields for the given organisation. 

259 """ 

260 if org_doc.id == user.org_id: 

261 org = user.organisation 

262 else: 

263 org = fetch.organisation(session, org_doc.id) 

264 validate.check(user, action=perms.MANAGE_ORGANISATION, target_org=org) 

265 org.name = org_doc.name 

266 org.domain_name = org_doc.domain_name 

267 org.password_expiry = org_doc.password_expiry 

268 org.public = org_doc.public 

269 return serial.BaseOrganisation.model_validate(org)