Coverage for rfpy/jobs/events/action.py: 98%

262 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-04-24 10:52 +0000

1from typing import Iterable, List, Type, Optional, Self 

2from contextlib import contextmanager 

3import logging 

4 

5import orjson 

6from sqlalchemy.orm import Session 

7from sqlalchemy.orm.exc import NoResultFound 

8from overrides import overrides, final, EnforceOverrides 

9 

10 

11from rfpy.model.humans import User 

12from rfpy import conf 

13from rfpy.mail.schemas import EvtSchema, build_template_model, ProjNote, ScoreSchema 

14from rfpy.jobs.events.webhooks import ping_webhook 

15from rfpy.model.notify import EmailNotification 

16from rfpy.model.audit.event import AuditEvent, Status as EvtStatus 

17from rfpy.model.audit import evt_types as e 

18from rfpy.model.issue import Issue 

19from rfpy.utils import json_default 

20from rfpy.api import fetch 

21from rfpy.jobs.events import fanout 

22 

23 

24log = logging.getLogger(__name__) 

25 

26registry: dict[str, Type["EventProcessor"]] = {} 

27 

28 

29def debug_model(model, event_id): 

30 if log.isEnabledFor(logging.DEBUG): 

31 mod_str = orjson.dumps(model, default=json_default, option=orjson.OPT_INDENT_2) 

32 

33 log.debug( 

34 "Data model for user %s, event %s: \n %s", 

35 model.get("To", "no email address given"), 

36 event_id, 

37 mod_str.decode("utf8"), 

38 ) 

39 

40 

41def webhook_evt_types() -> List[str]: 

42 """Get a list of event types which accept webhook subscriptions""" 

43 return [k for k, v in registry.items() if v.webhooks] 

44 

45 

46class handles: 

47 """ 

48 Decorator for associating an EventProcessor subclass with an event type. 

49 """ 

50 

51 def __init__(self, *evt_types: str): 

52 self.evt_types = evt_types 

53 

54 def __call__(self, cls): 

55 for evt_type in self.evt_types: 

56 if evt_type in registry: 

57 raise ValueError(f"{evt_type} is already assigned") 

58 registry[evt_type] = cls 

59 return cls 

60 

61 

62def handle_event(event: AuditEvent, session, mailer=None): 

63 """ 

64 Process email notifications, webhooks and any additional tasks for the given event. 

65 

66 Delegates to a subclass of EventProcessor 

67 """ 

68 if mailer is None: 

69 from rfpy.mail import stub 

70 

71 mailer = stub 

72 if event.event_type not in registry: 

73 raise ValueError(f"No Handler found for AuditEvent {event.event_type}") 

74 handler_cls = registry[event.event_type] 

75 handler = handler_cls(event, session, mailer) 

76 return handler.process() 

77 

78 

79def handler_exists_for(event: AuditEvent) -> bool: 

80 return event.event_type in registry 

81 

82 

83def run_event_handlers(session: Session): 

84 """Call handle_event for all events at status pending""" 

85 for evt in session.query(AuditEvent).filter(AuditEvent.status == EvtStatus.pending): 

86 handle_event(evt, session) 

87 

88 

89def logging_context(event: AuditEvent): 

90 @contextmanager 

91 def _cm(action): 

92 try: 

93 yield 

94 log.info( 

95 "%s completed for event %s # %s", action, event.event_type, event.id 

96 ) 

97 except Exception: 

98 event.status = EvtStatus.error 

99 log.exception( 

100 "%s failure for event %s # %s", action, event.event_type, event.id 

101 ) 

102 

103 return _cm 

104 

105 

106class EventProcessor(EnforceOverrides): 

107 exclude_initiator = True 

108 issue_watchers = False 

109 project_watchers = False 

110 webhooks = False 

111 fanout = False 

112 

113 def __init__(self, event: AuditEvent, session, mailer): 

114 self.session = session 

115 self.mailer = mailer 

116 self.event = event 

117 self.evt_model: Optional[EvtSchema] = None 

