Coverage for rfpy/model/questionnaire.py: 99%

639 statements  

« prev     ^ index     » next       coverage.py v7.0.1, created at 2022-12-31 16:00 +0000

1from collections import defaultdict 

2from operator import attrgetter 

3from typing import Dict 

4from decimal import Decimal 

5from enum import Enum, IntEnum 

6import logging 

7import re 

8 

9from sqlalchemy import ( 

10 Column, 

11 Unicode, 

12 Boolean, 

13 Integer, 

14 JSON, 

15 ForeignKey, 

16 Index, 

17 text, 

18 DateTime, 

19 and_, 

20 func, 

21 inspect, 

22) 

23from sqlalchemy.orm import relationship, backref, remote, foreign, validates 

24import sqlalchemy.types as types 

25from sqlalchemy.dialects import mysql 

26 

27 

28from rfpy.model.meta import Base, AttachmentMixin, Visitor 

29from .issue import Issue 

30 

31from .exc import ValidationFailure, QuestionnaireStructureException, WeightingsNotLoadedException 

32 

33""" 

34Note - default SQL JOIN / load strategy is defined by arguments 

35to 'relationship' 

36 

37Lots of joins involved when loading questionnaires. Current strategy is: 

38 

39Section -> questions (QuestionInstance): 'dynamic' - doesn't join by default, 

40 returns a query 

41 

42QuestionInstance -> QuestionDefinition: 'joined' - eager loading / inner join 

43QuestionDefinition -> [QElement] : 'joined' - eager loading by inner join 

44 

45For loading large collections (e.g. full questionnaire), these loading 

46strategies are overridden, e.g. loading question elements in a subquery load 

47""" 

48 

49log = logging.getLogger(__name__) 

50 

51 

52get_el_type = attrgetter("el_type") 

53get_cell = attrgetter("cell") 

54el_keys = ["col", "colspan", "row", "rowspan", "el_type", "label", 

55 "mandatory", "height", "width", "multitopic", "regexp", "choices"] 

56 

57 

58def to_b36(string_number): 

59 """ 

60 Converts a position number (e.g. 23.1.8) into a base36 string (0N0108) 

61 """ 

62 if not string_number: 

63 return string_number 

64 return "".join(base36encode(int(el)) for el in string_number.split(".")) 

65 

66 

67_b36_cache: Dict[int, str] = dict() 

68 

69 

70def base36encode(number: int): 

71 """ 

72 Converts an integer into a base36 string two characters long 

73 

74 @param number - an int or long between 0 and 1296 

75 """ 

76 initial_value = number 

77 if number not in _b36_cache: 

78 if not isinstance(number, int): 

79 raise TypeError("number must be an integer") 

80 if not 0 < number < 1296: 

81 raise ValueError("number must in range 1 - 1295, received %s" % number) 

82 

83 alphabet = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ" 

84 base36 = "" 

85 while number: 

86 number, i = divmod(number, 36) 

87 base36 = alphabet[i] + base36 

88 _b36_cache[initial_value] = base36.rjust(2, "0") 

89 

90 return _b36_cache[initial_value] 

91 

92 

93_number_cache: Dict[str, str] = dict() 

94 

95WEIGHTING_COL_TYPE = mysql.DECIMAL(precision=15, scale=4) 

96 

97 

98def from_b36(db_string): 

99 """ 

100 Converts a base36 encoded position string into human readable form, 

101 e.g. '0N0108' -> 23.1.18. 

102 

103 Called by every row for a question or section query, and surprisingly expensive 

104 For fetch question number and id for 1,500 this function accounts for 50% of 

105 the runtime. 

106 

107 To eliminate this a cache is used to store values. 

108 """ 

109 if db_string not in _number_cache: 

110 if not db_string: 

111 return db_string 

112 _number_cache[db_string] = ".".join( 

113 "%s" % int(db_string[x: x + 2], 36) for x in range(0, len(db_string), 2) 

114 ) 

115 return _number_cache[db_string] 

116 

117 

118class ImportType(IntEnum): 

119 SHARE = 0 

120 COPY = 10 

121 

122 

123class NumberString(str): 

124 def __init__(self, raw_value): 

125 self.raw_value = raw_value 

126 

127 @property 

128 def dotted(self): 

129 return from_b36(self.raw_value) 

130 

131 def next_increment(self): 

132 parts = self.dotted.split(".") 

133 parts[-1] = str(int(parts[-1]) + 1) 

134 return NumberString(to_b36(".".join(p for p in parts if p))) 

135 

136 def first_child(self): 

137 parts = self.dotted.split(".") 

138 parts.append("1") 

139 new_dotted = ".".join(p for p in parts if p) 

140 b36ed = to_b36(new_dotted) 

141 return NumberString(b36ed) 

142 

143 @classmethod 

144 def from_dotted(cls, value): 

145 return cls(to_b36(value)) 

146 

147 @classmethod 

148 def visible_relatives_regex(cls, section_number): 

149 ''' 

150 Create a regular expression for finding ancestor and ancestor sibling sections for 

151 the given section number. This is useful for returning a subset of the questionnaire tree 

152 necessary for rendering a tree menu without returning the entire questionnaire. 

153 

154 ''' 

155 # Build regex by processing each 2-character chunk of the section Number 

156 # starting from the parent of the section given by section_number 

157 parent_index = len(section_number) 

158 n = section_number 

159 # For each chunk build a regex similar to ^02\w{2}$ - i.e. find immediate offspring 

160 # for each chunk 

161 rels_regex_list = ["^" + n[0:i+2] + r'.{2}$' for i in range(0, parent_index, 2)] 

162 

163 # The root section has no number so root + immediate children must be special-cased: 

164 rels_regex_list.append(r"^.{0,2}$") 

165 

166 return r"|".join(rels_regex_list) 

167 

168 

169class SafeNumber: 

170 

171 """ 

172 A Property that returns either number.dotted 

173 or just number, depending on whether the instance's number 

174 attribute is an instance of NumberString or not 

175 """ 

