Coverage for rfpy/api/update.py: 100%

298 statements  

« prev     ^ index     » next       coverage.py v7.0.1, created at 2022-12-31 16:00 +0000

1''' 

2Functions that perform SQL Update queries 

3''' 

4 

5from typing import List, Tuple 

6from rfpy.model.graph import Edge, RelationshipType 

7from rfpy.model.tags import Tag 

8from sqlalchemy.sql.elements import literal 

9from sqlalchemy.sql.expression import insert, select 

10from rfpy.model.questionnaire import WeightingSet 

11from copy import deepcopy 

12from datetime import datetime 

13 

14from sqlalchemy import func 

15from sqlalchemy.orm import object_session, lazyload, Session, make_transient 

16from sqlalchemy.orm.exc import NoResultFound 

17 

18from rfpy.model import (Participant, ProjectPermission, SectionPermission, 

19 User, Organisation, Issue, IssueAttachment, Project, 

20 ProjectNote, ProjectAttachment, CustomRole, Category, QElement, 

21 AuditEvent, EventOrgACL, EmailNotification, ScoreComment, 

22 Weighting, QuestionInstance, Section, Score, Answer, 

23 QuestionDefinition, ResponseStatus, QuestionResponseState, ImportType) 

24from rfpy.model.audit import evt_types, Status as EventStatus 

25from rfpy.model.notify import WebhookSubscription 

26 

27from rfpy.model.exc import CosmeticQuestionEditViolation 

28from rfpy.auth import AuthorizationFailure 

29 

30 

31def grant_project_permission(session, project, user): 

32 ''' 

33 Grants a (restricted) User permissions for the given 

34 project. Returns the newly created ProjectPermission instance 

35 ''' 

36 if not user.is_restricted: 

37 raise ValueError('Assigning ProjectPermission to a ' + 

38 'non-restricted user has no effect') 

39 try: 

40 participant = session.query(Participant)\ 

41 .filter_by(project_id=project.id, organisation=user.organisation)\ 

42 .one() 

43 pp = ProjectPermission() 

44 pp.participant = participant 

45 pp.user = user 

46 session.add(pp) 

47 except NoResultFound: 

48 m = "User {user} is not a Participant in project {project}" 

49 raise AuthorizationFailure(message=m) 

50 return pp 

51 

52 

53def grant_section_permissions(session, project, user, section_id_list): 

54 ''' 

55 Grants the given user access to sections in the given project. 

56 A ProjectPermission for this user/project is created if it doesn't exist 

57 ''' 

58 try: 

59 project_permission = project.permissions.filter_by(user_id=user.id).one() 

60 except NoResultFound: 

61 project_permission = grant_project_permission(session, project, user) 

62 

63 for sec_id in section_id_list: 

64 

65 sp = SectionPermission(section_id=sec_id, 

66 user=user, 

67 project_permission=project_permission) 

68 session.add(sp) 

69 

70 

71def merge_organisations(session, redundant_org_id, correct_org_id, delete_org=False): 

72 

73 session.query(Organisation).filter(Organisation.id == redundant_org_id).one() 

74 session.query(Organisation).filter(Organisation.id == correct_org_id).one() 

75 

76 session.commit() 

77 

78 pairs = ( 

79 (Issue, Issue.respondent_id), 

80 (IssueAttachment, IssueAttachment.org_id), 

81 (User, User.org_id), 

82 (Project, Project.org_id), 

83 (Participant, Participant.org_id), 

84 (ProjectNote, ProjectNote.org_id), 

85 (ProjectNote, ProjectNote.target_org_id), 

86 (ProjectAttachment, ProjectAttachment.org_id), 

87 (CustomRole, CustomRole.org_id), 

88 (Category, Category.org_id), 

89 (EventOrgACL, EventOrgACL.org_id), 

90 (EmailNotification, EmailNotification.org_id), 

91 (WebhookSubscription, WebhookSubscription.org_id), 

92 (Tag, Tag.org_id), 

93 (Edge, Edge.to_org_id), 

94 (Edge, Edge.from_org_id), 

95 (RelationshipType, RelationshipType.org_id), 

96 (AuditEvent, AuditEvent.org_id) 

97 ) 