118 self.result = None 

119 

120 @final 

121 def process(self): 

122 """Process an audit event through notification pipeline. 

123 

124 Executes the following steps in order: 

125 1. Generates event model from audit data 

126 2. Moderates event access control list 

127 3. Sends email notifications if configured 

128 4. Triggers webhooks if enabled 

129 5. Publishes to fanout stream if active 

130 6. Runs any extra actions defined by subclasses 

131 

132 All steps are wrapped in error logging contexts. Failures in 

133 individual steps are logged but don't prevent subsequent steps 

134 from executing. 

135 

136 Side Effects: 

137 - Updates event.status to EvtStatus.done on success 

138 - Updates event.status to EvtStatus.error on failure 

139 - Logs completion/failure state 

140 

141 Returns: 

142 None 

143 """ 

144 self.generate_evt_model() 

145 self.moderate_event_acl() 

146 

147 errors_logged = logging_context(self.event) 

148 

149 with errors_logged("Email notifications"): 

150 self.send_emails() 

151 

152 with errors_logged("Webhooks"): 

153 self.ping_webhooks() 

154 

155 with errors_logged("Fanout Update Stream"): 

156 self.publish_to_fanout() 

157 

158 with errors_logged("Extra actions"): 

159 self.extra() 

160 

161 if self.event.status != EvtStatus.error: 

162 self.event.status = EvtStatus.done 

163 log.info("Event processing completed for %s", self.event) 

164 else: 

165 log.warning("Event processing failed for %s", self.event) 

166 

167 return self.result 

168 

169 def generate_evt_model(self): 

170 # Extracted from __init__ method to permit easier testing 

171 self.evt_model = EvtSchema.model_validate(self.event) 

172 

173 def moderate_event_acl(self): 

174 """Allow subclass handlers to customise the EventOrgACL""" 

175 pass 

176 

177 def _iter_watchers(self) -> Iterable[User]: 

178 self.session.add(self.event) 

179 if self.issue_watchers: 

180 yield from self.event.issue.watchers 

181 if self.project_watchers: 

182 yield from self.event.project.participant_watchers 

183 

184 def recipients(self) -> Iterable[User]: 

185 """ 

186 An iterable of users watching the related Project or Issue and which belong 

187 to an organisation assigned the Event's ACL. If self.initiator is False then 

188 the event author is excluded 

189 """ 

190 acl_org_ids = {acl.org_id for acl in self.event.acl} 

191 

192 for w in self._iter_watchers(): 

193 if w.org_id in acl_org_ids: 

194 if self.exclude_initiator and w is self.event.user: 

195 continue 

196 yield w 

197 

198 def assign_notifications(self) -> List[EmailNotification]: 

199 en_list = [] 

200 for user in self.recipients(): 

201 en = EmailNotification( 

202 user_id=user.id, 

203 org_id=user.org_id, 

204 organisation=user.organisation, 

205 email=user.email, 

206 event_id=self.event.id, 

207 ) 

208 self.session.add(en) 

209 en_list.append(en) 

210 self.session.commit() 

211 return en_list 

212 

213 @final 

214 def get_model(self, user: User, email: str) -> dict: 

215 assert self.evt_model is not None 

216 pydantic_model = build_template_model(self.evt_model, user, email) 

217 self.augment_model(pydantic_model.TemplateModel.event) 

218 model = pydantic_model.model_dump() 

219 debug_model(model, self.event.id) 

220 return model 

221 

222 def augment_model(self, model: EvtSchema): 

223 """ 

224 Subclasses can override this to add data to the 'TemplateModel' of the mail 

225 data package 

226 """ 

227 pass 

228 

229 def send_emails(self): 

230 for n in self.assign_notifications(): 

231 try: 

232 email_model = self.get_model(n.user, n.email) 

233 n.message_id = self.mailer.send_email(email_model) 

234 log.info( 

235 "Email notification sent to %s for event ID %s - %s", 

236 n.email, 

237 self.event.id, 

238 self.event.event_type, 

239 ) 

