Coverage for rfpy/model/audit/event.py: 99%

155 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-04-24 10:52 +0000

1import logging 

2from enum import Enum 

3from typing import Optional, TYPE_CHECKING 

4from datetime import datetime 

5from xml.etree import ElementTree as ET 

6 

7 

8from sqlalchemy import Boolean, DateTime, Integer, ForeignKey, event 

9from sqlalchemy import Index, text as sa_text 

10from sqlalchemy.orm import ( 

11 Mapped, 

12 mapped_column, 

13 relationship, 

14 validates, 

15 Session, 

16 DynamicMapped, 

17) 

18from sqlalchemy.dialects.mysql import VARCHAR, LONGTEXT, TINYINT 

19import sqlalchemy.types 

20 

21 

22from rfpy.model.meta import Base 

23from rfpy.model.audit import visible 

24 

25if TYPE_CHECKING: 

26 from rfpy.model.project import Project 

27 from rfpy.model.issue import Issue 

28 from rfpy.model.humans import Organisation, User 

29 from rfpy.model.notify import EmailNotification 

30 

31log = logging.getLogger(__name__) 

32 

33 

34class EventKlass: 

35 ANSWER = "ANSWER" 

36 BASE = "BASE" 

37 ISSUE = "ISSUE" 

38 ORGANISATION = "ORGANISATION" 

39 PROJECT = "PROJECT" 

40 QUESTION = "QUESTION" 

41 QUESTIONNAIRE = "QUESTIONNAIRE" 

42 ROLE = "ROLE" 

43 SCORE = "SCORE" 

44 SCORE_COMMENT = "SCORE_COMMENT" 

45 USER = "USER" 

46 SECTION = "SECTION" 

47 

48 

49PROJECT_KLASSES = { 

50 EventKlass.ISSUE, 

51 EventKlass.PROJECT, 

52 EventKlass.QUESTION, 

53 EventKlass.QUESTIONNAIRE, 

54 EventKlass.SCORE, 

55 EventKlass.SCORE_COMMENT, 

56 EventKlass.SECTION, 

57} 

58 

59 

60class PropertyChanges(object): 

61 def __init__(self): 

62 self.root = ET.Element("d") 

63 

64 def add(self, property_name, old_value, new_value): 

65 change = ET.SubElement(self.root, "c", name=property_name) 

66 ET.SubElement(change, "old").text = str(old_value) 

67 ET.SubElement(change, "new").text = str(new_value) 

68 

69 def as_string(self): 

70 return ET.tostring(self.root, encoding="unicode") 

71 

72 

73def change_to_json(change): 

74 js = dict(name=change.get("name")) 

75 for vals in change: 

76 js[vals.tag] = vals.text 

77 return js 

78 

79 

80class Status(Enum): 

81 pending = 0 

82 done = 1 

83 error = 2 

84 

85 

86class EventStatusType(sqlalchemy.types.TypeDecorator): 

87 impl = TINYINT(1) 

88 

89 cache_ok = True 

90 

91 def process_bind_param(self, status, dialect): 

92 return status.value 

93 

94 def process_result_value(self, int_value, dialect): 

95 return Status(int_value).name 

96 

97 

98class AuditEvent(Base): 

99 """@DynamicAttrs""" 

100 

101 __tablename__ = "audit_events" 

102 

103 public_attrs = ( 

104 "id", 

105 "event_class", 

106 "event_type", 

107 "timestamp", 

108 "question_id", 

109 "project_id", 

110 "issue_id", 

111 "user_id", 

112 "org_id", 

113 "object_id", 

114 "changes", 

115 "user", 

116 ) 

117 

118 __table_args__ = ( 

119 Index("event_object_id", "object_id"), 

120 Index("audit_events_user", "user_id"), 

121 Index("audit_events_type", "type"), 

122 Index("audit_events_timestamp", "timestamp", "id"), 

123 Index("audit_events_question", "question_id"), 

124 Index("audit_events_project", "project_id"), 

125 Index("audit_events_ord", "org_id"), 

126 Index("audit_events_issue", "issue_id"), 

127 Index("audit_events_enqueued", "enqueued"), 

128 ) + Base.__table_args__ 

129 

