Coverage for rfpy/api/domain_permissions.py: 100%
37 statements
« prev ^ index » next coverage.py v7.0.1, created at 2022-12-31 16:00 +0000
« prev ^ index » next coverage.py v7.0.1, created at 2022-12-31 16:00 +0000
1from sqlalchemy.orm.exc import NoResultFound
2from sqlalchemy.orm import object_session
4from rfpy.model import (ProjectPermission, SectionPermission, Section, User,
5 AuditEvent)
8def save_project_permissions(user, project, perm_doc):
9 '''
10 Set permissions for a user to access sections of a Project. perm_doc should be a dict:
11 {
12 user: id of user to grant permission to,
13 permissions : list of section ids to grant permission for
14 }
15 '''
16 session = object_session(project)
18 grant_user_id = perm_doc['user']
19 grant_user = session.query(User).get(grant_user_id)
21 # will fail unless the grant user's org is a participant of the given
22 # project_id
23 participant = project.participants.get(grant_user.organisation.id)
25 # section permissions only make sense if project permissions are in place
26 # so there is a foreign key and cascade delete
27 ppq = session.query(ProjectPermission)\
28 .filter(ProjectPermission.participant == participant,
29 ProjectPermission.user == grant_user)
30 try:
31 pp = ppq.one()
32 except NoResultFound:
33 pp = ProjectPermission(user=grant_user, participant=participant)
34 session.add(pp)
36 current_perm_query = session.query(Section)\
37 .join(SectionPermission)\
38 .filter(SectionPermission.project_permission == pp)
40 current_perms = [sec.number for sec in current_perm_query]
42 # somewhat nuclear option....
43 session.query(SectionPermission)\
44 .filter(SectionPermission.project_permission == pp)\
45 .delete()
47 section_numbers_granted = []
49 if perm_doc['permissions']:
51 # Skip unless at least one section id given
52 # possible that there is a use case to grant a project permission
53 # without any section permissions (e.g. to answer messages..)
55 sections_to_grant = session.query(Section)\
56 .filter(Section.project_id == project.id,
57 Section.id.in_(perm_doc['permissions']))
59 for sec in sections_to_grant:
60 sec_perm = SectionPermission(section=sec,
61 user=grant_user,
62 project_permission=pp)
63 session.add(sec_perm)
64 section_numbers_granted.append(sec.number.dotted)
66 evt = AuditEvent.create('SECTION_ACCESS_UPDATED',
67 project=project,
68 user_id=user.id,
69 org_id=user.organisation.id,
70 object_id=project.id,
71 private=True)
73 evt.add_change('Granted To', '', grant_user.id)
74 evt.add_change('Granted By', '', user.id)
76 all_secs = sorted(set(current_perms + section_numbers_granted))
77 rows = [(sec, (sec in current_perms), (sec in section_numbers_granted))
78 for sec in all_secs]
80 for section, old, new in rows:
81 previous_status = 'Granted' if old else 'Denied'
82 new_status = 'Granted' if new else 'Denied'
83 if section in ('', None):
84 sec = 'Root Section'
85 else:
86 sec = 'Section %s' % section
87 evt.add_change(sec, previous_status, new_status)
89 session.add(evt)