Coverage for rfpy/api/endpoints/audit.py: 98%
57 statements
« prev ^ index » next coverage.py v7.0.1, created at 2022-12-31 16:00 +0000
« prev ^ index » next coverage.py v7.0.1, created at 2022-12-31 16:00 +0000
1'''
2View and query Audit Event Logs
3'''
4from typing import List
6from sqlalchemy.orm.exc import NoResultFound
8from rfpy.suxint import http
9from rfpy.model import (QuestionInstance, Issue, AuditEvent, EventOrgACL, Organisation, Project)
10from rfpy.model.audit import evt_types
11from rfpy.auth import perms
12from rfpy.web import serial
14from .. import fetch, validate
17@http
18def get_project_events(session, user, project_id, event_type, pager) -> List[serial.SummaryEvent]:
19 '''
20 List summary events that have occurred within the context of the given project,
21 optionally filtering by event_type
22 '''
23 project = fetch.project(session, project_id)
24 validate.check(user, perms.PROJECT_ACCESS, project=project)
26 ev_query = fetch.project_audit_events(user.organisation, project, event_type)
28 cols = (AuditEvent.id, AuditEvent.timestamp,
29 AuditEvent.user_id, AuditEvent.event_type)
31 return [ae._asdict() for ae in
32 ev_query.slice(pager.startfrom, pager.goto).with_entities(*cols)]
35@http
36def get_events(session, user, event_type, pager) -> List[serial.SummaryEvent]:
37 '''
38 List all events visible to the current user, from all projects,
39 optionally filtering by event_type.
40 '''
41 validate.check(user, perms.MANAGE_ORGANISATION, target_org=user.organisation)
42 q = fetch.audit_events(user.organisation, event_type=event_type)
43 cols = (AuditEvent.id, AuditEvent.timestamp,
44 AuditEvent.user_id, AuditEvent.event_type)
45 return [e._asdict() for e in
46 q.slice(pager.startfrom, pager.goto).with_entities(*cols)]
49@http
50def get_event(session, user, event_id) -> serial.AuditEvent:
51 validate.check(user, perms.MANAGE_ORGANISATION, target_org=user.organisation)
52 acl = session.query(EventOrgACL)\
53 .filter(
54 EventOrgACL.event_id == event_id,
55 EventOrgACL.organisation == user.organisation)\
56 .first()
57 if not acl:
58 raise NoResultFound(f"No Event found for ID {event_id}")
60 event_dict = acl.event.as_dict()
61 issue_id = event_dict.get('issue_id', False)
62 if issue_id and issue_id is not None:
63 try:
64 issue = session.query(Issue.label,
65 Organisation.id.label('respondent_id'))\
66 .join(Organisation).filter(Issue.id == issue_id)\
67 .one()._asdict()
68 except NoResultFound: # Issue might have been deleted
69 issue = None
71 event_dict['issue'] = issue
73 question_id = event_dict.get('question_id', False)
74 if question_id:
75 q = (session.query(QuestionInstance.number,
76 QuestionInstance.section_id)
77 .filter(QuestionInstance.id == question_id)
78 .one())
79 event_dict.update({
80 'question_number': q.number.dotted,
81 'section_id': q.section_id
82 })
83 if acl.event.user:
84 ev_user = acl.event.user
85 event_dict['user'] = serial.User.from_orm(ev_user)
87 if acl.event.project_id:
88 pid = acl.event.project_id
89 pt = session.query(Project.title).filter(Project.id == pid).scalar()
90 event_dict['project_title'] = pt
92 return serial.AuditEvent(**event_dict).dict()
95@http
96def get_event_types(session) -> List[str]:
97 return [a for a in dir(evt_types) if not a.startswith('__')]
100@http
101def get_question_events(session, user, question_id) -> List[serial.FullEvent]:
103 qi = session.query(QuestionInstance).filter(QuestionInstance.id == question_id).one()
104 project = qi.project
105 validate.check(user, perms.PROJECT_VIEW_QUESTIONNAIRE,
106 project=project,
107 section_id=qi.section_id)
109 events = session.query(AuditEvent)\
110 .join(QuestionInstance, AuditEvent.question_id == QuestionInstance.id)\
111 .filter(
112 QuestionInstance.question_def_id == qi.question_def_id,
113 AuditEvent.event_class.in_(
114 ('QUESTION', 'SECTION')))\
115 .order_by(AuditEvent.id.asc())
117 return [serial.FullEvent.from_orm(e) for e in events]