176 

177 def __get__(self, obj, cls=None): 

178 if isinstance(obj.number, NumberString): 

179 return obj.number.dotted 

180 return obj.number 

181 

182 def __set__(self, obj, val): 

183 if isinstance(val, NumberString): 

184 obj.number = val 

185 return 

186 if isinstance(val, str): 

187 obj.number = NumberString(to_b36(val)) 

188 

189 

190class PositionNumber(types.TypeDecorator): 

191 

192 """ 

193 A custom SQLAlchemy type that translates unicode strings 

194 between display numbers like 1.2.3 and 

195 suppplierselect's internal database equivalent ('010203') 

196 """ 

197 

198 impl = types.Unicode(30) 

199 

200 cache_ok = True 

201 

202 def process_bind_param(self, value, dialect): 

203 return value 

204 

205 def process_result_value(self, value, dialect): 

206 return NumberString(value) 

207 

208 def copy(self): 

209 return PositionNumber(self.impl.length) 

210 

211 

212class Section(Base): 

213 __tablename__ = "sections" 

214 

215 id = Column(mysql.INTEGER(10), primary_key=True) 

216 title = Column(mysql.VARCHAR(255), nullable=False) 

217 number = Column(PositionNumber, default="") 

218 description = Column(mysql.LONGTEXT()) 

219 _weight = Column( 

220 "weight", 

221 WEIGHTING_COL_TYPE, 

222 server_default=text("'1.0000'") 

223 ) 

224 project_id = Column(mysql.INTEGER(11), ForeignKey("projects.id"), nullable=False) 

225 parent_id = Column(mysql.INTEGER(10), ForeignKey("sections.id", ondelete='CASCADE')) 

226 position = Column("pos", Integer, default=0, nullable=False) 

227 

228 subsections = relationship("Section", order_by="Section.position") 

229 

230 project = relationship("Project", primaryjoin="Section.project_id==Project.id", 

231 back_populates='sections') 

232 

233 questions = relationship( 

234 "QuestionInstance", 

235 order_by="QuestionInstance.position", 

236 back_populates='section' 

237 ) 

238 

239 parent: 'Section' = relationship('Section', remote_side=[id], back_populates='subsections') 

240 

241 questions_query = relationship( 

242 "QuestionInstance", lazy="dynamic", order_by="QuestionInstance.position", viewonly=True 

243 ) 

244 

245 perms = relationship("SectionPermission", lazy="dynamic") 

246 

247 descendants = relationship( 

248 "Section", 

249 viewonly=True, 

250 order_by=number, 

251 primaryjoin=and_( 

252 remote(foreign(number)).startswith(number), 

253 remote(foreign(project_id)) == project_id, 

254 remote(foreign(id)) != id, 

255 ), 

256 ) 

257 

258 safe_number = SafeNumber() 

259 

260 ancestors = relationship( 

261 "Section", 

262 viewonly=True, 

263 order_by=number, 

264 primaryjoin=and_( 

265 remote(foreign(number)) 

266 == func.LEFT(number, func.LENGTH(remote(foreign(number)))), 

267 remote(foreign(project_id)) == project_id, 

268 remote(foreign(id)) != id 

269 ), 

270 ) 

271 

272 def __repr__(self): 

273 return "<Sec id:%s Num:%s Parent: %s - %s>" % ( 

274 self.id, 

275 self.number, 

276 self.parent_id, 

277 self.title, 

278 ) 

279 

280 def as_dict(self): 

281 return { 

282 "id": self.id, 

283 "type": "section", 

284 "title": self.title, 

285 "description": self.description, 

286 "number": self.safe_number, 

287 "number36": self.number, 

288 "parent_id": self.parent_id, 

289 "subsections": [], 

290 "questions": [], 

291 } 

292 

293 def renumber(self, new_dotted_number=""): 

294 """ 

295 Set number and recursively set number for all children based on order_by 

296 on relationships (position attribute). 

297 """ 

298 self.safe_number = new_dotted_number 

299 fmt = "%s.%s" if new_dotted_number else "%s%s" 

300 i = 1 

301 for idx, subsection in enumerate(self.subsections): 

302 if subsection.id == self.id: 

303 raise QuestionnaireStructureException(f"Section {self.id} cannot be its own parent") 

304 

305 sec_num = fmt % (new_dotted_number, i) 

306 subsection.position = idx 

307 subsection.renumber(sec_num) 

308 i += 1 

309 

310 for idx, question in enumerate(self.questions): 

311 q_num = fmt % (new_dotted_number, i) 

312 question.safe_number = q_num 

313 question.position = idx 

314 i += 1 

315 

316 def accept(self, visitor): 

317 """ 

318 Call hello_section, visit_question(s), goodbye_section 

319 on the passed visitor and recurse to questions and subsections 

320 """ 

321 visitor.hello_section(self) 

322 

323 # check if method is implemented in visitor's own class, not in parent 

324 # i.e. only load questions if the visitor has its own implementation 

325 # of this method 

326 if "visit_question" in visitor.__class__.__dict__: 

327 for q in self.questions: 

328 visitor.visit_question(q) 

329 

330 for subsec in self.subsections: 

331 subsec.accept(visitor) 

332 

333 visitor.goodbye_section(self) 

334 

335 @property 

336 def is_top_level(self): 

337 return self.parent_id is None 

338 

339 @property 

340 def weight(self): 

341 try: 

342 return self._weightset_weight 

343 except AttributeError: 

344 return self._weight 

345 

346 @weight.setter 

347 def weight(self, weight): 

348 self._weight = weight 

349 

350 @property 

351 def absolute_weight(self): 

352 if self.is_top_level: 

353 return Decimal(1) 

354 try: 

355 return self._absolute_weight 

356 except AttributeError: 

357 try: 

358 nm = self.normalised_weight 

359 except AttributeError: 

360 m = 'normalised weights not loaded - run HierarchicalWeightingVisitor' 