98 

99 for klass, org_column in pairs: 

100 session.query(klass)\ 

101 .filter(org_column == redundant_org_id)\ 

102 .update({org_column: correct_org_id}, synchronize_session=False) 

103 

104 if delete_org: 

105 session.query(Organisation).filter(Organisation.id == redundant_org_id).delete() 

106 

107 

108def save_weightset_weightings(session, weightset, weights_doc): 

109 

110 q_lookup = {} 

111 sec_lookup = {} 

112 

113 for weighting in weightset.weightings: 

114 if weighting.section_id: 

115 sec_lookup[weighting.section_id] = weighting 

116 else: 

117 q_lookup[weighting.question_instance_id] = weighting 

118 

119 save = weightset.weightings.append # reference to bound method 

120 

121 q = session.query(QuestionInstance.id)\ 

122 .filter(QuestionInstance.project_id == weightset.project_id) 

123 qid_set = {qi.id for qi in q} 

124 project_id = weightset.project_id 

125 

126 for r in weights_doc['questions']: 

127 

128 question_id = r['question_id'] 

129 qweight = r['weight'] 

130 

131 if question_id in q_lookup: 

132 q_lookup[question_id].value = qweight 

133 elif question_id in qid_set: 

134 save(Weighting(question_instance_id=question_id, value=qweight)) 

135 else: 

136 m = f'Question ID {question_id} does not belong to project {project_id}' 

137 raise ValueError(m) 

138 

139 sq = session.query(Section).filter_by(project_id=project_id) 

140 sec_id_set = {s.id for s in sq} 

141 

142 for r in weights_doc['sections']: 

143 

144 section_id = r['section_id'] 

145 sec_weight = r['weight'] 

146 

147 if section_id in sec_lookup: 

148 sec_lookup[section_id].value = sec_weight 

149 elif section_id in sec_id_set: 

150 save(Weighting(section_id=section_id, value=sec_weight)) 

151 else: 

152 raise ValueError(f"Section '{section_id}' does not belong to project '{project_id}'") 

153 

154 

155def save_default_weightings(session, project, weights_doc): 

156 if weights_doc['questions']: 

157 q_ids = {r['question_id'] for r in weights_doc['questions']} 

158 qq = project.questions\ 

159 .options(lazyload('question_def.elements'))\ 

160 .filter(QuestionInstance.id.in_(q_ids)) 

161 

162 q_lookup = {qi.id: qi for qi in qq} 

163 for r in weights_doc['questions']: 

164 qid = r['question_id'] 

165 if qid in q_lookup: 

166 q_lookup[qid].weight = r['weight'] 

167 else: 

168 raise ValueError(f'Question ID {qid} does not belong to Project # {project.id}') 

169 

170 if weights_doc['sections']: 

171 sec_ids = {r['section_id'] for r in weights_doc['sections']} 

172 sq = project.sections\ 

173 .options(lazyload('questions'))\ 

174 .filter(Section.id.in_(sec_ids)) 

175 

176 sec_lookup = {sec.id: sec for sec in sq} 

177 for r in weights_doc['sections']: 

178 sec_id = r['section_id'] 

179 if sec_id in sec_lookup: 

180 sec_lookup[sec_id].weight = r['weight'] 

181 else: 

182 raise ValueError(f'Section ID {sec_id} does not belong to Project # {project.id}') 

183 

184 

185def set_initial_weightings(weighting_set: WeightingSet, initial_value): 

186 ''' 

187 Create Weighting records for each QuestionInstance and Section for weighting_set.project_id 

188 and weighting_set.weighting_set_id 

189 ''' 

190 session = object_session(weighting_set) 

191 

192 qs = (select([literal(weighting_set.id), QuestionInstance.id, literal(initial_value)]) 

193 .where(QuestionInstance.project_id == weighting_set.project_id)) 

194 qi = insert(Weighting).from_select(['weighting_set_id', 'question_instance_id', 'value'], qs) 

195 

196 session.execute(qi) 

197 

198 ss = (select([literal(weighting_set.id), Section.id, literal(initial_value)]) 

199 .where(Section.project_id == weighting_set.project_id)) 

