Coverage for rfpy/api/endpoints/audit.py: 98%
58 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-24 10:52 +0000
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-24 10:52 +0000
1"""
2View and query Audit Event Logs
3"""
5from typing import List, TYPE_CHECKING
7from sqlalchemy.orm import Session
8from sqlalchemy.orm.exc import NoResultFound
10from rfpy.suxint import http
11from rfpy.model import (
12 QuestionInstance,
13 Issue,
14 AuditEvent,
15 EventOrgACL,
16 Organisation,
17 Project,
18 User,
19)
20from rfpy.model.audit import evt_types
21from rfpy.auth import perms
22from rfpy.web import serial
24from .. import fetch, validate
26if TYPE_CHECKING:
27 from rfpy.model import User
28 from rfpy.model import Project
31@http
32def get_project_events(
33 session: Session, user: User, project_id: int, event_type: str, pager
34) -> List[serial.SummaryEvent]:
35 """
36 List summary events that have occurred within the context of the given project,
37 optionally filtering by event_type
38 """
39 project = fetch.project(session, project_id)
40 validate.check(user, perms.PROJECT_ACCESS, project=project)
42 ev_query = fetch.project_audit_events(user.organisation, project, event_type)
44 cols = (
45 AuditEvent.id,
46 AuditEvent.timestamp,
47 AuditEvent.user_id,
48 AuditEvent.event_type,
49 )
51 return [
52 serial.SummaryEvent.model_validate(ae)
53 for ae in ev_query.slice(pager.startfrom, pager.goto).with_entities(*cols)
54 ]
57@http
58def get_events(
59 session: Session, user: User, event_type, pager
60) -> List[serial.SummaryEvent]:
61 """
62 List all events visible to the current user, from all projects,
63 optionally filtering by event_type.
64 """
65 validate.check(user, perms.MANAGE_ORGANISATION, target_org=user.organisation)
66 q = fetch.audit_events(user.organisation, event_type=event_type)
67 cols = (
68 AuditEvent.id,
69 AuditEvent.timestamp,
70 AuditEvent.user_id,
71 AuditEvent.event_type,
72 )
73 return [
74 serial.SummaryEvent.model_validate(e)
75 for e in q.slice(pager.startfrom, pager.goto).with_entities(*cols)
76 ]
79@http
80def get_event(session: Session, user: User, event_id) -> serial.AuditEvent:
81 validate.check(user, perms.MANAGE_ORGANISATION, target_org=user.organisation)
82 acl = (
83 session.query(EventOrgACL)
84 .filter(
85 EventOrgACL.event_id == event_id,
86 EventOrgACL.organisation == user.organisation,
87 )
88 .first()
89 )
90 if not acl:
91 raise NoResultFound(f"No Event found for ID {event_id}")
93 event_dict = acl.event.as_dict()
94 issue_id = event_dict.get("issue_id", False)
95 if issue_id and issue_id is not None:
96 try:
97 issue = (
98 session.query(Issue.label, Organisation.id.label("respondent_id"))
99 .join(Organisation)
100 .filter(Issue.id == issue_id)
101 .one()
102 ._asdict()
103 )
104 except NoResultFound: # Issue might have been deleted
105 issue = None
107 event_dict["issue"] = issue
109 question_id = event_dict.get("question_id", False)
110 if question_id:
111 q = (
112 session.query(QuestionInstance.number, QuestionInstance.section_id)
113 .filter(QuestionInstance.id == question_id)
114 .one()
115 )
116 event_dict.update(
117 {"question_number": q.number.dotted, "section_id": q.section_id}
118 )
119 if acl.event.user:
120 ev_user = acl.event.user
121 event_dict["user"] = serial.User.model_validate(ev_user)
123 if acl.event.project_id:
124 pid = acl.event.project_id
125 pt = session.query(Project.title).filter(Project.id == pid).scalar()
126 event_dict["project_title"] = pt
128 return serial.AuditEvent(**event_dict)
131@http
132def get_event_types(session) -> List[str]:
133 return [a for a in dir(evt_types) if not a.startswith("__")]
136@http
137def get_question_events(
138 session: Session, user: User, question_id
139) -> List[serial.FullEvent]:
140 qi = (
141 session.query(QuestionInstance).filter(QuestionInstance.id == question_id).one()
142 )
143 project = qi.project
144 validate.check(
145 user,
146 perms.PROJECT_VIEW_QUESTIONNAIRE,
147 project=project,
148 section_id=qi.section_id,
149 )
151 events = (
152 session.query(AuditEvent)
153 .join(QuestionInstance, AuditEvent.question_id == QuestionInstance.id)
154 .filter(
155 QuestionInstance.question_def_id == qi.question_def_id,
156 AuditEvent.event_class.in_(("QUESTION", "SECTION")),
157 )
158 .order_by(AuditEvent.id.asc())
159 )
161 return [serial.FullEvent.model_validate(e) for e in events]