Coverage for rfpy/api/domain_permissions.py: 100%

37 statements  

« prev     ^ index     » next       coverage.py v7.0.1, created at 2022-12-31 16:00 +0000

1from sqlalchemy.orm.exc import NoResultFound 

2from sqlalchemy.orm import object_session 

3 

4from rfpy.model import (ProjectPermission, SectionPermission, Section, User, 

5 AuditEvent) 

6 

7 

8def save_project_permissions(user, project, perm_doc): 

9 ''' 

10 Set permissions for a user to access sections of a Project. perm_doc should be a dict: 

11 { 

12 user: id of user to grant permission to, 

13 permissions : list of section ids to grant permission for 

14 } 

15 ''' 

16 session = object_session(project) 

17 

18 grant_user_id = perm_doc['user'] 

19 grant_user = session.query(User).get(grant_user_id) 

20 

21 # will fail unless the grant user's org is a participant of the given 

22 # project_id 

23 participant = project.participants.get(grant_user.organisation.id) 

24 

25 # section permissions only make sense if project permissions are in place 

26 # so there is a foreign key and cascade delete 

27 ppq = session.query(ProjectPermission)\ 

28 .filter(ProjectPermission.participant == participant, 

29 ProjectPermission.user == grant_user) 

30 try: 

31 pp = ppq.one() 

32 except NoResultFound: 

33 pp = ProjectPermission(user=grant_user, participant=participant) 

34 session.add(pp) 

35 

36 current_perm_query = session.query(Section)\ 

37 .join(SectionPermission)\ 

38 .filter(SectionPermission.project_permission == pp) 

39 

40 current_perms = [sec.number for sec in current_perm_query] 

41 

42 # somewhat nuclear option.... 

43 session.query(SectionPermission)\ 

44 .filter(SectionPermission.project_permission == pp)\ 

45 .delete() 

46 

47 section_numbers_granted = [] 

48 

49 if perm_doc['permissions']: 

50 

51 # Skip unless at least one section id given 

52 # possible that there is a use case to grant a project permission 

53 # without any section permissions (e.g. to answer messages..) 

54 

55 sections_to_grant = session.query(Section)\ 

56 .filter(Section.project_id == project.id, 

57 Section.id.in_(perm_doc['permissions'])) 

58 

59 for sec in sections_to_grant: 

60 sec_perm = SectionPermission(section=sec, 

61 user=grant_user, 

62 project_permission=pp) 

63 session.add(sec_perm) 

64 section_numbers_granted.append(sec.number.dotted) 

65 

66 evt = AuditEvent.create('SECTION_ACCESS_UPDATED', 

67 project=project, 

68 user_id=user.id, 

69 org_id=user.organisation.id, 

70 object_id=project.id, 

71 private=True) 

72 

73 evt.add_change('Granted To', '', grant_user.id) 

74 evt.add_change('Granted By', '', user.id) 

75 

76 all_secs = sorted(set(current_perms + section_numbers_granted)) 

77 rows = [(sec, (sec in current_perms), (sec in section_numbers_granted)) 

78 for sec in all_secs] 

79 

80 for section, old, new in rows: 

81 previous_status = 'Granted' if old else 'Denied' 

82 new_status = 'Granted' if new else 'Denied' 

83 if section in ('', None): 

84 sec = 'Root Section' 

85 else: 

86 sec = 'Section %s' % section 

87 evt.add_change(sec, previous_status, new_status) 

88 

89 session.add(evt)