Coverage for rfpy/jobs/events/action.py: 97%

260 statements  

« prev     ^ index     » next       coverage.py v7.0.1, created at 2022-12-31 16:00 +0000

1from typing import Iterable, List 

2from contextlib import contextmanager 

3import logging 

4 

5import orjson 

6from sqlalchemy.orm import Session 

7from sqlalchemy.orm.exc import NoResultFound 

8from overrides import overrides, final, EnforceOverrides 

9 

10 

11from rfpy.mail.schemas import EvtSchema, build_template_model, ProjNote, ScoreSchema 

12from rfpy.jobs.events.webhooks import ping_webhook 

13from rfpy.model.notify import EmailNotification 

14from rfpy.model.audit.event import AuditEvent, Status as EvtStatus 

15from rfpy.model.audit import evt_types as e 

16from rfpy.model.issue import Issue 

17from rfpy.utils import json_default 

18from rfpy.model.humans import User 

19from rfpy.api import fetch 

20from rfpy import conf 

21from . import fanout 

22 

23 

24log = logging.getLogger(__name__) 

25 

26registry = {} 

27 

28 

29def debug_model(model, event_id): 

30 if log.isEnabledFor(logging.DEBUG): 

31 mod_str = orjson.dumps( 

32 model, 

33 default=json_default, 

34 option=orjson.OPT_INDENT_2 

35 ) 

36 

37 log.debug( 

38 "Data model for user %s, event %s: \n %s", 

39 model.get('To', 'no email address given'), event_id, mod_str.decode('utf8') 

40 ) 

41 

42 

43def webhook_evt_types() -> List[str]: 

44 '''Get a list of event types which accept webhook subscriptions''' 

45 return [k for k, v in registry.items() if v.webhooks] 

46 

47 

48class handles: 

49 ''' 

50 Decorator for associating an EventProcessor subclass with an event type. 

51 ''' 

52 def __init__(self, *evt_types: str): 

53 self.evt_types = evt_types 

54 

55 def __call__(self, cls): 

56 for evt_type in self.evt_types: 

57 if evt_type in registry: 

58 raise ValueError(f"{evt_type} is already assigned") 

59 registry[evt_type] = cls 

60 return cls 

61 

62 

63def handle_event(event: AuditEvent, session, mailer=None): 

64 ''' 

65 Process email notifications, webhooks and any additional tasks for the given event. 

66 

67 Delegates to a subclass of EventProcessor 

68 ''' 

69 if mailer is None: 

70 from rfpy.mail import stub 

71 mailer = stub 

72 if event.event_type not in registry: 

73 raise ValueError(f"No Handler found for AuditEvent {event.event_type}") 

74 handler_cls = registry[event.event_type] 

75 handler = handler_cls(event, session, mailer) 

76 return handler.process() 

77 

78 

79def handler_exists_for(event: AuditEvent) -> bool: 

80 return event.event_type in registry 

81 

82 

83def run_event_handlers(session: Session): 

84 '''Call handle_event for all events at status pending''' 

85 for evt in session.query(AuditEvent).filter(AuditEvent.status == EvtStatus.pending): 

86 handle_event(evt, session) 

87 

88 

89def logging_context(event: AuditEvent): 

90 

91 @contextmanager 

92 def _cm(action): 

93 try: 

94 yield 

95 log.info("%s completed for event %s # %s", action, event.event_type, event.id) 

96 except Exception: 

97 event.status = EvtStatus.error 

98 log.exception("%s failure for event %s # %s", action, event.event_type, event.id) 

99 

100 return _cm 

101 

102 

103class EventProcessor(EnforceOverrides): 

104 exclude_initiator = True 

105 issue_watchers = False 

106 project_watchers = False 

107 webhooks = False 

108 fanout = False 

109 

110 def __init__(self, event: AuditEvent, session, mailer): 

111 self.session = session 

112 self.mailer = mailer 

113 self.event = event 

114 self.evt_model = None 

115 self.result = None 

116 

117 @final 

118 def process(self): 

119 self.generate_evt_model() 

120 self.moderate_event_acl() 

121 

122 errors_logged = logging_context(self.event) 

123 

124 with errors_logged('Email notifications'): 

125 self.send_emails() 

126 

127 with errors_logged('Webhooks'): 

128 self.ping_webhooks() 

129 

130 with errors_logged('Fanout Update Stream'): 

131 self.publish_to_fanout() 

132 

133 with errors_logged('Extra actions'): 

134 self.extra() 

135 

136 if self.event.status != EvtStatus.error: 

137 self.event.status = EvtStatus.done 

138 log.info("Event processing completed for %s", self.event) 

139 else: 

140 log.warning("Event processing failed for %s", self.event) 

141 

142 return self.result 

143 

144 def generate_evt_model(self): 

145 # Extracted from __init__ method to permit easier testing 

146 self.evt_model = EvtSchema.from_orm(self.event) 

147 

148 def moderate_event_acl(self): 

149 '''Allow subclass handlers to customise the EventOrgACL''' 

