Coverage for rfpy/api/endpoints/auth.py: 95%

130 statements  

« prev     ^ index     » next       coverage.py v7.0.1, created at 2022-12-31 16:00 +0000

1''' 

2Users, roles and authentication 

3''' 

4from typing import List 

5from uuid import uuid4 

6 

7from sqlalchemy.orm import Session 

8 

9from rfpy.suxint import http 

10from rfpy.auth import AuthorizationFailure, ROLES, perms 

11from rfpy.web import serial 

12from rfpy.api import fetch, validate 

13from rfpy.model.humans import ( 

14 BuyerOrganisation, User, CustomRole, UserRole, Organisation, OrganisationType 

15) 

16from rfpy.model.audit import AuditEvent 

17from rfpy.model.exc import DuplicateDataProvided 

18 

19 

20@http 

21def get_role(session: Session, role_id): 

22 ''' 

23 List permissions associated with the provided Role 

24 ''' 

25 if role_id in ROLES: 

26 return sorted(ROLES[role_id]) 

27 custom_role = session.query(CustomRole).filter(CustomRole.name == role_id).one() 

28 return sorted(p.permission_id for p in custom_role.permissions) 

29 

30 

31@http 

32def get_roles(session, user): 

33 ''' 

34 List all available roles - built-in and custom (defined by the current users organisation) 

35 ''' 

36 roles = list(ROLES.keys()) 

37 role: CustomRole 

38 for role in session.query(CustomRole)\ 

39 .filter(CustomRole.org_id == user.org_id): 

40 roles.append(role.name) 

41 return roles 

42 

43 

44@http 

45def get_users(session, user, q_org_id=None) -> List[serial.BaseUser]: 

46 ''' 

47 List users belonging to the Organisation given by orgId. If this parameter is not provided 

48 then users are returned for the current logged-in user's organisation. 

49 ''' 

50 if q_org_id is None or q_org_id == user.org_id: 

51 org = user.organisation 

52 else: 

53 org = fetch.organisation(session, q_org_id) 

54 validate.check(user, perms.MANAGE_USERS, target_org=org) 

55 return [serial.User.from_orm(u) for u in org.users] 

56 

57 

58@http 

59def get_user(session, user, user_id) -> serial.FullUser: 

60 ''' 

61 Fetch an existing User by ID 

62 ''' 

63 if not user_id: 

64 target_user = user 

65 else: 

66 target_user = fetch.user(session, user_id) 

67 if target_user.org_id != user.org_id: 

68 validate.check(user, perms.MANAGE_USERS, target_user=target_user) 

69 return serial.FullUser.from_orm(target_user) 

70 

71 

72@http 

73def put_user(session, user, user_doc): 

74 ''' 

75 Update an existing user account 

76 ''' 

77 target_user = fetch.user(session, user_doc['id']) 

78 if user is not target_user: 

79 validate.check(user, action=perms.MANAGE_USERS, target_user=target_user) 

80 

81 evt = AuditEvent.create( 

82 "USER_UPDATED", 

83 object_id=target_user.id, 

84 user_id=user.id, 

85 org_id=user.org_id 

86 ) 

87 session.add(evt) 

88 for key in ('fullname', 'email', 'type'): 

89 if getattr(target_user, key) != user_doc[key]: 

90 evt.add_change(key, getattr(target_user, key), user_doc[key]) 

91 setattr(target_user, key, user_doc[key]) 

92 

93 session.query(UserRole).filter(UserRole.user == target_user).delete() 

94 session.flush() 

95 for role in user_doc['roles']: 

96 evt.add_change('role assigned', '', role) 

97 target_user.add_role(role) 

98 

99 

100@http 

101def post_user(session, user: User, user_doc) -> serial.Id: 

102 ''' 

103 Create a new user account. 

104 

105 User ID must be unique across the system. An HTTP 409 response is returned 

106 if the user ID provided already exists in the database; 

107 ''' 

108 

109 target_org_id = user_doc.get('org_id', None) 

110 if target_org_id is None: 

111 organisation = user.organisation 

112 else: 

113 organisation = fetch.organisation(session, target_org_id) 

114 

115 validate.check(user, action=perms.MANAGE_USERS, target_org=organisation) 

116 

117 new_user_id = user_doc['id'] 

118 existing_user = session.query(User).get(new_user_id) 

119 if existing_user is not None: 

120 raise DuplicateDataProvided(f"User '{new_user_id}' already exits") 

121 user = User(new_user_id) 

122 user.fullname = user_doc['fullname'] 

123 user.type = user_doc['type'] 

124 user.email = user_doc['email'] 

125 user.org_id = organisation.id 

126 