361 raise WeightingsNotLoadedException(m) 

362 self._absolute_weight = nm * self.parent.absolute_weight 

363 return self._absolute_weight 

364 

365 @absolute_weight.setter 

366 def absolute_weight(self, abs_weight): 

367 self._absolute_weight = abs_weight 

368 

369 @validates("questions", "subsections") 

370 def check_position(self, attr_name, instance): 

371 collection = getattr(self, attr_name) 

372 

373 if instance.position is None: 

374 new_position = 0 

375 if len(collection) > 0: 

376 previous = collection[-1] 

377 if previous.position is not None: 

378 new_position = previous.position + 1 

379 instance.position = new_position 

380 

381 if instance.number is None: 

382 if len(collection) > 0: 

383 previous = collection[-1] 

384 if hasattr(previous.number, "next_increment"): 

385 instance.number = previous.number.next_increment() 

386 else: 

387 if hasattr(self.number, "first_child"): 

388 instance.number = self.number.first_child() 

389 

390 if instance.project_id is None: 

391 instance.project_id = self.project_id 

392 

393 return instance 

394 

395 

396class QuestionDefinition(Base): 

397 __tablename__ = "questions" 

398 

399 title = Column(mysql.MEDIUMTEXT(), nullable=False) 

400 weight = Column(WEIGHTING_COL_TYPE, server_default=text("'1.0000'")) 

401 parent_id = Column(Integer, index=True) 

402 refcount = Column( 

403 "referenceCount", Integer, nullable=False, server_default=text("'1'") 

404 ) 

405 elements = relationship( 

406 "QElement", 

407 lazy="joined", 

408 order_by="QElement.row,QElement.col", 

409 cascade='all, delete-orphan', 

410 passive_deletes=True, 

411 back_populates='question_def' 

412 ) 

413 

414 _elements = relationship("QElement", lazy="dynamic", viewonly=True) 

415 

416 instances = relationship("QuestionInstance", lazy="dynamic", back_populates='question_def') 

417 

418 def __repr__(self): 

419 return f'<QDef Id: {self.id}, Title: "{self.title}">' 

420 

421 def __eq__(self, o: object) -> bool: 

422 if isinstance(o, QuestionDefinition): 

423 src_elements = sorted(self.elements, key=lambda e: [getattr(e, k) for k in el_keys]) 

424 des_elements = sorted(o.elements, key=lambda e: [getattr(e, k) for k in el_keys]) 

425 return self.title == o.title and src_elements == des_elements 

426 return False 

427 

428 def __hash__(self) -> int: 

429 return super().__hash__() 

430 

431 @property 

432 def is_shared(self): 

433 if self.refcount is None: 

434 return False 

435 return self.refcount > 1 

436 

437 def get_element(self, element_id): 

438 return self._elements.filter(QElement.id == element_id).one() 

439 

440 def as_dict(self, answer_lookup=None, vendor_view=False): 

441 """ 

442 Returns a list of question element lists, each inner list corresponding 

443 to a row in a question table 

444 

445 Groups elements into rows - returning a list of lists 

446 

447 """ 

448 row_list = [] 

449 current_row = -1 

450 for el in self.elements: 

451 

452 if el.row > current_row: 

453 row_list.append([]) 

454 current_row = el.row 

455 

456 if el.contains_choices: 

457 el_dict = el.as_dict(vendor_view=vendor_view) 

458 else: 

459 el_dict = el.as_dict() 

460 

461 if el.is_answerable and answer_lookup is not None: 

462 el_dict["answer"] = answer_lookup.get(el.id, None) 

463 

464 row_list[-1].append(el_dict) 

465 

466 return row_list 

467 

468 @property 

469 def answerable_elements(self): 

470 return [e for e in self.elements if e.is_answerable] 

471 

472 def add_element(self, el_name, **kwargs): 

473 el = QElement.build(el_name, **kwargs) 

474 self.elements.append(el) 

475 return el 

476 

477 def is_tabular(self): 

478 return self.column_count != 1 

479 

480 @property 

481 def column_count(self): 

482 if not self.elements: 

483 return 0 

484 return max(el.col for el in self.elements) 

485 

486 @property 

487 def row_count(self): 

488 if not self.elements: 

489 return 0 

490 return max(el.row for el in self.elements) 

491 

492 @property 

493 def grid_area(self) -> int: 

494 '''Number of available grid cells available according to row and col values of Elements''' 

495 return self.row_count * self.column_count 

496 

497 @property 

498 def occupied_area(self) -> int: 

499 ''' 

500 The number of grid cells occupied by cells according to their rowspan and colspan values 

501 If rowspans or colspans are set correctly this value will be equal to self.grid_area 

502 ''' 

503 return sum(el.cell_area for el in self.elements) 

504 

505 @property 

506 def is_table_valid(self): 

507 return self.grid_area == self.occupied_area 

508 

509 @classmethod 

510 def build(cls, qdict, strip_ids=False): 

511 """ 

512 Create a QuestionDefinition, complete with QElements, from a 

513 a dict datastructure as provided by serial.QuestionDef 

514 """ 

515 qdef = cls(title=qdict["title"]) 

516 for row_idx, row in enumerate(qdict["elements"], start=1): 

517 for col_idx, el in enumerate(row, start=1): 

518 el['row'] = row_idx 

519 el['col'] = col_idx 

520 if strip_ids: 

521 el['id'] = None 

522 qdef.add_element(el["el_type"], **el) 

523 return qdef 

524 

525 

526qi_weight_join = "foreign(TotalWeighting.question_instance_id)==QuestionInstance.id" 

527 

528 

529class SaveAnswersResult: 

530 def __init__(self, is_new, change_list, unanswered_mandatory): 

531 self.is_new = is_new 

532 self.change_list = change_list 

533 self.unanswered_mandatory = unanswered_mandatory 

534 

535 

536class QuestionInstance(Base): 

537 

538 """ 

539 @DynamicAttrs 

540 """ 

541 

542 __tablename__ = "question_instances" 

