Coverage for rfpy/model/questionnaire.py: 99%
639 statements
« prev ^ index » next coverage.py v7.0.1, created at 2022-12-31 16:00 +0000
« prev ^ index » next coverage.py v7.0.1, created at 2022-12-31 16:00 +0000
1from collections import defaultdict
2from operator import attrgetter
3from typing import Dict
4from decimal import Decimal
5from enum import Enum, IntEnum
6import logging
7import re
9from sqlalchemy import (
10 Column,
11 Unicode,
12 Boolean,
13 Integer,
14 JSON,
15 ForeignKey,
16 Index,
17 text,
18 DateTime,
19 and_,
20 func,
21 inspect,
22)
23from sqlalchemy.orm import relationship, backref, remote, foreign, validates
24import sqlalchemy.types as types
25from sqlalchemy.dialects import mysql
28from rfpy.model.meta import Base, AttachmentMixin, Visitor
29from .issue import Issue
31from .exc import ValidationFailure, QuestionnaireStructureException, WeightingsNotLoadedException
33"""
34Note - default SQL JOIN / load strategy is defined by arguments
35to 'relationship'
37Lots of joins involved when loading questionnaires. Current strategy is:
39Section -> questions (QuestionInstance): 'dynamic' - doesn't join by default,
40 returns a query
42QuestionInstance -> QuestionDefinition: 'joined' - eager loading / inner join
43QuestionDefinition -> [QElement] : 'joined' - eager loading by inner join
45For loading large collections (e.g. full questionnaire), these loading
46strategies are overridden, e.g. loading question elements in a subquery load
47"""
49log = logging.getLogger(__name__)
52get_el_type = attrgetter("el_type")
53get_cell = attrgetter("cell")
54el_keys = ["col", "colspan", "row", "rowspan", "el_type", "label",
55 "mandatory", "height", "width", "multitopic", "regexp", "choices"]
58def to_b36(string_number):
59 """
60 Converts a position number (e.g. 23.1.8) into a base36 string (0N0108)
61 """
62 if not string_number:
63 return string_number
64 return "".join(base36encode(int(el)) for el in string_number.split("."))
67_b36_cache: Dict[int, str] = dict()
70def base36encode(number: int):
71 """
72 Converts an integer into a base36 string two characters long
74 @param number - an int or long between 0 and 1296
75 """
76 initial_value = number
77 if number not in _b36_cache:
78 if not isinstance(number, int):
79 raise TypeError("number must be an integer")
80 if not 0 < number < 1296:
81 raise ValueError("number must in range 1 - 1295, received %s" % number)
83 alphabet = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ"
84 base36 = ""
85 while number:
86 number, i = divmod(number, 36)
87 base36 = alphabet[i] + base36
88 _b36_cache[initial_value] = base36.rjust(2, "0")
90 return _b36_cache[initial_value]
93_number_cache: Dict[str, str] = dict()
95WEIGHTING_COL_TYPE = mysql.DECIMAL(precision=15, scale=4)
98def from_b36(db_string):
99 """
100 Converts a base36 encoded position string into human readable form,
101 e.g. '0N0108' -> 23.1.18.
103 Called by every row for a question or section query, and surprisingly expensive
104 For fetch question number and id for 1,500 this function accounts for 50% of
105 the runtime.
107 To eliminate this a cache is used to store values.
108 """
109 if db_string not in _number_cache:
110 if not db_string:
111 return db_string
112 _number_cache[db_string] = ".".join(
113 "%s" % int(db_string[x: x + 2], 36) for x in range(0, len(db_string), 2)
114 )
115 return _number_cache[db_string]
118class ImportType(IntEnum):
119 SHARE = 0
120 COPY = 10
123class NumberString(str):
124 def __init__(self, raw_value):
125 self.raw_value = raw_value
127 @property
128 def dotted(self):
129 return from_b36(self.raw_value)
131 def next_increment(self):
132 parts = self.dotted.split(".")
133 parts[-1] = str(int(parts[-1]) + 1)
134 return NumberString(to_b36(".".join(p for p in parts if p)))
136 def first_child(self):
137 parts = self.dotted.split(".")
138 parts.append("1")
139 new_dotted = ".".join(p for p in parts if p)
140 b36ed = to_b36(new_dotted)
141 return NumberString(b36ed)
143 @classmethod
144 def from_dotted(cls, value):
145 return cls(to_b36(value))
147 @classmethod
148 def visible_relatives_regex(cls, section_number):
149 '''
150 Create a regular expression for finding ancestor and ancestor sibling sections for
151 the given section number. This is useful for returning a subset of the questionnaire tree
152 necessary for rendering a tree menu without returning the entire questionnaire.
154 '''
155 # Build regex by processing each 2-character chunk of the section Number
156 # starting from the parent of the section given by section_number
157 parent_index = len(section_number)
158 n = section_number
159 # For each chunk build a regex similar to ^02\w{2}$ - i.e. find immediate offspring
160 # for each chunk
161 rels_regex_list = ["^" + n[0:i+2] + r'.{2}$' for i in range(0, parent_index, 2)]
163 # The root section has no number so root + immediate children must be special-cased:
164 rels_regex_list.append(r"^.{0,2}$")
166 return r"|".join(rels_regex_list)
169class SafeNumber:
171 """
172 A Property that returns either number.dotted
173 or just number, depending on whether the instance's number
174 attribute is an instance of NumberString or not
175 """
177 def __get__(self, obj, cls=None):
178 if isinstance(obj.number, NumberString):
179 return obj.number.dotted
180 return obj.number
182 def __set__(self, obj, val):
183 if isinstance(val, NumberString):
184 obj.number = val
185 return
186 if isinstance(val, str):
187 obj.number = NumberString(to_b36(val))
190class PositionNumber(types.TypeDecorator):
192 """
193 A custom SQLAlchemy type that translates unicode strings
194 between display numbers like 1.2.3 and
195 suppplierselect's internal database equivalent ('010203')
196 """
198 impl = types.Unicode(30)
200 cache_ok = True
202 def process_bind_param(self, value, dialect):
203 return value
205 def process_result_value(self, value, dialect):
206 return NumberString(value)
208 def copy(self):
209 return PositionNumber(self.impl.length)
212class Section(Base):
213 __tablename__ = "sections"
215 id = Column(mysql.INTEGER(10), primary_key=True)
216 title = Column(mysql.VARCHAR(255), nullable=False)
217 number = Column(PositionNumber, default="")
218 description = Column(mysql.LONGTEXT())
219 _weight = Column(
220 "weight",
221 WEIGHTING_COL_TYPE,
222 server_default=text("'1.0000'")
223 )
224 project_id = Column(mysql.INTEGER(11), ForeignKey("projects.id"), nullable=False)
225 parent_id = Column(mysql.INTEGER(10), ForeignKey("sections.id", ondelete='CASCADE'))
226 position = Column("pos", Integer, default=0, nullable=False)
228 subsections = relationship("Section", order_by="Section.position")
230 project = relationship("Project", primaryjoin="Section.project_id==Project.id",
231 back_populates='sections')
233 questions = relationship(
234 "QuestionInstance",
235 order_by="QuestionInstance.position",
236 back_populates='section'
237 )
239 parent: 'Section' = relationship('Section', remote_side=[id], back_populates='subsections')
241 questions_query = relationship(
242 "QuestionInstance", lazy="dynamic", order_by="QuestionInstance.position", viewonly=True
243 )
245 perms = relationship("SectionPermission", lazy="dynamic")
247 descendants = relationship(
248 "Section",
249 viewonly=True,
250 order_by=number,
251 primaryjoin=and_(
252 remote(foreign(number)).startswith(number),
253 remote(foreign(project_id)) == project_id,
254 remote(foreign(id)) != id,
255 ),
256 )
258 safe_number = SafeNumber()
260 ancestors = relationship(
261 "Section",
262 viewonly=True,
263 order_by=number,
264 primaryjoin=and_(
265 remote(foreign(number))
266 == func.LEFT(number, func.LENGTH(remote(foreign(number)))),
267 remote(foreign(project_id)) == project_id,
268 remote(foreign(id)) != id
269 ),
270 )
272 def __repr__(self):
273 return "<Sec id:%s Num:%s Parent: %s - %s>" % (
274 self.id,
275 self.number,
276 self.parent_id,
277 self.title,
278 )
280 def as_dict(self):
281 return {
282 "id": self.id,
283 "type": "section",
284 "title": self.title,
285 "description": self.description,
286 "number": self.safe_number,
287 "number36": self.number,
288 "parent_id": self.parent_id,
289 "subsections": [],
290 "questions": [],
291 }
293 def renumber(self, new_dotted_number=""):
294 """
295 Set number and recursively set number for all children based on order_by
296 on relationships (position attribute).
297 """
298 self.safe_number = new_dotted_number
299 fmt = "%s.%s" if new_dotted_number else "%s%s"
300 i = 1
301 for idx, subsection in enumerate(self.subsections):
302 if subsection.id == self.id:
303 raise QuestionnaireStructureException(f"Section {self.id} cannot be its own parent")
305 sec_num = fmt % (new_dotted_number, i)
306 subsection.position = idx
307 subsection.renumber(sec_num)
308 i += 1
310 for idx, question in enumerate(self.questions):
311 q_num = fmt % (new_dotted_number, i)
312 question.safe_number = q_num
313 question.position = idx
314 i += 1
316 def accept(self, visitor):
317 """
318 Call hello_section, visit_question(s), goodbye_section
319 on the passed visitor and recurse to questions and subsections
320 """
321 visitor.hello_section(self)
323 # check if method is implemented in visitor's own class, not in parent
324 # i.e. only load questions if the visitor has its own implementation
325 # of this method
326 if "visit_question" in visitor.__class__.__dict__:
327 for q in self.questions:
328 visitor.visit_question(q)
330 for subsec in self.subsections:
331 subsec.accept(visitor)
333 visitor.goodbye_section(self)
335 @property
336 def is_top_level(self):
337 return self.parent_id is None
339 @property
340 def weight(self):
341 try:
342 return self._weightset_weight
343 except AttributeError:
344 return self._weight
346 @weight.setter
347 def weight(self, weight):
348 self._weight = weight
350 @property
351 def absolute_weight(self):
352 if self.is_top_level:
353 return Decimal(1)
354 try:
355 return self._absolute_weight
356 except AttributeError:
357 try:
358 nm = self.normalised_weight
359 except AttributeError:
360 m = 'normalised weights not loaded - run HierarchicalWeightingVisitor'
361 raise WeightingsNotLoadedException(m)
362 self._absolute_weight = nm * self.parent.absolute_weight
363 return self._absolute_weight
365 @absolute_weight.setter
366 def absolute_weight(self, abs_weight):
367 self._absolute_weight = abs_weight
369 @validates("questions", "subsections")
370 def check_position(self, attr_name, instance):
371 collection = getattr(self, attr_name)
373 if instance.position is None:
374 new_position = 0
375 if len(collection) > 0:
376 previous = collection[-1]
377 if previous.position is not None:
378 new_position = previous.position + 1
379 instance.position = new_position
381 if instance.number is None:
382 if len(collection) > 0:
383 previous = collection[-1]
384 if hasattr(previous.number, "next_increment"):
385 instance.number = previous.number.next_increment()
386 else:
387 if hasattr(self.number, "first_child"):
388 instance.number = self.number.first_child()
390 if instance.project_id is None:
391 instance.project_id = self.project_id
393 return instance
396class QuestionDefinition(Base):
397 __tablename__ = "questions"
399 title = Column(mysql.MEDIUMTEXT(), nullable=False)
400 weight = Column(WEIGHTING_COL_TYPE, server_default=text("'1.0000'"))
401 parent_id = Column(Integer, index=True)
402 refcount = Column(
403 "referenceCount", Integer, nullable=False, server_default=text("'1'")
404 )
405 elements = relationship(
406 "QElement",
407 lazy="joined",
408 order_by="QElement.row,QElement.col",
409 cascade='all, delete-orphan',
410 passive_deletes=True,
411 back_populates='question_def'
412 )
414 _elements = relationship("QElement", lazy="dynamic", viewonly=True)
416 instances = relationship("QuestionInstance", lazy="dynamic", back_populates='question_def')
418 def __repr__(self):
419 return f'<QDef Id: {self.id}, Title: "{self.title}">'
421 def __eq__(self, o: object) -> bool:
422 if isinstance(o, QuestionDefinition):
423 src_elements = sorted(self.elements, key=lambda e: [getattr(e, k) for k in el_keys])
424 des_elements = sorted(o.elements, key=lambda e: [getattr(e, k) for k in el_keys])
425 return self.title == o.title and src_elements == des_elements
426 return False
428 def __hash__(self) -> int:
429 return super().__hash__()
431 @property
432 def is_shared(self):
433 if self.refcount is None:
434 return False
435 return self.refcount > 1
437 def get_element(self, element_id):
438 return self._elements.filter(QElement.id == element_id).one()
440 def as_dict(self, answer_lookup=None, vendor_view=False):
441 """
442 Returns a list of question element lists, each inner list corresponding
443 to a row in a question table
445 Groups elements into rows - returning a list of lists
447 """
448 row_list = []
449 current_row = -1
450 for el in self.elements:
452 if el.row > current_row:
453 row_list.append([])
454 current_row = el.row
456 if el.contains_choices:
457 el_dict = el.as_dict(vendor_view=vendor_view)
458 else:
459 el_dict = el.as_dict()
461 if el.is_answerable and answer_lookup is not None:
462 el_dict["answer"] = answer_lookup.get(el.id, None)
464 row_list[-1].append(el_dict)
466 return row_list
468 @property
469 def answerable_elements(self):
470 return [e for e in self.elements if e.is_answerable]
472 def add_element(self, el_name, **kwargs):
473 el = QElement.build(el_name, **kwargs)
474 self.elements.append(el)
475 return el
477 def is_tabular(self):
478 return self.column_count != 1
480 @property
481 def column_count(self):
482 if not self.elements:
483 return 0
484 return max(el.col for el in self.elements)
486 @property
487 def row_count(self):
488 if not self.elements:
489 return 0
490 return max(el.row for el in self.elements)
492 @property
493 def grid_area(self) -> int:
494 '''Number of available grid cells available according to row and col values of Elements'''
495 return self.row_count * self.column_count
497 @property
498 def occupied_area(self) -> int:
499 '''
500 The number of grid cells occupied by cells according to their rowspan and colspan values
501 If rowspans or colspans are set correctly this value will be equal to self.grid_area
502 '''
503 return sum(el.cell_area for el in self.elements)
505 @property
506 def is_table_valid(self):
507 return self.grid_area == self.occupied_area
509 @classmethod
510 def build(cls, qdict, strip_ids=False):
511 """
512 Create a QuestionDefinition, complete with QElements, from a
513 a dict datastructure as provided by serial.QuestionDef
514 """
515 qdef = cls(title=qdict["title"])
516 for row_idx, row in enumerate(qdict["elements"], start=1):
517 for col_idx, el in enumerate(row, start=1):
518 el['row'] = row_idx
519 el['col'] = col_idx
520 if strip_ids:
521 el['id'] = None
522 qdef.add_element(el["el_type"], **el)
523 return qdef
526qi_weight_join = "foreign(TotalWeighting.question_instance_id)==QuestionInstance.id"
529class SaveAnswersResult:
530 def __init__(self, is_new, change_list, unanswered_mandatory):
531 self.is_new = is_new
532 self.change_list = change_list
533 self.unanswered_mandatory = unanswered_mandatory
536class QuestionInstance(Base):
538 """
539 @DynamicAttrs
540 """
542 __tablename__ = "question_instances"
544 __table_args__ = (
545 Index(
546 "unique_question_project",
547 "project_id",
548 "question_id",
549 unique=True,
550 ),
551 ) + Base.__table_args__
553 public_attrs = "id,title,position,section_id,number,url".split(",")
555 section_id = Column("section_id", mysql.INTEGER(10), ForeignKey("sections.id"), nullable=False)
556 question_def_id = Column(
557 "question_id",
558 mysql.INTEGER(10),
559 ForeignKey(QuestionDefinition.id),
560 nullable=False
561 )
562 project_id = Column("project_id", mysql.INTEGER(11), ForeignKey("projects.id"), nullable=False)
563 _weight = Column("weight", WEIGHTING_COL_TYPE, server_default=text("'1.0000'"))
564 position = Column("pos", Integer, nullable=False, server_default=text("'0'"))
565 number = Column(PositionNumber)
567 question_def: QuestionDefinition = relationship(
568 QuestionDefinition, lazy="joined", innerjoin=True, back_populates='instances'
569 )
570 project = relationship("Project")
571 section: Section = relationship("Section", back_populates='questions')
572 parent: Section = relationship("Section", viewonly=True)
573 _answers = relationship(
574 "Answer", lazy="dynamic", cascade="all, delete", passive_deletes=True,
575 back_populates="question_instance"
576 )
578 total_weightings = relationship(
579 "TotalWeighting", primaryjoin=qi_weight_join, backref="question", cascade='all'
580 )
582 def __repr__(self):
583 return f"<Question - InstanceId: {self.id}, DefId: {self.question_def_id}>"
585 def __eq__(self, o: object) -> bool:
586 if isinstance(o, QuestionInstance):
587 return self.question_def == o.question_def
588 return False
590 def __hash__(self) -> int:
591 return super().__hash__()
593 def all_answers(self):
594 """All Answers for this question and scoreable issues"""
595 return (
596 self._answers.join(Issue).filter(Issue.scoreable_filter(self.project)).all()
597 )
599 def answers_for_issue(self, issue_id):
600 return self._answers.filter(Answer.issue_id == issue_id)
602 def single_vendor_dict(self, issue):
603 lookup = {a.element_id: a.answer for a in self.answers_for_issue(issue.id)}
604 return {
605 "number": self.safe_number,
606 "id": self.id,
607 "title": self.title,
608 "respondent": issue.respondent.as_dict(),
609 "elements": self.question_def.as_dict(
610 vendor_view=True, answer_lookup=lookup
611 ),
612 }
614 @property
615 def title(self):
616 return self.question_def.title
618 @property
619 def elements(self):
620 return self.question_def.elements
622 @property
623 def weight(self):
624 """
625 Strange behaviour inherited from java land.
626 Returns
627 1) WeightingSet value, if loaded by LoadWeightSetVisitor
628 or
629 2) self._weight if weight defined for this QuestionInstance
630 or, finally
631 3) The weight from this Instances QuestionDefinition
632 """
633 try:
634 return self._weightset_weight
635 except AttributeError:
636 return self._weight if self._weight is not None else self.question_def.weight
638 @weight.setter
639 def weight(self, weight_value):
640 """
641 Set the weight on the question definition if the definition is not
642 shared. If the weight has already been set for this instance, carry on using
643 it
644 """
645 if not self.question_def.is_shared:
646 self.question_def.weight = weight_value
647 # Mirrors behaviour in Java land - SqlMapQuestionDao line 322
648 self._weight = weight_value
650 safe_number = SafeNumber()
652 def as_dict(self, answer_lookup=None, vendor_view=False):
653 return {
654 "id": self.id,
655 "title": self.title,
656 "number36": self.number,
657 "number": self.safe_number,
658 "elements": self.question_def.as_dict(
659 answer_lookup=answer_lookup, vendor_view=vendor_view
660 ),
661 "type": "question",
662 }
664 @property
665 def is_autoscored(self):
666 for element in self.elements:
667 if isinstance(element, MultipleChoice) and element.choices:
668 for choice in element.choices:
669 if choice.get('autoscore', None):
670 return True
671 return False
673 def calculate_autoscores(self):
674 '''
675 Mapping of issue_id to score, use a list in case of multiple
676 autoscored elements in one question - take the average later
677 '''
678 score_map = defaultdict(list)
679 multichoice_elements = [
680 el for el in self.question_def.elements if el.contains_choices
681 ]
682 max_score = self.project.maximum_score
683 issue_filter = Issue.scoreable_filter(self.project)
684 for multichoice in multichoice_elements:
685 mq = multichoice._answers.join(Issue).filter(issue_filter)
686 for answer in mq:
687 for choice in multichoice.choices:
688 if answer.answer == choice['label'] and choice['autoscore'] is not None:
689 score_map[answer.issue].append(int(choice['autoscore']))
691 for issue, score_list in score_map.items():
692 total_score = sum(score_list)
693 score_map[issue] = total_score if total_score <= max_score else max_score
695 return score_map
697 @property
698 def absolute_weight(self):
699 if not hasattr(self, "_absolute_weight"):
700 try:
701 self._absolute_weight = self.normalised_weight * self.section.absolute_weight
702 except AttributeError:
703 m = 'normalised weights not loaded - run HierarchicalWeightingVisitor'
704 raise WeightingsNotLoadedException(m)
705 return self._absolute_weight
707 @absolute_weight.setter
708 def absolute_weight(self, abs_weight):
709 self._absolute_weight = abs_weight
711 def validate_and_save_answers(self, answer_doc, issue) -> SaveAnswersResult:
712 changes = []
713 unanswered_mandatory = False
714 current_answers = {a.element_id: a for a in self.answers_for_issue(issue.id)}
715 for element in self.question_def.answerable_elements:
716 el_id = element.id
717 if el_id in answer_doc:
718 new_answer = answer_doc.pop(el_id)
719 if new_answer is None:
720 continue
721 element.validate(new_answer)
722 if el_id in current_answers:
723 current_answer = current_answers[el_id]
724 old_answer = current_answer.answer
725 if old_answer != new_answer:
726 current_answer.answer = new_answer
727 changes.append((old_answer, new_answer))
728 else:
729 a = Answer(element_id=el_id, answer=new_answer, issue_id=issue.id)
730 self._answers.append(a)
731 changes.append((None, new_answer))
732 else:
733 if element.mandatory and el_id not in current_answers:
734 unanswered_mandatory = True
736 if len(answer_doc) > 0:
737 tmpl = "Unable to match all answers with element IDs: {}"
738 ids = ', '.join(str(k) for k in answer_doc.keys())
739 raise ValidationFailure(tmpl.format(ids))
741 is_new = len(current_answers) == 0
743 return SaveAnswersResult(is_new, changes, unanswered_mandatory)
746class QElement(Base):
747 __tablename__ = "question_elements"
748 __table_args__ = (
749 Index(
750 "question_elements_qId_row_col",
751 "question_id",
752 "row",
753 "col",
754 ),
755 Index('ft_label', 'label', mysql_prefix='FULLTEXT')
756 ) + Base.__table_args__
758 __mapper_args__ = {"polymorphic_on": "el_type", "polymorphic_identity": ""}
760 is_answerable = True
762 contains_choices = False
764 public_attrs = ("id,el_type,colspan,rowspan,label,mandatory,regexp,col,row").split(",")
766 answerable_types = {"TX", "CR", "CC", "CB", "AT"}
768 question_id = Column(
769 ForeignKey("questions.id", ondelete="CASCADE"),
770 nullable=False,
771 index=True,
772 server_default=text("'0'"),
773 )
774 row = Column(mysql.INTEGER(10), nullable=False, default=1, server_default=text("'0'"))
775 col = Column(mysql.INTEGER(10), nullable=False, default=1, server_default=text("'0'"))
776 el_type = Column("type", mysql.CHAR(length=2),
777 nullable=False, server_default=text("''"))
778 colspan = Column(Integer, default=1)
779 rowspan = Column(Integer, default=1)
780 label = Column(mysql.MEDIUMTEXT(), nullable=True)
781 mandatory = Column(Boolean, default=False)
782 height = Column(Integer)
783 width = Column(Integer)
784 multitopic = Column(Boolean, default=False)
785 regexp = Column(Unicode(255))
786 choices = Column(JSON, nullable=True)
788 question_def = relationship("QuestionDefinition", back_populates='elements')
789 answers = relationship("Answer", back_populates='element')
790 _answers = relationship("Answer", lazy="dynamic", viewonly=True)
792 def __eq__(self, o: object) -> bool:
793 if isinstance(o, QElement):
794 equal: bool = True
795 for k in el_keys:
796 if hasattr(self, k) and hasattr(o, k) and getattr(o, k) == getattr(self, k):
797 continue
798 else:
799 equal = False
800 break
801 return equal
802 return False
804 def __hash__(self) -> int:
805 return super().__hash__()
807 def get_answer(self, issue):
808 """Get the answer for this element and `issue` or None if not yet answered"""
809 return self._answers.filter(Answer.issue == issue).one()
811 def get_question_instance(self, project_id):
812 return self.question_def.instances.filter_by(project_id=project_id).one()
814 @staticmethod
815 def split_choice_string(choice_string):
816 """
817 Split a string representation of a multiple choice option,
818 e.g. 'Yes <8>' into a tuple - ('Yes', 8)
819 """
820 regex_match = re.match(r"([^<]+)<(\d+)>", choice_string)
821 if regex_match:
822 label, autoscore = regex_match.groups()
823 return (label.strip(), autoscore.strip())
824 else:
825 return (choice_string.strip(), None)
827 @classmethod
828 def build(cls, type_name, **kwargs):
829 """Create a QElement subclass instance of the given type_name"""
830 mapper = inspect(cls).polymorphic_map[type_name]
831 el = mapper.class_(**kwargs)
832 return el
834 @property
835 def cell_area(self):
836 return self.colspan * self.rowspan
838 @property
839 def summary(self):
840 '''For describing this element in audit event change log'''
841 return self.label
844class Label(QElement):
845 __mapper_args__ = {"polymorphic_identity": "LB"}
846 is_answerable = False
847 public_attrs = "id,el_type,label,colspan,rowspan".split(",")
850class Checkbox(QElement):
851 __mapper_args__ = {"polymorphic_identity": "CB"}
852 public_attrs = "id,el_type,label,colspan,rowspan".split(",")
854 def validate(self, answer):
855 if answer not in ("true", "false"):
856 m = f"Answer for checkbox element {self.id} must be 'true' or 'false'"
857 raise ValidationFailure(m)
860class TextInput(QElement):
861 __mapper_args__ = {"polymorphic_identity": "TX"}
863 def as_dict(self):
864 return {
865 "id": self.id,
866 "el_type": self.el_type,
867 "colspan": self.colspan,
868 "rowspan": self.rowspan,
869 "height": self.height,
870 "width": self.width,
871 "mandatory": self.mandatory,
872 "regexp": self.regexp,
873 }
875 def validate(self, answer):
876 if self.regexp and not re.match(self.regexp, answer):
877 tmpl = "Answer {:.10} does not match regular expression {} for element {}"
878 raise ValidationFailure(tmpl.format(self.regexp, answer, self.id))
880 @property
881 def summary(self):
882 return f"[{self.width} X {self.height}]"
885class MultipleChoice(object):
886 '''
887 The choices property on QElement is used by RadioChoices (CR) and SelectChoices(CC)
888 subclasses. The property is a JSON field containing an array of label:autoscore objects:
889 [
890 {label: 'Yes', autoscore: 10},
891 {label: 'No', autoscore: 0}
892 ]
894 '''
895 contains_choices = True
896 # `choices` attribute is set by QuestionDefinition.elements()
898 def as_dict(self, vendor_view=False):
899 if vendor_view:
900 choices = [{'label': c['label']} for c in self.choices]
901 else:
902 choices = self.choices
903 return {
904 "id": self.id,
905 "el_type": self.el_type,
906 "colspan": self.colspan,
907 "rowspan": self.rowspan,
908 "mandatory": self.mandatory,
909 "choices": choices,
910 }
912 def validate(self, answer):
913 choice_vals = {c['label'] for c in self.choices}
914 if answer not in choice_vals:
915 tmpl = "Answer '{:.10}' is not one of the accepted values: [{}] for element {}"
916 raise ValidationFailure(tmpl.format(answer, ",".join(choice_vals), self.id))
918 @property
919 def summary(self):
920 if self.choices:
921 return '\n'.join(f"{c['label']}: {c.get('autoscore', None)}" for c in self.choices)
922 return ''
925class SelectChoices(MultipleChoice, QElement):
926 __mapper_args__ = {"polymorphic_identity": "CC"} # 'ChoiceCombo'
929class RadioChoices(MultipleChoice, QElement):
930 __mapper_args__ = {"polymorphic_identity": "CR"}
933class QuestionAttachment(QElement):
935 """An attachment added by the Buyer to the Question"""
937 __mapper_args__ = {"polymorphic_identity": "QA"}
938 is_answerable = False
940 public_attrs = "id,el_type,label,colspan,rowspan".split(",")
942 attachment = relationship("QAttachment", uselist=False)
945class SupportingAttachment(QElement):
947 """
948 A File Input form element enabling a Respondent to upload an attachment
949 as part of an answer to to a Question
950 """
952 __mapper_args__ = {"polymorphic_identity": "AT"}
953 public_attrs = "id,el_type,colspan,rowspan,mandatory".split(",")
955 def validate(self, answer):
956 if not isinstance(answer, str):
957 raise ValidationFailure('SupportingAttachments answer should be the filename and size')
959 @property
960 def summary(self):
961 return 'File upload field'
964class ExternalMedia(QElement):
966 """A link to an external resource, e.g. video"""
968 __mapper_args__ = {"polymorphic_identity": "MD"}
971class QAttachment(AttachmentMixin, Base):
972 __tablename__ = "question_attachments"
974 public_attrs = ("id,size,filename,url").split(",")
975 element_id = Column(Integer, ForeignKey("question_elements.id"))
977 element = relationship(QElement, viewonly=True)
980class Answer(Base):
981 __tablename__ = "answers"
982 __table_args__ = (
983 Index(
984 "unique_issue_question_element_answer",
985 "issue_id",
986 "question_instance_id",
987 "element_id",
988 unique=True,
989 ),
990 Index('ft_answer', 'answer', mysql_prefix='FULLTEXT'),
991 {"mysql_engine": "InnoDB", "mysql_charset": 'utf8mb4'}
992 )
994 issue_id = Column(
995 Integer,
996 ForeignKey("issues.id",
997 name='answers_ibfk_1',
998 ondelete="CASCADE"),
999 nullable=False,
1000 default=0,
1001 index=True
1002 )
1004 answer = Column(mysql.LONGTEXT())
1006 element_id = Column(
1007 Integer,
1008 ForeignKey("question_elements.id", name='answers_ibfk_2'),
1009 nullable=False,
1010 index=True,
1011 server_default=text("'0'")
1012 )
1013 autoscore = Column(mysql.INTEGER(11))
1014 question_instance_id = Column(
1015 Integer,
1016 ForeignKey("question_instances.id",
1017 ondelete='CASCADE',
1018 name='answers_ibfk_3'),
1019 nullable=False,
1020 index=True
1021 )
1023 issue = relationship("Issue",
1024 backref=backref("answers",
1025 lazy="dynamic",
1026 cascade='all,delete',
1027 passive_deletes=True))
1028 element = relationship(QElement, back_populates='answers')
1029 question_instance = relationship(QuestionInstance, back_populates="_answers", uselist=False)
1031 def as_dict(self):
1033 return {"answer_id": self.id, "issue_id": self.issue_id, "answer": self.answer}
1035 def __repr__(self):
1036 tmpl = "Answer ID %s, Issue: %s, QuestionInstance: %s"
1037 return tmpl % (self.id, self.issue_id, self.question_instance_id)
1040class AnswerReport(Base):
1041 __tablename__ = "answer_reports"
1042 __table_args__ = {"mysql_engine": "InnoDB", "mysql_charset": 'utf8mb4'}
1044 id = Column(Integer, primary_key=True)
1045 project_id = Column(
1046 ForeignKey("projects.id", ondelete="CASCADE", name='answer_reports_project_id'),
1047 nullable=False,
1048 index=True
1049 )
1050 title = Column(Unicode(100), nullable=False)
1051 definition = Column(mysql.MEDIUMTEXT(), nullable=False)
1052 project = relationship("Project", back_populates='answer_reports')
1055class ResponseStatus(Enum):
1056 NOT_ANSWERED = 0
1057 ANSWERED = 10
1058 FOR_REVIEW = 20
1059 REJECTED = 30
1060 APPROVED = 40
1063class ResponseStatusCol(types.TypeDecorator):
1065 """
1066 A custom SQLAlchemy type that maps database integer
1067 values to ResponseStatus Enum values
1068 """
1070 impl = mysql.TINYINT(4)
1072 cache_ok = True
1074 def process_bind_param(self, response_status_enum, dialect):
1075 return response_status_enum.value
1077 def process_result_value(self, int_value, dialect):
1078 return ResponseStatus(int_value)
1081class QuestionResponseState(Base):
1082 __tablename__ = "question_response_states"
1083 __table_args__ = (
1084 Index("unique_resp_state", "issue_id", "question_instance_id", unique=True),
1085 {"mysql_engine": "InnoDB", "mysql_charset": 'utf8mb4'},
1086 )
1087 public_attrs = [
1088 "status",
1089 "allocated_by",
1090 "allocated_to",
1091 "approved_by",
1092 "updated_by",
1093 "date_updated",
1094 ]
1096 id = Column(Integer, primary_key=True)
1098 issue_id = Column(
1099 ForeignKey("issues.id", ondelete="CASCADE"),
1100 nullable=False,
1101 index=True,
1102 server_default=text("'0'"),
1103 )
1105 question_instance_id = Column(
1106 ForeignKey("question_instances.id", ondelete="CASCADE"),
1107 nullable=False,
1108 index=True,
1109 server_default=text("'0'"),
1110 )
1112 status = Column(ResponseStatusCol, nullable=False, server_default=text("'0'"))
1114 allocated_by = Column(types.VARCHAR(length=50), ForeignKey("users.id", ondelete='SET NULL'))
1115 allocated_to = Column(types.VARCHAR(length=50), ForeignKey("users.id", ondelete='SET NULL'))
1116 approved_by = Column(types.VARCHAR(length=50), ForeignKey("users.id", ondelete='SET NULL'))
1117 updated_by = Column(types.VARCHAR(length=50), ForeignKey("users.id", ondelete='SET NULL'))
1119 date_updated = Column(DateTime)
1121 issue = relationship(
1122 "Issue", backref=backref("response_states", lazy="dynamic", passive_deletes=True)
1123 )
1124 question_instance = relationship("QuestionInstance")
1127class AAttachment(AttachmentMixin, Base):
1128 __tablename__ = "answer_attachments"
1130 public_attrs = ("id,size,filename,url,respondent,question_number,question_id").split(
1131 ","
1132 )
1133 answer_id = Column(
1134 Integer, ForeignKey("answers.id", ondelete="SET NULL"), unique=True
1135 )
1137 answer = relationship(
1138 Answer,
1139 uselist=False,
1140 backref=backref("attachment", uselist=False, passive_deletes=True),
1141 )
1143 def __repr__(self) -> str:
1144 return f"<AAttachment #{self.id} - {self.filename}>"
1147class WeightingSet(Base):
1148 __tablename__ = "weighting_sets"
1150 public_attrs = ("id", "name")
1151 name = Column(Unicode(255), nullable=True, default=None)
1152 project_id = Column(Integer, ForeignKey("projects.id", ondelete="CASCADE"), nullable=False)
1154 project = relationship("Project", back_populates="weighting_sets")
1156 weightings = relationship('Weighting', back_populates='weighting_set',
1157 lazy="dynamic",
1158 cascade='all,delete', passive_deletes=True)
1160 def __repr__(self):
1161 return 'Weighting Set ID: %s "%s"' % (self.id, self.name)
1163 def lookup_table(self):
1164 q_lookup = {}
1165 sec_lookup = {}
1166 for weight in self.weightings.with_entities(
1167 Weighting.section_id, Weighting.question_instance_id, Weighting.value
1168 ):
1169 if weight.section_id:
1170 sec_lookup[weight.section_id] = weight.value
1171 else:
1172 q_lookup[weight.question_instance_id] = weight.value
1173 return {"questions": q_lookup, "sections": sec_lookup}
1176class Weighting(Base):
1177 __tablename__ = "weightings"
1179 weighting_set_id = Column(
1180 mysql.INTEGER(11), ForeignKey("weighting_sets.id", ondelete="CASCADE"), nullable=False
1181 )
1182 question_instance_id = Column(
1183 mysql.INTEGER(11), ForeignKey("question_instances.id", ondelete="CASCADE")
1184 )
1185 section_id = Column(
1186 "section_id", Integer, ForeignKey("sections.id", ondelete="CASCADE")
1187 )
1188 value = Column(WEIGHTING_COL_TYPE, server_default=text("'1.0000'"))
1190 question = relationship(QuestionInstance)
1191 section = relationship("Section")
1192 weighting_set = relationship(WeightingSet, back_populates="weightings")
1195class TotalWeighting(Base):
1197 """
1198 The total_weightings table is essentially a cache. It saves the absolute, normalised weight
1199 values for question instances and sections for a given weighting_set_id. Null weighting_set_id
1200 indicates the default weighting set.
1202 Absolute weights are derived hierachically - the entire questionnaire needs to be loaded
1203 as a tree structure in order to figure out these weights. e.g. question weight depends
1204 on the weights of parent sections. It is therefore expense to derive on the fly - hence
1205 the requirement for this table.
1207 The cache is recalculated when weightings are saved
1208 """
1210 __tablename__ = "total_weightings"
1211 __table_args__ = (
1212 Index(
1213 "weightings",
1214 "project_id",
1215 "weighting_set_id",
1216 "question_instance_id",
1217 "section_id",
1218 unique=True,
1219 ),
1220 {"mysql_engine": "InnoDB", "mysql_charset": 'utf8mb4'},
1221 )
1223 project_id = Column(
1224 ForeignKey("projects.id", ondelete="CASCADE", name="total_weightings_wset"),
1225 nullable=False,
1226 )
1227 weighting_set_id = Column(Integer, nullable=True, default=None)
1228 question_instance_id = Column(Integer, nullable=False, server_default=text("'0'"))
1229 section_id = Column(Integer, nullable=False, server_default=text("'0'"))
1230 weight = Column(mysql.FLOAT(asdecimal=True, scale=7))
1232 project = relationship(
1233 "Project", backref=backref("total_weightings", lazy="dynamic",
1234 cascade='all,delete', passive_deletes=True)
1235 )
1238"""
1239TODO
1240- add column. One for normalised_weight, one for absolute_weight
1241 - Rename index in DB Schema to avoid name clash with "weightings" table
1242 - For API use, provide one weighting set at a time, load others on demand.
1243 - or provide weightings as a lookup dict in JS
1244"""
1247class LoadWeightSetVisitor(Visitor):
1249 """
1250 Updates Question and Section weights in the questionnaire with values
1251 from the provided weighting_set
1252 """
1254 def __init__(self, weighting_set):
1255 super(Visitor, self).__init__()
1256 weight_lookup = weighting_set.lookup_table()
1257 self.question_lookup = weight_lookup["questions"]
1258 self.section_lookup = weight_lookup["sections"]
1260 def hello_section(self, sec):
1261 sec._weightset_weight = self.section_lookup[sec.id]
1263 def visit_question(self, question):
1264 question._weightset_weight = self.question_lookup[question.id]
1267class HierarchyWeightingsVisitor(Visitor):
1269 """
1270 Sets normalised weights (percentage weight within sub-tree)
1271 for all sections and questions
1272 """
1274 def hello_section(self, sec):
1275 if sec.is_top_level:
1276 sec.normalised_weight = Decimal(1)
1277 self._set_subsection_normalised_weights(sec)
1278 self._set_question_normalised_weights(sec)
1280 def _set_subsection_normalised_weights(self, sec):
1282 subtree_total = sum(s.weight for s in sec.subsections)
1284 if subtree_total == 0:
1285 # Avoid divide by zero errors
1286 for subsec in sec.subsections:
1287 subsec.normalised_weight = Decimal(0)
1288 else:
1289 for subsec in sec.subsections:
1290 subsec.normalised_weight = Decimal(subsec.weight) / subtree_total
1292 def _set_question_normalised_weights(self, sec):
1294 q_total = sum(q.weight for q in sec.questions)
1296 if q_total == 0:
1297 # Avoid divide by zero errors
1298 for q in sec.questions:
1299 q.normalised_weight = Decimal(0)
1300 else:
1301 for q in sec.questions:
1302 q.normalised_weight = Decimal(q.weight) / q_total
1305class SaveTotalWeightingsVisitor(Visitor):
1307 """
1308 Saves TotalWeighting objects for the given project and weighting_set,
1309 should be invoked after HierarchyWeightingsVisitor has had a chance
1310 to save normalised weights
1311 """
1313 def __init__(self, session, project, weighting_set_id=None):
1314 super(Visitor, self).__init__()
1315 self.session = session
1316 self.project_id = project.id
1317 self.weighting_set_id = weighting_set_id
1318 self.total_weightings = []
1320 def hello_section(self, sec):
1321 tw = dict(
1322 project_id=self.project_id,
1323 weight=sec.absolute_weight,
1324 section_id=sec.id,
1325 weighting_set_id=self.weighting_set_id,
1326 )
1327 self.total_weightings.append(tw)
1329 def visit_question(self, question):
1330 tw = dict(
1331 project_id=self.project_id,
1332 weight=question.absolute_weight,
1333 question_instance_id=question.id,
1334 weighting_set_id=self.weighting_set_id,
1335 )
1337 self.total_weightings.append(tw)
1339 def finalise(self):
1340 self.session.bulk_insert_mappings(TotalWeighting, self.total_weightings)