Coverage for rfpy/api/domain_permissions.py: 100%
40 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-24 10:52 +0000
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-24 10:52 +0000
1from sqlalchemy.orm.exc import NoResultFound
2from sqlalchemy.orm import object_session
4from rfpy.model import (
5 ProjectPermission,
6 SectionPermission,
7 Section,
8 User,
9 AuditEvent,
10 Project,
11)
12from rfpy.web import serial
13from rfpy.api import fetch
16def save_project_permissions(
17 user: User, project: Project, perm_doc: serial.ProjectPermission
18) -> None:
19 """
20 Set permissions for a user to access sections of a Project. perm_doc should be a dict:
21 {
22 user: id of user to grant permission to,
23 permissions : list of section ids to grant permission for
24 }
25 """
26 session = object_session(project)
27 assert session is not None
29 grant_user_id = perm_doc.user
30 grant_user = fetch.user(session, grant_user_id)
32 # will fail unless the grant user's org is a participant of the given
33 # project_id
34 participant = project.participants.get(grant_user.organisation.id)
36 # section permissions only make sense if project permissions are in place
37 # so there is a foreign key and cascade delete
38 ppq = session.query(ProjectPermission).filter(
39 ProjectPermission.participant == participant,
40 ProjectPermission.user == grant_user,
41 )
42 try:
43 pp = ppq.one()
44 except NoResultFound:
45 pp = ProjectPermission(user=grant_user, participant=participant)
46 session.add(pp)
48 current_perm_query = (
49 session.query(Section)
50 .join(SectionPermission)
51 .filter(SectionPermission.project_permission == pp)
52 )
54 current_perms = [sec.number for sec in current_perm_query]
56 # somewhat nuclear option....
57 session.query(SectionPermission).filter(
58 SectionPermission.project_permission == pp
59 ).delete()
61 section_numbers_granted = []
63 if perm_doc.permissions:
64 # Skip unless at least one section id given
65 # possible that there is a use case to grant a project permission
66 # without any section permissions (e.g. to answer messages..)
68 sections_to_grant = session.query(Section).filter(
69 Section.project_id == project.id, Section.id.in_(perm_doc.permissions)
70 )
72 for sec in sections_to_grant:
73 sec_perm = SectionPermission(
74 section=sec, user=grant_user, project_permission=pp
75 )
76 session.add(sec_perm)
77 section_numbers_granted.append(sec.number.dotted)
79 evt = AuditEvent.create(
80 session,
81 "SECTION_ACCESS_UPDATED",
82 project=project,
83 user_id=user.id,
84 org_id=user.organisation.id,
85 object_id=project.id,
86 private=True,
87 )
89 evt.add_change("Granted To", "", grant_user.id)
90 evt.add_change("Granted By", "", user.id)
92 all_secs = sorted(set(current_perms + section_numbers_granted))
93 rows = [
94 (sec, (sec in current_perms), (sec in section_numbers_granted))
95 for sec in all_secs
96 ]
98 for section, old, new in rows:
99 previous_status = "Granted" if old else "Denied"
100 new_status = "Granted" if new else "Denied"
101 if section in ("", None):
102 sec_name = "Root Section"
103 else:
104 sec_name = "Section %s" % section
105 evt.add_change(sec_name, previous_status, new_status)
107 session.add(evt)