543 

544 __table_args__ = ( 

545 Index( 

546 "unique_question_project", 

547 "project_id", 

548 "question_id", 

549 unique=True, 

550 ), 

551 ) + Base.__table_args__ 

552 

553 public_attrs = "id,title,position,section_id,number,url".split(",") 

554 

555 section_id = Column("section_id", mysql.INTEGER(10), ForeignKey("sections.id"), nullable=False) 

556 question_def_id = Column( 

557 "question_id", 

558 mysql.INTEGER(10), 

559 ForeignKey(QuestionDefinition.id), 

560 nullable=False 

561 ) 

562 project_id = Column("project_id", mysql.INTEGER(11), ForeignKey("projects.id"), nullable=False) 

563 _weight = Column("weight", WEIGHTING_COL_TYPE, server_default=text("'1.0000'")) 

564 position = Column("pos", Integer, nullable=False, server_default=text("'0'")) 

565 number = Column(PositionNumber) 

566 

567 question_def: QuestionDefinition = relationship( 

568 QuestionDefinition, lazy="joined", innerjoin=True, back_populates='instances' 

569 ) 

570 project = relationship("Project") 

571 section: Section = relationship("Section", back_populates='questions') 

572 parent: Section = relationship("Section", viewonly=True) 

573 _answers = relationship( 

574 "Answer", lazy="dynamic", cascade="all, delete", passive_deletes=True, 

575 back_populates="question_instance" 

576 ) 

577 

578 total_weightings = relationship( 

579 "TotalWeighting", primaryjoin=qi_weight_join, backref="question", cascade='all' 

580 ) 

581 

582 def __repr__(self): 

583 return f"<Question - InstanceId: {self.id}, DefId: {self.question_def_id}>" 

584 

585 def __eq__(self, o: object) -> bool: 

586 if isinstance(o, QuestionInstance): 

587 return self.question_def == o.question_def 

588 return False 

589 

590 def __hash__(self) -> int: 

591 return super().__hash__() 

592 

593 def all_answers(self): 

594 """All Answers for this question and scoreable issues""" 

595 return ( 

596 self._answers.join(Issue).filter(Issue.scoreable_filter(self.project)).all() 

597 ) 

598 

599 def answers_for_issue(self, issue_id): 

600 return self._answers.filter(Answer.issue_id == issue_id) 

601 

602 def single_vendor_dict(self, issue): 

603 lookup = {a.element_id: a.answer for a in self.answers_for_issue(issue.id)} 

604 return { 

605 "number": self.safe_number, 

606 "id": self.id, 

607 "title": self.title, 

608 "respondent": issue.respondent.as_dict(), 

609 "elements": self.question_def.as_dict( 

610 vendor_view=True, answer_lookup=lookup 

611 ), 

612 } 

613 

614 @property 

615 def title(self): 

616 return self.question_def.title 

617 

618 @property 

619 def elements(self): 

620 return self.question_def.elements 

621 

622 @property 

623 def weight(self): 

624 """ 

625 Strange behaviour inherited from java land. 

626 Returns 

627 1) WeightingSet value, if loaded by LoadWeightSetVisitor 

628 or 

629 2) self._weight if weight defined for this QuestionInstance 

630 or, finally 

631 3) The weight from this Instances QuestionDefinition 

632 """ 

633 try: 

634 return self._weightset_weight 

635 except AttributeError: 

636 return self._weight if self._weight is not None else self.question_def.weight 

637 

638 @weight.setter 

639 def weight(self, weight_value): 

640 """ 

641 Set the weight on the question definition if the definition is not 

642 shared. If the weight has already been set for this instance, carry on using 

643 it 

644 """ 

645 if not self.question_def.is_shared: 

646 self.question_def.weight = weight_value 

647 # Mirrors behaviour in Java land - SqlMapQuestionDao line 322 

648 self._weight = weight_value 

649 

650 safe_number = SafeNumber() 

651 

652 def as_dict(self, answer_lookup=None, vendor_view=False): 

653 return { 

654 "id": self.id, 

655 "title": self.title, 

656 "number36": self.number, 

657 "number": self.safe_number, 

658 "elements": self.question_def.as_dict( 

659 answer_lookup=answer_lookup, vendor_view=vendor_view 

660 ), 

661 "type": "question", 

662 } 

663 

664 @property 

665 def is_autoscored(self): 

666 for element in self.elements: 

667 if isinstance(element, MultipleChoice) and element.choices: 

668 for choice in element.choices: 

669 if choice.get('autoscore', None): 

670 return True 

671 return False 

672 

673 def calculate_autoscores(self): 

674 ''' 

675 Mapping of issue_id to score, use a list in case of multiple 

676 autoscored elements in one question - take the average later 

677 ''' 

678 score_map = defaultdict(list) 

679 multichoice_elements = [ 

680 el for el in self.question_def.elements if el.contains_choices 

681 ] 

682 max_score = self.project.maximum_score 

683 issue_filter = Issue.scoreable_filter(self.project) 

684 for multichoice in multichoice_elements: 

685 mq = multichoice._answers.join(Issue).filter(issue_filter) 

686 for answer in mq: 

687 for choice in multichoice.choices: 

688 if answer.answer == choice['label'] and choice['autoscore'] is not None: 

689 score_map[answer.issue].append(int(choice['autoscore'])) 

690 

691 for issue, score_list in score_map.items(): 

692 total_score = sum(score_list) 

693 score_map[issue] = total_score if total_score <= max_score else max_score 

694 

695 return score_map 

696 

697 @property 

698 def absolute_weight(self): 

699 if not hasattr(self, "_absolute_weight"): 

700 try: 

701 self._absolute_weight = self.normalised_weight * self.section.absolute_weight 

702 except AttributeError: 

703 m = 'normalised weights not loaded - run HierarchicalWeightingVisitor' 

704 raise WeightingsNotLoadedException(m) 

705 return self._absolute_weight 

706 

707 @absolute_weight.setter 