200 si = insert(Weighting).from_select(['weighting_set_id', 'section_id', 'value'], ss) 

201 

202 session.execute(si) 

203 

204 

205def copy_weightings(source_weighting_set: WeightingSet, destination_weighting_set: WeightingSet): 

206 ''' 

207 Copy Weighting values from source weighting set to destination 

208 ''' 

209 session = object_session(destination_weighting_set) 

210 s = (select([ 

211 literal(destination_weighting_set.id), 

212 Weighting.section_id, 

213 Weighting.question_instance_id, 

214 Weighting.value 

215 ]).where(Weighting.weighting_set_id == source_weighting_set.id)) 

216 

217 i = (insert(Weighting) 

218 .from_select(['weighting_set_id', 'section_id', 'question_instance_id', 'value'], s)) 

219 

220 session.execute(i) 

221 

222 

223def reset_scores(session, project: Project, user: User): 

224 ''' 

225 Deletes all scores for the given project. 

226 Recreates Autoscores for multiple choice questions 

227 ''' 

228 issue_map = {} 

229 for issue in project.scoreable_issues: 

230 del_count = issue.scores.delete() 

231 issue_map[issue.id] = dict(respondent_id=issue.respondent_id, deleted=del_count) 

232 new_counts = {} 

233 

234 for ascore in project.generate_autoscores(session, user).values(): 

235 issue_id = ascore['issue_id'] 

236 score = Score(question_instance_id=ascore['question_id'], 

237 issue_id=issue_id, 

238 score=ascore['score']) 

239 if issue_id in new_counts: # pragma: no cover 

240 new_counts[issue_id] = new_counts[issue_id] + 1 

241 else: 

242 new_counts[issue_id] = 1 

243 session.add(score) 

244 

245 for issue_id, new_count in new_counts.items(): 

246 issue_map[issue_id]['added'] = new_count 

247 return issue_map 

248 

249 

250def save_answers(session, user: User, question: QuestionInstance, 

251 answer_lookup: dict, issue: Issue, 

252 imported: bool = False, set_done: bool = False): 

253 

254 res = question.validate_and_save_answers(answer_lookup, issue) 

255 if res.change_list is None or len(res.change_list) == 0: 

256 return False 

257 

258 response_state: QuestionResponseState = issue.response_state_for_q(question.id) 

259 response_state.date_updated = datetime.now() 

260 response_state.updated_by = user.id 

261 

262 if res.unanswered_mandatory: 

263 response_state.status = ResponseStatus.NOT_ANSWERED 

264 else: 

265 response_state.status = ResponseStatus.ANSWERED 

266 

267 evt_type = evt_types.ANSWER_CREATED if res.is_new else evt_types.ANSWER_UPDATED 

268 if imported: 

269 evt_type = evt_types.ANSWER_IMPORTED 

270 

271 evt = AuditEvent.create( 

272 evt_type, 

273 object_id=response_state.id, 

274 user=user, 

275 project_id=issue.project_id, 

276 issue=issue, 

277 question_id=question.id 

278 ) 

279 if set_done: 

280 evt.status = EventStatus.done 

281 if imported: 

282 evt.add_change('Import Source', issue.project.title, '') 

283 for old_value, new_value in res.change_list: 

284 evt.add_change("Answer", old_value, new_value) 

285 session.add(evt) 

286 

287 return True 

288 

289 

290def import_section( 

291 session: Session, src_sec: Section, des_sec: Section, type: ImportType) -> Tuple[ 

292 List[Section], 

293 List[QuestionInstance]]: 

294 ''' Import Section from another Project''' 

295 imp_secs = [] 

296 imp_qis = [] 

297 new_sec = Section( 

298 title=src_sec.title, 

299 description=src_sec.description, 

300 weight=src_sec.weight, 

301 project_id=des_sec.project_id, 

302 ) 

303 

304 des_sec.subsections.append(new_sec) 

305 imp_secs.append(new_sec) 

306 for qi in src_sec.questions: 

307 questions: List[QuestionInstance] = import_q_instance(qi, new_sec, type) 

308 imp_qis.append(questions) 

309 

310 for sec in src_sec.subsections: 