240 n.set_sent() 

241 except Exception as exc: 

242 n.set_failed() 

243 n.message_id = str(exc)[:255] 

244 log.exception( 

245 "Failed to send message to %s for event #%s", n.email, self.event.id 

246 ) 

247 self.session.commit() 

248 

249 def ping_webhooks(self): 

250 if self.webhooks is False: 

251 return 

252 ev_model_bytes = orjson.dumps(self.evt_model.model_dump(), default=json_default) 

253 for wh in fetch.webhooks_for_event(self.event): 

254 ping_webhook(wh, ev_model_bytes) 

255 self.session.commit() 

256 

257 @final 

258 def publish_to_fanout(self): 

259 if self.fanout and conf.CONF.fanout_realm_id is not None: 

260 data = self.fanout_data() 

261 for acl in self.event.acl: 

262 fanout.send_update( 

263 self.event.id, self.event.event_type, data, channel=acl.org_id 

264 ) 

265 

266 def fanout_data(self): 

267 """ 

268 Subclasses can customise data to to Fanout.io update stream. For security, only 

269 object IDs should be sent (requiring the client to re-fetch if necessary) 

270 """ 

271 return dict(event_id=self.event.id) 

272 

273 def extra(self): 

274 """Custom actions that may be defined by subclasses""" 

275 pass 

276 

277 

278@handles( 

279 e.ISSUE_ACCEPTED, e.ISSUE_DECLINED, e.ISSUE_REVERTED_TO_ACCEPTED, e.ISSUE_SUBMITTED 

280) 

281class VendorStatusAction(EventProcessor): 

282 issue_watchers = True 

283 project_watchers = True 

284 webhooks = True 

285 

286 @overrides 

287 def extra(self): 

288 if self.event.event_type == e.ISSUE_ACCEPTED: 

289 self.assign_accepting_admin_to_watchlist() 

290 

291 def assign_accepting_admin_to_watchlist(self): 

292 issue = self.session.query(Issue).filter_by(id=self.event.issue_id).one() 

293 issue.add_watcher(self.event.user) 

294 

295 

296@handles(e.ISSUE_RELEASED, e.ISSUE_RELEASED_UPDATEABLE) 

297class IssueReleased(EventProcessor): 

298 webhooks = True 

299 

300 @overrides 

301 def assign_notifications(self) -> List[EmailNotification]: 

302 issue: Issue = self.event.issue 

303 en_list = [] 

304 if issue.respondent is not None: 

305 for user in (u for u in issue.respondent.users if u.is_administrator()): 

306 en = EmailNotification( 

307 user_id=user.id, 

308 org_id=user.org_id, 

309 organisation=user.organisation, 

310 email=user.email, 

311 event_id=self.event.id, 

312 ) 

313 self.session.add(en) 

314 self.session.commit() 

315 en_list.append(en) 

316 

317 elif issue.respondent_email is not None: 

318 en = EmailNotification(email=issue.respondent_email, event_id=self.event.id) 

319 self.session.add(en) 

320 self.session.commit() 

321 en_list.append(en) 

322 

323 if not en_list: 

324 msg = ( 

325 f"Event #{self.event.id} - cannot send email notifications for Issue Released: " 

326 "respondent_email not set and no respondent user Administrators found" 

327 ) 

328 log.error(msg) 

329 return en_list 

330 

331 

332@handles(e.SECTION_ACCESS_UPDATED) 

333class SectionAccess(EventProcessor): 

334 @overrides 

335 def recipients(self) -> Iterable[User]: 

336 user_id_set = set() 

337 for pc in self.event.changes: 

338 if pc["name"] == "Granted To": 

339 user_id = pc.get("new", "").strip() 

340 if user_id in user_id_set: 

341 continue 

342 user_id_set.add(user_id) 

343 grant_user = self.session.get(User, user_id) 

344 if grant_user is not None: 

345 yield grant_user 

346 

347 

348@handles(e.PROJECT_NOTE_ADDED) 