708 def absolute_weight(self, abs_weight): 

709 self._absolute_weight = abs_weight 

710 

711 def validate_and_save_answers(self, answer_doc, issue) -> SaveAnswersResult: 

712 changes = [] 

713 unanswered_mandatory = False 

714 current_answers = {a.element_id: a for a in self.answers_for_issue(issue.id)} 

715 for element in self.question_def.answerable_elements: 

716 el_id = element.id 

717 if el_id in answer_doc: 

718 new_answer = answer_doc.pop(el_id) 

719 if new_answer is None: 

720 continue 

721 element.validate(new_answer) 

722 if el_id in current_answers: 

723 current_answer = current_answers[el_id] 

724 old_answer = current_answer.answer 

725 if old_answer != new_answer: 

726 current_answer.answer = new_answer 

727 changes.append((old_answer, new_answer)) 

728 else: 

729 a = Answer(element_id=el_id, answer=new_answer, issue_id=issue.id) 

730 self._answers.append(a) 

731 changes.append((None, new_answer)) 

732 else: 

733 if element.mandatory and el_id not in current_answers: 

734 unanswered_mandatory = True 

735 

736 if len(answer_doc) > 0: 

737 tmpl = "Unable to match all answers with element IDs: {}" 

738 ids = ', '.join(str(k) for k in answer_doc.keys()) 

739 raise ValidationFailure(tmpl.format(ids)) 

740 

741 is_new = len(current_answers) == 0 

742 

743 return SaveAnswersResult(is_new, changes, unanswered_mandatory) 

744 

745 

746class QElement(Base): 

747 __tablename__ = "question_elements" 

748 __table_args__ = ( 

749 Index( 

750 "question_elements_qId_row_col", 

751 "question_id", 

752 "row", 

753 "col", 

754 ), 

755 Index('ft_label', 'label', mysql_prefix='FULLTEXT') 

756 ) + Base.__table_args__ 

757 

758 __mapper_args__ = {"polymorphic_on": "el_type", "polymorphic_identity": ""} 

759 

760 is_answerable = True 

761 

762 contains_choices = False 

763 

764 public_attrs = ("id,el_type,colspan,rowspan,label,mandatory,regexp,col,row").split(",") 

765 

766 answerable_types = {"TX", "CR", "CC", "CB", "AT"} 

767 

768 question_id = Column( 

769 ForeignKey("questions.id", ondelete="CASCADE"), 

770 nullable=False, 

771 index=True, 

772 server_default=text("'0'"), 

773 ) 

774 row = Column(mysql.INTEGER(10), nullable=False, default=1, server_default=text("'0'")) 

775 col = Column(mysql.INTEGER(10), nullable=False, default=1, server_default=text("'0'")) 

776 el_type = Column("type", mysql.CHAR(length=2), 

777 nullable=False, server_default=text("''")) 

778 colspan = Column(Integer, default=1) 

779 rowspan = Column(Integer, default=1) 

780 label = Column(mysql.MEDIUMTEXT(), nullable=True) 

781 mandatory = Column(Boolean, default=False) 

782 height = Column(Integer) 

783 width = Column(Integer) 

784 multitopic = Column(Boolean, default=False) 

785 regexp = Column(Unicode(255)) 

786 choices = Column(JSON, nullable=True) 

787 

788 question_def = relationship("QuestionDefinition", back_populates='elements') 

789 answers = relationship("Answer", back_populates='element') 

790 _answers = relationship("Answer", lazy="dynamic", viewonly=True) 

791 

792 def __eq__(self, o: object) -> bool: 

793 if isinstance(o, QElement): 

794 equal: bool = True 

795 for k in el_keys: 

796 if hasattr(self, k) and hasattr(o, k) and getattr(o, k) == getattr(self, k): 

797 continue 

798 else: 

799 equal = False 

800 break 

801 return equal 

802 return False 

803 

804 def __hash__(self) -> int: 

805 return super().__hash__() 

806 

807 def get_answer(self, issue): 

808 """Get the answer for this element and `issue` or None if not yet answered""" 

809 return self._answers.filter(Answer.issue == issue).one() 

810 

811 def get_question_instance(self, project_id): 

812 return self.question_def.instances.filter_by(project_id=project_id).one() 

813 

814 @staticmethod 

815 def split_choice_string(choice_string): 

816 """ 

817 Split a string representation of a multiple choice option, 

818 e.g. 'Yes <8>' into a tuple - ('Yes', 8) 

819 """ 

820 regex_match = re.match(r"([^<]+)<(\d+)>", choice_string) 

821 if regex_match: 

822 label, autoscore = regex_match.groups() 

823 return (label.strip(), autoscore.strip()) 

824 else: 

825 return (choice_string.strip(), None) 

826 

827 @classmethod 

828 def build(cls, type_name, **kwargs): 

829 """Create a QElement subclass instance of the given type_name""" 

830 mapper = inspect(cls).polymorphic_map[type_name] 

831 el = mapper.class_(**kwargs) 

832 return el 

833 

834 @property 

835 def cell_area(self): 

836 return self.colspan * self.rowspan 

837 

838 @property 

839 def summary(self): 

840 '''For describing this element in audit event change log''' 

841 return self.label 

842 

843 

844class Label(QElement): 

845 __mapper_args__ = {"polymorphic_identity": "LB"} 

846 is_answerable = False 

847 public_attrs = "id,el_type,label,colspan,rowspan".split(",") 

848 

849 

850class Checkbox(QElement): 

851 __mapper_args__ = {"polymorphic_identity": "CB"} 

852 public_attrs = "id,el_type,label,colspan,rowspan".split(",") 

853 

854 def validate(self, answer): 

855 if answer not in ("true", "false"): 

856 m = f"Answer for checkbox element {self.id} must be 'true' or 'false'" 

857 raise ValidationFailure(m) 

858 

859 

860class TextInput(QElement): 

861 __mapper_args__ = {"polymorphic_identity": "TX"} 

862 

