Coverage for rfpy/model/questionnaire.py: 99%
646 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-24 10:52 +0000
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-24 10:52 +0000
1from collections import defaultdict
2from operator import attrgetter
3from typing import Optional, TYPE_CHECKING
4from datetime import datetime
5from decimal import Decimal
6from enum import Enum, IntEnum
7import logging
8import re
10from sqlalchemy import (
11 Unicode,
12 Boolean,
13 Integer,
14 JSON,
15 ForeignKey,
16 Index,
17 text,
18 DateTime,
19 and_,
20 func,
21 inspect,
22)
23from sqlalchemy.orm import (
24 Mapped,
25 DynamicMapped,
26 mapped_column,
27 relationship,
28 remote,
29 foreign,
30 validates,
31)
32import sqlalchemy.types as types
33from sqlalchemy.dialects import mysql
35from rfpy.model.meta import AttachmentMixin, Visitor, Base
36from .issue import Issue
38from .exc import (
39 ValidationFailure,
40 QuestionnaireStructureException,
41 WeightingsNotLoadedException,
42)
44if TYPE_CHECKING:
45 from rfpy.model.composite import QuestionMeta
46 from rfpy.model.project import Project
47 from rfpy.model.acl import SectionPermission
48 from rfpy.model.tags import Tag
50"""
51Note - default SQL JOIN / load strategy is defined by arguments
52to 'relationship'
54Lots of joins involved when loading questionnaires. Current strategy is:
56Section -> questions (QuestionInstance): 'dynamic' - doesn't join by default,
57 returns a query
59QuestionInstance -> QuestionDefinition: 'joined' - eager loading / inner join
60QuestionDefinition -> [QElement] : 'joined' - eager loading by inner join
62For loading large collections (e.g. full questionnaire), these loading
63strategies are overridden, e.g. loading question elements in a subquery load
64"""
66log = logging.getLogger(__name__)
69get_el_type = attrgetter("el_type")
70get_cell = attrgetter("cell")
71el_keys = [
72 "col",
73 "colspan",
74 "row",
75 "rowspan",
76 "el_type",
77 "label",
78 "mandatory",
79 "height",
80 "width",
81 "multitopic",
82 "regexp",
83 "choices",
84]
87def to_b36(string_number):
88 """
89 Converts a position number (e.g. 23.1.8) into a base36 string (0N0108)
90 """
91 if not string_number:
92 return string_number
93 return "".join(base36encode(int(el)) for el in string_number.split("."))
96_b36_cache: dict[int, str] = dict()
99def base36encode(number: int):
100 """
101 Converts an integer into a base36 string two characters long
103 @param number - an int or long between 0 and 1296
104 """
105 initial_value = number
106 if number not in _b36_cache:
107 if not isinstance(number, int):
108 raise TypeError("number must be an integer")
109 if not 0 < number < 1296:
110 raise ValueError("number must in range 1 - 1295, received %s" % number)
112 alphabet = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ"
113 base36 = ""
114 while number:
115 number, i = divmod(number, 36)
116 base36 = alphabet[i] + base36
117 _b36_cache[initial_value] = base36.rjust(2, "0")
119 return _b36_cache[initial_value]
122_number_cache: dict[str, str] = dict()
124WEIGHTING_COL_TYPE = mysql.DECIMAL(precision=15, scale=4)
127def from_b36(db_string):
128 """
129 Converts a base36 encoded position string into human readable form,
130 e.g. '0N0108' -> 23.1.18.
132 Called by every row for a question or section query, and surprisingly expensive
133 For fetch question number and id for 1,500 this function accounts for 50% of
134 the runtime.
136 To eliminate this a cache is used to store values.
137 """
138 if db_string not in _number_cache:
139 if not db_string:
140 return db_string
141 _number_cache[db_string] = ".".join(
142 "%s" % int(db_string[x : x + 2], 36) for x in range(0, len(db_string), 2)
143 )
144 return _number_cache[db_string]
147class ImportType(IntEnum):
148 SHARE = 0
149 COPY = 10
152class NumberString(str):
153 def __init__(self, raw_value):
154 self.raw_value = raw_value
156 @property
157 def dotted(self):
158 return from_b36(self.raw_value)
160 def next_increment(self):
161 parts = self.dotted.split(".")
162 parts[-1] = str(int(parts[-1]) + 1)
163 return NumberString(to_b36(".".join(p for p in parts if p)))
165 def first_child(self):
166 parts = self.dotted.split(".")
167 parts.append("1")
168 new_dotted = ".".join(p for p in parts if p)
169 b36ed = to_b36(new_dotted)
170 return NumberString(b36ed)
172 @classmethod
173 def from_dotted(cls, value):
174 return cls(to_b36(value))
176 @classmethod
177 def visible_relatives_regex(cls, section_number):
178 """
179 Create a regular expression for finding ancestor and ancestor sibling sections for
180 the given section number. This is useful for returning a subset of the questionnaire tree
181 necessary for rendering a tree menu without returning the entire questionnaire.
183 """
184 # Build regex by processing each 2-character chunk of the section Number
185 # starting from the parent of the section given by section_number
186 parent_index = len(section_number)
187 n = section_number
188 # For each chunk build a regex similar to ^02\w{2}$ - i.e. find immediate offspring
189 # for each chunk
190 rels_regex_list = [
191 "^" + n[0 : i + 2] + r".{2}$" for i in range(0, parent_index, 2)
192 ]
194 # The root section has no number so root + immediate children must be special-cased:
195 rels_regex_list.append(r"^.{0,2}$")
197 return r"|".join(rels_regex_list)
200class SafeNumber:
201 """
202 A Property that returns either number.dotted
203 or just number, depending on whether the instance's number
204 attribute is an instance of NumberString or not
205 """
207 def __get__(self, obj, cls=None):
208 if isinstance(obj.number, NumberString):
209 return obj.number.dotted
210 return obj.number
212 def __set__(self, obj, val):
213 if isinstance(val, NumberString):
214 obj.number = val
215 return
216 if isinstance(val, str):
217 obj.number = NumberString(to_b36(val))
220class PositionNumber(types.TypeDecorator):
221 """
222 A custom SQLAlchemy type that translates unicode strings
223 between display numbers like 1.2.3 and
224 suppplierselect's internal database equivalent ('010203')
225 """
227 impl = types.Unicode(30)
229 cache_ok = True
231 def process_bind_param(self, value, dialect):
232 return value
234 def process_result_value(self, value, dialect):
235 return NumberString(value)
237 def copy(self):
238 return PositionNumber(self.impl.length)
241class Section(Base):
242 __tablename__ = "sections"
244 id: Mapped[int] = mapped_column(mysql.INTEGER(10), primary_key=True)
245 title: Mapped[str] = mapped_column(mysql.VARCHAR(length=255), nullable=False)
246 number: Mapped[Optional[NumberString]] = mapped_column(PositionNumber, default="", nullable=True)
247 description: Mapped[Optional[str]] = mapped_column(mysql.LONGTEXT())
248 _weight: Mapped[Optional[Decimal]] = mapped_column(
249 "weight", WEIGHTING_COL_TYPE, server_default=text("'1.0000'"), nullable=True
250 )
251 project_id: Mapped[int] = mapped_column(
252 mysql.INTEGER(11), ForeignKey("projects.id"), nullable=False
253 )
254 parent_id: Mapped[Optional[int]] = mapped_column(
255 mysql.INTEGER(10), ForeignKey("sections.id", ondelete="CASCADE")
256 )
257 position: Mapped[int] = mapped_column("pos", Integer, default=0, nullable=False)
259 subsections: Mapped[list["Section"]] = relationship(
260 "Section", order_by="Section.position"
261 )
263 project: Mapped["Project"] = relationship(
264 "Project",
265 primaryjoin="Section.project_id==Project.id",
266 back_populates="sections",
267 )
269 questions: Mapped[list["QuestionInstance"]] = relationship(
270 "QuestionInstance",
271 order_by="QuestionInstance.position",
272 back_populates="section",
273 )
275 parent: Mapped[Optional["Section"]] = relationship(
276 "Section",
277 remote_side=[id],
278 back_populates="subsections",
279 cascade_backrefs=False,
280 )
282 questions_query: DynamicMapped["QuestionInstance"] = relationship(
283 "QuestionInstance",
284 lazy="dynamic",
285 order_by="QuestionInstance.position",
286 viewonly=True,
287 )
289 perms: DynamicMapped["SectionPermission"] = relationship(
290 "SectionPermission", lazy="dynamic"
291 )
293 descendants: Mapped[list["Section"]] = relationship(
294 "Section",
295 viewonly=True,
296 order_by=number,
297 primaryjoin=and_(
298 remote(foreign(number)).startswith(number),
299 remote(foreign(project_id)) == project_id,
300 remote(foreign(id)) != id,
301 ),
302 )
304 safe_number = SafeNumber()
306 ancestors: Mapped[list["Section"]] = relationship(
307 "Section",
308 viewonly=True,
309 order_by=number,
310 primaryjoin=and_(
311 remote(foreign(number))
312 == func.LEFT(number, func.LENGTH(remote(foreign(number)))),
313 remote(foreign(project_id)) == project_id,
314 remote(foreign(id)) != id,
315 ),
316 )
318 def __repr__(self):
319 return f"<Sec id:{self.id} Num:{self.number} Parent: {self.parent_id} - {self.title}>"
321 def as_dict(self):
322 return {
323 "id": self.id,
324 "type": "section",
325 "title": self.title,
326 "description": self.description,
327 "number": self.safe_number,
328 "number36": self.number,
329 "parent_id": self.parent_id,
330 "subsections": [],
331 "questions": [],
332 }
334 def renumber(self, new_dotted_number=""):
335 """
336 Set number and recursively set number for all children based on order_by
337 on relationships (position attribute).
338 """
339 self.safe_number = new_dotted_number
340 fmt = "%s.%s" if new_dotted_number else "%s%s"
341 i = 1
342 for idx, subsection in enumerate(self.subsections):
343 if subsection.id == self.id:
344 raise QuestionnaireStructureException(
345 f"Section {self.id} cannot be its own parent"
346 )
348 sec_num = fmt % (new_dotted_number, i)
349 subsection.position = idx
350 subsection.renumber(sec_num)
351 i += 1
353 for idx, question in enumerate(self.questions):
354 q_num = fmt % (new_dotted_number, i)
355 question.safe_number = q_num
356 question.position = idx
357 i += 1
359 def accept(self, visitor):
360 """
361 Call hello_section, visit_question(s), goodbye_section
362 on the passed visitor and recurse to questions and subsections
363 """
364 visitor.hello_section(self)
366 # check if method is implemented in visitor's own class, not in parent
367 # i.e. only load questions if the visitor has its own implementation
368 # of this method
369 if "visit_question" in visitor.__class__.__dict__:
370 for q in self.questions:
371 visitor.visit_question(q)
373 for subsec in self.subsections:
374 subsec.accept(visitor)
376 visitor.goodbye_section(self)
378 @property
379 def is_top_level(self):
380 return self.parent_id is None
382 @property
383 def weight(self):
384 try:
385 return self._weightset_weight
386 except AttributeError:
387 return self._weight
389 @weight.setter
390 def weight(self, weight):
391 self._weight = weight
393 @property
394 def absolute_weight(self):
395 if self.is_top_level:
396 return Decimal(1)
397 try:
398 return self._absolute_weight
399 except AttributeError:
400 try:
401 nm = self.normalised_weight
402 except AttributeError:
403 m = "normalised weights not loaded - run HierarchicalWeightingVisitor"
404 raise WeightingsNotLoadedException(m)
405 self._absolute_weight = nm * self.parent.absolute_weight
406 return self._absolute_weight
408 @absolute_weight.setter
409 def absolute_weight(self, abs_weight):
410 self._absolute_weight = abs_weight
412 @validates("questions", "subsections")
413 def check_position(self, attr_name, instance):
414 collection = getattr(self, attr_name)
416 if instance.position is None:
417 new_position = 0
418 if len(collection) > 0:
419 previous = collection[-1]
420 if previous.position is not None:
421 new_position = previous.position + 1
422 instance.position = new_position
424 if instance.number is None:
425 if len(collection) > 0:
426 previous = collection[-1]
427 if hasattr(previous.number, "next_increment"):
428 instance.number = previous.number.next_increment()
429 else:
430 if hasattr(self.number, "first_child"):
431 instance.number = self.number.first_child()
433 if instance.project_id is None:
434 instance.project_id = self.project_id
436 return instance
439class QuestionDefinition(Base):
440 __tablename__ = "questions"
442 title: Mapped[str] = mapped_column(mysql.MEDIUMTEXT(), nullable=False)
443 weight: Mapped[Optional[Decimal]] = mapped_column(
444 WEIGHTING_COL_TYPE, server_default=text("'1.0000'"), nullable=True
445 )
446 parent_id: Mapped[Optional[int]] = mapped_column(Integer, index=True)
447 refcount: Mapped[int] = mapped_column(
448 "referenceCount", Integer, nullable=False, server_default=text("'1'")
449 )
450 elements: Mapped[list["QElement"]] = relationship(
451 "QElement",
452 lazy="joined",
453 order_by="QElement.row,QElement.col",
454 cascade="all, delete-orphan",
455 passive_deletes=True,
456 back_populates="question_def",
457 )
459 _elements: DynamicMapped["QElement"] = relationship(
460 "QElement", lazy="dynamic", viewonly=True
461 )
463 instances: DynamicMapped["QuestionInstance"] = relationship(
464 "QuestionInstance", lazy="dynamic", back_populates="question_def"
465 )
467 meta: Mapped[Optional["QuestionMeta"]] = relationship(
468 "QuestionMeta",
469 uselist=False,
470 back_populates="question_def",
471 cascade="all, delete",
472 passive_deletes=True,
473 )
475 def __repr__(self):
476 return f'<QDef Id: {self.id}, Title: "{self.title}">'
478 def __eq__(self, o: object) -> bool:
479 if isinstance(o, QuestionDefinition):
480 src_elements = sorted(
481 self.elements, key=lambda e: [getattr(e, k) for k in el_keys]
482 )
483 des_elements = sorted(
484 o.elements, key=lambda e: [getattr(e, k) for k in el_keys]
485 )
486 return self.title == o.title and src_elements == des_elements
487 return False
489 def __hash__(self) -> int:
490 return super().__hash__()
492 @property
493 def is_shared(self):
494 if self.refcount is None:
495 return False
496 return self.refcount > 1
498 def get_element(self, element_id):
499 return self._elements.filter(QElement.id == element_id).one()
501 def as_dict(self, answer_lookup=None, vendor_view=False):
502 """
503 Returns a list of question element lists, each inner list corresponding
504 to a row in a question table
506 Groups elements into rows - returning a list of lists
508 """
509 row_list = []
510 current_row = -1
511 for el in self.elements:
512 if el.row > current_row:
513 row_list.append([])
514 current_row = el.row
516 if el.contains_choices:
517 el_dict = el.as_dict(vendor_view=vendor_view)
518 else:
519 el_dict = el.as_dict()
521 if el.is_answerable and answer_lookup is not None:
522 el_dict["answer"] = answer_lookup.get(el.id, None)
524 row_list[-1].append(el_dict)
526 return row_list
528 @property
529 def answerable_elements(self):
530 return [e for e in self.elements if e.is_answerable]
532 def add_element(self, el_name, **kwargs):
533 el = QElement.build(el_name, **kwargs)
534 self.elements.append(el)
535 return el
537 def is_tabular(self):
538 return self.column_count != 1
540 @property
541 def column_count(self):
542 if not self.elements:
543 return 0
544 return max(el.col for el in self.elements)
546 @property
547 def row_count(self):
548 if not self.elements:
549 return 0
550 return max(el.row for el in self.elements)
552 @property
553 def grid_area(self) -> int:
554 """Number of available grid cells available according to row and col values of Elements"""
555 return self.row_count * self.column_count
557 @property
558 def occupied_area(self) -> int:
559 """
560 The number of grid cells occupied by cells according to their rowspan and colspan values
561 If rowspans or colspans are set correctly this value will be equal to self.grid_area
562 """
563 return sum(el.cell_area for el in self.elements)
565 @property
566 def is_table_valid(self):
567 return self.grid_area == self.occupied_area
569 @classmethod
570 def build(cls, qdict, strip_ids=False):
571 """
572 Create a QuestionDefinition, complete with QElements, from a
573 a dict datastructure as provided by serial.QuestionDef
574 """
575 if not isinstance(qdict, dict):
576 qdict = qdict.model_dump()
578 qdef = cls(title=qdict["title"])
579 for row_idx, row in enumerate(qdict["elements"], start=1):
580 for col_idx, el in enumerate(row, start=1):
581 el["row"] = row_idx
582 el["col"] = col_idx
583 if strip_ids:
584 el["id"] = None
585 qdef.add_element(el["el_type"], **el)
586 return qdef
589qi_weight_join = "foreign(TotalWeighting.question_instance_id)==QuestionInstance.id"
592class SaveAnswersResult:
593 def __init__(self, is_new, change_list, unanswered_mandatory):
594 self.is_new = is_new
595 self.change_list = change_list
596 self.unanswered_mandatory = unanswered_mandatory
599class QuestionInstance(Base):
600 """
601 @DynamicAttrs
602 """
604 __tablename__ = "question_instances"
606 __table_args__ = ( # type: ignore
607 Index(
608 "unique_question_project",
609 "project_id",
610 "question_id",
611 unique=True,
612 ),
613 ) + Base.__table_args__
615 public_attrs = "id,title,position,section_id,number,url".split(",")
617 section_id: Mapped[int] = mapped_column(
618 "section_id", mysql.INTEGER(10), ForeignKey("sections.id"), nullable=False
619 )
620 question_def_id: Mapped[int] = mapped_column(
621 "question_id",
622 mysql.INTEGER(10),
623 ForeignKey(QuestionDefinition.id),
624 nullable=False,
625 )
626 project_id: Mapped[int] = mapped_column(
627 "project_id", mysql.INTEGER(11), ForeignKey("projects.id"), nullable=False
628 )
629 _weight: Mapped[Optional[Decimal]] = mapped_column(
630 "weight", WEIGHTING_COL_TYPE, server_default=text("'1.0000'"), nullable=True
631 )
632 position: Mapped[int] = mapped_column(
633 "pos", Integer, nullable=False, server_default=text("'0'")
634 )
635 number: Mapped[Optional[NumberString]] = mapped_column(PositionNumber, nullable=True)
637 question_def: Mapped["QuestionDefinition"] = relationship(
638 QuestionDefinition, lazy="joined", innerjoin=True, back_populates="instances"
639 )
640 project: Mapped["Project"] = relationship("Project")
641 section: Mapped["Section"] = relationship("Section", back_populates="questions")
642 parent: Mapped["Section"] = relationship("Section", viewonly=True)
643 _answers: DynamicMapped["Answer"] = relationship(
644 "Answer",
645 lazy="dynamic",
646 cascade="all, delete",
647 passive_deletes=True,
648 back_populates="question_instance",
649 )
651 total_weightings: Mapped["TotalWeighting"] = relationship(
652 "TotalWeighting", primaryjoin=qi_weight_join, backref="question", cascade="all"
653 )
655 meta: Mapped[Optional["QuestionMeta"]] = relationship(
656 "QuestionMeta",
657 uselist=False,
658 back_populates="question_instance",
659 secondary="questions",
660 viewonly=True,
661 )
662 tags: Mapped[list["Tag"]] = relationship(
663 "Tag",
664 secondary="tags_qinstances",
665 back_populates="question_instances",
666 passive_deletes=True,
667 )
669 def __repr__(self):
670 return f"<Question - InstanceId: {self.id}, DefId: {self.question_def_id}>"
672 def __eq__(self, o: object) -> bool:
673 if isinstance(o, QuestionInstance):
674 return self.question_def == o.question_def
675 return False
677 def __hash__(self) -> int:
678 return super().__hash__()
680 def all_answers(self):
681 """All Answers for this question and scoreable issues"""
682 return (
683 self._answers.join(Issue).filter(Issue.scoreable_filter(self.project)).all()
684 )
686 def answers_for_issue(self, issue_id):
687 return self._answers.filter(Answer.issue_id == issue_id)
689 def single_vendor_dict(self, issue):
690 lookup = {a.element_id: a.answer for a in self.answers_for_issue(issue.id)}
691 return {
692 "number": self.safe_number,
693 "id": self.id,
694 "title": self.title,
695 "respondent": issue.respondent.as_dict(),
696 "elements": self.question_def.as_dict(
697 vendor_view=True, answer_lookup=lookup
698 ),
699 }
701 @property
702 def title(self):
703 return self.question_def.title
705 @property
706 def elements(self):
707 return self.question_def.elements
709 @property
710 def weight(self):
711 """
712 Strange behaviour inherited from java land.
713 Returns
714 1) WeightingSet value, if loaded by LoadWeightSetVisitor
715 or
716 2) self._weight if weight defined for this QuestionInstance
717 or, finally
718 3) The weight from this Instances QuestionDefinition
719 """
720 try:
721 return self._weightset_weight
722 except AttributeError:
723 return (
724 self._weight if self._weight is not None else self.question_def.weight
725 )
727 @weight.setter
728 def weight(self, weight_value):
729 """
730 Set the weight on the question definition if the definition is not
731 shared. If the weight has already been set for this instance, carry on using
732 it
733 """
734 if not self.question_def.is_shared:
735 self.question_def.weight = weight_value
736 # Mirrors behaviour in Java land - SqlMapQuestionDao line 322
737 self._weight = weight_value
739 safe_number = SafeNumber()
741 def as_dict(self, answer_lookup=None, vendor_view=False):
742 return {
743 "id": self.id,
744 "title": self.title,
745 "number36": self.number,
746 "number": self.safe_number,
747 "elements": self.question_def.as_dict(
748 answer_lookup=answer_lookup, vendor_view=vendor_view
749 ),
750 "type": "question",
751 }
753 @property
754 def is_autoscored(self):
755 for element in self.elements:
756 if isinstance(element, MultipleChoice) and element.choices:
757 for choice in element.choices:
758 if choice.get("autoscore", None):
759 return True
760 return False
762 def calculate_autoscores(self):
763 """
764 Mapping of issue_id to score, use a list in case of multiple
765 autoscored elements in one question - take the average later
766 """
767 score_map = defaultdict(list)
768 multichoice_elements = [
769 el for el in self.question_def.elements if el.contains_choices
770 ]
771 max_score = self.project.maximum_score
772 issue_filter = Issue.scoreable_filter(self.project)
773 for multichoice in multichoice_elements:
774 mq = multichoice._answers.join(Issue).filter(issue_filter)
775 for answer in mq:
776 for choice in multichoice.choices:
777 if (
778 answer.answer == choice["label"]
779 and choice["autoscore"] is not None
780 ):
781 score_map[answer.issue].append(int(choice["autoscore"]))
783 for issue, score_list in score_map.items():
784 total_score = sum(score_list)
785 score_map[issue] = total_score if total_score <= max_score else max_score
787 return score_map
789 @property
790 def absolute_weight(self):
791 if not hasattr(self, "_absolute_weight"):
792 try:
793 self._absolute_weight = (
794 self.normalised_weight * self.section.absolute_weight
795 )
796 except AttributeError:
797 m = "normalised weights not loaded - run HierarchicalWeightingVisitor"
798 raise WeightingsNotLoadedException(m)
799 return self._absolute_weight
801 @absolute_weight.setter
802 def absolute_weight(self, abs_weight):
803 self._absolute_weight = abs_weight
805 def validate_and_save_answers(self, answer_doc, issue) -> SaveAnswersResult:
806 changes = []
807 unanswered_mandatory = False
808 current_answers = {a.element_id: a for a in self.answers_for_issue(issue.id)}
809 for element in self.question_def.answerable_elements:
810 el_id = element.id
811 if el_id in answer_doc:
812 new_answer = answer_doc.pop(el_id)
813 if new_answer is None:
814 continue
815 element.validate(new_answer)
816 if el_id in current_answers:
817 current_answer = current_answers[el_id]
818 old_answer = current_answer.answer
819 if old_answer != new_answer:
820 current_answer.answer = new_answer
821 changes.append((old_answer, new_answer))
822 else:
823 a = Answer(element_id=el_id, answer=new_answer, issue_id=issue.id)
824 self._answers.append(a)
825 changes.append((None, new_answer))
826 else:
827 if element.mandatory and el_id not in current_answers:
828 unanswered_mandatory = True
830 if len(answer_doc) > 0:
831 tmpl = "Unable to match all answers with element IDs: {}"
832 ids = ", ".join(str(k) for k in answer_doc.keys())
833 raise ValidationFailure(tmpl.format(ids))
835 is_new = len(current_answers) == 0
837 return SaveAnswersResult(is_new, changes, unanswered_mandatory)
840class QElement(Base):
841 __tablename__ = "question_elements"
842 __table_args__ = ( # type: ignore
843 Index(
844 "question_elements_qId_row_col",
845 "question_id",
846 "row",
847 "col",
848 ),
849 Index("ft_label", "label", mysql_prefix="FULLTEXT"),
850 ) + Base.__table_args__
852 __mapper_args__ = {"polymorphic_on": "el_type", "polymorphic_identity": ""}
854 is_answerable = True
856 contains_choices = False
858 public_attrs = ("id,el_type,colspan,rowspan,label,mandatory,regexp,col,row").split(
859 ","
860 )
862 answerable_types = {"TX", "CR", "CC", "CB", "AT"}
864 question_id: Mapped[int] = mapped_column(
865 ForeignKey("questions.id", ondelete="CASCADE"),
866 nullable=False,
867 index=True,
868 server_default=text("'0'"),
869 )
870 row: Mapped[int] = mapped_column(
871 mysql.INTEGER(10), nullable=False, default=1, server_default=text("'0'")
872 )
873 col: Mapped[int] = mapped_column(
874 mysql.INTEGER(10), nullable=False, default=1, server_default=text("'0'")
875 )
876 el_type: Mapped[str] = mapped_column(
877 "type", mysql.CHAR(length=2), nullable=False, server_default=text("''")
878 )
879 colspan: Mapped[Optional[int]] = mapped_column(Integer, default=1, nullable=True)
880 rowspan: Mapped[Optional[int]] = mapped_column(Integer, default=1, nullable=True)
881 label: Mapped[Optional[str]] = mapped_column(mysql.MEDIUMTEXT(), nullable=True)
882 mandatory: Mapped[Optional[bool]] = mapped_column(Boolean, default=False, nullable=True)
883 height: Mapped[Optional[int]]
884 width: Mapped[Optional[int]]
885 multitopic: Mapped[Optional[bool]] = mapped_column(Boolean, default=False, nullable=True)
886 regexp: Mapped[Optional[str]] = mapped_column(Unicode(255))
887 choices: Mapped[Optional[dict]] = mapped_column(JSON, nullable=True)
889 question_def: Mapped["QuestionDefinition"] = relationship(
890 "QuestionDefinition", back_populates="elements"
891 )
892 answers: Mapped[list["Answer"]] = relationship("Answer", back_populates="element")
893 _answers: DynamicMapped["Answer"] = relationship(
894 "Answer", lazy="dynamic", viewonly=True
895 )
897 def __eq__(self, o: object) -> bool:
898 if isinstance(o, QElement):
899 equal: bool = True
900 for k in el_keys:
901 if (
902 hasattr(self, k)
903 and hasattr(o, k)
904 and getattr(o, k) == getattr(self, k)
905 ):
906 continue
907 else:
908 equal = False
909 break
910 return equal
911 return False
913 def __hash__(self) -> int:
914 return super().__hash__()
916 def get_answer(self, issue):
917 """Get the answer for this element and `issue` or None if not yet answered"""
918 return self._answers.filter(Answer.issue == issue).one()
920 def get_question_instance(self, project_id):
921 return self.question_def.instances.filter_by(project_id=project_id).one()
923 @staticmethod
924 def split_choice_string(choice_string):
925 """
926 Split a string representation of a multiple choice option,
927 e.g. 'Yes <8>' into a tuple - ('Yes', 8)
928 """
929 regex_match = re.match(r"([^<]+)<(\d+)>", choice_string)
930 if regex_match:
931 label, autoscore = regex_match.groups()
932 return (label.strip(), autoscore.strip())
933 else:
934 return (choice_string.strip(), None)
936 @classmethod
937 def build(cls, type_name, **kwargs):
938 """Create a QElement subclass instance of the given type_name"""
939 mapper = inspect(cls).polymorphic_map[type_name]
940 el = mapper.class_(**kwargs)
941 return el
943 @property
944 def cell_area(self):
945 return self.colspan * self.rowspan
947 @property
948 def summary(self):
949 """For describing this element in audit event change log"""
950 return self.label
953class Label(QElement):
954 __mapper_args__ = {"polymorphic_identity": "LB"}
955 is_answerable = False
956 public_attrs = "id,el_type,label,colspan,rowspan".split(",")
959class Checkbox(QElement):
960 __mapper_args__ = {"polymorphic_identity": "CB"}
961 public_attrs = "id,el_type,label,colspan,rowspan".split(",")
963 def validate(self, answer):
964 if answer not in ("true", "false"):
965 m = f"Answer for checkbox element {self.id} must be 'true' or 'false'"
966 raise ValidationFailure(m)
969class TextInput(QElement):
970 __mapper_args__ = {"polymorphic_identity": "TX"}
972 def as_dict(self):
973 return {
974 "id": self.id,
975 "el_type": self.el_type,
976 "colspan": self.colspan,
977 "rowspan": self.rowspan,
978 "height": self.height,
979 "width": self.width,
980 "mandatory": self.mandatory,
981 "regexp": self.regexp,
982 }
984 def validate(self, answer):
985 if self.regexp and not re.match(self.regexp, answer):
986 tmpl = "Answer {:.10} does not match regular expression {} for element {}"
987 raise ValidationFailure(tmpl.format(self.regexp, answer, self.id))
989 @property
990 def summary(self):
991 return f"[{self.width} X {self.height}]"
994class MultipleChoice(object):
995 """
996 The choices property on QElement is used by RadioChoices (CR) and SelectChoices(CC)
997 subclasses. The property is a JSON field containing an array of label:autoscore objects:
998 [
999 {label: 'Yes', autoscore: 10},
1000 {label: 'No', autoscore: 0}
1001 ]
1003 """
1005 contains_choices = True
1006 # `choices` attribute is set by QuestionDefinition.elements()
1008 def as_dict(self, vendor_view=False):
1009 if vendor_view:
1010 choices = [{"label": c["label"]} for c in self.choices]
1011 else:
1012 choices = self.choices
1013 return {
1014 "id": self.id,
1015 "el_type": self.el_type,
1016 "colspan": self.colspan,
1017 "rowspan": self.rowspan,
1018 "mandatory": self.mandatory,
1019 "choices": choices,
1020 }
1022 def validate(self, answer):
1023 choice_vals = {c["label"] for c in self.choices}
1024 if answer not in choice_vals:
1025 tmpl = (
1026 "Answer '{:.10}' is not one of the accepted values: [{}] for element {}"
1027 )
1028 raise ValidationFailure(tmpl.format(answer, ",".join(choice_vals), self.id))
1030 @property
1031 def summary(self):
1032 if self.choices:
1033 return "\n".join(
1034 f"{c['label']}: {c.get('autoscore', None)}" for c in self.choices
1035 )
1036 return ""
1039class SelectChoices(MultipleChoice, QElement):
1040 __mapper_args__ = {"polymorphic_identity": "CC"} # 'ChoiceCombo'
1043class RadioChoices(MultipleChoice, QElement):
1044 __mapper_args__ = {"polymorphic_identity": "CR"}
1047class QuestionAttachment(QElement):
1048 """An attachment added by the Buyer to the Question"""
1050 __mapper_args__ = {"polymorphic_identity": "QA"}
1051 is_answerable = False
1053 public_attrs = "id,el_type,label,colspan,rowspan".split(",")
1055 attachment: Mapped[Optional["QAttachment"]] = relationship(
1056 "QAttachment", uselist=False
1057 )
1060class SupportingAttachment(QElement):
1061 """
1062 A File Input form element enabling a Respondent to upload an attachment
1063 as part of an answer to to a Question
1064 """
1066 __mapper_args__ = {"polymorphic_identity": "AT"}
1067 public_attrs = "id,el_type,colspan,rowspan,mandatory".split(",")
1069 def validate(self, answer):
1070 if not isinstance(answer, str):
1071 raise ValidationFailure(
1072 "SupportingAttachments answer should be the filename and size"
1073 )
1075 @property
1076 def summary(self):
1077 return "File upload field"
1080class ExternalMedia(QElement):
1081 """A link to an external resource, e.g. video"""
1083 __mapper_args__ = {"polymorphic_identity": "MD"}
1086class QAttachment(AttachmentMixin, Base):
1087 __tablename__ = "question_attachments"
1089 public_attrs = ("id,size,filename,url").split(",")
1090 element_id = mapped_column(Integer, ForeignKey("question_elements.id"))
1092 element = relationship(QElement, viewonly=True)
1095class Answer(Base):
1096 __tablename__ = "answers"
1097 __table_args__ = (
1098 Index(
1099 "unique_issue_question_element_answer",
1100 "issue_id",
1101 "question_instance_id",
1102 "element_id",
1103 unique=True,
1104 ),
1105 Index("ft_answer", "answer", mysql_prefix="FULLTEXT"),
1106 {"mysql_engine": "InnoDB", "mysql_charset": "utf8mb4"},
1107 )
1109 issue_id: Mapped[int] = mapped_column(
1110 Integer,
1111 ForeignKey("issues.id", name="answers_ibfk_1", ondelete="CASCADE"),
1112 nullable=False,
1113 default=0,
1114 index=True,
1115 )
1117 answer: Mapped[str] = mapped_column(mysql.LONGTEXT(), nullable=True)
1119 element_id: Mapped[int] = mapped_column(
1120 Integer,
1121 ForeignKey("question_elements.id", name="answers_ibfk_2"),
1122 nullable=False,
1123 index=True,
1124 server_default=text("'0'"),
1125 )
1126 autoscore: Mapped[Optional[int]] = mapped_column(mysql.INTEGER(11))
1127 question_instance_id: Mapped[int] = mapped_column(
1128 Integer,
1129 ForeignKey("question_instances.id", ondelete="CASCADE", name="answers_ibfk_3"),
1130 nullable=False,
1131 index=True,
1132 )
1134 issue: Mapped["Issue"] = relationship("Issue", back_populates="answers")
1136 element: Mapped["QElement"] = relationship(QElement, back_populates="answers")
1138 question_instance: Mapped["QuestionInstance"] = relationship(
1139 QuestionInstance, back_populates="_answers", uselist=False
1140 )
1141 attachment: Mapped["AAttachment"] = relationship(
1142 "AAttachment",
1143 back_populates="answer",
1144 passive_deletes=True,
1145 uselist=False,
1146 )
1148 def as_dict(self):
1149 return {"answer_id": self.id, "issue_id": self.issue_id, "answer": self.answer}
1151 def __repr__(self):
1152 return f"Answer ID {self.id}, Issue: {self.issue_id}, QuestionInstance: {self.question_instance_id}"
1155class AnswerReport(Base):
1156 __tablename__ = "answer_reports"
1157 __table_args__ = {"mysql_engine": "InnoDB", "mysql_charset": "utf8mb4"}
1159 id: Mapped[int] = mapped_column(Integer, primary_key=True)
1160 project_id: Mapped[int] = mapped_column(
1161 ForeignKey("projects.id", ondelete="CASCADE", name="answer_reports_project_id"),
1162 nullable=False,
1163 index=True,
1164 )
1165 title: Mapped[str] = mapped_column(Unicode(100), nullable=False)
1166 definition: Mapped[str] = mapped_column(mysql.MEDIUMTEXT(), nullable=False)
1167 project: Mapped["Project"] = relationship(
1168 "Project", back_populates="answer_reports"
1169 )
1171 def __repr__(self):
1172 return f"<AnswerReport {self.id} - {self.title}>"
1175class ResponseStatus(Enum):
1176 NOT_ANSWERED = 0
1177 ANSWERED = 10
1178 FOR_REVIEW = 20
1179 REJECTED = 30
1180 APPROVED = 40
1183class ResponseStatusCol(types.TypeDecorator):
1184 """
1185 A custom SQLAlchemy type that maps database integer
1186 values to ResponseStatus Enum values
1187 """
1189 impl = mysql.TINYINT(4)
1191 cache_ok = True
1193 def process_bind_param(self, response_status_enum, dialect):
1194 return response_status_enum.value
1196 def process_result_value(self, int_value, dialect):
1197 return ResponseStatus(int_value)
1200class QuestionResponseState(Base):
1201 __tablename__ = "question_response_states"
1202 __table_args__ = (
1203 Index("unique_resp_state", "issue_id", "question_instance_id", unique=True),
1204 {"mysql_engine": "InnoDB", "mysql_charset": "utf8mb4"},
1205 )
1206 public_attrs = [
1207 "status",
1208 "allocated_by",
1209 "allocated_to",
1210 "approved_by",
1211 "updated_by",
1212 "date_updated",
1213 ]
1215 id: Mapped[int] = mapped_column(Integer, primary_key=True)
1217 issue_id: Mapped[int] = mapped_column(
1218 ForeignKey("issues.id", ondelete="CASCADE"),
1219 nullable=False,
1220 index=True,
1221 server_default=text("'0'"),
1222 )
1224 question_instance_id: Mapped[int] = mapped_column(
1225 ForeignKey("question_instances.id", ondelete="CASCADE"),
1226 nullable=False,
1227 index=True,
1228 server_default=text("'0'"),
1229 )
1231 status: Mapped[ResponseStatus] = mapped_column(
1232 ResponseStatusCol, nullable=False, server_default=text("'0'")
1233 )
1235 allocated_by: Mapped[Optional[str]] = mapped_column(
1236 types.VARCHAR(length=50), ForeignKey("users.id", ondelete="SET NULL")
1237 )
1238 allocated_to: Mapped[Optional[str]] = mapped_column(
1239 types.VARCHAR(length=50), ForeignKey("users.id", ondelete="SET NULL")
1240 )
1241 approved_by: Mapped[Optional[str]] = mapped_column(
1242 types.VARCHAR(length=50), ForeignKey("users.id", ondelete="SET NULL")
1243 )
1244 updated_by: Mapped[Optional[str]] = mapped_column(
1245 types.VARCHAR(length=50), ForeignKey("users.id", ondelete="SET NULL")
1246 )
1248 date_updated: Mapped[Optional[datetime]] = mapped_column(DateTime)
1250 issue: Mapped["Issue"] = relationship(
1251 "Issue",
1252 back_populates="response_states",
1253 )
1254 question_instance: Mapped["QuestionInstance"] = relationship("QuestionInstance")
1257class AAttachment(AttachmentMixin, Base):
1258 __tablename__ = "answer_attachments"
1260 public_attrs = (
1261 "id,size,filename,url,respondent,question_number,question_id"
1262 ).split(",")
1263 answer_id: Mapped[Optional[int]] = mapped_column(
1264 Integer, ForeignKey("answers.id", ondelete="SET NULL"), unique=True
1265 )
1267 answer: Mapped[Optional["Answer"]] = relationship(
1268 Answer,
1269 uselist=False,
1270 back_populates="attachment",
1271 )
1273 def __repr__(self) -> str:
1274 return f"<AAttachment #{self.id} - {self.filename}>"
1277class WeightingSet(Base):
1278 __tablename__ = "weighting_sets"
1280 public_attrs = ("id", "name")
1281 name: Mapped[Optional[str]] = mapped_column(Unicode(255), default=None)
1282 project_id: Mapped[int] = mapped_column(
1283 Integer, ForeignKey("projects.id", ondelete="CASCADE"), nullable=False
1284 )
1286 project: Mapped["Project"] = relationship(
1287 "Project", back_populates="weighting_sets"
1288 )
1290 weightings: DynamicMapped["Weighting"] = relationship(
1291 "Weighting",
1292 back_populates="weighting_set",
1293 lazy="dynamic",
1294 cascade="all,delete",
1295 passive_deletes=True,
1296 )
1298 def __repr__(self):
1299 return f'Weighting Set ID: {self.id} "{self.name}"'
1301 def lookup_table(self):
1302 q_lookup = {}
1303 sec_lookup = {}
1304 for weight in self.weightings.with_entities(
1305 Weighting.section_id, Weighting.question_instance_id, Weighting.value
1306 ):
1307 if weight.section_id:
1308 sec_lookup[weight.section_id] = weight.value
1309 else:
1310 q_lookup[weight.question_instance_id] = weight.value
1311 return {"questions": q_lookup, "sections": sec_lookup}
1314class Weighting(Base):
1315 __tablename__ = "weightings"
1317 weighting_set_id: Mapped[int] = mapped_column(
1318 ForeignKey("weighting_sets.id", ondelete="CASCADE"),
1319 nullable=False,
1320 )
1321 question_instance_id: Mapped[Optional[int]] = mapped_column(
1322 ForeignKey("question_instances.id", ondelete="CASCADE")
1323 )
1324 section_id: Mapped[Optional[int]] = mapped_column(
1325 ForeignKey("sections.id", ondelete="CASCADE")
1326 )
1327 value: Mapped[Decimal] = mapped_column(
1328 WEIGHTING_COL_TYPE, server_default=text("'1.0000'"), nullable=False
1329 )
1331 question: Mapped[Optional["QuestionInstance"]] = relationship(QuestionInstance)
1332 section: Mapped[Optional["Section"]] = relationship("Section")
1333 weighting_set: Mapped["WeightingSet"] = relationship(
1334 WeightingSet, back_populates="weightings"
1335 )
1337 def __repr__(self):
1338 return f"<Weighting {self.id} - {self.value}>"
1341class TotalWeighting(Base):
1342 """
1343 The total_weightings table is essentially a cache. It saves the absolute, normalised weight
1344 values for question instances and sections for a given weighting_set_id. Null weighting_set_id
1345 indicates the default weighting set.
1347 Absolute weights are derived hierachically - the entire questionnaire needs to be loaded
1348 as a tree structure in order to figure out these weights. e.g. question weight depends
1349 on the weights of parent sections. It is therefore expense to derive on the fly - hence
1350 the requirement for this table.
1352 The cache is recalculated when weightings are saved
1353 """
1355 __tablename__ = "total_weightings"
1356 __table_args__ = (
1357 Index(
1358 "weightings",
1359 "project_id",
1360 "weighting_set_id",
1361 "question_instance_id",
1362 "section_id",
1363 unique=True,
1364 ),
1365 {"mysql_engine": "InnoDB", "mysql_charset": "utf8mb4"},
1366 )
1368 project_id: Mapped[int] = mapped_column(
1369 ForeignKey("projects.id", ondelete="CASCADE", name="total_weightings_wset"),
1370 nullable=False,
1371 )
1372 weighting_set_id: Mapped[Optional[int]] = mapped_column(
1373 Integer, nullable=True, default=None
1374 )
1375 question_instance_id: Mapped[int] = mapped_column(
1376 Integer, nullable=False, server_default=text("'0'")
1377 )
1378 section_id: Mapped[int] = mapped_column(
1379 Integer, nullable=False, server_default=text("'0'")
1380 )
1381 weight: Mapped[Optional[float]] = mapped_column(mysql.FLOAT(asdecimal=True, scale=7), nullable=True)
1383 project: Mapped["Project"] = relationship(
1384 "Project",
1385 back_populates="total_weightings",
1386 )
1388 def __repr__(self):
1389 return f"<TotalWeighting {self.id} - {self.weight}>"
1392"""
1393TODO
1394- add column. One for normalised_weight, one for absolute_weight
1395 - Rename index in DB Schema to avoid name clash with "weightings" table
1396 - For API use, provide one weighting set at a time, load others on demand.
1397 - or provide weightings as a lookup dict in JS
1398"""
1401class LoadWeightSetVisitor(Visitor):
1402 """
1403 Updates Question and Section weights in the questionnaire with values
1404 from the provided weighting_set
1405 """
1407 def __init__(self, weighting_set):
1408 super(Visitor, self).__init__()
1409 weight_lookup = weighting_set.lookup_table()
1410 self.question_lookup = weight_lookup["questions"]
1411 self.section_lookup = weight_lookup["sections"]
1413 def hello_section(self, sec):
1414 sec._weightset_weight = self.section_lookup[sec.id]
1416 def visit_question(self, question):
1417 question._weightset_weight = self.question_lookup[question.id]
1420class HierarchyWeightingsVisitor(Visitor):
1421 """
1422 Sets normalised weights (percentage weight within sub-tree)
1423 for all sections and questions
1424 """
1426 def hello_section(self, sec):
1427 if sec.is_top_level:
1428 sec.normalised_weight = Decimal(1)
1429 self._set_subsection_normalised_weights(sec)
1430 self._set_question_normalised_weights(sec)
1432 def _set_subsection_normalised_weights(self, sec):
1433 subtree_total = sum(s.weight for s in sec.subsections)
1435 if subtree_total == 0:
1436 # Avoid divide by zero errors
1437 for subsec in sec.subsections:
1438 subsec.normalised_weight = Decimal(0)
1439 else:
1440 for subsec in sec.subsections:
1441 subsec.normalised_weight = Decimal(subsec.weight) / subtree_total
1443 def _set_question_normalised_weights(self, sec):
1444 q_total = sum(q.weight for q in sec.questions)
1446 if q_total == 0:
1447 # Avoid divide by zero errors
1448 for q in sec.questions:
1449 q.normalised_weight = Decimal(0)
1450 else:
1451 for q in sec.questions:
1452 q.normalised_weight = Decimal(q.weight) / q_total
1455class SaveTotalWeightingsVisitor(Visitor):
1456 """
1457 Saves TotalWeighting objects for the given project and weighting_set,
1458 should be invoked after HierarchyWeightingsVisitor has had a chance
1459 to save normalised weights
1460 """
1462 def __init__(self, session, project, weighting_set_id=None):
1463 super(Visitor, self).__init__()
1464 self.session = session
1465 self.project_id = project.id
1466 self.weighting_set_id = weighting_set_id
1467 self.total_weightings = []
1469 def hello_section(self, sec):
1470 tw = dict(
1471 project_id=self.project_id,
1472 weight=sec.absolute_weight,
1473 section_id=sec.id,
1474 weighting_set_id=self.weighting_set_id,
1475 )
1476 self.total_weightings.append(tw)
1478 def visit_question(self, question):
1479 tw = dict(
1480 project_id=self.project_id,
1481 weight=question.absolute_weight,
1482 question_instance_id=question.id,
1483 weighting_set_id=self.weighting_set_id,
1484 )
1486 self.total_weightings.append(tw)
1488 def finalise(self):
1489 self.session.bulk_insert_mappings(TotalWeighting, self.total_weightings)