Coverage for rfpy/vendor/api/users.py: 98%
110 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-24 10:52 +0000
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-24 10:52 +0000
1"""
2Manage user accounts and issue watchlists
3"""
5from rfpy.model.notify import IssueWatchList
6from typing import List
8from sqlalchemy import select
10from rfpy.api import fetch
11from rfpy.suxint import http
12from rfpy.auth import perms, ROLES, AuthorizationFailure
13from rfpy.model import AuditEvent, User, CustomRole, UserRole
14from rfpy.web import serial
15from rfpy.vendor.validation import validate
18@http
19def get_users(session, effective_user) -> List[serial.BaseUser]:
20 q = session.query(User).filter(User.organisation == effective_user.organisation)
21 return [serial.BaseUser.model_validate(u) for u in q]
24@http
25def get_whoami(user) -> serial.FullUser:
26 return serial.FullUser.model_validate(user)
29@http
30def get_user(session, effective_user, target_user) -> serial.FullUser:
31 if target_user.org_id != effective_user.org_id:
32 m = f"User {target_user.id} does not belong to {effective_user.org_id}"
33 raise AuthorizationFailure(m)
35 return serial.FullUser.model_validate(target_user)
38@http
39def put_user(session, effective_user, user_doc: serial.EditableUser):
40 if effective_user.id != user_doc.id:
41 validate(effective_user, action=perms.MANAGE_USERS)
43 stmt = select(User).where(User.id == user_doc.id)
44 user: User = session.scalars(stmt).one()
46 if user.org_id != effective_user.org_id:
47 m = "It is not possible to manage users from another organisation"
48 raise AuthorizationFailure(m)
50 evt = AuditEvent.create(
51 session,
52 "USER_UPDATED",
53 object_id=user.id,
54 user_id=effective_user.id,
55 org_id=user.org_id,
56 )
57 session.add(evt)
58 for key in ("fullname", "email"):
59 doc_val = getattr(user_doc, key)
60 if getattr(user, key) != doc_val:
61 evt.add_change(key, getattr(user, key), doc_val)
62 setattr(user, key, doc_val)
64 session.query(UserRole).filter(UserRole.user == user).delete()
65 session.flush()
66 for role in user_doc.roles:
67 evt.add_change("role assigned", "", role)
68 user.add_role(role)
71@http
72def post_user(
73 session, effective_user, user_doc: serial.EditableUser
74) -> serial.StringId:
75 validate(effective_user, action=perms.MANAGE_USERS)
76 user = User(user_id=user_doc.id)
77 user.organisation = effective_user.organisation
78 user.fullname = user_doc.fullname
79 user.email = user_doc.email
81 evt = AuditEvent.create(
82 session,
83 "USER_UPDATED",
84 object_id=user.id,
85 user_id=effective_user.id,
86 org_id=user.org_id,
87 )
88 session.add(evt)
89 evt.add_change("fullname", "", user.fullname)
90 evt.add_change("email", "", user.email)
91 evt.add_change("org_id", "", user.org_id)
92 for role in user_doc.roles:
93 evt.add_change("role", "", role)
94 user.add_role(role)
95 session.add(user)
97 return serial.StringId(id=user.id)
100@http
101def delete_user(
102 session, effective_user, user_doc: serial.EditableUser
103) -> serial.StringId:
104 validate(effective_user, action=perms.MANAGE_USERS)
105 user = session.query(User).filter(User.id == user_doc.id).one()
106 if user.org_id != effective_user.org_id:
107 raise AuthorizationFailure("Cannot delete users from a different organisation")
108 session.delete(user)
109 evt = AuditEvent.create(
110 session,
111 "USER_DELETED",
112 object_id=user.id,
113 user_id=effective_user.id,
114 org_id=user.org_id,
115 )
116 session.add(evt)
118 return serial.StringId(id=user.id)
121@http
122def get_roles(session, effective_user):
123 """
124 List all available roles - built-in and custom (defined by the current users organisation)
125 """
126 roles = list(ROLES.keys())
127 for role in session.query(CustomRole).filter(
128 CustomRole.org_id == effective_user.org_id
129 ):
130 roles.append(role.name)
131 return roles
134@http
135def get_issue_watchlist(session, effective_user, issue_id) -> serial.IssueWatchList:
136 issue = fetch.issue(session, issue_id)
137 validate(effective_user, issue, action=perms.ISSUE_VIEW_ANSWERS)
138 ws = serial.Watcher
139 q = fetch.issue_watchers(session, issue)
140 watchers = [ws.model_validate(wl) for wl in q]
141 return serial.IssueWatchList(issue_id=int(issue.id), watchlist=watchers)
144def auth_wl_change(session, effective_user, issue_id, watchers_doc):
145 issue = fetch.issue(session, issue_id)
146 user_ids = {wl["targetUser"] for wl in watchers_doc}
147 if len(user_ids) == 1 and effective_user.id in user_ids:
148 # Any user can update their own watch list
149 validate(effective_user, issue=issue, action=perms.ISSUE_VIEW_ANSWERS)
150 else:
151 validate(effective_user, issue=None, action=perms.MANAGE_USERS)
152 return (user_ids, issue)
155@http
156def post_issue_watchers(session, effective_user, issue_id, watchers_doc) -> List[int]:
157 user_ids, issue = auth_wl_change(session, effective_user, issue_id, watchers_doc)
158 added_ids = []
159 for user in issue.respondent.users:
160 if user.id in user_ids and user not in issue.watchers:
161 issue.watch_list.append(IssueWatchList(user=user))
162 added_ids.append(user.id)
163 return added_ids
166@http
167def delete_issue_watchers(session, effective_user, issue_id, watchers_doc) -> List[int]:
168 user_ids, issue = auth_wl_change(session, effective_user, issue_id, watchers_doc)
169 removed_ids = []
170 for user in issue.respondent.users:
171 if user.id in user_ids:
172 wl = issue.watch_list.filter_by(user_id=user.id).one()
173 session.delete(wl)
174 removed_ids.append(user.id)
175 return removed_ids