863 def as_dict(self): 

864 return { 

865 "id": self.id, 

866 "el_type": self.el_type, 

867 "colspan": self.colspan, 

868 "rowspan": self.rowspan, 

869 "height": self.height, 

870 "width": self.width, 

871 "mandatory": self.mandatory, 

872 "regexp": self.regexp, 

873 } 

874 

875 def validate(self, answer): 

876 if self.regexp and not re.match(self.regexp, answer): 

877 tmpl = "Answer {:.10} does not match regular expression {} for element {}" 

878 raise ValidationFailure(tmpl.format(self.regexp, answer, self.id)) 

879 

880 @property 

881 def summary(self): 

882 return f"[{self.width} X {self.height}]" 

883 

884 

885class MultipleChoice(object): 

886 ''' 

887 The choices property on QElement is used by RadioChoices (CR) and SelectChoices(CC) 

888 subclasses. The property is a JSON field containing an array of label:autoscore objects: 

889 [ 

890 {label: 'Yes', autoscore: 10}, 

891 {label: 'No', autoscore: 0} 

892 ] 

893 

894 ''' 

895 contains_choices = True 

896 # `choices` attribute is set by QuestionDefinition.elements() 

897 

898 def as_dict(self, vendor_view=False): 

899 if vendor_view: 

900 choices = [{'label': c['label']} for c in self.choices] 

901 else: 

902 choices = self.choices 

903 return { 

904 "id": self.id, 

905 "el_type": self.el_type, 

906 "colspan": self.colspan, 

907 "rowspan": self.rowspan, 

908 "mandatory": self.mandatory, 

909 "choices": choices, 

910 } 

911 

912 def validate(self, answer): 

913 choice_vals = {c['label'] for c in self.choices} 

914 if answer not in choice_vals: 

915 tmpl = "Answer '{:.10}' is not one of the accepted values: [{}] for element {}" 

916 raise ValidationFailure(tmpl.format(answer, ",".join(choice_vals), self.id)) 

917 

918 @property 

919 def summary(self): 

920 if self.choices: 

921 return '\n'.join(f"{c['label']}: {c.get('autoscore', None)}" for c in self.choices) 

922 return '' 

923 

924 

925class SelectChoices(MultipleChoice, QElement): 

926 __mapper_args__ = {"polymorphic_identity": "CC"} # 'ChoiceCombo' 

927 

928 

929class RadioChoices(MultipleChoice, QElement): 

930 __mapper_args__ = {"polymorphic_identity": "CR"} 

931 

932 

933class QuestionAttachment(QElement): 

934 

935 """An attachment added by the Buyer to the Question""" 

936 

937 __mapper_args__ = {"polymorphic_identity": "QA"} 

938 is_answerable = False 

939 

940 public_attrs = "id,el_type,label,colspan,rowspan".split(",") 

941 

942 attachment = relationship("QAttachment", uselist=False) 

943 

944 

945class SupportingAttachment(QElement): 

946 

947 """ 

948 A File Input form element enabling a Respondent to upload an attachment 

949 as part of an answer to to a Question 

950 """ 

951 

952 __mapper_args__ = {"polymorphic_identity": "AT"} 

953 public_attrs = "id,el_type,colspan,rowspan,mandatory".split(",") 

954 

955 def validate(self, answer): 

956 if not isinstance(answer, str): 

957 raise ValidationFailure('SupportingAttachments answer should be the filename and size') 

958 

959 @property 

960 def summary(self): 

961 return 'File upload field' 

962 

963 

964class ExternalMedia(QElement): 

965 

966 """A link to an external resource, e.g. video""" 

967 

968 __mapper_args__ = {"polymorphic_identity": "MD"} 

969 

970 

971class QAttachment(AttachmentMixin, Base): 

972 __tablename__ = "question_attachments" 

973 

974 public_attrs = ("id,size,filename,url").split(",") 

975 element_id = Column(Integer, ForeignKey("question_elements.id")) 

976 

977 element = relationship(QElement, viewonly=True) 

978 

979 

980class Answer(Base): 

981 __tablename__ = "answers" 

982 __table_args__ = ( 

983 Index( 

984 "unique_issue_question_element_answer", 

985 "issue_id", 

986 "question_instance_id", 

987 "element_id", 

988 unique=True, 

989 ), 

990 Index('ft_answer', 'answer', mysql_prefix='FULLTEXT'), 

991 {"mysql_engine": "InnoDB", "mysql_charset": 'utf8mb4'} 

992 ) 

993 

994 issue_id = Column( 

995 Integer, 

996 ForeignKey("issues.id", 

997 name='answers_ibfk_1', 

998 ondelete="CASCADE"), 

999 nullable=False, 

1000 default=0, 

1001 index=True 

1002 ) 

1003 

1004 answer = Column(mysql.LONGTEXT()) 

1005 

1006 element_id = Column( 

1007 Integer, 

1008 ForeignKey("question_elements.id", name='answers_ibfk_2'), 

1009 nullable=False, 

1010 index=True, 

1011 server_default=text("'0'") 

1012 ) 

1013 autoscore = Column(mysql.INTEGER(11)) 

1014 question_instance_id = Column( 

1015 Integer, 

1016 ForeignKey("question_instances.id", 

1017 ondelete='CASCADE', 

1018 name='answers_ibfk_3'), 

1019 nullable=False, 

1020 index=True 

1021 ) 

1022 

1023 issue = relationship("Issue", 

1024 backref=backref("answers", 

1025 lazy="dynamic", 

1026 cascade='all,delete', 

1027 passive_deletes=True)) 

1028 element = relationship(QElement, back_populates='answers') 

1029 question_instance = relationship(QuestionInstance, back_populates="_answers", uselist=False) 

1030 

1031 def as_dict(self): 

1032 

1033 return {"answer_id": self.id, "issue_id": self.issue_id, "answer": self.answer} 

1034 

1035 def __repr__(self): 

1036 tmpl = "Answer ID %s, Issue: %s, QuestionInstance: %s" 