130 project_id: Mapped[Optional[int]] = mapped_column(Integer, nullable=True) 

131 issue_id: Mapped[Optional[int]] = mapped_column(Integer, nullable=True) 

132 question_id: Mapped[Optional[int]] = mapped_column(Integer, nullable=True) 

133 event_class: Mapped[Optional[str]] = mapped_column("class", VARCHAR(length=20), nullable=True) 

134 event_type: Mapped[Optional[str]] = mapped_column("type", VARCHAR(length=50), nullable=True) 

135 object_id: Mapped[Optional[str]] = mapped_column(VARCHAR(length=150), nullable=True) 

136 user_id: Mapped[Optional[str]] = mapped_column(VARCHAR(length=150), nullable=True) 

137 org_id: Mapped[Optional[str]] = mapped_column(VARCHAR(length=150), nullable=True) 

138 private: Mapped[bool] = mapped_column(Boolean, nullable=False, default=0) 

139 timestamp: Mapped[datetime] = mapped_column( 

140 DateTime, nullable=False, server_default=sa_text("CURRENT_TIMESTAMP") 

141 ) 

142 text: Mapped[Optional[str]] = mapped_column(LONGTEXT()) 

143 status: Mapped[Status] = mapped_column( 

144 "enqueued", 

145 EventStatusType, 

146 default=Status.pending, 

147 nullable=False, 

148 server_default=sa_text("'0'"), 

149 ) 

150 session_id: Mapped[Optional[str]] = mapped_column(VARCHAR(length=50), nullable=True) 

151 

152 project: Mapped["Project"] = relationship( 

153 "Project", 

154 back_populates="events", 

155 primaryjoin=("foreign(AuditEvent.project_id)" "==Project.id"), 

156 ) 

157 

158 issue: Mapped["Issue"] = relationship( 

159 "Issue", 

160 back_populates="events", 

161 primaryjoin="foreign(AuditEvent.issue_id)==Issue.id", 

162 ) 

163 

164 organisation: Mapped["Organisation"] = relationship( 

165 "Organisation", 

166 primaryjoin=("foreign(AuditEvent.org_id)" "==Organisation.id"), 

167 ) 

168 user: Mapped["User"] = relationship( 

169 "User", 

170 primaryjoin="foreign(AuditEvent.user_id)==User.id", 

171 back_populates="events", 

172 ) 

173 

174 acl: Mapped[list["EventOrgACL"]] = relationship( 

175 "EventOrgACL", 

176 back_populates="event", 

177 passive_deletes=True, 

178 ) 

179 

180 notifications: DynamicMapped["EmailNotification"] = relationship( 

181 "EmailNotification", 

182 back_populates="event", 

183 lazy="dynamic", 

184 passive_deletes=True, 

185 ) 

186 

187 def __init__(self, *args, **kwargs) -> None: 

188 super(AuditEvent, self).__init__(*args, **kwargs) 

189 if self.event_class is None and self.event_type: 

190 self.event_class = self.event_type.split("_")[0] 

191 

192 def __repr__(self) -> str: 

193 return f"<AuditEvent #{self.id}, Status: {self.status}>" 

194 

195 def set_text(self, text) -> None: # pragma: no cover 

196 # subclasses to specialise 

197 self.text = str(text) 

198 

199 @validates("object_id") 

200 def check_object_id_unicode(self, key, object_id) -> str: 

201 return str(object_id) 

202 

203 @classmethod 

204 def create(cls, session: Session, event_type_name: str, **kwargs) -> "AuditEvent": 

205 """Creates an instance of the given event type""" 

206 

207 if "user" in kwargs: 

208 user = kwargs.pop("user") 

209 kwargs["user_id"] = kwargs.get("user_id", user.id) 

210 kwargs["org_id"] = kwargs.get("org_id", user.org_id) 

211 

212 change_list = kwargs.pop("change_list", None) 

213 

214 evt = cls(event_type=event_type_name, **kwargs) 

215 if change_list: 

216 for prop_name, old_value, new_value in change_list: 

217 evt.add_change(prop_name, old_value, new_value) 

218 

219 session.add(evt) 

220 evt.build_acl() 

221 return evt 

222 

