Coverage for rfpy/api/domain_permissions.py: 100%

40 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-04-24 10:52 +0000

1from sqlalchemy.orm.exc import NoResultFound 

2from sqlalchemy.orm import object_session 

3 

4from rfpy.model import ( 

5 ProjectPermission, 

6 SectionPermission, 

7 Section, 

8 User, 

9 AuditEvent, 

10 Project, 

11) 

12from rfpy.web import serial 

13from rfpy.api import fetch 

14 

15 

16def save_project_permissions( 

17 user: User, project: Project, perm_doc: serial.ProjectPermission 

18) -> None: 

19 """ 

20 Set permissions for a user to access sections of a Project. perm_doc should be a dict: 

21 { 

22 user: id of user to grant permission to, 

23 permissions : list of section ids to grant permission for 

24 } 

25 """ 

26 session = object_session(project) 

27 assert session is not None 

28 

29 grant_user_id = perm_doc.user 

30 grant_user = fetch.user(session, grant_user_id) 

31 

32 # will fail unless the grant user's org is a participant of the given 

33 # project_id 

34 participant = project.participants.get(grant_user.organisation.id) 

35 

36 # section permissions only make sense if project permissions are in place 

37 # so there is a foreign key and cascade delete 

38 ppq = session.query(ProjectPermission).filter( 

39 ProjectPermission.participant == participant, 

40 ProjectPermission.user == grant_user, 

41 ) 

42 try: 

43 pp = ppq.one() 

44 except NoResultFound: 

45 pp = ProjectPermission(user=grant_user, participant=participant) 

46 session.add(pp) 

47 

48 current_perm_query = ( 

49 session.query(Section) 

50 .join(SectionPermission) 

51 .filter(SectionPermission.project_permission == pp) 

52 ) 

53 

54 current_perms = [sec.number for sec in current_perm_query] 

55 

56 # somewhat nuclear option.... 

57 session.query(SectionPermission).filter( 

58 SectionPermission.project_permission == pp 

59 ).delete() 

60 

61 section_numbers_granted = [] 

62 

63 if perm_doc.permissions: 

64 # Skip unless at least one section id given 

65 # possible that there is a use case to grant a project permission 

66 # without any section permissions (e.g. to answer messages..) 

67 

68 sections_to_grant = session.query(Section).filter( 

69 Section.project_id == project.id, Section.id.in_(perm_doc.permissions) 

70 ) 

71 

72 for sec in sections_to_grant: 

73 sec_perm = SectionPermission( 

74 section=sec, user=grant_user, project_permission=pp 

75 ) 

76 session.add(sec_perm) 

77 section_numbers_granted.append(sec.number.dotted) 

78 

79 evt = AuditEvent.create( 

80 session, 

81 "SECTION_ACCESS_UPDATED", 

82 project=project, 

83 user_id=user.id, 

84 org_id=user.organisation.id, 

85 object_id=project.id, 

86 private=True, 

87 ) 

88 

89 evt.add_change("Granted To", "", grant_user.id) 

90 evt.add_change("Granted By", "", user.id) 

91 

92 all_secs = sorted(set(current_perms + section_numbers_granted)) 

93 rows = [ 

94 (sec, (sec in current_perms), (sec in section_numbers_granted)) 

95 for sec in all_secs 

96 ] 

97 

98 for section, old, new in rows: 

99 previous_status = "Granted" if old else "Denied" 

100 new_status = "Granted" if new else "Denied" 

101 if section in ("", None): 

102 sec_name = "Root Section" 

103 else: 

104 sec_name = "Section %s" % section 

105 evt.add_change(sec_name, previous_status, new_status) 

106 

107 session.add(evt)