Coverage for rfpy/jobs/events/action.py: 97%
260 statements
« prev ^ index » next coverage.py v7.0.1, created at 2022-12-31 16:00 +0000
« prev ^ index » next coverage.py v7.0.1, created at 2022-12-31 16:00 +0000
1from typing import Iterable, List
2from contextlib import contextmanager
3import logging
5import orjson
6from sqlalchemy.orm import Session
7from sqlalchemy.orm.exc import NoResultFound
8from overrides import overrides, final, EnforceOverrides
11from rfpy.mail.schemas import EvtSchema, build_template_model, ProjNote, ScoreSchema
12from rfpy.jobs.events.webhooks import ping_webhook
13from rfpy.model.notify import EmailNotification
14from rfpy.model.audit.event import AuditEvent, Status as EvtStatus
15from rfpy.model.audit import evt_types as e
16from rfpy.model.issue import Issue
17from rfpy.utils import json_default
18from rfpy.model.humans import User
19from rfpy.api import fetch
20from rfpy import conf
21from . import fanout
24log = logging.getLogger(__name__)
26registry = {}
29def debug_model(model, event_id):
30 if log.isEnabledFor(logging.DEBUG):
31 mod_str = orjson.dumps(
32 model,
33 default=json_default,
34 option=orjson.OPT_INDENT_2
35 )
37 log.debug(
38 "Data model for user %s, event %s: \n %s",
39 model.get('To', 'no email address given'), event_id, mod_str.decode('utf8')
40 )
43def webhook_evt_types() -> List[str]:
44 '''Get a list of event types which accept webhook subscriptions'''
45 return [k for k, v in registry.items() if v.webhooks]
48class handles:
49 '''
50 Decorator for associating an EventProcessor subclass with an event type.
51 '''
52 def __init__(self, *evt_types: str):
53 self.evt_types = evt_types
55 def __call__(self, cls):
56 for evt_type in self.evt_types:
57 if evt_type in registry:
58 raise ValueError(f"{evt_type} is already assigned")
59 registry[evt_type] = cls
60 return cls
63def handle_event(event: AuditEvent, session, mailer=None):
64 '''
65 Process email notifications, webhooks and any additional tasks for the given event.
67 Delegates to a subclass of EventProcessor
68 '''
69 if mailer is None:
70 from rfpy.mail import stub
71 mailer = stub
72 if event.event_type not in registry:
73 raise ValueError(f"No Handler found for AuditEvent {event.event_type}")
74 handler_cls = registry[event.event_type]
75 handler = handler_cls(event, session, mailer)
76 return handler.process()
79def handler_exists_for(event: AuditEvent) -> bool:
80 return event.event_type in registry
83def run_event_handlers(session: Session):
84 '''Call handle_event for all events at status pending'''
85 for evt in session.query(AuditEvent).filter(AuditEvent.status == EvtStatus.pending):
86 handle_event(evt, session)
89def logging_context(event: AuditEvent):
91 @contextmanager
92 def _cm(action):
93 try:
94 yield
95 log.info("%s completed for event %s # %s", action, event.event_type, event.id)
96 except Exception:
97 event.status = EvtStatus.error
98 log.exception("%s failure for event %s # %s", action, event.event_type, event.id)
100 return _cm
103class EventProcessor(EnforceOverrides):
104 exclude_initiator = True
105 issue_watchers = False
106 project_watchers = False
107 webhooks = False
108 fanout = False
110 def __init__(self, event: AuditEvent, session, mailer):
111 self.session = session
112 self.mailer = mailer
113 self.event = event
114 self.evt_model = None
115 self.result = None
117 @final
118 def process(self):
119 self.generate_evt_model()
120 self.moderate_event_acl()
122 errors_logged = logging_context(self.event)
124 with errors_logged('Email notifications'):
125 self.send_emails()
127 with errors_logged('Webhooks'):
128 self.ping_webhooks()
130 with errors_logged('Fanout Update Stream'):
131 self.publish_to_fanout()
133 with errors_logged('Extra actions'):
134 self.extra()
136 if self.event.status != EvtStatus.error:
137 self.event.status = EvtStatus.done
138 log.info("Event processing completed for %s", self.event)
139 else:
140 log.warning("Event processing failed for %s", self.event)
142 return self.result
144 def generate_evt_model(self):
145 # Extracted from __init__ method to permit easier testing
146 self.evt_model = EvtSchema.from_orm(self.event)
148 def moderate_event_acl(self):
149 '''Allow subclass handlers to customise the EventOrgACL'''
150 pass
152 def _iter_watchers(self) -> Iterable[User]:
153 self.session.add(self.event)
154 if self.issue_watchers:
155 yield from self.event.issue.watchers
156 if self.project_watchers:
157 yield from self.event.project.participant_watchers
159 def recipients(self) -> Iterable[User]:
160 '''
161 An iterable of users watching the related Project or Issue and which belong
162 to an organisation assigned the Event's ACL. If self.initiator is False then
163 the event author is excluded
164 '''
165 acl_org_ids = {acl.org_id for acl in self.event.acl}
167 for w in self._iter_watchers():
168 if w.org_id in acl_org_ids:
169 if self.exclude_initiator and w is self.event.user:
170 continue
171 yield w
173 def assign_notifications(self) -> List[EmailNotification]:
174 en_list = []
175 for user in self.recipients():
176 en = EmailNotification(
177 user_id=user.id,
178 org_id=user.org_id,
179 organisation=user.organisation,
180 email=user.email,
181 event_id=self.event.id
182 )
183 self.session.add(en)
184 en_list.append(en)
185 self.session.commit()
186 return en_list
188 @final
189 def get_model(self, user: User, email: str) -> dict:
190 pydantic_model = build_template_model(self.evt_model, user, email)
191 self.augment_model(pydantic_model.TemplateModel.event)
192 model = pydantic_model.dict()
193 debug_model(model, self.event.id)
194 return model
196 def augment_model(self, model: EvtSchema):
197 '''
198 Subclasses can override this to add data to the 'TemplateModel' of the mail
199 data package
200 '''
201 pass
203 def send_emails(self):
204 for n in self.assign_notifications():
205 try:
206 email_model = self.get_model(n.user, n.email)
207 n.message_id = self.mailer.send_email(email_model)
208 log.info(
209 'Email notification sent to %s for event ID %s - %s',
210 n.email, self.event.id, self.event.event_type
211 )
212 n.set_sent()
213 except Exception as exc:
214 n.set_failed()
215 n.message_id = str(exc)[:255]
216 log.exception('Failed to send message to %s for event #%s', n.email, self.event.id)
217 self.session.commit()
219 def ping_webhooks(self):
220 if self.webhooks is False:
221 return
222 ev_model_bytes = orjson.dumps(self.evt_model.dict(), default=json_default)
223 for wh in fetch.webhooks_for_event(self.event):
224 ping_webhook(wh, ev_model_bytes)
225 self.session.commit()
227 @final
228 def publish_to_fanout(self):
229 if self.fanout and conf.CONF.fanout_realm_id is not None:
230 data = self.fanout_data()
231 for acl in self.event.acl:
232 fanout.send_update(self.event.id, self.event.event_type, data, channel=acl.org_id)
234 def fanout_data(self):
235 '''
236 Subclasses can customise data to to Fanout.io update stream. For security, only
237 object IDs should be sent (requiring the client to re-fetch if necessary)
238 '''
239 return dict(event_id=self.event.id)
241 def extra(self):
242 '''Custom actions that may be defined by subclasses'''
243 pass
246@handles(e.ISSUE_ACCEPTED, e.ISSUE_DECLINED, e.ISSUE_REVERTED_TO_ACCEPTED, e.ISSUE_SUBMITTED)
247class VendorStatusAction(EventProcessor):
248 issue_watchers = True
249 project_watchers = True
250 webhooks = True
252 @overrides
253 def extra(self):
254 if self.event.event_type == e.ISSUE_ACCEPTED:
255 self.assign_accepting_admin_to_watchlist()
257 def assign_accepting_admin_to_watchlist(self):
258 issue = self.session.query(Issue).filter_by(id=self.event.issue_id).one()
259 issue.add_watcher(self.event.user)
262@handles(e.ISSUE_RELEASED, e.ISSUE_RELEASED_UPDATEABLE)
263class IssueReleased(EventProcessor):
264 webhooks = True
266 @overrides
267 def assign_notifications(self) -> List[EmailNotification]:
268 issue: Issue = self.event.issue
269 rs: set
270 en_list = []
271 if issue.respondent is not None:
272 for user in (u for u in issue.respondent.users if u.is_administrator()):
273 en = EmailNotification(
274 user_id=user.id,
275 org_id=user.org_id,
276 organisation=user.organisation,
277 email=user.email,
278 event_id=self.event.id
279 )
280 self.session.add(en)
281 self.session.commit()
282 en_list.append(en)
284 elif issue.respondent_email is not None:
285 en = EmailNotification(
286 email=issue.respondent_email,
287 event_id=self.event.id
288 )
289 self.session.add(en)
290 self.session.commit()
291 en_list.append(en)
293 if not en_list:
294 msg = (f"Event #{self.event.id} - cannot send email notifications for Issue Released: "
295 "respondent_email not set and no respondent user Administrators found")
296 log.error(msg)
297 return en_list
300@handles(e.SECTION_ACCESS_UPDATED)
301class SectionAccess(EventProcessor):
303 @overrides
304 def recipients(self) -> Iterable[User]:
305 user_id_set = set()
306 for pc in self.event.changes:
307 if pc['name'] == 'Granted To':
308 user_id = pc.get('new', '').strip()
309 if user_id in user_id_set:
310 continue
311 user_id_set.add(user_id)
312 grant_user = self.session.query(User).get(user_id)
313 if grant_user is not None:
314 yield grant_user
317@handles(e.PROJECT_NOTE_ADDED)
318class ProjectNoteHandler(EventProcessor):
319 webhooks = True
320 fanout = True
322 @overrides
323 def moderate_event_acl(self) -> List[EmailNotification]:
324 from rfpy.model.notes import ProjectNote, Distribution
325 from rfpy.model.project import Project
327 event = self.event
328 note: ProjectNote = (
329 self.session.query(ProjectNote).filter_by(id=self.event.object_id).one()
330 )
331 project: Project = note.project
333 if note.distribution != Distribution.RESPONDENT_INTERNAL_MEMO:
334 for participant in project.participants:
335 if participant.org_id != note.org_id:
336 self.event.add_to_acl(participant.org_id)
338 if note.distribution == Distribution.BROADCAST_NOTICE:
339 for issue in project.published_issues:
340 event.add_to_acl(issue.respondent_id)
341 elif note.distribution == Distribution.TARGETED:
342 event.add_to_acl(note.target_org_id)
344 @overrides
345 def recipients(self) -> Iterable[User]:
346 acl_org_ids = {a.org_id for a in self.event.acl}
347 for user in set(self.event.project.iter_all_watchers()):
348 if user.org_id in acl_org_ids and user.id != self.event.user_id:
349 yield user
351 @overrides
352 def augment_model(self, model: EvtSchema):
353 from rfpy.model.notes import ProjectNote
354 note: ProjectNote = self.session.query(ProjectNote).get(self.event.object_id)
355 model.note = ProjNote.from_orm(note)
358@handles(e.SCORE_COMMENT_ADDED)
359class ScoreCommentHandler(EventProcessor):
360 webhooks = True
362 @overrides
363 def recipients(self) -> Iterable[User]:
364 from rfpy.model.issue import ScoreComment, Score
366 event = self.event
367 score_comment: ScoreComment = (
368 self.session.query(ScoreComment).filter_by(id=event.object_id).one()
369 )
370 score: Score = score_comment.score
371 users_to_notify = {sc.user for sc in score.comments}
372 users_to_notify = users_to_notify.union(set(event.project.participant_watchers))
373 try:
374 # Try to add the original scorer to the recipients list
375 score_created_event = (
376 self.session.query(AuditEvent)
377 .filter(AuditEvent.event_type == e.SCORE_CREATED)
378 .filter(AuditEvent.object_id == score.id)
379 .one()
380 )
381 users_to_notify.add(score_created_event.user)
382 except NoResultFound:
383 # Ignore - score could be created by autoscore or imported answer
384 pass
385 # AuditEvent.object_id is a bit vague. To prevent info leaking to the wrong people
386 # do a sanity check for all recipients
387 parti_orgs = {p.org_id for p in score.issue.project.participants}
388 for user in users_to_notify:
389 if user.org_id in parti_orgs and user is not event.user:
390 yield user
392 @overrides
393 def augment_model(self, model: EvtSchema):
394 from rfpy.model.issue import ScoreComment
395 sc: ScoreComment = self.session.query(ScoreComment).get(self.event.object_id)
396 score = sc.score
397 if score.score is None:
398 sv = None
399 else:
400 sv = float(score.score)
402 model.score = ScoreSchema(
403 score_value=sv,
404 comment=sc.comment_text,
405 question_number=score.question.number.dotted,
406 respondent_name=score.issue.respondent.name,
407 issue_id=score.issue_id
408 )
411if __name__ == '__main__':
412 for k, v in registry.items():
413 print(f"{k} : {v.__name__}")
414 print('Can be webhooked:')
415 for ev in webhook_evt_types():
416 print(ev)