Coverage for rfpy/vendor/api/users.py: 98%

110 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-04-24 10:52 +0000

1""" 

2Manage user accounts and issue watchlists 

3""" 

4 

5from rfpy.model.notify import IssueWatchList 

6from typing import List 

7 

8from sqlalchemy import select 

9 

10from rfpy.api import fetch 

11from rfpy.suxint import http 

12from rfpy.auth import perms, ROLES, AuthorizationFailure 

13from rfpy.model import AuditEvent, User, CustomRole, UserRole 

14from rfpy.web import serial 

15from rfpy.vendor.validation import validate 

16 

17 

18@http 

19def get_users(session, effective_user) -> List[serial.BaseUser]: 

20 q = session.query(User).filter(User.organisation == effective_user.organisation) 

21 return [serial.BaseUser.model_validate(u) for u in q] 

22 

23 

24@http 

25def get_whoami(user) -> serial.FullUser: 

26 return serial.FullUser.model_validate(user) 

27 

28 

29@http 

30def get_user(session, effective_user, target_user) -> serial.FullUser: 

31 if target_user.org_id != effective_user.org_id: 

32 m = f"User {target_user.id} does not belong to {effective_user.org_id}" 

33 raise AuthorizationFailure(m) 

34 

35 return serial.FullUser.model_validate(target_user) 

36 

37 

38@http 

39def put_user(session, effective_user, user_doc: serial.EditableUser): 

40 if effective_user.id != user_doc.id: 

41 validate(effective_user, action=perms.MANAGE_USERS) 

42 

43 stmt = select(User).where(User.id == user_doc.id) 

44 user: User = session.scalars(stmt).one() 

45 

46 if user.org_id != effective_user.org_id: 

47 m = "It is not possible to manage users from another organisation" 

48 raise AuthorizationFailure(m) 

49 

50 evt = AuditEvent.create( 

51 session, 

52 "USER_UPDATED", 

53 object_id=user.id, 

54 user_id=effective_user.id, 

55 org_id=user.org_id, 

56 ) 

57 session.add(evt) 

58 for key in ("fullname", "email"): 

59 doc_val = getattr(user_doc, key) 

60 if getattr(user, key) != doc_val: 

61 evt.add_change(key, getattr(user, key), doc_val) 

62 setattr(user, key, doc_val) 

63 

64 session.query(UserRole).filter(UserRole.user == user).delete() 

65 session.flush() 

66 for role in user_doc.roles: 

67 evt.add_change("role assigned", "", role) 

68 user.add_role(role) 

69 

70 

71@http 

72def post_user( 

73 session, effective_user, user_doc: serial.EditableUser 

74) -> serial.StringId: 

75 validate(effective_user, action=perms.MANAGE_USERS) 

76 user = User(user_id=user_doc.id) 

77 user.organisation = effective_user.organisation 

78 user.fullname = user_doc.fullname 

79 user.email = user_doc.email 

80 

81 evt = AuditEvent.create( 

82 session, 

83 "USER_UPDATED", 

84 object_id=user.id, 

85 user_id=effective_user.id, 

86 org_id=user.org_id, 

87 ) 

88 session.add(evt) 

89 evt.add_change("fullname", "", user.fullname) 

90 evt.add_change("email", "", user.email) 

91 evt.add_change("org_id", "", user.org_id) 

92 for role in user_doc.roles: 

93 evt.add_change("role", "", role) 

94 user.add_role(role) 

95 session.add(user) 

96 

97 return serial.StringId(id=user.id) 

98 

99 

100@http 

101def delete_user( 

102 session, effective_user, user_doc: serial.EditableUser 

103) -> serial.StringId: 

104 validate(effective_user, action=perms.MANAGE_USERS) 

105 user = session.query(User).filter(User.id == user_doc.id).one() 

106 if user.org_id != effective_user.org_id: 

107 raise AuthorizationFailure("Cannot delete users from a different organisation") 

108 session.delete(user) 

109 evt = AuditEvent.create( 

110 session, 

111 "USER_DELETED", 

112 object_id=user.id, 

113 user_id=effective_user.id, 

114 org_id=user.org_id, 

115 ) 

116 session.add(evt) 

117 

118 return serial.StringId(id=user.id) 

119 

120 

121@http 

122def get_roles(session, effective_user): 

123 """ 

124 List all available roles - built-in and custom (defined by the current users organisation) 

125 """ 

126 roles = list(ROLES.keys()) 

127 for role in session.query(CustomRole).filter( 

128 CustomRole.org_id == effective_user.org_id 

129 ): 

130 roles.append(role.name) 

131 return roles 

132 

133 

134@http 

135def get_issue_watchlist(session, effective_user, issue_id) -> serial.IssueWatchList: 

136 issue = fetch.issue(session, issue_id) 

137 validate(effective_user, issue, action=perms.ISSUE_VIEW_ANSWERS) 

138 ws = serial.Watcher 

139 q = fetch.issue_watchers(session, issue) 

140 watchers = [ws.model_validate(wl) for wl in q] 

141 return serial.IssueWatchList(issue_id=int(issue.id), watchlist=watchers) 

142 

143 

144def auth_wl_change(session, effective_user, issue_id, watchers_doc): 

145 issue = fetch.issue(session, issue_id) 

146 user_ids = {wl["targetUser"] for wl in watchers_doc} 

147 if len(user_ids) == 1 and effective_user.id in user_ids: 

148 # Any user can update their own watch list 

149 validate(effective_user, issue=issue, action=perms.ISSUE_VIEW_ANSWERS) 

150 else: 

151 validate(effective_user, issue=None, action=perms.MANAGE_USERS) 

152 return (user_ids, issue) 

153 

154 

155@http 

156def post_issue_watchers(session, effective_user, issue_id, watchers_doc) -> List[int]: 

157 user_ids, issue = auth_wl_change(session, effective_user, issue_id, watchers_doc) 

158 added_ids = [] 

159 for user in issue.respondent.users: 

160 if user.id in user_ids and user not in issue.watchers: 

161 issue.watch_list.append(IssueWatchList(user=user)) 

162 added_ids.append(user.id) 

163 return added_ids 

164 

165 

166@http 

167def delete_issue_watchers(session, effective_user, issue_id, watchers_doc) -> List[int]: 

168 user_ids, issue = auth_wl_change(session, effective_user, issue_id, watchers_doc) 

169 removed_ids = [] 

170 for user in issue.respondent.users: 

171 if user.id in user_ids: 

172 wl = issue.watch_list.filter_by(user_id=user.id).one() 

173 session.delete(wl) 

174 removed_ids.append(user.id) 

175 return removed_ids