Coverage for rfpy/jobs/events/action.py: 98%
262 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-24 10:52 +0000
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-24 10:52 +0000
1from typing import Iterable, List, Type, Optional, Self
2from contextlib import contextmanager
3import logging
5import orjson
6from sqlalchemy.orm import Session
7from sqlalchemy.orm.exc import NoResultFound
8from overrides import overrides, final, EnforceOverrides
11from rfpy.model.humans import User
12from rfpy import conf
13from rfpy.mail.schemas import EvtSchema, build_template_model, ProjNote, ScoreSchema
14from rfpy.jobs.events.webhooks import ping_webhook
15from rfpy.model.notify import EmailNotification
16from rfpy.model.audit.event import AuditEvent, Status as EvtStatus
17from rfpy.model.audit import evt_types as e
18from rfpy.model.issue import Issue
19from rfpy.utils import json_default
20from rfpy.api import fetch
21from rfpy.jobs.events import fanout
24log = logging.getLogger(__name__)
26registry: dict[str, Type["EventProcessor"]] = {}
29def debug_model(model, event_id):
30 if log.isEnabledFor(logging.DEBUG):
31 mod_str = orjson.dumps(model, default=json_default, option=orjson.OPT_INDENT_2)
33 log.debug(
34 "Data model for user %s, event %s: \n %s",
35 model.get("To", "no email address given"),
36 event_id,
37 mod_str.decode("utf8"),
38 )
41def webhook_evt_types() -> List[str]:
42 """Get a list of event types which accept webhook subscriptions"""
43 return [k for k, v in registry.items() if v.webhooks]
46class handles:
47 """
48 Decorator for associating an EventProcessor subclass with an event type.
49 """
51 def __init__(self, *evt_types: str):
52 self.evt_types = evt_types
54 def __call__(self, cls):
55 for evt_type in self.evt_types:
56 if evt_type in registry:
57 raise ValueError(f"{evt_type} is already assigned")
58 registry[evt_type] = cls
59 return cls
62def handle_event(event: AuditEvent, session, mailer=None):
63 """
64 Process email notifications, webhooks and any additional tasks for the given event.
66 Delegates to a subclass of EventProcessor
67 """
68 if mailer is None:
69 from rfpy.mail import stub
71 mailer = stub
72 if event.event_type not in registry:
73 raise ValueError(f"No Handler found for AuditEvent {event.event_type}")
74 handler_cls = registry[event.event_type]
75 handler = handler_cls(event, session, mailer)
76 return handler.process()
79def handler_exists_for(event: AuditEvent) -> bool:
80 return event.event_type in registry
83def run_event_handlers(session: Session):
84 """Call handle_event for all events at status pending"""
85 for evt in session.query(AuditEvent).filter(AuditEvent.status == EvtStatus.pending):
86 handle_event(evt, session)
89def logging_context(event: AuditEvent):
90 @contextmanager
91 def _cm(action):
92 try:
93 yield
94 log.info(
95 "%s completed for event %s # %s", action, event.event_type, event.id
96 )
97 except Exception:
98 event.status = EvtStatus.error
99 log.exception(
100 "%s failure for event %s # %s", action, event.event_type, event.id
101 )
103 return _cm
106class EventProcessor(EnforceOverrides):
107 exclude_initiator = True
108 issue_watchers = False
109 project_watchers = False
110 webhooks = False
111 fanout = False
113 def __init__(self, event: AuditEvent, session, mailer):
114 self.session = session
115 self.mailer = mailer
116 self.event = event
117 self.evt_model: Optional[EvtSchema] = None
118 self.result = None
120 @final
121 def process(self):
122 """Process an audit event through notification pipeline.
124 Executes the following steps in order:
125 1. Generates event model from audit data
126 2. Moderates event access control list
127 3. Sends email notifications if configured
128 4. Triggers webhooks if enabled
129 5. Publishes to fanout stream if active
130 6. Runs any extra actions defined by subclasses
132 All steps are wrapped in error logging contexts. Failures in
133 individual steps are logged but don't prevent subsequent steps
134 from executing.
136 Side Effects:
137 - Updates event.status to EvtStatus.done on success
138 - Updates event.status to EvtStatus.error on failure
139 - Logs completion/failure state
141 Returns:
142 None
143 """
144 self.generate_evt_model()
145 self.moderate_event_acl()
147 errors_logged = logging_context(self.event)
149 with errors_logged("Email notifications"):
150 self.send_emails()
152 with errors_logged("Webhooks"):
153 self.ping_webhooks()
155 with errors_logged("Fanout Update Stream"):
156 self.publish_to_fanout()
158 with errors_logged("Extra actions"):
159 self.extra()
161 if self.event.status != EvtStatus.error:
162 self.event.status = EvtStatus.done
163 log.info("Event processing completed for %s", self.event)
164 else:
165 log.warning("Event processing failed for %s", self.event)
167 return self.result
169 def generate_evt_model(self):
170 # Extracted from __init__ method to permit easier testing
171 self.evt_model = EvtSchema.model_validate(self.event)
173 def moderate_event_acl(self):
174 """Allow subclass handlers to customise the EventOrgACL"""
175 pass
177 def _iter_watchers(self) -> Iterable[User]:
178 self.session.add(self.event)
179 if self.issue_watchers:
180 yield from self.event.issue.watchers
181 if self.project_watchers:
182 yield from self.event.project.participant_watchers
184 def recipients(self) -> Iterable[User]:
185 """
186 An iterable of users watching the related Project or Issue and which belong
187 to an organisation assigned the Event's ACL. If self.initiator is False then
188 the event author is excluded
189 """
190 acl_org_ids = {acl.org_id for acl in self.event.acl}
192 for w in self._iter_watchers():
193 if w.org_id in acl_org_ids:
194 if self.exclude_initiator and w is self.event.user:
195 continue
196 yield w
198 def assign_notifications(self) -> List[EmailNotification]:
199 en_list = []
200 for user in self.recipients():
201 en = EmailNotification(
202 user_id=user.id,
203 org_id=user.org_id,
204 organisation=user.organisation,
205 email=user.email,
206 event_id=self.event.id,
207 )
208 self.session.add(en)
209 en_list.append(en)
210 self.session.commit()
211 return en_list
213 @final
214 def get_model(self, user: User, email: str) -> dict:
215 assert self.evt_model is not None
216 pydantic_model = build_template_model(self.evt_model, user, email)
217 self.augment_model(pydantic_model.TemplateModel.event)
218 model = pydantic_model.model_dump()
219 debug_model(model, self.event.id)
220 return model
222 def augment_model(self, model: EvtSchema):
223 """
224 Subclasses can override this to add data to the 'TemplateModel' of the mail
225 data package
226 """
227 pass
229 def send_emails(self):
230 for n in self.assign_notifications():
231 try:
232 email_model = self.get_model(n.user, n.email)
233 n.message_id = self.mailer.send_email(email_model)
234 log.info(
235 "Email notification sent to %s for event ID %s - %s",
236 n.email,
237 self.event.id,
238 self.event.event_type,
239 )
240 n.set_sent()
241 except Exception as exc:
242 n.set_failed()
243 n.message_id = str(exc)[:255]
244 log.exception(
245 "Failed to send message to %s for event #%s", n.email, self.event.id
246 )
247 self.session.commit()
249 def ping_webhooks(self):
250 if self.webhooks is False:
251 return
252 ev_model_bytes = orjson.dumps(self.evt_model.model_dump(), default=json_default)
253 for wh in fetch.webhooks_for_event(self.event):
254 ping_webhook(wh, ev_model_bytes)
255 self.session.commit()
257 @final
258 def publish_to_fanout(self):
259 if self.fanout and conf.CONF.fanout_realm_id is not None:
260 data = self.fanout_data()
261 for acl in self.event.acl:
262 fanout.send_update(
263 self.event.id, self.event.event_type, data, channel=acl.org_id
264 )
266 def fanout_data(self):
267 """
268 Subclasses can customise data to to Fanout.io update stream. For security, only
269 object IDs should be sent (requiring the client to re-fetch if necessary)
270 """
271 return dict(event_id=self.event.id)
273 def extra(self):
274 """Custom actions that may be defined by subclasses"""
275 pass
278@handles(
279 e.ISSUE_ACCEPTED, e.ISSUE_DECLINED, e.ISSUE_REVERTED_TO_ACCEPTED, e.ISSUE_SUBMITTED
280)
281class VendorStatusAction(EventProcessor):
282 issue_watchers = True
283 project_watchers = True
284 webhooks = True
286 @overrides
287 def extra(self):
288 if self.event.event_type == e.ISSUE_ACCEPTED:
289 self.assign_accepting_admin_to_watchlist()
291 def assign_accepting_admin_to_watchlist(self):
292 issue = self.session.query(Issue).filter_by(id=self.event.issue_id).one()
293 issue.add_watcher(self.event.user)
296@handles(e.ISSUE_RELEASED, e.ISSUE_RELEASED_UPDATEABLE)
297class IssueReleased(EventProcessor):
298 webhooks = True
300 @overrides
301 def assign_notifications(self) -> List[EmailNotification]:
302 issue: Issue = self.event.issue
303 en_list = []
304 if issue.respondent is not None:
305 for user in (u for u in issue.respondent.users if u.is_administrator()):
306 en = EmailNotification(
307 user_id=user.id,
308 org_id=user.org_id,
309 organisation=user.organisation,
310 email=user.email,
311 event_id=self.event.id,
312 )
313 self.session.add(en)
314 self.session.commit()
315 en_list.append(en)
317 elif issue.respondent_email is not None:
318 en = EmailNotification(email=issue.respondent_email, event_id=self.event.id)
319 self.session.add(en)
320 self.session.commit()
321 en_list.append(en)
323 if not en_list:
324 msg = (
325 f"Event #{self.event.id} - cannot send email notifications for Issue Released: "
326 "respondent_email not set and no respondent user Administrators found"
327 )
328 log.error(msg)
329 return en_list
332@handles(e.SECTION_ACCESS_UPDATED)
333class SectionAccess(EventProcessor):
334 @overrides
335 def recipients(self) -> Iterable[User]:
336 user_id_set = set()
337 for pc in self.event.changes:
338 if pc["name"] == "Granted To":
339 user_id = pc.get("new", "").strip()
340 if user_id in user_id_set:
341 continue
342 user_id_set.add(user_id)
343 grant_user = self.session.get(User, user_id)
344 if grant_user is not None:
345 yield grant_user
348@handles(e.PROJECT_NOTE_ADDED)
349class ProjectNoteHandler(EventProcessor):
350 webhooks = True
351 fanout = True
353 @overrides
354 def moderate_event_acl(self: Self):
355 from rfpy.model.notes import ProjectNote, Distribution
356 from rfpy.model.project import Project
358 event = self.event
359 note: ProjectNote = (
360 self.session.query(ProjectNote).filter_by(id=self.event.object_id).one()
361 )
362 project: Project = note.project
364 if note.distribution != Distribution.RESPONDENT_INTERNAL_MEMO:
365 for participant in project.participants:
366 if participant.org_id != note.org_id:
367 self.event.add_to_acl(participant.org_id)
369 if note.distribution == Distribution.BROADCAST_NOTICE:
370 for issue in project.published_issues:
371 if issue.respondent_id is not None:
372 event.add_to_acl(issue.respondent_id)
373 elif (
374 note.distribution == Distribution.TARGETED
375 and note.target_org_id is not None
376 ):
377 event.add_to_acl(note.target_org_id)
379 @overrides
380 def recipients(self) -> Iterable[User]:
381 acl_org_ids = {a.org_id for a in self.event.acl}
382 for user in set(self.event.project.iter_all_watchers()):
383 if user.org_id in acl_org_ids and user.id != self.event.user_id:
384 yield user
386 @overrides
387 def augment_model(self, model: EvtSchema):
388 from rfpy.model.notes import ProjectNote
390 note: ProjectNote = self.session.get(ProjectNote, self.event.object_id)
391 model.note = ProjNote.model_validate(note)
394@handles(e.SCORE_COMMENT_ADDED)
395class ScoreCommentHandler(EventProcessor):
396 webhooks = True
398 @overrides
399 def recipients(self) -> Iterable[User]:
400 from rfpy.model.issue import ScoreComment, Score
402 event = self.event
403 score_comment: ScoreComment = (
404 self.session.query(ScoreComment).filter_by(id=event.object_id).one()
405 )
406 score: Score = score_comment.score
407 users_to_notify = {sc.user for sc in score.comments}
408 users_to_notify = users_to_notify.union(set(event.project.participant_watchers))
409 try:
410 # Try to add the original scorer to the recipients list
411 score_created_event = (
412 self.session.query(AuditEvent)
413 .filter(AuditEvent.event_type == e.SCORE_CREATED)
414 .filter(AuditEvent.object_id == score.id)
415 .one()
416 )
417 users_to_notify.add(score_created_event.user)
418 except NoResultFound:
419 # Ignore - score could be created by autoscore or imported answer
420 pass
421 # AuditEvent.object_id is a bit vague. To prevent info leaking to the wrong people
422 # do a sanity check for all recipients
423 parti_orgs = {p.org_id for p in score.issue.project.participants}
424 for user in users_to_notify:
425 if user.org_id in parti_orgs and user is not event.user:
426 yield user
428 @overrides
429 def augment_model(self, model: EvtSchema):
430 from rfpy.model.issue import ScoreComment
432 sc: ScoreComment = self.session.get(ScoreComment, self.event.object_id)
433 score = sc.score
434 if score.score is None:
435 sv = None
436 else:
437 sv = score.score
439 model.score = ScoreSchema(
440 score_value=sv,
441 comment=sc.comment_text,
442 question_number=score.question.number.dotted,
443 respondent_name=score.issue.respondent.name,
444 issue_id=score.issue_id,
445 )
448if __name__ == "__main__": # pragma: no cover
449 for k, v in registry.items():
450 print(f"{k} : {v.__name__}")
451 print("Can be webhooked:")
452 for ev in webhook_evt_types():
453 print(ev)