150 pass 

151 

152 def _iter_watchers(self) -> Iterable[User]: 

153 self.session.add(self.event) 

154 if self.issue_watchers: 

155 yield from self.event.issue.watchers 

156 if self.project_watchers: 

157 yield from self.event.project.participant_watchers 

158 

159 def recipients(self) -> Iterable[User]: 

160 ''' 

161 An iterable of users watching the related Project or Issue and which belong 

162 to an organisation assigned the Event's ACL. If self.initiator is False then 

163 the event author is excluded 

164 ''' 

165 acl_org_ids = {acl.org_id for acl in self.event.acl} 

166 

167 for w in self._iter_watchers(): 

168 if w.org_id in acl_org_ids: 

169 if self.exclude_initiator and w is self.event.user: 

170 continue 

171 yield w 

172 

173 def assign_notifications(self) -> List[EmailNotification]: 

174 en_list = [] 

175 for user in self.recipients(): 

176 en = EmailNotification( 

177 user_id=user.id, 

178 org_id=user.org_id, 

179 organisation=user.organisation, 

180 email=user.email, 

181 event_id=self.event.id 

182 ) 

183 self.session.add(en) 

184 en_list.append(en) 

185 self.session.commit() 

186 return en_list 

187 

188 @final 

189 def get_model(self, user: User, email: str) -> dict: 

190 pydantic_model = build_template_model(self.evt_model, user, email) 

191 self.augment_model(pydantic_model.TemplateModel.event) 

192 model = pydantic_model.dict() 

193 debug_model(model, self.event.id) 

194 return model 

195 

196 def augment_model(self, model: EvtSchema): 

197 ''' 

198 Subclasses can override this to add data to the 'TemplateModel' of the mail 

199 data package 

200 ''' 

201 pass 

202 

203 def send_emails(self): 

204 for n in self.assign_notifications(): 

205 try: 

206 email_model = self.get_model(n.user, n.email) 

207 n.message_id = self.mailer.send_email(email_model) 

208 log.info( 

209 'Email notification sent to %s for event ID %s - %s', 

210 n.email, self.event.id, self.event.event_type 

211 ) 

212 n.set_sent() 

213 except Exception as exc: 

214 n.set_failed() 

215 n.message_id = str(exc)[:255] 

216 log.exception('Failed to send message to %s for event #%s', n.email, self.event.id) 

217 self.session.commit() 

218 

219 def ping_webhooks(self): 

220 if self.webhooks is False: 

221 return 

222 ev_model_bytes = orjson.dumps(self.evt_model.dict(), default=json_default) 

223 for wh in fetch.webhooks_for_event(self.event): 

224 ping_webhook(wh, ev_model_bytes) 

225 self.session.commit() 

226 

227 @final 

228 def publish_to_fanout(self): 

229 if self.fanout and conf.CONF.fanout_realm_id is not None: 

230 data = self.fanout_data() 

231 for acl in self.event.acl: 

232 fanout.send_update(self.event.id, self.event.event_type, data, channel=acl.org_id) 

233 

234 def fanout_data(self): 

235 ''' 

236 Subclasses can customise data to to Fanout.io update stream. For security, only 

237 object IDs should be sent (requiring the client to re-fetch if necessary) 

238 ''' 

239 return dict(event_id=self.event.id) 

240 

241 def extra(self): 

242 '''Custom actions that may be defined by subclasses''' 

243 pass 

244 

245 

246@handles(e.ISSUE_ACCEPTED, e.ISSUE_DECLINED, e.ISSUE_REVERTED_TO_ACCEPTED, e.ISSUE_SUBMITTED) 

247class VendorStatusAction(EventProcessor): 

248 issue_watchers = True 

249 project_watchers = True 

250 webhooks = True 

251 

252 @overrides 

253 def extra(self): 

254 if self.event.event_type == e.ISSUE_ACCEPTED: 

255 self.assign_accepting_admin_to_watchlist() 

256 

257 def assign_accepting_admin_to_watchlist(self): 

258 issue = self.session.query(Issue).filter_by(id=self.event.issue_id).one() 

259 issue.add_watcher(self.event.user) 

260 

261 

262@handles(e.ISSUE_RELEASED, e.ISSUE_RELEASED_UPDATEABLE) 

263class IssueReleased(EventProcessor): 

264 webhooks = True 

265 

266 @overrides 

267 def assign_notifications(self) -> List[EmailNotification]: 

268 issue: Issue = self.event.issue 

269 rs: set 

270 en_list = [] 

271 if issue.respondent is not None: 

272 for user in (u for u in issue.respondent.users if u.is_administrator()): 

273 en = EmailNotification( 

274 user_id=user.id, 

275 org_id=user.org_id, 

276 organisation=user.organisation, 

277 email=user.email, 

278 event_id=self.event.id 

279 ) 

280 self.session.add(en) 

281 self.session.commit() 

282 en_list.append(en) 

283 

284 elif issue.respondent_email is not None: 

