Coverage for rfpy/api/endpoints/audit.py: 98%

57 statements  

« prev     ^ index     » next       coverage.py v7.0.1, created at 2022-12-31 16:00 +0000

1''' 

2View and query Audit Event Logs 

3''' 

4from typing import List 

5 

6from sqlalchemy.orm.exc import NoResultFound 

7 

8from rfpy.suxint import http 

9from rfpy.model import (QuestionInstance, Issue, AuditEvent, EventOrgACL, Organisation, Project) 

10from rfpy.model.audit import evt_types 

11from rfpy.auth import perms 

12from rfpy.web import serial 

13 

14from .. import fetch, validate 

15 

16 

17@http 

18def get_project_events(session, user, project_id, event_type, pager) -> List[serial.SummaryEvent]: 

19 ''' 

20 List summary events that have occurred within the context of the given project, 

21 optionally filtering by event_type 

22 ''' 

23 project = fetch.project(session, project_id) 

24 validate.check(user, perms.PROJECT_ACCESS, project=project) 

25 

26 ev_query = fetch.project_audit_events(user.organisation, project, event_type) 

27 

28 cols = (AuditEvent.id, AuditEvent.timestamp, 

29 AuditEvent.user_id, AuditEvent.event_type) 

30 

31 return [ae._asdict() for ae in 

32 ev_query.slice(pager.startfrom, pager.goto).with_entities(*cols)] 

33 

34 

35@http 

36def get_events(session, user, event_type, pager) -> List[serial.SummaryEvent]: 

37 ''' 

38 List all events visible to the current user, from all projects, 

39 optionally filtering by event_type. 

40 ''' 

41 validate.check(user, perms.MANAGE_ORGANISATION, target_org=user.organisation) 

42 q = fetch.audit_events(user.organisation, event_type=event_type) 

43 cols = (AuditEvent.id, AuditEvent.timestamp, 

44 AuditEvent.user_id, AuditEvent.event_type) 

45 return [e._asdict() for e in 

46 q.slice(pager.startfrom, pager.goto).with_entities(*cols)] 

47 

48 

49@http 

50def get_event(session, user, event_id) -> serial.AuditEvent: 

51 validate.check(user, perms.MANAGE_ORGANISATION, target_org=user.organisation) 

52 acl = session.query(EventOrgACL)\ 

53 .filter( 

54 EventOrgACL.event_id == event_id, 

55 EventOrgACL.organisation == user.organisation)\ 

56 .first() 

57 if not acl: 

58 raise NoResultFound(f"No Event found for ID {event_id}") 

59 

60 event_dict = acl.event.as_dict() 

61 issue_id = event_dict.get('issue_id', False) 

62 if issue_id and issue_id is not None: 

63 try: 

64 issue = session.query(Issue.label, 

65 Organisation.id.label('respondent_id'))\ 

66 .join(Organisation).filter(Issue.id == issue_id)\ 

67 .one()._asdict() 

68 except NoResultFound: # Issue might have been deleted 

69 issue = None 

70 

71 event_dict['issue'] = issue 

72 

73 question_id = event_dict.get('question_id', False) 

74 if question_id: 

75 q = (session.query(QuestionInstance.number, 

76 QuestionInstance.section_id) 

77 .filter(QuestionInstance.id == question_id) 

78 .one()) 

79 event_dict.update({ 

80 'question_number': q.number.dotted, 

81 'section_id': q.section_id 

82 }) 

83 if acl.event.user: 

84 ev_user = acl.event.user 

85 event_dict['user'] = serial.User.from_orm(ev_user) 

86 

87 if acl.event.project_id: 

88 pid = acl.event.project_id 

89 pt = session.query(Project.title).filter(Project.id == pid).scalar() 

90 event_dict['project_title'] = pt 

91 

92 return serial.AuditEvent(**event_dict).dict() 

93 

94 

95@http 

96def get_event_types(session) -> List[str]: 

97 return [a for a in dir(evt_types) if not a.startswith('__')] 

98 

99 

100@http 

101def get_question_events(session, user, question_id) -> List[serial.FullEvent]: 

102 

103 qi = session.query(QuestionInstance).filter(QuestionInstance.id == question_id).one() 

104 project = qi.project 

105 validate.check(user, perms.PROJECT_VIEW_QUESTIONNAIRE, 

106 project=project, 

107 section_id=qi.section_id) 

108 

109 events = session.query(AuditEvent)\ 

110 .join(QuestionInstance, AuditEvent.question_id == QuestionInstance.id)\ 

111 .filter( 

112 QuestionInstance.question_def_id == qi.question_def_id, 

113 AuditEvent.event_class.in_( 

114 ('QUESTION', 'SECTION')))\ 

115 .order_by(AuditEvent.id.asc()) 

116 

117 return [serial.FullEvent.from_orm(e) for e in events]