223 @property 

224 def changes(self) -> list[dict]: 

225 """ 

226 Parses the xml saved in the text column and returns a json 

227 representation of the property changes associated with this event 

228 """ 

229 

230 if not self.text: 

231 return [] 

232 try: 

233 changes_xml = ET.fromstring(self.text) 

234 return [change_to_json(c) for c in changes_xml] 

235 except Exception: 

236 log.exception("Error attemping to parse AuditEvent xml text") 

237 return [] 

238 

239 def set_changes(self, property_changes: PropertyChanges) -> None: 

240 self.text = property_changes.as_string() 

241 

242 def add_change(self, prop_name, old_value, new_value) -> None: 

243 if getattr(self, "_pc", None) is None: 

244 self._pc = PropertyChanges() 

245 self._pc.add(prop_name, old_value, new_value) 

246 self.set_changes(self._pc) 

247 

248 def build_acl(self) -> set[str]: 

249 """Create EventOrgACL records for this event""" 

250 

251 event_type = self.event_type 

252 

253 acl_orgid_set = set() 

254 

255 if event_type in visible.to_participants: 

256 for participant in self.project.participants: 

257 acl_orgid_set.add(participant.org_id) 

258 

259 if event_type in visible.to_respondent and self.issue is not None: 

260 if self.issue.respondent_id is not None: 

261 acl_orgid_set.add(self.issue.respondent_id) 

262 

263 if event_type in visible.to_initiator and self.org_id is not None: 

264 acl_orgid_set.add(self.org_id) 

265 

266 for org_id in acl_orgid_set: 

267 self.add_to_acl(org_id) 

268 

269 return acl_orgid_set 

270 

271 def add_to_acl(self, org_id: str) -> None: 

272 self.acl.append(EventOrgACL(org_id=org_id)) 

273 

274 def set_done(self) -> None: 

275 self.status = Status.done 

276 

277 

278class EventOrgACL(Base): 

279 """ 

280 Provides an Access Control List to restrict which organisations 

281 can view which events. 

282 """ 

283 

284 __tablename__ = "audit_event_orgs" 

285 __table_args__ = ( 

286 Index("audit_event_org_idx", "org_id", unique=False), 

287 ) + Base.__table_args__ 

288 

289 event_id: Mapped[Optional[int]] = mapped_column( 

290 Integer, 

291 ForeignKey( 

292 "audit_events.id", ondelete="CASCADE", name="audit_event_orgs_ibfk_1" 

293 ), 

294 nullable=True, 

295 ) 

296 

297 org_id: Mapped[Optional[str]] = mapped_column( 

298 VARCHAR(length=150), 

299 ForeignKey("organisations.id", ondelete="CASCADE", onupdate="CASCADE"), 

300 nullable=True, 

301 ) 

302 

303 event: Mapped[AuditEvent] = relationship( 

304 AuditEvent, lazy="joined", back_populates="acl" 

305 ) 

306 organisation: Mapped["Organisation"] = relationship("Organisation") 

307 

308 def __repr__(self) -> str: 

309 return f"<EventACL Event {self.event_id} Org {self.org_id}>" 

310 

311 

312@event.listens_for(AuditEvent, "before_insert", propagate=True) 

313def serialize_property_changes(mapper, connection, target): 

314 if hasattr(target, "_pc"): 

315 target.text = target._pc.as_string() 

316 

317 

318@event.listens_for(AuditEvent, "before_insert", propagate=True) 

319def set_status_done_if_no_handler(mapper, connection, target: AuditEvent): 

320 from rfpy.jobs.events.action import handler_exists_for 

321 

322 if not handler_exists_for(target): 

323 target.set_done() 

324 

325 

326@event.listens_for(AuditEvent, "after_insert") 

327def enqueue_event_job(mapper, connection, target: AuditEvent): 

328 from rfpy.jobs.events.action import handler_exists_for 

329 

330 if handler_exists_for(target): 

331 from rfpy.jobs.offload import spool_event_handler 

332 

333 evt_id = str(target.id).encode("utf8") 

334 spool_file = spool_event_handler({"event_id": evt_id}) 

335 log.info("Event %s spooled to file %s", target.id, spool_file)