Coverage for rfpy/api/endpoints/audit.py: 98%

58 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-04-24 10:52 +0000

1""" 

2View and query Audit Event Logs 

3""" 

4 

5from typing import List, TYPE_CHECKING 

6 

7from sqlalchemy.orm import Session 

8from sqlalchemy.orm.exc import NoResultFound 

9 

10from rfpy.suxint import http 

11from rfpy.model import ( 

12 QuestionInstance, 

13 Issue, 

14 AuditEvent, 

15 EventOrgACL, 

16 Organisation, 

17 Project, 

18 User, 

19) 

20from rfpy.model.audit import evt_types 

21from rfpy.auth import perms 

22from rfpy.web import serial 

23 

24from .. import fetch, validate 

25 

26if TYPE_CHECKING: 

27 from rfpy.model import User 

28 from rfpy.model import Project 

29 

30 

31@http 

32def get_project_events( 

33 session: Session, user: User, project_id: int, event_type: str, pager 

34) -> List[serial.SummaryEvent]: 

35 """ 

36 List summary events that have occurred within the context of the given project, 

37 optionally filtering by event_type 

38 """ 

39 project = fetch.project(session, project_id) 

40 validate.check(user, perms.PROJECT_ACCESS, project=project) 

41 

42 ev_query = fetch.project_audit_events(user.organisation, project, event_type) 

43 

44 cols = ( 

45 AuditEvent.id, 

46 AuditEvent.timestamp, 

47 AuditEvent.user_id, 

48 AuditEvent.event_type, 

49 ) 

50 

51 return [ 

52 serial.SummaryEvent.model_validate(ae) 

53 for ae in ev_query.slice(pager.startfrom, pager.goto).with_entities(*cols) 

54 ] 

55 

56 

57@http 

58def get_events( 

59 session: Session, user: User, event_type, pager 

60) -> List[serial.SummaryEvent]: 

61 """ 

62 List all events visible to the current user, from all projects, 

63 optionally filtering by event_type. 

64 """ 

65 validate.check(user, perms.MANAGE_ORGANISATION, target_org=user.organisation) 

66 q = fetch.audit_events(user.organisation, event_type=event_type) 

67 cols = ( 

68 AuditEvent.id, 

69 AuditEvent.timestamp, 

70 AuditEvent.user_id, 

71 AuditEvent.event_type, 

72 ) 

73 return [ 

74 serial.SummaryEvent.model_validate(e) 

75 for e in q.slice(pager.startfrom, pager.goto).with_entities(*cols) 

76 ] 

77 

78 

79@http 

80def get_event(session: Session, user: User, event_id) -> serial.AuditEvent: 

81 validate.check(user, perms.MANAGE_ORGANISATION, target_org=user.organisation) 

82 acl = ( 

83 session.query(EventOrgACL) 

84 .filter( 

85 EventOrgACL.event_id == event_id, 

86 EventOrgACL.organisation == user.organisation, 

87 ) 

88 .first() 

89 ) 

90 if not acl: 

91 raise NoResultFound(f"No Event found for ID {event_id}") 

92 

93 event_dict = acl.event.as_dict() 

94 issue_id = event_dict.get("issue_id", False) 

95 if issue_id and issue_id is not None: 

96 try: 

97 issue = ( 

98 session.query(Issue.label, Organisation.id.label("respondent_id")) 

99 .join(Organisation) 

100 .filter(Issue.id == issue_id) 

101 .one() 

102 ._asdict() 

103 ) 

104 except NoResultFound: # Issue might have been deleted 

105 issue = None 

106 

107 event_dict["issue"] = issue 

108 

109 question_id = event_dict.get("question_id", False) 

110 if question_id: 

111 q = ( 

112 session.query(QuestionInstance.number, QuestionInstance.section_id) 

113 .filter(QuestionInstance.id == question_id) 

114 .one() 

115 ) 

116 event_dict.update( 

117 {"question_number": q.number.dotted, "section_id": q.section_id} 

118 ) 

119 if acl.event.user: 

120 ev_user = acl.event.user 

121 event_dict["user"] = serial.User.model_validate(ev_user) 

122 

123 if acl.event.project_id: 

124 pid = acl.event.project_id 

125 pt = session.query(Project.title).filter(Project.id == pid).scalar() 

126 event_dict["project_title"] = pt 

127 

128 return serial.AuditEvent(**event_dict) 

129 

130 

131@http 

132def get_event_types(session) -> List[str]: 

133 return [a for a in dir(evt_types) if not a.startswith("__")] 

134 

135 

136@http 

137def get_question_events( 

138 session: Session, user: User, question_id 

139) -> List[serial.FullEvent]: 

140 qi = ( 

141 session.query(QuestionInstance).filter(QuestionInstance.id == question_id).one() 

142 ) 

143 project = qi.project 

144 validate.check( 

145 user, 

146 perms.PROJECT_VIEW_QUESTIONNAIRE, 

147 project=project, 

148 section_id=qi.section_id, 

149 ) 

150 

151 events = ( 

152 session.query(AuditEvent) 

153 .join(QuestionInstance, AuditEvent.question_id == QuestionInstance.id) 

154 .filter( 

155 QuestionInstance.question_def_id == qi.question_def_id, 

156 AuditEvent.event_class.in_(("QUESTION", "SECTION")), 

157 ) 

158 .order_by(AuditEvent.id.asc()) 

159 ) 

160 

161 return [serial.FullEvent.model_validate(e) for e in events]