Coverage for rfpy/api/update.py: 100%
298 statements
« prev ^ index » next coverage.py v7.0.1, created at 2022-12-31 16:00 +0000
« prev ^ index » next coverage.py v7.0.1, created at 2022-12-31 16:00 +0000
1'''
2Functions that perform SQL Update queries
3'''
5from typing import List, Tuple
6from rfpy.model.graph import Edge, RelationshipType
7from rfpy.model.tags import Tag
8from sqlalchemy.sql.elements import literal
9from sqlalchemy.sql.expression import insert, select
10from rfpy.model.questionnaire import WeightingSet
11from copy import deepcopy
12from datetime import datetime
14from sqlalchemy import func
15from sqlalchemy.orm import object_session, lazyload, Session, make_transient
16from sqlalchemy.orm.exc import NoResultFound
18from rfpy.model import (Participant, ProjectPermission, SectionPermission,
19 User, Organisation, Issue, IssueAttachment, Project,
20 ProjectNote, ProjectAttachment, CustomRole, Category, QElement,
21 AuditEvent, EventOrgACL, EmailNotification, ScoreComment,
22 Weighting, QuestionInstance, Section, Score, Answer,
23 QuestionDefinition, ResponseStatus, QuestionResponseState, ImportType)
24from rfpy.model.audit import evt_types, Status as EventStatus
25from rfpy.model.notify import WebhookSubscription
27from rfpy.model.exc import CosmeticQuestionEditViolation
28from rfpy.auth import AuthorizationFailure
31def grant_project_permission(session, project, user):
32 '''
33 Grants a (restricted) User permissions for the given
34 project. Returns the newly created ProjectPermission instance
35 '''
36 if not user.is_restricted:
37 raise ValueError('Assigning ProjectPermission to a ' +
38 'non-restricted user has no effect')
39 try:
40 participant = session.query(Participant)\
41 .filter_by(project_id=project.id, organisation=user.organisation)\
42 .one()
43 pp = ProjectPermission()
44 pp.participant = participant
45 pp.user = user
46 session.add(pp)
47 except NoResultFound:
48 m = "User {user} is not a Participant in project {project}"
49 raise AuthorizationFailure(message=m)
50 return pp
53def grant_section_permissions(session, project, user, section_id_list):
54 '''
55 Grants the given user access to sections in the given project.
56 A ProjectPermission for this user/project is created if it doesn't exist
57 '''
58 try:
59 project_permission = project.permissions.filter_by(user_id=user.id).one()
60 except NoResultFound:
61 project_permission = grant_project_permission(session, project, user)
63 for sec_id in section_id_list:
65 sp = SectionPermission(section_id=sec_id,
66 user=user,
67 project_permission=project_permission)
68 session.add(sp)
71def merge_organisations(session, redundant_org_id, correct_org_id, delete_org=False):
73 session.query(Organisation).filter(Organisation.id == redundant_org_id).one()
74 session.query(Organisation).filter(Organisation.id == correct_org_id).one()
76 session.commit()
78 pairs = (
79 (Issue, Issue.respondent_id),
80 (IssueAttachment, IssueAttachment.org_id),
81 (User, User.org_id),
82 (Project, Project.org_id),
83 (Participant, Participant.org_id),
84 (ProjectNote, ProjectNote.org_id),
85 (ProjectNote, ProjectNote.target_org_id),
86 (ProjectAttachment, ProjectAttachment.org_id),
87 (CustomRole, CustomRole.org_id),
88 (Category, Category.org_id),
89 (EventOrgACL, EventOrgACL.org_id),
90 (EmailNotification, EmailNotification.org_id),
91 (WebhookSubscription, WebhookSubscription.org_id),
92 (Tag, Tag.org_id),
93 (Edge, Edge.to_org_id),
94 (Edge, Edge.from_org_id),
95 (RelationshipType, RelationshipType.org_id),
96 (AuditEvent, AuditEvent.org_id)
97 )
99 for klass, org_column in pairs:
100 session.query(klass)\
101 .filter(org_column == redundant_org_id)\
102 .update({org_column: correct_org_id}, synchronize_session=False)
104 if delete_org:
105 session.query(Organisation).filter(Organisation.id == redundant_org_id).delete()
108def save_weightset_weightings(session, weightset, weights_doc):
110 q_lookup = {}
111 sec_lookup = {}
113 for weighting in weightset.weightings:
114 if weighting.section_id:
115 sec_lookup[weighting.section_id] = weighting
116 else:
117 q_lookup[weighting.question_instance_id] = weighting
119 save = weightset.weightings.append # reference to bound method
121 q = session.query(QuestionInstance.id)\
122 .filter(QuestionInstance.project_id == weightset.project_id)
123 qid_set = {qi.id for qi in q}
124 project_id = weightset.project_id
126 for r in weights_doc['questions']:
128 question_id = r['question_id']
129 qweight = r['weight']
131 if question_id in q_lookup:
132 q_lookup[question_id].value = qweight
133 elif question_id in qid_set:
134 save(Weighting(question_instance_id=question_id, value=qweight))
135 else:
136 m = f'Question ID {question_id} does not belong to project {project_id}'
137 raise ValueError(m)
139 sq = session.query(Section).filter_by(project_id=project_id)
140 sec_id_set = {s.id for s in sq}
142 for r in weights_doc['sections']:
144 section_id = r['section_id']
145 sec_weight = r['weight']
147 if section_id in sec_lookup:
148 sec_lookup[section_id].value = sec_weight
149 elif section_id in sec_id_set:
150 save(Weighting(section_id=section_id, value=sec_weight))
151 else:
152 raise ValueError(f"Section '{section_id}' does not belong to project '{project_id}'")
155def save_default_weightings(session, project, weights_doc):
156 if weights_doc['questions']:
157 q_ids = {r['question_id'] for r in weights_doc['questions']}
158 qq = project.questions\
159 .options(lazyload('question_def.elements'))\
160 .filter(QuestionInstance.id.in_(q_ids))
162 q_lookup = {qi.id: qi for qi in qq}
163 for r in weights_doc['questions']:
164 qid = r['question_id']
165 if qid in q_lookup:
166 q_lookup[qid].weight = r['weight']
167 else:
168 raise ValueError(f'Question ID {qid} does not belong to Project # {project.id}')
170 if weights_doc['sections']:
171 sec_ids = {r['section_id'] for r in weights_doc['sections']}
172 sq = project.sections\
173 .options(lazyload('questions'))\
174 .filter(Section.id.in_(sec_ids))
176 sec_lookup = {sec.id: sec for sec in sq}
177 for r in weights_doc['sections']:
178 sec_id = r['section_id']
179 if sec_id in sec_lookup:
180 sec_lookup[sec_id].weight = r['weight']
181 else:
182 raise ValueError(f'Section ID {sec_id} does not belong to Project # {project.id}')
185def set_initial_weightings(weighting_set: WeightingSet, initial_value):
186 '''
187 Create Weighting records for each QuestionInstance and Section for weighting_set.project_id
188 and weighting_set.weighting_set_id
189 '''
190 session = object_session(weighting_set)
192 qs = (select([literal(weighting_set.id), QuestionInstance.id, literal(initial_value)])
193 .where(QuestionInstance.project_id == weighting_set.project_id))
194 qi = insert(Weighting).from_select(['weighting_set_id', 'question_instance_id', 'value'], qs)
196 session.execute(qi)
198 ss = (select([literal(weighting_set.id), Section.id, literal(initial_value)])
199 .where(Section.project_id == weighting_set.project_id))
200 si = insert(Weighting).from_select(['weighting_set_id', 'section_id', 'value'], ss)
202 session.execute(si)
205def copy_weightings(source_weighting_set: WeightingSet, destination_weighting_set: WeightingSet):
206 '''
207 Copy Weighting values from source weighting set to destination
208 '''
209 session = object_session(destination_weighting_set)
210 s = (select([
211 literal(destination_weighting_set.id),
212 Weighting.section_id,
213 Weighting.question_instance_id,
214 Weighting.value
215 ]).where(Weighting.weighting_set_id == source_weighting_set.id))
217 i = (insert(Weighting)
218 .from_select(['weighting_set_id', 'section_id', 'question_instance_id', 'value'], s))
220 session.execute(i)
223def reset_scores(session, project: Project, user: User):
224 '''
225 Deletes all scores for the given project.
226 Recreates Autoscores for multiple choice questions
227 '''
228 issue_map = {}
229 for issue in project.scoreable_issues:
230 del_count = issue.scores.delete()
231 issue_map[issue.id] = dict(respondent_id=issue.respondent_id, deleted=del_count)
232 new_counts = {}
234 for ascore in project.generate_autoscores(session, user).values():
235 issue_id = ascore['issue_id']
236 score = Score(question_instance_id=ascore['question_id'],
237 issue_id=issue_id,
238 score=ascore['score'])
239 if issue_id in new_counts: # pragma: no cover
240 new_counts[issue_id] = new_counts[issue_id] + 1
241 else:
242 new_counts[issue_id] = 1
243 session.add(score)
245 for issue_id, new_count in new_counts.items():
246 issue_map[issue_id]['added'] = new_count
247 return issue_map
250def save_answers(session, user: User, question: QuestionInstance,
251 answer_lookup: dict, issue: Issue,
252 imported: bool = False, set_done: bool = False):
254 res = question.validate_and_save_answers(answer_lookup, issue)
255 if res.change_list is None or len(res.change_list) == 0:
256 return False
258 response_state: QuestionResponseState = issue.response_state_for_q(question.id)
259 response_state.date_updated = datetime.now()
260 response_state.updated_by = user.id
262 if res.unanswered_mandatory:
263 response_state.status = ResponseStatus.NOT_ANSWERED
264 else:
265 response_state.status = ResponseStatus.ANSWERED
267 evt_type = evt_types.ANSWER_CREATED if res.is_new else evt_types.ANSWER_UPDATED
268 if imported:
269 evt_type = evt_types.ANSWER_IMPORTED
271 evt = AuditEvent.create(
272 evt_type,
273 object_id=response_state.id,
274 user=user,
275 project_id=issue.project_id,
276 issue=issue,
277 question_id=question.id
278 )
279 if set_done:
280 evt.status = EventStatus.done
281 if imported:
282 evt.add_change('Import Source', issue.project.title, '')
283 for old_value, new_value in res.change_list:
284 evt.add_change("Answer", old_value, new_value)
285 session.add(evt)
287 return True
290def import_section(
291 session: Session, src_sec: Section, des_sec: Section, type: ImportType) -> Tuple[
292 List[Section],
293 List[QuestionInstance]]:
294 ''' Import Section from another Project'''
295 imp_secs = []
296 imp_qis = []
297 new_sec = Section(
298 title=src_sec.title,
299 description=src_sec.description,
300 weight=src_sec.weight,
301 project_id=des_sec.project_id,
302 )
304 des_sec.subsections.append(new_sec)
305 imp_secs.append(new_sec)
306 for qi in src_sec.questions:
307 questions: List[QuestionInstance] = import_q_instance(qi, new_sec, type)
308 imp_qis.append(questions)
310 for sec in src_sec.subsections:
311 secs, ques = import_section(session, sec, new_sec, type)
312 imp_secs += secs
313 imp_qis += ques
315 return imp_secs, imp_qis
318def import_q_instance(
319 src_qi: QuestionInstance, des_sec: Section, type: ImportType) -> QuestionInstance:
320 '''Import question instances from another Project'''
321 src_qdef = src_qi.question_def
322 des_qdef = src_qi.question_def
323 des_sec_quesions = des_sec.questions
325 if type == ImportType.COPY:
326 des_qdef = QuestionDefinition(
327 title=src_qdef.title,
328 weight=src_qdef.weight,
329 refcount=1,
330 parent_id=src_qdef.id)
332 for src_el in src_qdef.elements:
333 des_element = QElement(
334 row=src_el.row,
335 col=src_el.col,
336 colspan=src_el.colspan,
337 rowspan=src_el.rowspan,
338 el_type=src_el.el_type,
339 label=src_el.label,
340 mandatory=src_el.mandatory,
341 width=src_el.width,
342 height=src_el.height,
343 multitopic=src_el.multitopic,
344 regexp=src_el.regexp,
345 choices=src_el.choices)
346 des_qdef.elements.append(des_element)
347 elif type == ImportType.SHARE:
348 des_qdef.refcount += 1
350 des_qi: QuestionInstance = QuestionInstance(
351 project_id=des_sec.project_id,
352 section_id=des_sec.id,
353 question_def=des_qdef,
354 weight=src_qi.weight)
355 des_sec_quesions.append(des_qi)
356 return des_qi
359def copy_q_definition(original_qdef: QuestionDefinition, session: Session):
360 '''Make a copy of the QuestionDefinition and all associated question elements'''
362 new_qdef = QuestionDefinition()
363 new_qdef.title = original_qdef.title
364 new_qdef.weight = original_qdef.weight
365 new_qdef.refcount = 1
366 new_qdef.parent_id = original_qdef.id
368 for el in original_qdef.elements:
369 session.expunge(el) # Allows us to make a copy
370 make_transient(el)
371 el.id = None # make_transient doesn't deleted the primary key
372 new_qdef.elements.append(el)
374 return new_qdef
377def delete_qinstances_update_def_refcounts(
378 session: Session,
379 project_id,
380 section_id: int = None):
381 '''
382 Delete question instances, orphan question definitions and update refcount
383 on remaining question definitions for the given project, optionally filtering
384 by section_id
385 '''
386 def qf(q):
387 q = q.filter(QuestionInstance.project_id == project_id)
388 if section_id is not None:
389 q = q.filter(QuestionInstance.section_id == section_id)
390 return q
392 qiq = qf(session.query(QuestionInstance.question_def_id.label('id')))
394 qi_set = {r.id for r in qiq}
395 # Can't delete question defs if question instances remain, so delete
396 # instances first, having saved the relevant IDs
397 qf(session.query(QuestionInstance)).delete(synchronize_session=False)
399 # Delete question defs in this project with a refcount of one - not shared
400 session.query(QuestionDefinition)\
401 .filter(QuestionDefinition.id.in_(qi_set))\
402 .filter(QuestionDefinition.refcount == 1)\
403 .delete(synchronize_session=False)
405 # Decrement refcount for all remaining question definitions
406 session.query(QuestionDefinition)\
407 .filter(QuestionDefinition.id.in_(qi_set))\
408 .filter(QuestionDefinition.refcount > 1)\
409 .update({'refcount': QuestionDefinition.refcount-1}, synchronize_session=False)
412def log_score_event(session: Session, score, initial_score_value, is_new,
413 project, user, autoscore=False):
415 event_class = 'SCORE_CREATED' if is_new else 'SCORE_UPDATED'
417 evt = AuditEvent.create(event_class,
418 project=project,
419 issue_id=score.issue_id,
420 user_id=user.id,
421 org_id=user.organisation.id,
422 object_id=score.id,
423 private=True,
424 question_id=score.question_id)
425 evt.add_change('Score', initial_score_value, score.score)
427 session.add(evt)
429 if autoscore:
430 msg = "Autoscore Calculated for Multiple Choice Question"
431 comment = ScoreComment(score=score, user=user, comment_time=datetime.utcnow(),
432 comment_text=msg)
433 session.add(comment)
434 session.flush()
436 kw = dict(project=project, issue_id=score.issue_id,
437 user_id=user.id, org_id=user.organisation.id,
438 object_id=comment.id, private=True, question_id=score.question_id)
439 cmnt_evt = AuditEvent.create('SCORE_COMMENT_ADDED', **kw)
440 cmnt_evt.add_change('Comment', '', msg)
441 session.add(cmnt_evt)
444def label_text(project: Project, search_term: str, replace_term: str = '', dry_run=True):
446 q = (project.qelements
447 .filter(QElement.el_type.in_(('LB', 'CB')))
448 .filter(QElement.label.collate('utf8mb4_bin').like(f'%{search_term}%'))
449 .add_columns(QuestionInstance.number))
451 for label, qnum in q:
452 old_label = label.label
453 new_label = label.label.replace(search_term, replace_term)
454 if not dry_run:
455 label.label = new_label
456 yield dict(change_type='label', question_number=qnum.dotted, new=new_label, old=old_label)
459def question_titles(project: Project, search_term: str, replace_term: str = '', dry_run=True):
460 session = object_session(project)
462 q = (session.query(QuestionInstance)
463 .join(QuestionDefinition)
464 .options(lazyload('question_def.elements'))
465 .filter(QuestionInstance.project_id == project.id)
466 .filter(QuestionDefinition.title.collate('utf8mb4_bin').like(f'%{search_term}%')))
468 for qi in q:
469 qdef = qi.question_def
470 old_title = qdef.title
471 new_title = qdef.title.replace(search_term, replace_term)
472 if not dry_run:
473 qdef.title = new_title
474 yield dict(change_type='title', question_number=qi.number.dotted,
475 new=new_title, old=old_title)
478def choices_text(project: Project, search_term: str, replace_term: str = '', dry_run=True):
479 q = (project.qelements
480 .filter(QElement.el_type.in_(('CR', 'CC')))
481 .filter(func.json_search(QElement.choices, 'one', search_term) != None)
482 .add_columns(QuestionInstance.number))
484 for el, qnum in q:
485 new_choices = deepcopy(el.choices)
486 old_labels = [c['label'] for c in el.choices]
487 for choice in new_choices:
488 choice['label'] = choice['label'].replace(search_term, replace_term)
489 new_labels = [c['label'] for c in new_choices]
491 if not dry_run:
492 el.choices = new_choices
494 yield dict(change_type='choice',
495 question_number=qnum.dotted, old=old_labels, new=new_labels)
498def pretty_choices(choices: List[dict]) -> str:
499 if not isinstance(choices, list):
500 return str(choices)
501 txt = ""
502 for c in choices:
503 auto = (f" <{c['autoscore']}>") if c.get('autoscore', False) else ""
504 txt += f" - {c['label']}{auto}\n"
505 return txt
508def update_create_qdef(qdef: QuestionDefinition,
509 evt: AuditEvent,
510 el_map: dict,
511 el_dict: dict):
512 '''
513 Update question elements where ID values are provided; add new elements if no id provided
515 Updated ids are removed from el_map - thus any removing elements are to be deleted
516 '''
517 mutable_attrs = {
518 "colspan", "rowspan", "label", "mandatory",
519 "regexp", "height", "width", "row", "col", "choices"
520 }
521 if el_dict.get('id', None) is not None:
522 # Update existing element
523 old_el = el_map.pop(el_dict['id'])
524 for attr in mutable_attrs & el_dict.keys():
525 new_val = el_dict[attr]
526 old_val = getattr(old_el, attr)
527 if new_val != old_val:
528 setattr(old_el, attr, new_val)
529 evt_name = f"{old_el.__class__.__name__} #{old_el.id}, {attr.title()}"
530 if attr == 'choices' and el_dict['el_type'] in ('CR', 'CC'):
531 old_val = pretty_choices(old_val)
532 new_val = pretty_choices(new_val)
533 evt.add_change(evt_name, old_val, new_val)
534 else:
535 # A new element
536 el_type = el_dict.pop('el_type')
537 new_el = qdef.add_element(el_type, **el_dict)
538 evt_name = f"{new_el.__class__.__name__} Added"
539 evt.add_change(evt_name, None, new_el.summary)
542def check_for_saved_answers(session, qdef, el_map):
543 '''
544 If there are elements to delete with associated answers raise
545 @raises CosmeticQuestionEditViolation
546 '''
547 if (not qdef.is_shared) or (len(el_map) == 0):
548 return
549 answer_count = session.query(Answer).filter(Answer.element_id.in_(el_map.keys())).count()
550 if answer_count > 0:
551 del_ids = ", ".join(str(el_id) for el_id in el_map.keys())
552 m = f"Cannot delete question elements that have associated answers. Element IDs: {del_ids}"
553 raise CosmeticQuestionEditViolation(m)
556def change_org_id(session, old_org_id, new_org_id):
558 session.execute(
559 "UPDATE audit_events SET org_id = :new_org_id WHERE org_id = :old_org_id",
560 {'new_org_id': new_org_id, 'old_org_id': old_org_id}
561 )
562 session.execute(
563 "UPDATE organisations SET id=:new_org_id WHERE id=:old_org_id",
564 {'new_org_id': new_org_id, 'old_org_id': old_org_id}
565 )
568def delete_project_section(session: Session, user: User, project: Project, section: Section):
569 '''
570 Delete the given Section and all questions and subsections contained within that
571 section.
572 '''
574 for descendant_section in section.descendants:
575 delete_qinstances_update_def_refcounts(session, project.id,
576 section_id=descendant_section.id)
577 session.delete(descendant_section)
579 delete_qinstances_update_def_refcounts(session, project.id, section_id=section.id)
580 session.delete(section)
582 evt = AuditEvent.create(evt_types.SECTION_DELETED, project=project, user_id=user.id,
583 org_id=user.organisation.id, object_id=section.id, private=True)
584 session.add(evt)
587def delete_project_section_question(
588 session: Session, user: User, project: Project, section: Section, qi: QuestionInstance):
589 '''
590 Delete the Question with the given ID
592 The return value is an array of remaining instances of the same question that may exist
593 in other projects
595 '''
597 evt = AuditEvent.create('QUESTION_DELETED',
598 project=project,
599 user_id=user.id,
600 org_id=user.organisation.id,
601 object_id=qi.id,
602 private=True,
603 question_id=qi.id)
604 evt.add_change('Title', qi.title, None)
605 evt.add_change('Number', qi.number.dotted, None)
606 session.add(evt)
608 instances_remaining = []
609 with session.no_autoflush:
610 qdef = qi.question_def
611 session.delete(qi)
613 qdef.refcount -= 1
614 if qdef.refcount == 0:
615 session.delete(qdef)
616 else:
617 instances_remaining = [dict(project_id=project.id, number=qi.number.dotted, id=qi.id)
618 for qi in qdef.instances]
619 section.renumber(section.number.dotted)
620 return instances_remaining