Coverage for rfpy/api/update.py: 100%
300 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-24 10:52 +0000
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-24 10:52 +0000
1"""
2Functions that perform SQL Update queries
3"""
5from typing import List, Tuple, Optional
6from rfpy.model.graph import Edge, RelationshipType
7from rfpy.model.tags import Tag
8from sqlalchemy.sql.elements import literal
9from sqlalchemy.sql.expression import insert, select
10from rfpy.model.questionnaire import WeightingSet
11from copy import deepcopy
12from datetime import datetime
14from sqlalchemy import func, text
15from sqlalchemy.orm import (
16 object_session,
17 lazyload,
18 Session,
19 joinedload,
20 noload,
21)
22from sqlalchemy.orm.exc import NoResultFound
24from rfpy.model import (
25 Participant,
26 ProjectPermission,
27 SectionPermission,
28 User,
29 Organisation,
30 Issue,
31 IssueAttachment,
32 Project,
33 ProjectNote,
34 ProjectAttachment,
35 CustomRole,
36 Category,
37 QElement,
38 AuditEvent,
39 EventOrgACL,
40 EmailNotification,
41 ScoreComment,
42 Weighting,
43 QuestionInstance,
44 Section,
45 Score,
46 Answer,
47 QuestionDefinition,
48 ResponseStatus,
49 QuestionResponseState,
50 ImportType,
51 LiveProject,
52)
53from rfpy.model.audit import evt_types, Status as EventStatus
54from rfpy.model.notify import WebhookSubscription
56from rfpy.model.exc import CosmeticQuestionEditViolation
57from rfpy.auth import AuthorizationFailure
58from rfpy.web import serial
61def grant_project_permission(session, project, user):
62 """
63 Grants a (restricted) User permissions for the given
64 project. Returns the newly created ProjectPermission instance
65 """
66 if not user.is_restricted:
67 raise ValueError(
68 "Assigning ProjectPermission to a " + "non-restricted user has no effect"
69 )
70 try:
71 participant = (
72 session.query(Participant)
73 .filter_by(project_id=project.id, organisation=user.organisation)
74 .one()
75 )
76 pp = ProjectPermission()
77 pp.participant = participant
78 pp.user = user
79 session.add(pp)
80 except NoResultFound:
81 m = "User {user} is not a Participant in project {project}"
82 raise AuthorizationFailure(message=m)
83 return pp
86def grant_section_permissions(session, project, user, section_id_list):
87 """
88 Grants the given user access to sections in the given project.
89 A ProjectPermission for this user/project is created if it doesn't exist
90 """
91 try:
92 project_permission = project.permissions.filter_by(user_id=user.id).one()
93 except NoResultFound:
94 project_permission = grant_project_permission(session, project, user)
96 for sec_id in section_id_list:
97 sp = SectionPermission(
98 section_id=sec_id, user=user, project_permission=project_permission
99 )
100 session.add(sp)
103def merge_organisations(session, redundant_org_id, correct_org_id, delete_org=False):
104 session.query(Organisation).filter(Organisation.id == redundant_org_id).one()
105 session.query(Organisation).filter(Organisation.id == correct_org_id).one()
107 session.commit()
109 pairs = (
110 (Issue, Issue.respondent_id),
111 (IssueAttachment, IssueAttachment.org_id),
112 (User, User.org_id),
113 (Project, Project.org_id),
114 (Participant, Participant.org_id),
115 (ProjectNote, ProjectNote.org_id),
116 (ProjectNote, ProjectNote.target_org_id),
117 (ProjectAttachment, ProjectAttachment.org_id),
118 (CustomRole, CustomRole.org_id),
119 (Category, Category.org_id),
120 (EventOrgACL, EventOrgACL.org_id),
121 (EmailNotification, EmailNotification.org_id),
122 (WebhookSubscription, WebhookSubscription.org_id),
123 (Tag, Tag.org_id),
124 (Edge, Edge.to_org_id),
125 (Edge, Edge.from_org_id),
126 (RelationshipType, RelationshipType.org_id),
127 (AuditEvent, AuditEvent.org_id),
128 )
130 for klass, org_column in pairs:
131 session.query(klass).filter(org_column == redundant_org_id).update(
132 {org_column: correct_org_id}, synchronize_session=False
133 )
135 if delete_org:
136 session.query(Organisation).filter(Organisation.id == redundant_org_id).delete()
139def save_weightset_weightings(session, weightset, weights_doc: serial.WeightingsDoc):
140 q_lookup = {}
141 sec_lookup = {}
143 for weighting in weightset.weightings:
144 if weighting.section_id:
145 sec_lookup[weighting.section_id] = weighting
146 else:
147 q_lookup[weighting.question_instance_id] = weighting
149 save = weightset.weightings.append # reference to bound method
151 q = session.query(QuestionInstance.id).filter(
152 QuestionInstance.project_id == weightset.project_id
153 )
154 qid_set = {qi.id for qi in q}
155 project_id = weightset.project_id
157 for question_weight in weights_doc.questions:
158 question_id = question_weight.question_id
159 qweight = question_weight.weight
161 if question_id in q_lookup:
162 q_lookup[question_id].value = qweight
163 elif question_id in qid_set:
164 save(Weighting(question_instance_id=question_id, value=qweight))
165 else:
166 m = f"Question ID {question_id} does not belong to project {project_id}"
167 raise ValueError(m)
169 sq = session.query(Section).filter_by(project_id=project_id)
170 sec_id_set = {s.id for s in sq}
172 for sw in weights_doc.sections:
173 section_id = sw.section_id
174 sec_weight = sw.weight
176 if section_id in sec_lookup:
177 sec_lookup[section_id].value = sec_weight
178 elif section_id in sec_id_set:
179 save(Weighting(section_id=section_id, value=sec_weight))
180 else:
181 raise ValueError(
182 f"Section '{section_id}' does not belong to project '{project_id}'"
183 )
186def save_default_weightings(
187 session, project: Project, weights_doc: serial.WeightingsDoc
188):
189 if weights_doc.questions:
190 q_ids = {r.question_id for r in weights_doc.questions}
191 qq = (
192 project.questions.join(QuestionInstance.question_def)
193 .filter(QuestionInstance.id.in_(q_ids))
194 .options(
195 joinedload(QuestionInstance.question_def).noload(
196 QuestionDefinition.elements
197 )
198 )
199 )
201 q_lookup = {qi.id: qi for qi in qq}
202 for r in weights_doc.questions:
203 qid = r.question_id
204 if qid in q_lookup:
205 q_lookup[qid].weight = r.weight
206 else:
207 raise ValueError(
208 f"Question ID {qid} does not belong to Project # {project.id}"
209 )
211 if weights_doc.sections:
212 sec_ids = {r.section_id for r in weights_doc.sections}
213 sq = project.sections.options(lazyload(Section.questions)).filter(
214 Section.id.in_(sec_ids)
215 )
217 sec_lookup = {sec.id: sec for sec in sq}
218 for sw in weights_doc.sections:
219 sec_id = sw.section_id
220 if sec_id in sec_lookup:
221 sec_lookup[sec_id].weight = sw.weight
222 else:
223 raise ValueError(
224 f"Section ID {sec_id} does not belong to Project # {project.id}"
225 )
228def set_initial_weightings(weighting_set: WeightingSet, initial_value):
229 """
230 Create Weighting records for each QuestionInstance and Section for weighting_set.project_id
231 and weighting_set.weighting_set_id
232 """
233 session = object_session(weighting_set)
234 assert session is not None
236 qs = select(
237 literal(weighting_set.id), QuestionInstance.id, literal(initial_value)
238 ).where(QuestionInstance.project_id == weighting_set.project_id)
239 qi = insert(Weighting).from_select(
240 ["weighting_set_id", "question_instance_id", "value"], qs
241 )
243 session.execute(qi)
245 ss = select(literal(weighting_set.id), Section.id, literal(initial_value)).where(
246 Section.project_id == weighting_set.project_id
247 )
248 si = insert(Weighting).from_select(["weighting_set_id", "section_id", "value"], ss)
250 session.execute(si)
253def copy_weightings(
254 source_weighting_set: WeightingSet, destination_weighting_set: WeightingSet
255):
256 """
257 Copy Weighting values from source weighting set to destination
258 """
259 session = object_session(destination_weighting_set)
260 s = select(
261 literal(destination_weighting_set.id),
262 Weighting.section_id,
263 Weighting.question_instance_id,
264 Weighting.value,
265 ).where(Weighting.weighting_set_id == source_weighting_set.id)
267 i = insert(Weighting).from_select(
268 ["weighting_set_id", "section_id", "question_instance_id", "value"], s
269 )
270 assert session is not None
271 session.execute(i)
274def reset_scores(session, project: LiveProject, user: User):
275 """
276 Deletes all scores for the given project.
277 Recreates Autoscores for multiple choice questions
278 """
279 issue_map = {}
280 for issue in project.scoreable_issues:
281 del_count = issue.scores.delete()
282 issue_map[issue.id] = dict(respondent_id=issue.respondent_id, deleted=del_count)
283 new_counts: dict[int, int] = {}
285 for ascore in project.generate_autoscores(session, user).values():
286 issue_id = ascore["issue_id"]
287 score = Score(
288 question_instance_id=ascore["question_id"],
289 issue_id=issue_id,
290 score=ascore["score"],
291 )
292 if issue_id in new_counts: # pragma: no cover
293 new_counts[issue_id] = new_counts[issue_id] + 1
294 else:
295 new_counts[issue_id] = 1
296 session.add(score)
298 for issue_id, new_count in new_counts.items():
299 issue_map[issue_id]["added"] = new_count
300 return issue_map
303def save_answers(
304 session,
305 user: User,
306 question: QuestionInstance,
307 answer_lookup: dict,
308 issue: Issue,
309 imported: bool = False,
310 set_done: bool = False,
311):
312 res = question.validate_and_save_answers(answer_lookup, issue)
313 if res.change_list is None or len(res.change_list) == 0:
314 return False
316 response_state: QuestionResponseState = issue.response_state_for_q(question.id)
317 response_state.date_updated = datetime.now()
318 response_state.updated_by = user.id
320 if res.unanswered_mandatory:
321 response_state.status = ResponseStatus.NOT_ANSWERED
322 else:
323 response_state.status = ResponseStatus.ANSWERED
325 evt_type = evt_types.ANSWER_CREATED if res.is_new else evt_types.ANSWER_UPDATED
326 if imported:
327 evt_type = evt_types.ANSWER_IMPORTED
329 evt = AuditEvent.create(
330 session,
331 evt_type,
332 object_id=response_state.id,
333 user=user,
334 project_id=issue.project_id,
335 issue=issue,
336 question_id=question.id,
337 )
338 if set_done:
339 evt.status = EventStatus.done
340 if imported:
341 evt.add_change("Import Source", issue.project.title, "")
342 for old_value, new_value in res.change_list:
343 evt.add_change("Answer", old_value, new_value)
344 session.add(evt)
346 return True
349def import_section(
350 session: Session, src_sec: Section, des_sec: Section, type: ImportType
351) -> Tuple[List[Section], List[QuestionInstance]]:
352 """Import Section from another Project"""
353 imp_secs: list[Section] = []
354 imp_qis: list[QuestionInstance] = []
355 new_sec = Section(
356 title=src_sec.title,
357 description=src_sec.description,
358 weight=src_sec.weight,
359 project_id=des_sec.project_id,
360 )
362 des_sec.subsections.append(new_sec)
363 imp_secs.append(new_sec)
364 for qi in src_sec.questions:
365 new_qi = import_q_instance(qi, new_sec, type)
366 imp_qis.append(new_qi)
368 for sec in src_sec.subsections:
369 secs, ques = import_section(session, sec, new_sec, type)
370 imp_secs += secs
371 imp_qis += ques
373 return imp_secs, imp_qis
376def import_q_instance(
377 src_qi: QuestionInstance, des_sec: Section, type: ImportType
378) -> QuestionInstance:
379 """Import question instances from another Project"""
380 src_qdef = src_qi.question_def
381 des_qdef = src_qi.question_def
382 des_sec_quesions = des_sec.questions
384 if type == ImportType.COPY:
385 des_qdef = QuestionDefinition(
386 title=src_qdef.title,
387 weight=src_qdef.weight,
388 refcount=1,
389 parent_id=src_qdef.id,
390 )
392 for src_el in src_qdef.elements:
393 des_element = QElement(
394 row=src_el.row,
395 col=src_el.col,
396 colspan=src_el.colspan,
397 rowspan=src_el.rowspan,
398 el_type=src_el.el_type,
399 label=src_el.label,
400 mandatory=src_el.mandatory,
401 width=src_el.width,
402 height=src_el.height,
403 multitopic=src_el.multitopic,
404 regexp=src_el.regexp,
405 choices=src_el.choices,
406 )
407 des_qdef.elements.append(des_element)
408 elif type == ImportType.SHARE:
409 des_qdef.refcount += 1
411 des_qi: QuestionInstance = QuestionInstance(
412 project_id=des_sec.project_id,
413 section_id=des_sec.id,
414 question_def=des_qdef,
415 weight=src_qi.weight,
416 )
417 des_sec_quesions.append(des_qi)
418 return des_qi
421def copy_q_definition(original_qdef: QuestionDefinition, session: Session):
422 """Make a copy of the QuestionDefinition and all associated question elements"""
424 new_qdef = QuestionDefinition()
425 new_qdef.title = original_qdef.title
426 new_qdef.weight = original_qdef.weight
427 new_qdef.refcount = 1
428 new_qdef.parent_id = original_qdef.id
430 for el in original_qdef.elements:
431 new_el = QElement(
432 row=el.row,
433 col=el.col,
434 colspan=el.colspan,
435 rowspan=el.rowspan,
436 el_type=el.el_type,
437 label=el.label,
438 mandatory=el.mandatory,
439 width=el.width,
440 height=el.height,
441 multitopic=el.multitopic,
442 regexp=el.regexp,
443 choices=el.choices,
444 )
445 new_qdef.elements.append(new_el)
447 return new_qdef
450def delete_qinstances_update_def_refcounts(
451 session: Session, project_id, section_id: Optional[int] = None
452):
453 """
454 Delete question instances, orphan question definitions and update refcount
455 on remaining question definitions for the given project, optionally filtering
456 by section_id
457 """
459 def qf(q):
460 q = q.filter(QuestionInstance.project_id == project_id)
461 if section_id is not None:
462 q = q.filter(QuestionInstance.section_id == section_id)
463 return q
465 qiq = qf(session.query(QuestionInstance.question_def_id.label("id")))
467 qi_set = {r.id for r in qiq}
468 # Can't delete question defs if question instances remain, so delete
469 # instances first, having saved the relevant IDs
470 qf(session.query(QuestionInstance)).delete(synchronize_session=False)
472 # Delete question defs in this project with a refcount of one - not shared
473 session.query(QuestionDefinition).filter(QuestionDefinition.id.in_(qi_set)).filter(
474 QuestionDefinition.refcount == 1
475 ).delete(synchronize_session=False)
477 # Decrement refcount for all remaining question definitions
478 session.query(QuestionDefinition).filter(QuestionDefinition.id.in_(qi_set)).filter(
479 QuestionDefinition.refcount > 1
480 ).update({"refcount": QuestionDefinition.refcount - 1}, synchronize_session=False)
483def log_score_event(
484 session: Session, score, initial_score_value, is_new, project, user, autoscore=False
485):
486 event_class = "SCORE_CREATED" if is_new else "SCORE_UPDATED"
488 evt = AuditEvent.create(
489 session,
490 event_class,
491 project=project,
492 issue_id=score.issue_id,
493 user_id=user.id,
494 org_id=user.organisation.id,
495 object_id=score.id,
496 private=True,
497 question_id=score.question_id,
498 )
499 evt.add_change("Score", initial_score_value, score.score)
501 session.add(evt)
503 if autoscore:
504 msg = "Autoscore Calculated for Multiple Choice Question"
505 comment = ScoreComment(
506 score=score, user=user, comment_time=datetime.now(), comment_text=msg
507 )
508 session.add(comment)
509 session.flush()
511 kw = dict(
512 project=project,
513 issue_id=score.issue_id,
514 user_id=user.id,
515 org_id=user.organisation.id,
516 object_id=comment.id,
517 private=True,
518 question_id=score.question_id,
519 )
520 cmnt_evt = AuditEvent.create(session, "SCORE_COMMENT_ADDED", **kw)
521 cmnt_evt.add_change("Comment", "", msg)
522 session.add(cmnt_evt)
525def label_text(
526 project: Project, search_term: str, replace_term: str = "", dry_run=True
527):
528 q = (
529 project.qelements.filter(QElement.el_type.in_(("LB", "CB")))
530 .filter(QElement.label.collate("utf8mb4_bin").like(f"%{search_term}%"))
531 .add_columns(QuestionInstance.number)
532 )
534 for label, qnum in q:
535 old_label = label.label
536 new_label = label.label.replace(search_term, replace_term)
537 if not dry_run:
538 label.label = new_label
539 yield dict(
540 change_type="label",
541 question_number=qnum.dotted,
542 new=new_label,
543 old=old_label,
544 )
547def question_titles(
548 project: Project, search_term: str, replace_term: str = "", dry_run=True
549):
550 session = object_session(project)
551 assert session is not None
553 q = (
554 session.query(QuestionInstance)
555 .join(QuestionDefinition)
556 .options(noload(QuestionInstance.question_def, QuestionDefinition.elements))
557 .filter(QuestionInstance.project_id == project.id)
558 .filter(
559 QuestionDefinition.title.collate("utf8mb4_bin").like(f"%{search_term}%")
560 )
561 )
563 for qi in q:
564 qdef = qi.question_def
565 old_title = qdef.title
566 new_title = qdef.title.replace(search_term, replace_term)
567 if not dry_run:
568 qdef.title = new_title
569 yield dict(
570 change_type="title",
571 question_number=qi.number.dotted,
572 new=new_title,
573 old=old_title,
574 )
577def choices_text(
578 project: Project, search_term: str, replace_term: str = "", dry_run=True
579):
580 q = (
581 project.qelements.filter(QElement.el_type.in_(("CR", "CC")))
582 .filter(func.json_search(QElement.choices, "one", search_term) != None) # noqa: E711
583 .add_columns(QuestionInstance.number)
584 )
586 for el, qnum in q:
587 new_choices = deepcopy(el.choices)
588 old_labels = [c["label"] for c in el.choices]
589 for choice in new_choices:
590 choice["label"] = choice["label"].replace(search_term, replace_term)
591 new_labels = [c["label"] for c in new_choices]
593 if not dry_run:
594 el.choices = new_choices
596 yield dict(
597 change_type="choice",
598 question_number=qnum.dotted,
599 old=old_labels,
600 new=new_labels,
601 )
604def pretty_choices(choices: List[dict]) -> str:
605 if not isinstance(choices, list):
606 return str(choices)
607 txt = ""
608 for c in choices:
609 auto = (f" <{c['autoscore']}>") if c.get("autoscore", False) else ""
610 txt += f" - {c['label']}{auto}\n"
611 return txt
614def update_create_qdef(
615 qdef: QuestionDefinition, evt: AuditEvent, el_map: dict, el_dict: dict
616):
617 """
618 Update question elements where ID values are provided; add new elements if no id provided
620 Updated ids are removed from el_map - thus any removing elements are to be deleted
621 """
622 mutable_attrs = {
623 "colspan",
624 "rowspan",
625 "label",
626 "mandatory",
627 "regexp",
628 "height",
629 "width",
630 "row",
631 "col",
632 "choices",
633 }
634 if el_dict.get("id", None) is not None:
635 # Update existing element
636 old_el = el_map.pop(el_dict["id"])
637 for attr in mutable_attrs & el_dict.keys():
638 new_val = el_dict[attr]
639 old_val = getattr(old_el, attr)
640 if new_val != old_val:
641 setattr(old_el, attr, new_val)
642 evt_name = f"{old_el.__class__.__name__} #{old_el.id}, {attr.title()}"
643 if attr == "choices" and el_dict["el_type"] in ("CR", "CC"):
644 old_val = pretty_choices(old_val)
645 new_val = pretty_choices(new_val)
646 evt.add_change(evt_name, old_val, new_val)
647 else:
648 # A new element
649 el_type = el_dict.pop("el_type")
650 new_el = qdef.add_element(el_type, **el_dict)
651 evt_name = f"{new_el.__class__.__name__} Added"
652 evt.add_change(evt_name, None, new_el.summary)
655def check_for_saved_answers(session, qdef, el_map):
656 """
657 If there are elements to delete with associated answers raise
658 @raises CosmeticQuestionEditViolation
659 """
660 if (not qdef.is_shared) or (len(el_map) == 0):
661 return
662 answer_count = (
663 session.query(Answer).filter(Answer.element_id.in_(el_map.keys())).count()
664 )
665 if answer_count > 0:
666 del_ids = ", ".join(str(el_id) for el_id in el_map.keys())
667 m = f"Cannot delete question elements that have associated answers. Element IDs: {del_ids}"
668 raise CosmeticQuestionEditViolation(m)
671def change_org_id(session, old_org_id, new_org_id):
672 session.execute(
673 text("UPDATE audit_events SET org_id = :new_org_id WHERE org_id = :old_org_id"),
674 {"new_org_id": new_org_id, "old_org_id": old_org_id},
675 )
676 session.execute(
677 text("UPDATE organisations SET id=:new_org_id WHERE id=:old_org_id"),
678 {"new_org_id": new_org_id, "old_org_id": old_org_id},
679 )
682def delete_project_section(
683 session: Session, user: User, project: Project, section: Section
684):
685 """
686 Delete the given Section and all questions and subsections contained within that
687 section.
688 """
690 for descendant_section in section.descendants:
691 delete_qinstances_update_def_refcounts(
692 session, project.id, section_id=descendant_section.id
693 )
694 session.delete(descendant_section)
696 delete_qinstances_update_def_refcounts(session, project.id, section_id=section.id)
697 session.delete(section)
699 evt = AuditEvent.create(
700 session,
701 evt_types.SECTION_DELETED,
702 project=project,
703 user_id=user.id,
704 org_id=user.organisation.id,
705 object_id=section.id,
706 private=True,
707 )
708 session.add(evt)
711def delete_project_section_question(
712 session: Session,
713 user: User,
714 project: Project,
715 section: Section,
716 qi: QuestionInstance,
717):
718 """
719 Delete the Question with the given ID
721 The return value is an array of remaining instances of the same question that may exist
722 in other projects
724 """
726 evt = AuditEvent.create(
727 session,
728 "QUESTION_DELETED",
729 project=project,
730 user_id=user.id,
731 org_id=user.organisation.id,
732 object_id=qi.id,
733 private=True,
734 question_id=qi.id,
735 )
736 evt.add_change("Title", qi.title, None)
737 evt.add_change("Number", qi.number.dotted, None)
738 session.add(evt)
740 instances_remaining = []
741 with session.no_autoflush:
742 qdef = qi.question_def
743 session.delete(qi)
745 qdef.refcount -= 1
746 if qdef.refcount == 0:
747 session.delete(qdef)
748 else:
749 instances_remaining = [
750 dict(project_id=project.id, number=qi.number.dotted, id=qi.id)
751 for qi in qdef.instances
752 ]
753 section.renumber(section.number.dotted)
754 return instances_remaining