311 secs, ques = import_section(session, sec, new_sec, type) 

312 imp_secs += secs 

313 imp_qis += ques 

314 

315 return imp_secs, imp_qis 

316 

317 

318def import_q_instance( 

319 src_qi: QuestionInstance, des_sec: Section, type: ImportType) -> QuestionInstance: 

320 '''Import question instances from another Project''' 

321 src_qdef = src_qi.question_def 

322 des_qdef = src_qi.question_def 

323 des_sec_quesions = des_sec.questions 

324 

325 if type == ImportType.COPY: 

326 des_qdef = QuestionDefinition( 

327 title=src_qdef.title, 

328 weight=src_qdef.weight, 

329 refcount=1, 

330 parent_id=src_qdef.id) 

331 

332 for src_el in src_qdef.elements: 

333 des_element = QElement( 

334 row=src_el.row, 

335 col=src_el.col, 

336 colspan=src_el.colspan, 

337 rowspan=src_el.rowspan, 

338 el_type=src_el.el_type, 

339 label=src_el.label, 

340 mandatory=src_el.mandatory, 

341 width=src_el.width, 

342 height=src_el.height, 

343 multitopic=src_el.multitopic, 

344 regexp=src_el.regexp, 

345 choices=src_el.choices) 

346 des_qdef.elements.append(des_element) 

347 elif type == ImportType.SHARE: 

348 des_qdef.refcount += 1 

349 

350 des_qi: QuestionInstance = QuestionInstance( 

351 project_id=des_sec.project_id, 

352 section_id=des_sec.id, 

353 question_def=des_qdef, 

354 weight=src_qi.weight) 

355 des_sec_quesions.append(des_qi) 

356 return des_qi 

357 

358 

359def copy_q_definition(original_qdef: QuestionDefinition, session: Session): 

360 '''Make a copy of the QuestionDefinition and all associated question elements''' 

361 

362 new_qdef = QuestionDefinition() 

363 new_qdef.title = original_qdef.title 

364 new_qdef.weight = original_qdef.weight 

365 new_qdef.refcount = 1 

366 new_qdef.parent_id = original_qdef.id 

367 

368 for el in original_qdef.elements: 

369 session.expunge(el) # Allows us to make a copy 

370 make_transient(el) 

371 el.id = None # make_transient doesn't deleted the primary key 

372 new_qdef.elements.append(el) 

373 

374 return new_qdef 

375 

376 

377def delete_qinstances_update_def_refcounts( 

378 session: Session, 

379 project_id, 

380 section_id: int = None): 

381 ''' 

382 Delete question instances, orphan question definitions and update refcount 

383 on remaining question definitions for the given project, optionally filtering 

384 by section_id 

385 ''' 

386 def qf(q): 

387 q = q.filter(QuestionInstance.project_id == project_id) 

388 if section_id is not None: 

389 q = q.filter(QuestionInstance.section_id == section_id) 

390 return q 

391 

392 qiq = qf(session.query(QuestionInstance.question_def_id.label('id'))) 

393 

394 qi_set = {r.id for r in qiq} 

395 # Can't delete question defs if question instances remain, so delete 

396 # instances first, having saved the relevant IDs 

397 qf(session.query(QuestionInstance)).delete(synchronize_session=False) 

398 

399 # Delete question defs in this project with a refcount of one - not shared 

400 session.query(QuestionDefinition)\ 

401 .filter(QuestionDefinition.id.in_(qi_set))\ 

402 .filter(QuestionDefinition.refcount == 1)\ 

403 .delete(synchronize_session=False) 

404 

405 # Decrement refcount for all remaining question definitions 

406 session.query(QuestionDefinition)\ 

407 .filter(QuestionDefinition.id.in_(qi_set))\ 

408 .filter(QuestionDefinition.refcount > 1)\ 

409 .update({'refcount': QuestionDefinition.refcount-1}, synchronize_session=False) 

410 

411 

412def log_score_event(session: Session, score, initial_score_value, is_new, 

413 project, user, autoscore=False): 

414 

415 event_class = 'SCORE_CREATED' if is_new else 'SCORE_UPDATED' 

416 