285 en = EmailNotification( 

286 email=issue.respondent_email, 

287 event_id=self.event.id 

288 ) 

289 self.session.add(en) 

290 self.session.commit() 

291 en_list.append(en) 

292 

293 if not en_list: 

294 msg = (f"Event #{self.event.id} - cannot send email notifications for Issue Released: " 

295 "respondent_email not set and no respondent user Administrators found") 

296 log.error(msg) 

297 return en_list 

298 

299 

300@handles(e.SECTION_ACCESS_UPDATED) 

301class SectionAccess(EventProcessor): 

302 

303 @overrides 

304 def recipients(self) -> Iterable[User]: 

305 user_id_set = set() 

306 for pc in self.event.changes: 

307 if pc['name'] == 'Granted To': 

308 user_id = pc.get('new', '').strip() 

309 if user_id in user_id_set: 

310 continue 

311 user_id_set.add(user_id) 

312 grant_user = self.session.query(User).get(user_id) 

313 if grant_user is not None: 

314 yield grant_user 

315 

316 

317@handles(e.PROJECT_NOTE_ADDED) 

318class ProjectNoteHandler(EventProcessor): 

319 webhooks = True 

320 fanout = True 

321 

322 @overrides 

323 def moderate_event_acl(self) -> List[EmailNotification]: 

324 from rfpy.model.notes import ProjectNote, Distribution 

325 from rfpy.model.project import Project 

326 

327 event = self.event 

328 note: ProjectNote = ( 

329 self.session.query(ProjectNote).filter_by(id=self.event.object_id).one() 

330 ) 

331 project: Project = note.project 

332 

333 if note.distribution != Distribution.RESPONDENT_INTERNAL_MEMO: 

334 for participant in project.participants: 

335 if participant.org_id != note.org_id: 

336 self.event.add_to_acl(participant.org_id) 

337 

338 if note.distribution == Distribution.BROADCAST_NOTICE: 

339 for issue in project.published_issues: 

340 event.add_to_acl(issue.respondent_id) 

341 elif note.distribution == Distribution.TARGETED: 

342 event.add_to_acl(note.target_org_id) 

343 

344 @overrides 

345 def recipients(self) -> Iterable[User]: 

346 acl_org_ids = {a.org_id for a in self.event.acl} 

347 for user in set(self.event.project.iter_all_watchers()): 

348 if user.org_id in acl_org_ids and user.id != self.event.user_id: 

349 yield user 

350 

351 @overrides 

352 def augment_model(self, model: EvtSchema): 

353 from rfpy.model.notes import ProjectNote 

354 note: ProjectNote = self.session.query(ProjectNote).get(self.event.object_id) 

355 model.note = ProjNote.from_orm(note) 

356 

357 

358@handles(e.SCORE_COMMENT_ADDED) 

359class ScoreCommentHandler(EventProcessor): 

360 webhooks = True 

361 

362 @overrides 

363 def recipients(self) -> Iterable[User]: 

364 from rfpy.model.issue import ScoreComment, Score 

365 

366 event = self.event 

367 score_comment: ScoreComment = ( 

368 self.session.query(ScoreComment).filter_by(id=event.object_id).one() 

369 ) 

370 score: Score = score_comment.score 

371 users_to_notify = {sc.user for sc in score.comments} 

372 users_to_notify = users_to_notify.union(set(event.project.participant_watchers)) 

373 try: 

374 # Try to add the original scorer to the recipients list 

375 score_created_event = ( 

376 self.session.query(AuditEvent) 

377 .filter(AuditEvent.event_type == e.SCORE_CREATED) 

378 .filter(AuditEvent.object_id == score.id) 

379 .one() 

380 ) 

381 users_to_notify.add(score_created_event.user) 

382 except NoResultFound: 

383 # Ignore - score could be created by autoscore or imported answer 

384 pass 

385 # AuditEvent.object_id is a bit vague. To prevent info leaking to the wrong people 

386 # do a sanity check for all recipients 

387 parti_orgs = {p.org_id for p in score.issue.project.participants} 

388 for user in users_to_notify: 

389 if user.org_id in parti_orgs and user is not event.user: 

390 yield user 

391 

392 @overrides 

393 def augment_model(self, model: EvtSchema): 

394 from rfpy.model.issue import ScoreComment 

395 sc: ScoreComment = self.session.query(ScoreComment).get(self.event.object_id) 

396 score = sc.score 

397 if score.score is None: 

398 sv = None 

399 else: 

400 sv = float(score.score) 

401 

402 model.score = ScoreSchema( 

403 score_value=sv, 

404 comment=sc.comment_text, 

405 question_number=score.question.number.dotted, 

406 respondent_name=score.issue.respondent.name, 

407 issue_id=score.issue_id 

408 ) 

409 

410 

411if __name__ == '__main__': 

412 for k, v in registry.items(): 

413 print(f"{k} : {v.__name__}") 

414 print('Can be webhooked:') 

415 for ev in webhook_evt_types(): 

416 print(ev)