127 evt = AuditEvent.create( 

128 "USER_UPDATED", 

129 object_id=user.id, 

130 user_id=user.id, 

131 org_id=user.org_id, 

132 ) 

133 session.add(evt) 

134 evt.add_change('fullname', '', user.fullname) 

135 evt.add_change('email', '', user.email) 

136 evt.add_change('org_id', '', organisation.id) 

137 evt.add_change('type', '', user.type) 

138 for role in user_doc['roles']: 

139 evt.add_change('role', '', role) 

140 user.add_role(role) 

141 session.add(user) 

142 return {'id': user.id} 

143 

144 

145@http 

146def delete_user(session, user, user_id_doc): 

147 ''' 

148 Delete the user account given by 'id' in the body document. 

149 

150 If the user ID is that of the current user an HTTP 400 Error reponse is returned - 

151 a user cannot delete themselves via this method. 

152 ''' 

153 user_id = user_id_doc['id'] 

154 target_user = fetch.user(session, user_id) 

155 if user is target_user: 

156 raise ValueError('A user cannot delete their own account') 

157 validate.check(user, action=perms.MANAGE_USERS, target_user=target_user) 

158 

159 session.delete(target_user) 

160 evt = AuditEvent.create( 

161 "USER_DELETED", 

162 object_id=target_user.id, 

163 user_id=user.id, 

164 org_id=user.org_id 

165 ) 

166 evt.add_change('User', user_id, None) 

167 session.add(evt) 

168 

169 

170@http 

171def get_organisations(session, user: User, org_type) -> List[serial.BaseOrganisation]: 

172 ''' 

173 Get an array of organisations. The type of organisation returned depends on the value of the 

174 query parameter 'orgType': 

175 - 'vendors', the default, provides an array of vendor(supplier) organisations that have been 

176 invited to respond to Projects issued by the current users' organisation. 

177 - 'clients' - get buyer organisations that are clients of current user's organisation. This 

178 option is only valid for Consultant organisations. 

179 ''' 

180 validate.check(user, action=perms.MANAGE_ORGANISATION, target_org=user.organisation) 

181 fo = serial.BaseOrganisation.from_orm 

182 if org_type == 'vendors': 

183 return [fo(org) for org in 

184 user.organisation.suppliers.filter( 

185 Organisation.type == OrganisationType.RESPONDENT)] 

186 if org_type == 'clients': 

187 if not user.organisation.is_consultant: 

188 raise AuthorizationFailure('Action only permitted for Consultant Users') 

189 return [fo(org) for org in user.organisation.clients] 

190 

191 

192@http 

193def get_organisation(session, user: User, q_org_id) -> serial.OrgWithUsers: 

194 ''' 

195 Fetch organisation details together with an array of users. 

196 

197 Consultant users can provide an "orgId" query parameter to fetch details for a client 

198 organisation. 

199 ''' 

200 if not q_org_id: 

201 org = user.organisation 

202 else: 

203 org = fetch.organisation(session, q_org_id) 

204 validate.check(user, action=perms.MANAGE_ORGANISATION, target_org=org) 

205 return serial.OrgWithUsers.from_orm(org) 

206 

207 

208@http 

209def post_client(session, 

210 user: User, 

211 new_client_doc: serial.NewClient) -> serial.OrgWithUsers: 

212 ''' 

213 Create a new organisation. This operation is only valid for Consultant users creating 

214 Buyer organisations for their clients. 

215 ''' 

216 validate.check(user, action=perms.MANAGE_ORGANISATION, target_org=user.organisation) 

217 org = BuyerOrganisation(str(uuid4())) 

218 org.name = new_client_doc.org_name 

219 org.domain_name = new_client_doc.domain_name 

220 client_user = User( 

221 new_client_doc.administrator_email, 

222 fullname=new_client_doc.administrator_name, 

223 email=new_client_doc.administrator_email 

224 ) 

225 client_user.add_role('Administrator') 

226 org.users.append(client_user) 

227 user.organisation.clients.append(org) 

228 return get_organisation(session, user, q_org_id=org.id) 

229 

230 

231@http 

232def put_organisation(session, 

233 user: User, 

234 org_doc: serial.Organisation) -> serial.BaseOrganisation: 

235 ''' 

236 Update name, password_expiry and public fields for the given organisation. 

237 ''' 

238 if org_doc.id == user.org_id: 

239 org = user.organisation 

240 else: 

241 org = fetch.organisation(session, org_doc.id) 

242 validate.check(user, action=perms.MANAGE_ORGANISATION, target_org=org) 

243 org.name = org_doc.name 

244 org.domain_name = org_doc.domain_name 

245 org.password_expiry = org_doc.password_expiry 

246 org.public = org_doc.public 

247 return serial.BaseOrganisation.from_orm(org)