417 evt = AuditEvent.create(event_class, 

418 project=project, 

419 issue_id=score.issue_id, 

420 user_id=user.id, 

421 org_id=user.organisation.id, 

422 object_id=score.id, 

423 private=True, 

424 question_id=score.question_id) 

425 evt.add_change('Score', initial_score_value, score.score) 

426 

427 session.add(evt) 

428 

429 if autoscore: 

430 msg = "Autoscore Calculated for Multiple Choice Question" 

431 comment = ScoreComment(score=score, user=user, comment_time=datetime.utcnow(), 

432 comment_text=msg) 

433 session.add(comment) 

434 session.flush() 

435 

436 kw = dict(project=project, issue_id=score.issue_id, 

437 user_id=user.id, org_id=user.organisation.id, 

438 object_id=comment.id, private=True, question_id=score.question_id) 

439 cmnt_evt = AuditEvent.create('SCORE_COMMENT_ADDED', **kw) 

440 cmnt_evt.add_change('Comment', '', msg) 

441 session.add(cmnt_evt) 

442 

443 

444def label_text(project: Project, search_term: str, replace_term: str = '', dry_run=True): 

445 

446 q = (project.qelements 

447 .filter(QElement.el_type.in_(('LB', 'CB'))) 

448 .filter(QElement.label.collate('utf8mb4_bin').like(f'%{search_term}%')) 

449 .add_columns(QuestionInstance.number)) 

450 

451 for label, qnum in q: 

452 old_label = label.label 

453 new_label = label.label.replace(search_term, replace_term) 

454 if not dry_run: 

455 label.label = new_label 

456 yield dict(change_type='label', question_number=qnum.dotted, new=new_label, old=old_label) 

457 

458 

459def question_titles(project: Project, search_term: str, replace_term: str = '', dry_run=True): 

460 session = object_session(project) 

461 

462 q = (session.query(QuestionInstance) 

463 .join(QuestionDefinition) 

464 .options(lazyload('question_def.elements')) 

465 .filter(QuestionInstance.project_id == project.id) 

466 .filter(QuestionDefinition.title.collate('utf8mb4_bin').like(f'%{search_term}%'))) 

467 

468 for qi in q: 

469 qdef = qi.question_def 

470 old_title = qdef.title 

471 new_title = qdef.title.replace(search_term, replace_term) 

472 if not dry_run: 

473 qdef.title = new_title 

474 yield dict(change_type='title', question_number=qi.number.dotted, 

475 new=new_title, old=old_title) 

476 

477 

478def choices_text(project: Project, search_term: str, replace_term: str = '', dry_run=True): 

479 q = (project.qelements 

480 .filter(QElement.el_type.in_(('CR', 'CC'))) 

481 .filter(func.json_search(QElement.choices, 'one', search_term) != None) 

482 .add_columns(QuestionInstance.number)) 

483 

484 for el, qnum in q: 

485 new_choices = deepcopy(el.choices) 

486 old_labels = [c['label'] for c in el.choices] 

487 for choice in new_choices: 

488 choice['label'] = choice['label'].replace(search_term, replace_term) 

489 new_labels = [c['label'] for c in new_choices] 

490 

491 if not dry_run: 

492 el.choices = new_choices 

493 

494 yield dict(change_type='choice', 

495 question_number=qnum.dotted, old=old_labels, new=new_labels) 

496 

497 

498def pretty_choices(choices: List[dict]) -> str: 

499 if not isinstance(choices, list): 

500 return str(choices) 

501 txt = "" 

502 for c in choices: 

503 auto = (f" <{c['autoscore']}>") if c.get('autoscore', False) else "" 

504 txt += f" - {c['label']}{auto}\n" 

505 return txt 

506 

507 

508def update_create_qdef(qdef: QuestionDefinition, 

509 evt: AuditEvent, 

510 el_map: dict, 

511 el_dict: dict): 

512 ''' 

513 Update question elements where ID values are provided; add new elements if no id provided 

514 

515 Updated ids are removed from el_map - thus any removing elements are to be deleted 

516 ''' 

517 mutable_attrs = { 

518 "colspan", "rowspan", "label", "mandatory", 

519 "regexp", "height", "width", "row", "col", "choices" 

520 } 

