Coverage for rfpy/model/audit/event.py: 99%
155 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-24 10:52 +0000
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-24 10:52 +0000
1import logging
2from enum import Enum
3from typing import Optional, TYPE_CHECKING
4from datetime import datetime
5from xml.etree import ElementTree as ET
8from sqlalchemy import Boolean, DateTime, Integer, ForeignKey, event
9from sqlalchemy import Index, text as sa_text
10from sqlalchemy.orm import (
11 Mapped,
12 mapped_column,
13 relationship,
14 validates,
15 Session,
16 DynamicMapped,
17)
18from sqlalchemy.dialects.mysql import VARCHAR, LONGTEXT, TINYINT
19import sqlalchemy.types
22from rfpy.model.meta import Base
23from rfpy.model.audit import visible
25if TYPE_CHECKING:
26 from rfpy.model.project import Project
27 from rfpy.model.issue import Issue
28 from rfpy.model.humans import Organisation, User
29 from rfpy.model.notify import EmailNotification
31log = logging.getLogger(__name__)
34class EventKlass:
35 ANSWER = "ANSWER"
36 BASE = "BASE"
37 ISSUE = "ISSUE"
38 ORGANISATION = "ORGANISATION"
39 PROJECT = "PROJECT"
40 QUESTION = "QUESTION"
41 QUESTIONNAIRE = "QUESTIONNAIRE"
42 ROLE = "ROLE"
43 SCORE = "SCORE"
44 SCORE_COMMENT = "SCORE_COMMENT"
45 USER = "USER"
46 SECTION = "SECTION"
49PROJECT_KLASSES = {
50 EventKlass.ISSUE,
51 EventKlass.PROJECT,
52 EventKlass.QUESTION,
53 EventKlass.QUESTIONNAIRE,
54 EventKlass.SCORE,
55 EventKlass.SCORE_COMMENT,
56 EventKlass.SECTION,
57}
60class PropertyChanges(object):
61 def __init__(self):
62 self.root = ET.Element("d")
64 def add(self, property_name, old_value, new_value):
65 change = ET.SubElement(self.root, "c", name=property_name)
66 ET.SubElement(change, "old").text = str(old_value)
67 ET.SubElement(change, "new").text = str(new_value)
69 def as_string(self):
70 return ET.tostring(self.root, encoding="unicode")
73def change_to_json(change):
74 js = dict(name=change.get("name"))
75 for vals in change:
76 js[vals.tag] = vals.text
77 return js
80class Status(Enum):
81 pending = 0
82 done = 1
83 error = 2
86class EventStatusType(sqlalchemy.types.TypeDecorator):
87 impl = TINYINT(1)
89 cache_ok = True
91 def process_bind_param(self, status, dialect):
92 return status.value
94 def process_result_value(self, int_value, dialect):
95 return Status(int_value).name
98class AuditEvent(Base):
99 """@DynamicAttrs"""
101 __tablename__ = "audit_events"
103 public_attrs = (
104 "id",
105 "event_class",
106 "event_type",
107 "timestamp",
108 "question_id",
109 "project_id",
110 "issue_id",
111 "user_id",
112 "org_id",
113 "object_id",
114 "changes",
115 "user",
116 )
118 __table_args__ = (
119 Index("event_object_id", "object_id"),
120 Index("audit_events_user", "user_id"),
121 Index("audit_events_type", "type"),
122 Index("audit_events_timestamp", "timestamp", "id"),
123 Index("audit_events_question", "question_id"),
124 Index("audit_events_project", "project_id"),
125 Index("audit_events_ord", "org_id"),
126 Index("audit_events_issue", "issue_id"),
127 Index("audit_events_enqueued", "enqueued"),
128 ) + Base.__table_args__
130 project_id: Mapped[Optional[int]] = mapped_column(Integer, nullable=True)
131 issue_id: Mapped[Optional[int]] = mapped_column(Integer, nullable=True)
132 question_id: Mapped[Optional[int]] = mapped_column(Integer, nullable=True)
133 event_class: Mapped[Optional[str]] = mapped_column("class", VARCHAR(length=20), nullable=True)
134 event_type: Mapped[Optional[str]] = mapped_column("type", VARCHAR(length=50), nullable=True)
135 object_id: Mapped[Optional[str]] = mapped_column(VARCHAR(length=150), nullable=True)
136 user_id: Mapped[Optional[str]] = mapped_column(VARCHAR(length=150), nullable=True)
137 org_id: Mapped[Optional[str]] = mapped_column(VARCHAR(length=150), nullable=True)
138 private: Mapped[bool] = mapped_column(Boolean, nullable=False, default=0)
139 timestamp: Mapped[datetime] = mapped_column(
140 DateTime, nullable=False, server_default=sa_text("CURRENT_TIMESTAMP")
141 )
142 text: Mapped[Optional[str]] = mapped_column(LONGTEXT())
143 status: Mapped[Status] = mapped_column(
144 "enqueued",
145 EventStatusType,
146 default=Status.pending,
147 nullable=False,
148 server_default=sa_text("'0'"),
149 )
150 session_id: Mapped[Optional[str]] = mapped_column(VARCHAR(length=50), nullable=True)
152 project: Mapped["Project"] = relationship(
153 "Project",
154 back_populates="events",
155 primaryjoin=("foreign(AuditEvent.project_id)" "==Project.id"),
156 )
158 issue: Mapped["Issue"] = relationship(
159 "Issue",
160 back_populates="events",
161 primaryjoin="foreign(AuditEvent.issue_id)==Issue.id",
162 )
164 organisation: Mapped["Organisation"] = relationship(
165 "Organisation",
166 primaryjoin=("foreign(AuditEvent.org_id)" "==Organisation.id"),
167 )
168 user: Mapped["User"] = relationship(
169 "User",
170 primaryjoin="foreign(AuditEvent.user_id)==User.id",
171 back_populates="events",
172 )
174 acl: Mapped[list["EventOrgACL"]] = relationship(
175 "EventOrgACL",
176 back_populates="event",
177 passive_deletes=True,
178 )
180 notifications: DynamicMapped["EmailNotification"] = relationship(
181 "EmailNotification",
182 back_populates="event",
183 lazy="dynamic",
184 passive_deletes=True,
185 )
187 def __init__(self, *args, **kwargs) -> None:
188 super(AuditEvent, self).__init__(*args, **kwargs)
189 if self.event_class is None and self.event_type:
190 self.event_class = self.event_type.split("_")[0]
192 def __repr__(self) -> str:
193 return f"<AuditEvent #{self.id}, Status: {self.status}>"
195 def set_text(self, text) -> None: # pragma: no cover
196 # subclasses to specialise
197 self.text = str(text)
199 @validates("object_id")
200 def check_object_id_unicode(self, key, object_id) -> str:
201 return str(object_id)
203 @classmethod
204 def create(cls, session: Session, event_type_name: str, **kwargs) -> "AuditEvent":
205 """Creates an instance of the given event type"""
207 if "user" in kwargs:
208 user = kwargs.pop("user")
209 kwargs["user_id"] = kwargs.get("user_id", user.id)
210 kwargs["org_id"] = kwargs.get("org_id", user.org_id)
212 change_list = kwargs.pop("change_list", None)
214 evt = cls(event_type=event_type_name, **kwargs)
215 if change_list:
216 for prop_name, old_value, new_value in change_list:
217 evt.add_change(prop_name, old_value, new_value)
219 session.add(evt)
220 evt.build_acl()
221 return evt
223 @property
224 def changes(self) -> list[dict]:
225 """
226 Parses the xml saved in the text column and returns a json
227 representation of the property changes associated with this event
228 """
230 if not self.text:
231 return []
232 try:
233 changes_xml = ET.fromstring(self.text)
234 return [change_to_json(c) for c in changes_xml]
235 except Exception:
236 log.exception("Error attemping to parse AuditEvent xml text")
237 return []
239 def set_changes(self, property_changes: PropertyChanges) -> None:
240 self.text = property_changes.as_string()
242 def add_change(self, prop_name, old_value, new_value) -> None:
243 if getattr(self, "_pc", None) is None:
244 self._pc = PropertyChanges()
245 self._pc.add(prop_name, old_value, new_value)
246 self.set_changes(self._pc)
248 def build_acl(self) -> set[str]:
249 """Create EventOrgACL records for this event"""
251 event_type = self.event_type
253 acl_orgid_set = set()
255 if event_type in visible.to_participants:
256 for participant in self.project.participants:
257 acl_orgid_set.add(participant.org_id)
259 if event_type in visible.to_respondent and self.issue is not None:
260 if self.issue.respondent_id is not None:
261 acl_orgid_set.add(self.issue.respondent_id)
263 if event_type in visible.to_initiator and self.org_id is not None:
264 acl_orgid_set.add(self.org_id)
266 for org_id in acl_orgid_set:
267 self.add_to_acl(org_id)
269 return acl_orgid_set
271 def add_to_acl(self, org_id: str) -> None:
272 self.acl.append(EventOrgACL(org_id=org_id))
274 def set_done(self) -> None:
275 self.status = Status.done
278class EventOrgACL(Base):
279 """
280 Provides an Access Control List to restrict which organisations
281 can view which events.
282 """
284 __tablename__ = "audit_event_orgs"
285 __table_args__ = (
286 Index("audit_event_org_idx", "org_id", unique=False),
287 ) + Base.__table_args__
289 event_id: Mapped[Optional[int]] = mapped_column(
290 Integer,
291 ForeignKey(
292 "audit_events.id", ondelete="CASCADE", name="audit_event_orgs_ibfk_1"
293 ),
294 nullable=True,
295 )
297 org_id: Mapped[Optional[str]] = mapped_column(
298 VARCHAR(length=150),
299 ForeignKey("organisations.id", ondelete="CASCADE", onupdate="CASCADE"),
300 nullable=True,
301 )
303 event: Mapped[AuditEvent] = relationship(
304 AuditEvent, lazy="joined", back_populates="acl"
305 )
306 organisation: Mapped["Organisation"] = relationship("Organisation")
308 def __repr__(self) -> str:
309 return f"<EventACL Event {self.event_id} Org {self.org_id}>"
312@event.listens_for(AuditEvent, "before_insert", propagate=True)
313def serialize_property_changes(mapper, connection, target):
314 if hasattr(target, "_pc"):
315 target.text = target._pc.as_string()
318@event.listens_for(AuditEvent, "before_insert", propagate=True)
319def set_status_done_if_no_handler(mapper, connection, target: AuditEvent):
320 from rfpy.jobs.events.action import handler_exists_for
322 if not handler_exists_for(target):
323 target.set_done()
326@event.listens_for(AuditEvent, "after_insert")
327def enqueue_event_job(mapper, connection, target: AuditEvent):
328 from rfpy.jobs.events.action import handler_exists_for
330 if handler_exists_for(target):
331 from rfpy.jobs.offload import spool_event_handler
333 evt_id = str(target.id).encode("utf8")
334 spool_file = spool_event_handler({"event_id": evt_id})
335 log.info("Event %s spooled to file %s", target.id, spool_file)