1037 return tmpl % (self.id, self.issue_id, self.question_instance_id) 

1038 

1039 

1040class AnswerReport(Base): 

1041 __tablename__ = "answer_reports" 

1042 __table_args__ = {"mysql_engine": "InnoDB", "mysql_charset": 'utf8mb4'} 

1043 

1044 id = Column(Integer, primary_key=True) 

1045 project_id = Column( 

1046 ForeignKey("projects.id", ondelete="CASCADE", name='answer_reports_project_id'), 

1047 nullable=False, 

1048 index=True 

1049 ) 

1050 title = Column(Unicode(100), nullable=False) 

1051 definition = Column(mysql.MEDIUMTEXT(), nullable=False) 

1052 project = relationship("Project", back_populates='answer_reports') 

1053 

1054 

1055class ResponseStatus(Enum): 

1056 NOT_ANSWERED = 0 

1057 ANSWERED = 10 

1058 FOR_REVIEW = 20 

1059 REJECTED = 30 

1060 APPROVED = 40 

1061 

1062 

1063class ResponseStatusCol(types.TypeDecorator): 

1064 

1065 """ 

1066 A custom SQLAlchemy type that maps database integer 

1067 values to ResponseStatus Enum values 

1068 """ 

1069 

1070 impl = mysql.TINYINT(4) 

1071 

1072 cache_ok = True 

1073 

1074 def process_bind_param(self, response_status_enum, dialect): 

1075 return response_status_enum.value 

1076 

1077 def process_result_value(self, int_value, dialect): 

1078 return ResponseStatus(int_value) 

1079 

1080 

1081class QuestionResponseState(Base): 

1082 __tablename__ = "question_response_states" 

1083 __table_args__ = ( 

1084 Index("unique_resp_state", "issue_id", "question_instance_id", unique=True), 

1085 {"mysql_engine": "InnoDB", "mysql_charset": 'utf8mb4'}, 

1086 ) 

1087 public_attrs = [ 

1088 "status", 

1089 "allocated_by", 

1090 "allocated_to", 

1091 "approved_by", 

1092 "updated_by", 

1093 "date_updated", 

1094 ] 

1095 

1096 id = Column(Integer, primary_key=True) 

1097 

1098 issue_id = Column( 

1099 ForeignKey("issues.id", ondelete="CASCADE"), 

1100 nullable=False, 

1101 index=True, 

1102 server_default=text("'0'"), 

1103 ) 

1104 

1105 question_instance_id = Column( 

1106 ForeignKey("question_instances.id", ondelete="CASCADE"), 

1107 nullable=False, 

1108 index=True, 

1109 server_default=text("'0'"), 

1110 ) 

1111 

1112 status = Column(ResponseStatusCol, nullable=False, server_default=text("'0'")) 

1113 

1114 allocated_by = Column(types.VARCHAR(length=50), ForeignKey("users.id", ondelete='SET NULL')) 

1115 allocated_to = Column(types.VARCHAR(length=50), ForeignKey("users.id", ondelete='SET NULL')) 

1116 approved_by = Column(types.VARCHAR(length=50), ForeignKey("users.id", ondelete='SET NULL')) 

1117 updated_by = Column(types.VARCHAR(length=50), ForeignKey("users.id", ondelete='SET NULL')) 

1118 

1119 date_updated = Column(DateTime) 

1120 

1121 issue = relationship( 

1122 "Issue", backref=backref("response_states", lazy="dynamic", passive_deletes=True) 

1123 ) 

1124 question_instance = relationship("QuestionInstance") 

1125 

1126 

1127class AAttachment(AttachmentMixin, Base): 

1128 __tablename__ = "answer_attachments" 

1129 

1130 public_attrs = ("id,size,filename,url,respondent,question_number,question_id").split( 

1131 "," 

1132 ) 

1133 answer_id = Column( 

1134 Integer, ForeignKey("answers.id", ondelete="SET NULL"), unique=True 

1135 ) 

1136 

1137 answer = relationship( 

1138 Answer, 

1139 uselist=False, 

1140 backref=backref("attachment", uselist=False, passive_deletes=True), 

1141 ) 

1142 

1143 def __repr__(self) -> str: 

1144 return f"<AAttachment #{self.id} - {self.filename}>" 

1145 

1146 

1147class WeightingSet(Base): 

1148 __tablename__ = "weighting_sets" 

1149 

1150 public_attrs = ("id", "name") 

1151 name = Column(Unicode(255), nullable=True, default=None) 

1152 project_id = Column(Integer, ForeignKey("projects.id", ondelete="CASCADE"), nullable=False) 

1153 

1154 project = relationship("Project", back_populates="weighting_sets") 

1155 

1156 weightings = relationship('Weighting', back_populates='weighting_set', 

1157 lazy="dynamic", 

1158 cascade='all,delete', passive_deletes=True) 

1159 

1160 def __repr__(self): 

1161 return 'Weighting Set ID: %s "%s"' % (self.id, self.name) 

1162 

1163 def lookup_table(self): 

1164 q_lookup = {} 

1165 sec_lookup = {} 

1166 for weight in self.weightings.with_entities( 

1167 Weighting.section_id, Weighting.question_instance_id, Weighting.value 

1168 ): 

1169 if weight.section_id: 

1170 sec_lookup[weight.section_id] = weight.value 

1171 else: 

1172 q_lookup[weight.question_instance_id] = weight.value 

1173 return {"questions": q_lookup, "sections": sec_lookup} 

1174 

1175 

1176class Weighting(Base): 

1177 __tablename__ = "weightings" 

1178 

1179 weighting_set_id = Column( 

1180 mysql.INTEGER(11), ForeignKey("weighting_sets.id", ondelete="CASCADE"), nullable=False 

1181 ) 

1182 question_instance_id = Column( 

1183 mysql.INTEGER(11), ForeignKey("question_instances.id", ondelete="CASCADE") 

1184 ) 