521 if el_dict.get('id', None) is not None: 

522 # Update existing element 

523 old_el = el_map.pop(el_dict['id']) 

524 for attr in mutable_attrs & el_dict.keys(): 

525 new_val = el_dict[attr] 

526 old_val = getattr(old_el, attr) 

527 if new_val != old_val: 

528 setattr(old_el, attr, new_val) 

529 evt_name = f"{old_el.__class__.__name__} #{old_el.id}, {attr.title()}" 

530 if attr == 'choices' and el_dict['el_type'] in ('CR', 'CC'): 

531 old_val = pretty_choices(old_val) 

532 new_val = pretty_choices(new_val) 

533 evt.add_change(evt_name, old_val, new_val) 

534 else: 

535 # A new element 

536 el_type = el_dict.pop('el_type') 

537 new_el = qdef.add_element(el_type, **el_dict) 

538 evt_name = f"{new_el.__class__.__name__} Added" 

539 evt.add_change(evt_name, None, new_el.summary) 

540 

541 

542def check_for_saved_answers(session, qdef, el_map): 

543 ''' 

544 If there are elements to delete with associated answers raise 

545 @raises CosmeticQuestionEditViolation 

546 ''' 

547 if (not qdef.is_shared) or (len(el_map) == 0): 

548 return 

549 answer_count = session.query(Answer).filter(Answer.element_id.in_(el_map.keys())).count() 

550 if answer_count > 0: 

551 del_ids = ", ".join(str(el_id) for el_id in el_map.keys()) 

552 m = f"Cannot delete question elements that have associated answers. Element IDs: {del_ids}" 

553 raise CosmeticQuestionEditViolation(m) 

554 

555 

556def change_org_id(session, old_org_id, new_org_id): 

557 

558 session.execute( 

559 "UPDATE audit_events SET org_id = :new_org_id WHERE org_id = :old_org_id", 

560 {'new_org_id': new_org_id, 'old_org_id': old_org_id} 

561 ) 

562 session.execute( 

563 "UPDATE organisations SET id=:new_org_id WHERE id=:old_org_id", 

564 {'new_org_id': new_org_id, 'old_org_id': old_org_id} 

565 ) 

566 

567 

568def delete_project_section(session: Session, user: User, project: Project, section: Section): 

569 ''' 

570 Delete the given Section and all questions and subsections contained within that 

571 section. 

572 ''' 

573 

574 for descendant_section in section.descendants: 

575 delete_qinstances_update_def_refcounts(session, project.id, 

576 section_id=descendant_section.id) 

577 session.delete(descendant_section) 

578 

579 delete_qinstances_update_def_refcounts(session, project.id, section_id=section.id) 

580 session.delete(section) 

581 

582 evt = AuditEvent.create(evt_types.SECTION_DELETED, project=project, user_id=user.id, 

583 org_id=user.organisation.id, object_id=section.id, private=True) 

584 session.add(evt) 

585 

586 

587def delete_project_section_question( 

588 session: Session, user: User, project: Project, section: Section, qi: QuestionInstance): 

589 ''' 

590 Delete the Question with the given ID 

591 

592 The return value is an array of remaining instances of the same question that may exist 

593 in other projects 

594 

595 ''' 

596 

597 evt = AuditEvent.create('QUESTION_DELETED', 

598 project=project, 

599 user_id=user.id, 

600 org_id=user.organisation.id, 

601 object_id=qi.id, 

602 private=True, 

603 question_id=qi.id) 

604 evt.add_change('Title', qi.title, None) 

605 evt.add_change('Number', qi.number.dotted, None) 

606 session.add(evt) 

607 

608 instances_remaining = [] 

609 with session.no_autoflush: 

610 qdef = qi.question_def 

611 session.delete(qi) 

612 

613 qdef.refcount -= 1 

614 if qdef.refcount == 0: 

615 session.delete(qdef) 

616 else: 

617 instances_remaining = [dict(project_id=project.id, number=qi.number.dotted, id=qi.id) 

618 for qi in qdef.instances] 

619 section.renumber(section.number.dotted) 

620 return instances_remaining