349class ProjectNoteHandler(EventProcessor): 

350 webhooks = True 

351 fanout = True 

352 

353 @overrides 

354 def moderate_event_acl(self: Self): 

355 from rfpy.model.notes import ProjectNote, Distribution 

356 from rfpy.model.project import Project 

357 

358 event = self.event 

359 note: ProjectNote = ( 

360 self.session.query(ProjectNote).filter_by(id=self.event.object_id).one() 

361 ) 

362 project: Project = note.project 

363 

364 if note.distribution != Distribution.RESPONDENT_INTERNAL_MEMO: 

365 for participant in project.participants: 

366 if participant.org_id != note.org_id: 

367 self.event.add_to_acl(participant.org_id) 

368 

369 if note.distribution == Distribution.BROADCAST_NOTICE: 

370 for issue in project.published_issues: 

371 if issue.respondent_id is not None: 

372 event.add_to_acl(issue.respondent_id) 

373 elif ( 

374 note.distribution == Distribution.TARGETED 

375 and note.target_org_id is not None 

376 ): 

377 event.add_to_acl(note.target_org_id) 

378 

379 @overrides 

380 def recipients(self) -> Iterable[User]: 

381 acl_org_ids = {a.org_id for a in self.event.acl} 

382 for user in set(self.event.project.iter_all_watchers()): 

383 if user.org_id in acl_org_ids and user.id != self.event.user_id: 

384 yield user 

385 

386 @overrides 

387 def augment_model(self, model: EvtSchema): 

388 from rfpy.model.notes import ProjectNote 

389 

390 note: ProjectNote = self.session.get(ProjectNote, self.event.object_id) 

391 model.note = ProjNote.model_validate(note) 

392 

393 

394@handles(e.SCORE_COMMENT_ADDED) 

395class ScoreCommentHandler(EventProcessor): 

396 webhooks = True 

397 

398 @overrides 

399 def recipients(self) -> Iterable[User]: 

400 from rfpy.model.issue import ScoreComment, Score 

401 

402 event = self.event 

403 score_comment: ScoreComment = ( 

404 self.session.query(ScoreComment).filter_by(id=event.object_id).one() 

405 ) 

406 score: Score = score_comment.score 

407 users_to_notify = {sc.user for sc in score.comments} 

408 users_to_notify = users_to_notify.union(set(event.project.participant_watchers)) 

409 try: 

410 # Try to add the original scorer to the recipients list 

411 score_created_event = ( 

412 self.session.query(AuditEvent) 

413 .filter(AuditEvent.event_type == e.SCORE_CREATED) 

414 .filter(AuditEvent.object_id == score.id) 

415 .one() 

416 ) 

417 users_to_notify.add(score_created_event.user) 

418 except NoResultFound: 

419 # Ignore - score could be created by autoscore or imported answer 

420 pass 

421 # AuditEvent.object_id is a bit vague. To prevent info leaking to the wrong people 

422 # do a sanity check for all recipients 

423 parti_orgs = {p.org_id for p in score.issue.project.participants} 

424 for user in users_to_notify: 

425 if user.org_id in parti_orgs and user is not event.user: 

426 yield user 

427 

428 @overrides 

429 def augment_model(self, model: EvtSchema): 

430 from rfpy.model.issue import ScoreComment 

431 

432 sc: ScoreComment = self.session.get(ScoreComment, self.event.object_id) 

433 score = sc.score 

434 if score.score is None: 

435 sv = None 

436 else: 

437 sv = score.score 

438 

439 model.score = ScoreSchema( 

440 score_value=sv, 

441 comment=sc.comment_text, 

442 question_number=score.question.number.dotted, 

443 respondent_name=score.issue.respondent.name, 

444 issue_id=score.issue_id, 

445 ) 

446 

447 

448if __name__ == "__main__": # pragma: no cover 

449 for k, v in registry.items(): 

450 print(f"{k} : {v.__name__}") 

451 print("Can be webhooked:") 

452 for ev in webhook_evt_types(): 

453 print(ev)