1185 section_id = Column( 

1186 "section_id", Integer, ForeignKey("sections.id", ondelete="CASCADE") 

1187 ) 

1188 value = Column(WEIGHTING_COL_TYPE, server_default=text("'1.0000'")) 

1189 

1190 question = relationship(QuestionInstance) 

1191 section = relationship("Section") 

1192 weighting_set = relationship(WeightingSet, back_populates="weightings") 

1193 

1194 

1195class TotalWeighting(Base): 

1196 

1197 """ 

1198 The total_weightings table is essentially a cache. It saves the absolute, normalised weight 

1199 values for question instances and sections for a given weighting_set_id. Null weighting_set_id 

1200 indicates the default weighting set. 

1201 

1202 Absolute weights are derived hierachically - the entire questionnaire needs to be loaded 

1203 as a tree structure in order to figure out these weights. e.g. question weight depends 

1204 on the weights of parent sections. It is therefore expense to derive on the fly - hence 

1205 the requirement for this table. 

1206 

1207 The cache is recalculated when weightings are saved 

1208 """ 

1209 

1210 __tablename__ = "total_weightings" 

1211 __table_args__ = ( 

1212 Index( 

1213 "weightings", 

1214 "project_id", 

1215 "weighting_set_id", 

1216 "question_instance_id", 

1217 "section_id", 

1218 unique=True, 

1219 ), 

1220 {"mysql_engine": "InnoDB", "mysql_charset": 'utf8mb4'}, 

1221 ) 

1222 

1223 project_id = Column( 

1224 ForeignKey("projects.id", ondelete="CASCADE", name="total_weightings_wset"), 

1225 nullable=False, 

1226 ) 

1227 weighting_set_id = Column(Integer, nullable=True, default=None) 

1228 question_instance_id = Column(Integer, nullable=False, server_default=text("'0'")) 

1229 section_id = Column(Integer, nullable=False, server_default=text("'0'")) 

1230 weight = Column(mysql.FLOAT(asdecimal=True, scale=7)) 

1231 

1232 project = relationship( 

1233 "Project", backref=backref("total_weightings", lazy="dynamic", 

1234 cascade='all,delete', passive_deletes=True) 

1235 ) 

1236 

1237 

1238""" 

1239TODO 

1240- add column. One for normalised_weight, one for absolute_weight 

1241 - Rename index in DB Schema to avoid name clash with "weightings" table 

1242 - For API use, provide one weighting set at a time, load others on demand. 

1243 - or provide weightings as a lookup dict in JS 

1244""" 

1245 

1246 

1247class LoadWeightSetVisitor(Visitor): 

1248 

1249 """ 

1250 Updates Question and Section weights in the questionnaire with values 

1251 from the provided weighting_set 

1252 """ 

1253 

1254 def __init__(self, weighting_set): 

1255 super(Visitor, self).__init__() 

1256 weight_lookup = weighting_set.lookup_table() 

1257 self.question_lookup = weight_lookup["questions"] 

1258 self.section_lookup = weight_lookup["sections"] 

1259 

1260 def hello_section(self, sec): 

1261 sec._weightset_weight = self.section_lookup[sec.id] 

1262 

1263 def visit_question(self, question): 

1264 question._weightset_weight = self.question_lookup[question.id] 

1265 

1266 

1267class HierarchyWeightingsVisitor(Visitor): 

1268 

1269 """ 

1270 Sets normalised weights (percentage weight within sub-tree) 

1271 for all sections and questions 

1272 """ 

1273 

1274 def hello_section(self, sec): 

1275 if sec.is_top_level: 

1276 sec.normalised_weight = Decimal(1) 

1277 self._set_subsection_normalised_weights(sec) 

1278 self._set_question_normalised_weights(sec) 

1279 

1280 def _set_subsection_normalised_weights(self, sec): 

1281 

1282 subtree_total = sum(s.weight for s in sec.subsections) 

1283 

1284 if subtree_total == 0: 

1285 # Avoid divide by zero errors 

1286 for subsec in sec.subsections: 

1287 subsec.normalised_weight = Decimal(0) 

1288 else: 

1289 for subsec in sec.subsections: 

1290 subsec.normalised_weight = Decimal(subsec.weight) / subtree_total 

1291 

1292 def _set_question_normalised_weights(self, sec): 

1293 

1294 q_total = sum(q.weight for q in sec.questions) 

1295 

1296 if q_total == 0: 

1297 # Avoid divide by zero errors 

1298 for q in sec.questions: 

1299 q.normalised_weight = Decimal(0) 

1300 else: 

1301 for q in sec.questions: 

1302 q.normalised_weight = Decimal(q.weight) / q_total 

1303 

1304 

1305class SaveTotalWeightingsVisitor(Visitor): 

1306 

1307 """ 

1308 Saves TotalWeighting objects for the given project and weighting_set, 

1309 should be invoked after HierarchyWeightingsVisitor has had a chance 

1310 to save normalised weights 

1311 """ 

1312 

1313 def __init__(self, session, project, weighting_set_id=None): 

1314 super(Visitor, self).__init__() 

1315 self.session = session 

1316 self.project_id = project.id 

1317 self.weighting_set_id = weighting_set_id 

1318 self.total_weightings = [] 

1319 

1320 def hello_section(self, sec): 

1321 tw = dict( 

1322 project_id=self.project_id, 

1323 weight=sec.absolute_weight, 

1324 section_id=sec.id, 

1325 weighting_set_id=self.weighting_set_id, 

1326 ) 

1327 self.total_weightings.append(tw) 

1328 

1329 def visit_question(self, question): 

1330 tw = dict( 

1331 project_id=self.project_id, 

1332 weight=question.absolute_weight, 

1333 question_instance_id=question.id, 

1334 weighting_set_id=self.weighting_set_id, 

1335 ) 

1336 

1337 self.total_weightings.append(tw) 

1338 

1339 def finalise(self): 

1340 self.session.bulk_insert_mappings(TotalWeighting, self.total_weightings)