Coverage for rfpy/model/questionnaire.py: 99%

646 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-04-24 10:52 +0000

1from collections import defaultdict 

2from operator import attrgetter 

3from typing import Optional, TYPE_CHECKING 

4from datetime import datetime 

5from decimal import Decimal 

6from enum import Enum, IntEnum 

7import logging 

8import re 

9 

10from sqlalchemy import ( 

11 Unicode, 

12 Boolean, 

13 Integer, 

14 JSON, 

15 ForeignKey, 

16 Index, 

17 text, 

18 DateTime, 

19 and_, 

20 func, 

21 inspect, 

22) 

23from sqlalchemy.orm import ( 

24 Mapped, 

25 DynamicMapped, 

26 mapped_column, 

27 relationship, 

28 remote, 

29 foreign, 

30 validates, 

31) 

32import sqlalchemy.types as types 

33from sqlalchemy.dialects import mysql 

34 

35from rfpy.model.meta import AttachmentMixin, Visitor, Base 

36from .issue import Issue 

37 

38from .exc import ( 

39 ValidationFailure, 

40 QuestionnaireStructureException, 

41 WeightingsNotLoadedException, 

42) 

43 

44if TYPE_CHECKING: 

45 from rfpy.model.composite import QuestionMeta 

46 from rfpy.model.project import Project 

47 from rfpy.model.acl import SectionPermission 

48 from rfpy.model.tags import Tag 

49 

50""" 

51Note - default SQL JOIN / load strategy is defined by arguments 

52to 'relationship' 

53 

54Lots of joins involved when loading questionnaires. Current strategy is: 

55 

56Section -> questions (QuestionInstance): 'dynamic' - doesn't join by default, 

57 returns a query 

58 

59QuestionInstance -> QuestionDefinition: 'joined' - eager loading / inner join 

60QuestionDefinition -> [QElement] : 'joined' - eager loading by inner join 

61 

62For loading large collections (e.g. full questionnaire), these loading 

63strategies are overridden, e.g. loading question elements in a subquery load 

64""" 

65 

66log = logging.getLogger(__name__) 

67 

68 

69get_el_type = attrgetter("el_type") 

70get_cell = attrgetter("cell") 

71el_keys = [ 

72 "col", 

73 "colspan", 

74 "row", 

75 "rowspan", 

76 "el_type", 

77 "label", 

78 "mandatory", 

79 "height", 

80 "width", 

81 "multitopic", 

82 "regexp", 

83 "choices", 

84] 

85 

86 

87def to_b36(string_number): 

88 """ 

89 Converts a position number (e.g. 23.1.8) into a base36 string (0N0108) 

90 """ 

91 if not string_number: 

92 return string_number 

93 return "".join(base36encode(int(el)) for el in string_number.split(".")) 

94 

95 

96_b36_cache: dict[int, str] = dict() 

97 

98 

99def base36encode(number: int): 

100 """ 

101 Converts an integer into a base36 string two characters long 

102 

103 @param number - an int or long between 0 and 1296 

104 """ 

105 initial_value = number 

106 if number not in _b36_cache: 

107 if not isinstance(number, int): 

108 raise TypeError("number must be an integer") 

109 if not 0 < number < 1296: 

110 raise ValueError("number must in range 1 - 1295, received %s" % number) 

111 

112 alphabet = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ" 

113 base36 = "" 

114 while number: 

115 number, i = divmod(number, 36) 

116 base36 = alphabet[i] + base36 

117 _b36_cache[initial_value] = base36.rjust(2, "0") 

118 

119 return _b36_cache[initial_value] 

120 

121 

122_number_cache: dict[str, str] = dict() 

123 

124WEIGHTING_COL_TYPE = mysql.DECIMAL(precision=15, scale=4) 

125 

126 

127def from_b36(db_string): 

128 """ 

129 Converts a base36 encoded position string into human readable form, 

130 e.g. '0N0108' -> 23.1.18. 

131 

132 Called by every row for a question or section query, and surprisingly expensive 

133 For fetch question number and id for 1,500 this function accounts for 50% of 

134 the runtime. 

135 

136 To eliminate this a cache is used to store values. 

137 """ 

138 if db_string not in _number_cache: 

139 if not db_string: 

140 return db_string 

141 _number_cache[db_string] = ".".join( 

142 "%s" % int(db_string[x : x + 2], 36) for x in range(0, len(db_string), 2) 

143 ) 

144 return _number_cache[db_string] 

145 

146 

147class ImportType(IntEnum): 

148 SHARE = 0 

149 COPY = 10 

150 

151 

152class NumberString(str): 

153 def __init__(self, raw_value): 

154 self.raw_value = raw_value 

155 

156 @property 

157 def dotted(self): 

158 return from_b36(self.raw_value) 

159 

160 def next_increment(self): 

161 parts = self.dotted.split(".") 

162 parts[-1] = str(int(parts[-1]) + 1) 

163 return NumberString(to_b36(".".join(p for p in parts if p))) 

164 

165 def first_child(self): 

166 parts = self.dotted.split(".") 

167 parts.append("1") 

168 new_dotted = ".".join(p for p in parts if p) 

169 b36ed = to_b36(new_dotted) 

170 return NumberString(b36ed) 

171 

172 @classmethod 

173 def from_dotted(cls, value): 

174 return cls(to_b36(value)) 

175 

176 @classmethod 

177 def visible_relatives_regex(cls, section_number): 

178 """ 

179 Create a regular expression for finding ancestor and ancestor sibling sections for 

180 the given section number. This is useful for returning a subset of the questionnaire tree 

181 necessary for rendering a tree menu without returning the entire questionnaire. 

182 

183 """ 

184 # Build regex by processing each 2-character chunk of the section Number 

185 # starting from the parent of the section given by section_number 

186 parent_index = len(section_number) 

187 n = section_number 

188 # For each chunk build a regex similar to ^02\w{2}$ - i.e. find immediate offspring 

189 # for each chunk 

190 rels_regex_list = [ 

191 "^" + n[0 : i + 2] + r".{2}$" for i in range(0, parent_index, 2) 

192 ] 

193 

194 # The root section has no number so root + immediate children must be special-cased: 

195 rels_regex_list.append(r"^.{0,2}$") 

196 

197 return r"|".join(rels_regex_list) 

198 

199 

200class SafeNumber: 

201 """ 

202 A Property that returns either number.dotted 

203 or just number, depending on whether the instance's number 

204 attribute is an instance of NumberString or not 

205 """ 

206 

207 def __get__(self, obj, cls=None): 

208 if isinstance(obj.number, NumberString): 

209 return obj.number.dotted 

210 return obj.number 

211 

212 def __set__(self, obj, val): 

213 if isinstance(val, NumberString): 

214 obj.number = val 

215 return 

216 if isinstance(val, str): 

217 obj.number = NumberString(to_b36(val)) 

218 

219 

220class PositionNumber(types.TypeDecorator): 

221 """ 

222 A custom SQLAlchemy type that translates unicode strings 

223 between display numbers like 1.2.3 and 

224 suppplierselect's internal database equivalent ('010203') 

225 """ 

226 

227 impl = types.Unicode(30) 

228 

229 cache_ok = True 

230 

231 def process_bind_param(self, value, dialect): 

232 return value 

233 

234 def process_result_value(self, value, dialect): 

235 return NumberString(value) 

236 

237 def copy(self): 

238 return PositionNumber(self.impl.length) 

239 

240 

241class Section(Base): 

242 __tablename__ = "sections" 

243 

244 id: Mapped[int] = mapped_column(mysql.INTEGER(10), primary_key=True) 

245 title: Mapped[str] = mapped_column(mysql.VARCHAR(length=255), nullable=False) 

246 number: Mapped[Optional[NumberString]] = mapped_column(PositionNumber, default="", nullable=True) 

247 description: Mapped[Optional[str]] = mapped_column(mysql.LONGTEXT()) 

248 _weight: Mapped[Optional[Decimal]] = mapped_column( 

249 "weight", WEIGHTING_COL_TYPE, server_default=text("'1.0000'"), nullable=True 

250 ) 

251 project_id: Mapped[int] = mapped_column( 

252 mysql.INTEGER(11), ForeignKey("projects.id"), nullable=False 

253 ) 

254 parent_id: Mapped[Optional[int]] = mapped_column( 

255 mysql.INTEGER(10), ForeignKey("sections.id", ondelete="CASCADE") 

256 ) 

257 position: Mapped[int] = mapped_column("pos", Integer, default=0, nullable=False) 

258 

259 subsections: Mapped[list["Section"]] = relationship( 

260 "Section", order_by="Section.position" 

261 ) 

262 

263 project: Mapped["Project"] = relationship( 

264 "Project", 

265 primaryjoin="Section.project_id==Project.id", 

266 back_populates="sections", 

267 ) 

268 

269 questions: Mapped[list["QuestionInstance"]] = relationship( 

270 "QuestionInstance", 

271 order_by="QuestionInstance.position", 

272 back_populates="section", 

273 ) 

274 

275 parent: Mapped[Optional["Section"]] = relationship( 

276 "Section", 

277 remote_side=[id], 

278 back_populates="subsections", 

279 cascade_backrefs=False, 

280 ) 

281 

282 questions_query: DynamicMapped["QuestionInstance"] = relationship( 

283 "QuestionInstance", 

284 lazy="dynamic", 

285 order_by="QuestionInstance.position", 

286 viewonly=True, 

287 ) 

288 

289 perms: DynamicMapped["SectionPermission"] = relationship( 

290 "SectionPermission", lazy="dynamic" 

291 ) 

292 

293 descendants: Mapped[list["Section"]] = relationship( 

294 "Section", 

295 viewonly=True, 

296 order_by=number, 

297 primaryjoin=and_( 

298 remote(foreign(number)).startswith(number), 

299 remote(foreign(project_id)) == project_id, 

300 remote(foreign(id)) != id, 

301 ), 

302 ) 

303 

304 safe_number = SafeNumber() 

305 

306 ancestors: Mapped[list["Section"]] = relationship( 

307 "Section", 

308 viewonly=True, 

309 order_by=number, 

310 primaryjoin=and_( 

311 remote(foreign(number)) 

312 == func.LEFT(number, func.LENGTH(remote(foreign(number)))), 

313 remote(foreign(project_id)) == project_id, 

314 remote(foreign(id)) != id, 

315 ), 

316 ) 

317 

318 def __repr__(self): 

319 return f"<Sec id:{self.id} Num:{self.number} Parent: {self.parent_id} - {self.title}>" 

320 

321 def as_dict(self): 

322 return { 

323 "id": self.id, 

324 "type": "section", 

325 "title": self.title, 

326 "description": self.description, 

327 "number": self.safe_number, 

328 "number36": self.number, 

329 "parent_id": self.parent_id, 

330 "subsections": [], 

331 "questions": [], 

332 } 

333 

334 def renumber(self, new_dotted_number=""): 

335 """ 

336 Set number and recursively set number for all children based on order_by 

337 on relationships (position attribute). 

338 """ 

339 self.safe_number = new_dotted_number 

340 fmt = "%s.%s" if new_dotted_number else "%s%s" 

341 i = 1 

342 for idx, subsection in enumerate(self.subsections): 

343 if subsection.id == self.id: 

344 raise QuestionnaireStructureException( 

345 f"Section {self.id} cannot be its own parent" 

346 ) 

347 

348 sec_num = fmt % (new_dotted_number, i) 

349 subsection.position = idx 

350 subsection.renumber(sec_num) 

351 i += 1 

352 

353 for idx, question in enumerate(self.questions): 

354 q_num = fmt % (new_dotted_number, i) 

355 question.safe_number = q_num 

356 question.position = idx 

357 i += 1 

358 

359 def accept(self, visitor): 

360 """ 

361 Call hello_section, visit_question(s), goodbye_section 

362 on the passed visitor and recurse to questions and subsections 

363 """ 

364 visitor.hello_section(self) 

365 

366 # check if method is implemented in visitor's own class, not in parent 

367 # i.e. only load questions if the visitor has its own implementation 

368 # of this method 

369 if "visit_question" in visitor.__class__.__dict__: 

370 for q in self.questions: 

371 visitor.visit_question(q) 

372 

373 for subsec in self.subsections: 

374 subsec.accept(visitor) 

375 

376 visitor.goodbye_section(self) 

377 

378 @property 

379 def is_top_level(self): 

380 return self.parent_id is None 

381 

382 @property 

383 def weight(self): 

384 try: 

385 return self._weightset_weight 

386 except AttributeError: 

387 return self._weight 

388 

389 @weight.setter 

390 def weight(self, weight): 

391 self._weight = weight 

392 

393 @property 

394 def absolute_weight(self): 

395 if self.is_top_level: 

396 return Decimal(1) 

397 try: 

398 return self._absolute_weight 

399 except AttributeError: 

400 try: 

401 nm = self.normalised_weight 

402 except AttributeError: 

403 m = "normalised weights not loaded - run HierarchicalWeightingVisitor" 

404 raise WeightingsNotLoadedException(m) 

405 self._absolute_weight = nm * self.parent.absolute_weight 

406 return self._absolute_weight 

407 

408 @absolute_weight.setter 

409 def absolute_weight(self, abs_weight): 

410 self._absolute_weight = abs_weight 

411 

412 @validates("questions", "subsections") 

413 def check_position(self, attr_name, instance): 

414 collection = getattr(self, attr_name) 

415 

416 if instance.position is None: 

417 new_position = 0 

418 if len(collection) > 0: 

419 previous = collection[-1] 

420 if previous.position is not None: 

421 new_position = previous.position + 1 

422 instance.position = new_position 

423 

424 if instance.number is None: 

425 if len(collection) > 0: 

426 previous = collection[-1] 

427 if hasattr(previous.number, "next_increment"): 

428 instance.number = previous.number.next_increment() 

429 else: 

430 if hasattr(self.number, "first_child"): 

431 instance.number = self.number.first_child() 

432 

433 if instance.project_id is None: 

434 instance.project_id = self.project_id 

435 

436 return instance 

437 

438 

439class QuestionDefinition(Base): 

440 __tablename__ = "questions" 

441 

442 title: Mapped[str] = mapped_column(mysql.MEDIUMTEXT(), nullable=False) 

443 weight: Mapped[Optional[Decimal]] = mapped_column( 

444 WEIGHTING_COL_TYPE, server_default=text("'1.0000'"), nullable=True 

445 ) 

446 parent_id: Mapped[Optional[int]] = mapped_column(Integer, index=True) 

447 refcount: Mapped[int] = mapped_column( 

448 "referenceCount", Integer, nullable=False, server_default=text("'1'") 

449 ) 

450 elements: Mapped[list["QElement"]] = relationship( 

451 "QElement", 

452 lazy="joined", 

453 order_by="QElement.row,QElement.col", 

454 cascade="all, delete-orphan", 

455 passive_deletes=True, 

456 back_populates="question_def", 

457 ) 

458 

459 _elements: DynamicMapped["QElement"] = relationship( 

460 "QElement", lazy="dynamic", viewonly=True 

461 ) 

462 

463 instances: DynamicMapped["QuestionInstance"] = relationship( 

464 "QuestionInstance", lazy="dynamic", back_populates="question_def" 

465 ) 

466 

467 meta: Mapped[Optional["QuestionMeta"]] = relationship( 

468 "QuestionMeta", 

469 uselist=False, 

470 back_populates="question_def", 

471 cascade="all, delete", 

472 passive_deletes=True, 

473 ) 

474 

475 def __repr__(self): 

476 return f'<QDef Id: {self.id}, Title: "{self.title}">' 

477 

478 def __eq__(self, o: object) -> bool: 

479 if isinstance(o, QuestionDefinition): 

480 src_elements = sorted( 

481 self.elements, key=lambda e: [getattr(e, k) for k in el_keys] 

482 ) 

483 des_elements = sorted( 

484 o.elements, key=lambda e: [getattr(e, k) for k in el_keys] 

485 ) 

486 return self.title == o.title and src_elements == des_elements 

487 return False 

488 

489 def __hash__(self) -> int: 

490 return super().__hash__() 

491 

492 @property 

493 def is_shared(self): 

494 if self.refcount is None: 

495 return False 

496 return self.refcount > 1 

497 

498 def get_element(self, element_id): 

499 return self._elements.filter(QElement.id == element_id).one() 

500 

501 def as_dict(self, answer_lookup=None, vendor_view=False): 

502 """ 

503 Returns a list of question element lists, each inner list corresponding 

504 to a row in a question table 

505 

506 Groups elements into rows - returning a list of lists 

507 

508 """ 

509 row_list = [] 

510 current_row = -1 

511 for el in self.elements: 

512 if el.row > current_row: 

513 row_list.append([]) 

514 current_row = el.row 

515 

516 if el.contains_choices: 

517 el_dict = el.as_dict(vendor_view=vendor_view) 

518 else: 

519 el_dict = el.as_dict() 

520 

521 if el.is_answerable and answer_lookup is not None: 

522 el_dict["answer"] = answer_lookup.get(el.id, None) 

523 

524 row_list[-1].append(el_dict) 

525 

526 return row_list 

527 

528 @property 

529 def answerable_elements(self): 

530 return [e for e in self.elements if e.is_answerable] 

531 

532 def add_element(self, el_name, **kwargs): 

533 el = QElement.build(el_name, **kwargs) 

534 self.elements.append(el) 

535 return el 

536 

537 def is_tabular(self): 

538 return self.column_count != 1 

539 

540 @property 

541 def column_count(self): 

542 if not self.elements: 

543 return 0 

544 return max(el.col for el in self.elements) 

545 

546 @property 

547 def row_count(self): 

548 if not self.elements: 

549 return 0 

550 return max(el.row for el in self.elements) 

551 

552 @property 

553 def grid_area(self) -> int: 

554 """Number of available grid cells available according to row and col values of Elements""" 

555 return self.row_count * self.column_count 

556 

557 @property 

558 def occupied_area(self) -> int: 

559 """ 

560 The number of grid cells occupied by cells according to their rowspan and colspan values 

561 If rowspans or colspans are set correctly this value will be equal to self.grid_area 

562 """ 

563 return sum(el.cell_area for el in self.elements) 

564 

565 @property 

566 def is_table_valid(self): 

567 return self.grid_area == self.occupied_area 

568 

569 @classmethod 

570 def build(cls, qdict, strip_ids=False): 

571 """ 

572 Create a QuestionDefinition, complete with QElements, from a 

573 a dict datastructure as provided by serial.QuestionDef 

574 """ 

575 if not isinstance(qdict, dict): 

576 qdict = qdict.model_dump() 

577 

578 qdef = cls(title=qdict["title"]) 

579 for row_idx, row in enumerate(qdict["elements"], start=1): 

580 for col_idx, el in enumerate(row, start=1): 

581 el["row"] = row_idx 

582 el["col"] = col_idx 

583 if strip_ids: 

584 el["id"] = None 

585 qdef.add_element(el["el_type"], **el) 

586 return qdef 

587 

588 

589qi_weight_join = "foreign(TotalWeighting.question_instance_id)==QuestionInstance.id" 

590 

591 

592class SaveAnswersResult: 

593 def __init__(self, is_new, change_list, unanswered_mandatory): 

594 self.is_new = is_new 

595 self.change_list = change_list 

596 self.unanswered_mandatory = unanswered_mandatory 

597 

598 

599class QuestionInstance(Base): 

600 """ 

601 @DynamicAttrs 

602 """ 

603 

604 __tablename__ = "question_instances" 

605 

606 __table_args__ = ( # type: ignore 

607 Index( 

608 "unique_question_project", 

609 "project_id", 

610 "question_id", 

611 unique=True, 

612 ), 

613 ) + Base.__table_args__ 

614 

615 public_attrs = "id,title,position,section_id,number,url".split(",") 

616 

617 section_id: Mapped[int] = mapped_column( 

618 "section_id", mysql.INTEGER(10), ForeignKey("sections.id"), nullable=False 

619 ) 

620 question_def_id: Mapped[int] = mapped_column( 

621 "question_id", 

622 mysql.INTEGER(10), 

623 ForeignKey(QuestionDefinition.id), 

624 nullable=False, 

625 ) 

626 project_id: Mapped[int] = mapped_column( 

627 "project_id", mysql.INTEGER(11), ForeignKey("projects.id"), nullable=False 

628 ) 

629 _weight: Mapped[Optional[Decimal]] = mapped_column( 

630 "weight", WEIGHTING_COL_TYPE, server_default=text("'1.0000'"), nullable=True 

631 ) 

632 position: Mapped[int] = mapped_column( 

633 "pos", Integer, nullable=False, server_default=text("'0'") 

634 ) 

635 number: Mapped[Optional[NumberString]] = mapped_column(PositionNumber, nullable=True) 

636 

637 question_def: Mapped["QuestionDefinition"] = relationship( 

638 QuestionDefinition, lazy="joined", innerjoin=True, back_populates="instances" 

639 ) 

640 project: Mapped["Project"] = relationship("Project") 

641 section: Mapped["Section"] = relationship("Section", back_populates="questions") 

642 parent: Mapped["Section"] = relationship("Section", viewonly=True) 

643 _answers: DynamicMapped["Answer"] = relationship( 

644 "Answer", 

645 lazy="dynamic", 

646 cascade="all, delete", 

647 passive_deletes=True, 

648 back_populates="question_instance", 

649 ) 

650 

651 total_weightings: Mapped["TotalWeighting"] = relationship( 

652 "TotalWeighting", primaryjoin=qi_weight_join, backref="question", cascade="all" 

653 ) 

654 

655 meta: Mapped[Optional["QuestionMeta"]] = relationship( 

656 "QuestionMeta", 

657 uselist=False, 

658 back_populates="question_instance", 

659 secondary="questions", 

660 viewonly=True, 

661 ) 

662 tags: Mapped[list["Tag"]] = relationship( 

663 "Tag", 

664 secondary="tags_qinstances", 

665 back_populates="question_instances", 

666 passive_deletes=True, 

667 ) 

668 

669 def __repr__(self): 

670 return f"<Question - InstanceId: {self.id}, DefId: {self.question_def_id}>" 

671 

672 def __eq__(self, o: object) -> bool: 

673 if isinstance(o, QuestionInstance): 

674 return self.question_def == o.question_def 

675 return False 

676 

677 def __hash__(self) -> int: 

678 return super().__hash__() 

679 

680 def all_answers(self): 

681 """All Answers for this question and scoreable issues""" 

682 return ( 

683 self._answers.join(Issue).filter(Issue.scoreable_filter(self.project)).all() 

684 ) 

685 

686 def answers_for_issue(self, issue_id): 

687 return self._answers.filter(Answer.issue_id == issue_id) 

688 

689 def single_vendor_dict(self, issue): 

690 lookup = {a.element_id: a.answer for a in self.answers_for_issue(issue.id)} 

691 return { 

692 "number": self.safe_number, 

693 "id": self.id, 

694 "title": self.title, 

695 "respondent": issue.respondent.as_dict(), 

696 "elements": self.question_def.as_dict( 

697 vendor_view=True, answer_lookup=lookup 

698 ), 

699 } 

700 

701 @property 

702 def title(self): 

703 return self.question_def.title 

704 

705 @property 

706 def elements(self): 

707 return self.question_def.elements 

708 

709 @property 

710 def weight(self): 

711 """ 

712 Strange behaviour inherited from java land. 

713 Returns 

714 1) WeightingSet value, if loaded by LoadWeightSetVisitor 

715 or 

716 2) self._weight if weight defined for this QuestionInstance 

717 or, finally 

718 3) The weight from this Instances QuestionDefinition 

719 """ 

720 try: 

721 return self._weightset_weight 

722 except AttributeError: 

723 return ( 

724 self._weight if self._weight is not None else self.question_def.weight 

725 ) 

726 

727 @weight.setter 

728 def weight(self, weight_value): 

729 """ 

730 Set the weight on the question definition if the definition is not 

731 shared. If the weight has already been set for this instance, carry on using 

732 it 

733 """ 

734 if not self.question_def.is_shared: 

735 self.question_def.weight = weight_value 

736 # Mirrors behaviour in Java land - SqlMapQuestionDao line 322 

737 self._weight = weight_value 

738 

739 safe_number = SafeNumber() 

740 

741 def as_dict(self, answer_lookup=None, vendor_view=False): 

742 return { 

743 "id": self.id, 

744 "title": self.title, 

745 "number36": self.number, 

746 "number": self.safe_number, 

747 "elements": self.question_def.as_dict( 

748 answer_lookup=answer_lookup, vendor_view=vendor_view 

749 ), 

750 "type": "question", 

751 } 

752 

753 @property 

754 def is_autoscored(self): 

755 for element in self.elements: 

756 if isinstance(element, MultipleChoice) and element.choices: 

757 for choice in element.choices: 

758 if choice.get("autoscore", None): 

759 return True 

760 return False 

761 

762 def calculate_autoscores(self): 

763 """ 

764 Mapping of issue_id to score, use a list in case of multiple 

765 autoscored elements in one question - take the average later 

766 """ 

767 score_map = defaultdict(list) 

768 multichoice_elements = [ 

769 el for el in self.question_def.elements if el.contains_choices 

770 ] 

771 max_score = self.project.maximum_score 

772 issue_filter = Issue.scoreable_filter(self.project) 

773 for multichoice in multichoice_elements: 

774 mq = multichoice._answers.join(Issue).filter(issue_filter) 

775 for answer in mq: 

776 for choice in multichoice.choices: 

777 if ( 

778 answer.answer == choice["label"] 

779 and choice["autoscore"] is not None 

780 ): 

781 score_map[answer.issue].append(int(choice["autoscore"])) 

782 

783 for issue, score_list in score_map.items(): 

784 total_score = sum(score_list) 

785 score_map[issue] = total_score if total_score <= max_score else max_score 

786 

787 return score_map 

788 

789 @property 

790 def absolute_weight(self): 

791 if not hasattr(self, "_absolute_weight"): 

792 try: 

793 self._absolute_weight = ( 

794 self.normalised_weight * self.section.absolute_weight 

795 ) 

796 except AttributeError: 

797 m = "normalised weights not loaded - run HierarchicalWeightingVisitor" 

798 raise WeightingsNotLoadedException(m) 

799 return self._absolute_weight 

800 

801 @absolute_weight.setter 

802 def absolute_weight(self, abs_weight): 

803 self._absolute_weight = abs_weight 

804 

805 def validate_and_save_answers(self, answer_doc, issue) -> SaveAnswersResult: 

806 changes = [] 

807 unanswered_mandatory = False 

808 current_answers = {a.element_id: a for a in self.answers_for_issue(issue.id)} 

809 for element in self.question_def.answerable_elements: 

810 el_id = element.id 

811 if el_id in answer_doc: 

812 new_answer = answer_doc.pop(el_id) 

813 if new_answer is None: 

814 continue 

815 element.validate(new_answer) 

816 if el_id in current_answers: 

817 current_answer = current_answers[el_id] 

818 old_answer = current_answer.answer 

819 if old_answer != new_answer: 

820 current_answer.answer = new_answer 

821 changes.append((old_answer, new_answer)) 

822 else: 

823 a = Answer(element_id=el_id, answer=new_answer, issue_id=issue.id) 

824 self._answers.append(a) 

825 changes.append((None, new_answer)) 

826 else: 

827 if element.mandatory and el_id not in current_answers: 

828 unanswered_mandatory = True 

829 

830 if len(answer_doc) > 0: 

831 tmpl = "Unable to match all answers with element IDs: {}" 

832 ids = ", ".join(str(k) for k in answer_doc.keys()) 

833 raise ValidationFailure(tmpl.format(ids)) 

834 

835 is_new = len(current_answers) == 0 

836 

837 return SaveAnswersResult(is_new, changes, unanswered_mandatory) 

838 

839 

840class QElement(Base): 

841 __tablename__ = "question_elements" 

842 __table_args__ = ( # type: ignore 

843 Index( 

844 "question_elements_qId_row_col", 

845 "question_id", 

846 "row", 

847 "col", 

848 ), 

849 Index("ft_label", "label", mysql_prefix="FULLTEXT"), 

850 ) + Base.__table_args__ 

851 

852 __mapper_args__ = {"polymorphic_on": "el_type", "polymorphic_identity": ""} 

853 

854 is_answerable = True 

855 

856 contains_choices = False 

857 

858 public_attrs = ("id,el_type,colspan,rowspan,label,mandatory,regexp,col,row").split( 

859 "," 

860 ) 

861 

862 answerable_types = {"TX", "CR", "CC", "CB", "AT"} 

863 

864 question_id: Mapped[int] = mapped_column( 

865 ForeignKey("questions.id", ondelete="CASCADE"), 

866 nullable=False, 

867 index=True, 

868 server_default=text("'0'"), 

869 ) 

870 row: Mapped[int] = mapped_column( 

871 mysql.INTEGER(10), nullable=False, default=1, server_default=text("'0'") 

872 ) 

873 col: Mapped[int] = mapped_column( 

874 mysql.INTEGER(10), nullable=False, default=1, server_default=text("'0'") 

875 ) 

876 el_type: Mapped[str] = mapped_column( 

877 "type", mysql.CHAR(length=2), nullable=False, server_default=text("''") 

878 ) 

879 colspan: Mapped[Optional[int]] = mapped_column(Integer, default=1, nullable=True) 

880 rowspan: Mapped[Optional[int]] = mapped_column(Integer, default=1, nullable=True) 

881 label: Mapped[Optional[str]] = mapped_column(mysql.MEDIUMTEXT(), nullable=True) 

882 mandatory: Mapped[Optional[bool]] = mapped_column(Boolean, default=False, nullable=True) 

883 height: Mapped[Optional[int]] 

884 width: Mapped[Optional[int]] 

885 multitopic: Mapped[Optional[bool]] = mapped_column(Boolean, default=False, nullable=True) 

886 regexp: Mapped[Optional[str]] = mapped_column(Unicode(255)) 

887 choices: Mapped[Optional[dict]] = mapped_column(JSON, nullable=True) 

888 

889 question_def: Mapped["QuestionDefinition"] = relationship( 

890 "QuestionDefinition", back_populates="elements" 

891 ) 

892 answers: Mapped[list["Answer"]] = relationship("Answer", back_populates="element") 

893 _answers: DynamicMapped["Answer"] = relationship( 

894 "Answer", lazy="dynamic", viewonly=True 

895 ) 

896 

897 def __eq__(self, o: object) -> bool: 

898 if isinstance(o, QElement): 

899 equal: bool = True 

900 for k in el_keys: 

901 if ( 

902 hasattr(self, k) 

903 and hasattr(o, k) 

904 and getattr(o, k) == getattr(self, k) 

905 ): 

906 continue 

907 else: 

908 equal = False 

909 break 

910 return equal 

911 return False 

912 

913 def __hash__(self) -> int: 

914 return super().__hash__() 

915 

916 def get_answer(self, issue): 

917 """Get the answer for this element and `issue` or None if not yet answered""" 

918 return self._answers.filter(Answer.issue == issue).one() 

919 

920 def get_question_instance(self, project_id): 

921 return self.question_def.instances.filter_by(project_id=project_id).one() 

922 

923 @staticmethod 

924 def split_choice_string(choice_string): 

925 """ 

926 Split a string representation of a multiple choice option, 

927 e.g. 'Yes <8>' into a tuple - ('Yes', 8) 

928 """ 

929 regex_match = re.match(r"([^<]+)<(\d+)>", choice_string) 

930 if regex_match: 

931 label, autoscore = regex_match.groups() 

932 return (label.strip(), autoscore.strip()) 

933 else: 

934 return (choice_string.strip(), None) 

935 

936 @classmethod 

937 def build(cls, type_name, **kwargs): 

938 """Create a QElement subclass instance of the given type_name""" 

939 mapper = inspect(cls).polymorphic_map[type_name] 

940 el = mapper.class_(**kwargs) 

941 return el 

942 

943 @property 

944 def cell_area(self): 

945 return self.colspan * self.rowspan 

946 

947 @property 

948 def summary(self): 

949 """For describing this element in audit event change log""" 

950 return self.label 

951 

952 

953class Label(QElement): 

954 __mapper_args__ = {"polymorphic_identity": "LB"} 

955 is_answerable = False 

956 public_attrs = "id,el_type,label,colspan,rowspan".split(",") 

957 

958 

959class Checkbox(QElement): 

960 __mapper_args__ = {"polymorphic_identity": "CB"} 

961 public_attrs = "id,el_type,label,colspan,rowspan".split(",") 

962 

963 def validate(self, answer): 

964 if answer not in ("true", "false"): 

965 m = f"Answer for checkbox element {self.id} must be 'true' or 'false'" 

966 raise ValidationFailure(m) 

967 

968 

969class TextInput(QElement): 

970 __mapper_args__ = {"polymorphic_identity": "TX"} 

971 

972 def as_dict(self): 

973 return { 

974 "id": self.id, 

975 "el_type": self.el_type, 

976 "colspan": self.colspan, 

977 "rowspan": self.rowspan, 

978 "height": self.height, 

979 "width": self.width, 

980 "mandatory": self.mandatory, 

981 "regexp": self.regexp, 

982 } 

983 

984 def validate(self, answer): 

985 if self.regexp and not re.match(self.regexp, answer): 

986 tmpl = "Answer {:.10} does not match regular expression {} for element {}" 

987 raise ValidationFailure(tmpl.format(self.regexp, answer, self.id)) 

988 

989 @property 

990 def summary(self): 

991 return f"[{self.width} X {self.height}]" 

992 

993 

994class MultipleChoice(object): 

995 """ 

996 The choices property on QElement is used by RadioChoices (CR) and SelectChoices(CC) 

997 subclasses. The property is a JSON field containing an array of label:autoscore objects: 

998 [ 

999 {label: 'Yes', autoscore: 10}, 

1000 {label: 'No', autoscore: 0} 

1001 ] 

1002 

1003 """ 

1004 

1005 contains_choices = True 

1006 # `choices` attribute is set by QuestionDefinition.elements() 

1007 

1008 def as_dict(self, vendor_view=False): 

1009 if vendor_view: 

1010 choices = [{"label": c["label"]} for c in self.choices] 

1011 else: 

1012 choices = self.choices 

1013 return { 

1014 "id": self.id, 

1015 "el_type": self.el_type, 

1016 "colspan": self.colspan, 

1017 "rowspan": self.rowspan, 

1018 "mandatory": self.mandatory, 

1019 "choices": choices, 

1020 } 

1021 

1022 def validate(self, answer): 

1023 choice_vals = {c["label"] for c in self.choices} 

1024 if answer not in choice_vals: 

1025 tmpl = ( 

1026 "Answer '{:.10}' is not one of the accepted values: [{}] for element {}" 

1027 ) 

1028 raise ValidationFailure(tmpl.format(answer, ",".join(choice_vals), self.id)) 

1029 

1030 @property 

1031 def summary(self): 

1032 if self.choices: 

1033 return "\n".join( 

1034 f"{c['label']}: {c.get('autoscore', None)}" for c in self.choices 

1035 ) 

1036 return "" 

1037 

1038 

1039class SelectChoices(MultipleChoice, QElement): 

1040 __mapper_args__ = {"polymorphic_identity": "CC"} # 'ChoiceCombo' 

1041 

1042 

1043class RadioChoices(MultipleChoice, QElement): 

1044 __mapper_args__ = {"polymorphic_identity": "CR"} 

1045 

1046 

1047class QuestionAttachment(QElement): 

1048 """An attachment added by the Buyer to the Question""" 

1049 

1050 __mapper_args__ = {"polymorphic_identity": "QA"} 

1051 is_answerable = False 

1052 

1053 public_attrs = "id,el_type,label,colspan,rowspan".split(",") 

1054 

1055 attachment: Mapped[Optional["QAttachment"]] = relationship( 

1056 "QAttachment", uselist=False 

1057 ) 

1058 

1059 

1060class SupportingAttachment(QElement): 

1061 """ 

1062 A File Input form element enabling a Respondent to upload an attachment 

1063 as part of an answer to to a Question 

1064 """ 

1065 

1066 __mapper_args__ = {"polymorphic_identity": "AT"} 

1067 public_attrs = "id,el_type,colspan,rowspan,mandatory".split(",") 

1068 

1069 def validate(self, answer): 

1070 if not isinstance(answer, str): 

1071 raise ValidationFailure( 

1072 "SupportingAttachments answer should be the filename and size" 

1073 ) 

1074 

1075 @property 

1076 def summary(self): 

1077 return "File upload field" 

1078 

1079 

1080class ExternalMedia(QElement): 

1081 """A link to an external resource, e.g. video""" 

1082 

1083 __mapper_args__ = {"polymorphic_identity": "MD"} 

1084 

1085 

1086class QAttachment(AttachmentMixin, Base): 

1087 __tablename__ = "question_attachments" 

1088 

1089 public_attrs = ("id,size,filename,url").split(",") 

1090 element_id = mapped_column(Integer, ForeignKey("question_elements.id")) 

1091 

1092 element = relationship(QElement, viewonly=True) 

1093 

1094 

1095class Answer(Base): 

1096 __tablename__ = "answers" 

1097 __table_args__ = ( 

1098 Index( 

1099 "unique_issue_question_element_answer", 

1100 "issue_id", 

1101 "question_instance_id", 

1102 "element_id", 

1103 unique=True, 

1104 ), 

1105 Index("ft_answer", "answer", mysql_prefix="FULLTEXT"), 

1106 {"mysql_engine": "InnoDB", "mysql_charset": "utf8mb4"}, 

1107 ) 

1108 

1109 issue_id: Mapped[int] = mapped_column( 

1110 Integer, 

1111 ForeignKey("issues.id", name="answers_ibfk_1", ondelete="CASCADE"), 

1112 nullable=False, 

1113 default=0, 

1114 index=True, 

1115 ) 

1116 

1117 answer: Mapped[str] = mapped_column(mysql.LONGTEXT(), nullable=True) 

1118 

1119 element_id: Mapped[int] = mapped_column( 

1120 Integer, 

1121 ForeignKey("question_elements.id", name="answers_ibfk_2"), 

1122 nullable=False, 

1123 index=True, 

1124 server_default=text("'0'"), 

1125 ) 

1126 autoscore: Mapped[Optional[int]] = mapped_column(mysql.INTEGER(11)) 

1127 question_instance_id: Mapped[int] = mapped_column( 

1128 Integer, 

1129 ForeignKey("question_instances.id", ondelete="CASCADE", name="answers_ibfk_3"), 

1130 nullable=False, 

1131 index=True, 

1132 ) 

1133 

1134 issue: Mapped["Issue"] = relationship("Issue", back_populates="answers") 

1135 

1136 element: Mapped["QElement"] = relationship(QElement, back_populates="answers") 

1137 

1138 question_instance: Mapped["QuestionInstance"] = relationship( 

1139 QuestionInstance, back_populates="_answers", uselist=False 

1140 ) 

1141 attachment: Mapped["AAttachment"] = relationship( 

1142 "AAttachment", 

1143 back_populates="answer", 

1144 passive_deletes=True, 

1145 uselist=False, 

1146 ) 

1147 

1148 def as_dict(self): 

1149 return {"answer_id": self.id, "issue_id": self.issue_id, "answer": self.answer} 

1150 

1151 def __repr__(self): 

1152 return f"Answer ID {self.id}, Issue: {self.issue_id}, QuestionInstance: {self.question_instance_id}" 

1153 

1154 

1155class AnswerReport(Base): 

1156 __tablename__ = "answer_reports" 

1157 __table_args__ = {"mysql_engine": "InnoDB", "mysql_charset": "utf8mb4"} 

1158 

1159 id: Mapped[int] = mapped_column(Integer, primary_key=True) 

1160 project_id: Mapped[int] = mapped_column( 

1161 ForeignKey("projects.id", ondelete="CASCADE", name="answer_reports_project_id"), 

1162 nullable=False, 

1163 index=True, 

1164 ) 

1165 title: Mapped[str] = mapped_column(Unicode(100), nullable=False) 

1166 definition: Mapped[str] = mapped_column(mysql.MEDIUMTEXT(), nullable=False) 

1167 project: Mapped["Project"] = relationship( 

1168 "Project", back_populates="answer_reports" 

1169 ) 

1170 

1171 def __repr__(self): 

1172 return f"<AnswerReport {self.id} - {self.title}>" 

1173 

1174 

1175class ResponseStatus(Enum): 

1176 NOT_ANSWERED = 0 

1177 ANSWERED = 10 

1178 FOR_REVIEW = 20 

1179 REJECTED = 30 

1180 APPROVED = 40 

1181 

1182 

1183class ResponseStatusCol(types.TypeDecorator): 

1184 """ 

1185 A custom SQLAlchemy type that maps database integer 

1186 values to ResponseStatus Enum values 

1187 """ 

1188 

1189 impl = mysql.TINYINT(4) 

1190 

1191 cache_ok = True 

1192 

1193 def process_bind_param(self, response_status_enum, dialect): 

1194 return response_status_enum.value 

1195 

1196 def process_result_value(self, int_value, dialect): 

1197 return ResponseStatus(int_value) 

1198 

1199 

1200class QuestionResponseState(Base): 

1201 __tablename__ = "question_response_states" 

1202 __table_args__ = ( 

1203 Index("unique_resp_state", "issue_id", "question_instance_id", unique=True), 

1204 {"mysql_engine": "InnoDB", "mysql_charset": "utf8mb4"}, 

1205 ) 

1206 public_attrs = [ 

1207 "status", 

1208 "allocated_by", 

1209 "allocated_to", 

1210 "approved_by", 

1211 "updated_by", 

1212 "date_updated", 

1213 ] 

1214 

1215 id: Mapped[int] = mapped_column(Integer, primary_key=True) 

1216 

1217 issue_id: Mapped[int] = mapped_column( 

1218 ForeignKey("issues.id", ondelete="CASCADE"), 

1219 nullable=False, 

1220 index=True, 

1221 server_default=text("'0'"), 

1222 ) 

1223 

1224 question_instance_id: Mapped[int] = mapped_column( 

1225 ForeignKey("question_instances.id", ondelete="CASCADE"), 

1226 nullable=False, 

1227 index=True, 

1228 server_default=text("'0'"), 

1229 ) 

1230 

1231 status: Mapped[ResponseStatus] = mapped_column( 

1232 ResponseStatusCol, nullable=False, server_default=text("'0'") 

1233 ) 

1234 

1235 allocated_by: Mapped[Optional[str]] = mapped_column( 

1236 types.VARCHAR(length=50), ForeignKey("users.id", ondelete="SET NULL") 

1237 ) 

1238 allocated_to: Mapped[Optional[str]] = mapped_column( 

1239 types.VARCHAR(length=50), ForeignKey("users.id", ondelete="SET NULL") 

1240 ) 

1241 approved_by: Mapped[Optional[str]] = mapped_column( 

1242 types.VARCHAR(length=50), ForeignKey("users.id", ondelete="SET NULL") 

1243 ) 

1244 updated_by: Mapped[Optional[str]] = mapped_column( 

1245 types.VARCHAR(length=50), ForeignKey("users.id", ondelete="SET NULL") 

1246 ) 

1247 

1248 date_updated: Mapped[Optional[datetime]] = mapped_column(DateTime) 

1249 

1250 issue: Mapped["Issue"] = relationship( 

1251 "Issue", 

1252 back_populates="response_states", 

1253 ) 

1254 question_instance: Mapped["QuestionInstance"] = relationship("QuestionInstance") 

1255 

1256 

1257class AAttachment(AttachmentMixin, Base): 

1258 __tablename__ = "answer_attachments" 

1259 

1260 public_attrs = ( 

1261 "id,size,filename,url,respondent,question_number,question_id" 

1262 ).split(",") 

1263 answer_id: Mapped[Optional[int]] = mapped_column( 

1264 Integer, ForeignKey("answers.id", ondelete="SET NULL"), unique=True 

1265 ) 

1266 

1267 answer: Mapped[Optional["Answer"]] = relationship( 

1268 Answer, 

1269 uselist=False, 

1270 back_populates="attachment", 

1271 ) 

1272 

1273 def __repr__(self) -> str: 

1274 return f"<AAttachment #{self.id} - {self.filename}>" 

1275 

1276 

1277class WeightingSet(Base): 

1278 __tablename__ = "weighting_sets" 

1279 

1280 public_attrs = ("id", "name") 

1281 name: Mapped[Optional[str]] = mapped_column(Unicode(255), default=None) 

1282 project_id: Mapped[int] = mapped_column( 

1283 Integer, ForeignKey("projects.id", ondelete="CASCADE"), nullable=False 

1284 ) 

1285 

1286 project: Mapped["Project"] = relationship( 

1287 "Project", back_populates="weighting_sets" 

1288 ) 

1289 

1290 weightings: DynamicMapped["Weighting"] = relationship( 

1291 "Weighting", 

1292 back_populates="weighting_set", 

1293 lazy="dynamic", 

1294 cascade="all,delete", 

1295 passive_deletes=True, 

1296 ) 

1297 

1298 def __repr__(self): 

1299 return f'Weighting Set ID: {self.id} "{self.name}"' 

1300 

1301 def lookup_table(self): 

1302 q_lookup = {} 

1303 sec_lookup = {} 

1304 for weight in self.weightings.with_entities( 

1305 Weighting.section_id, Weighting.question_instance_id, Weighting.value 

1306 ): 

1307 if weight.section_id: 

1308 sec_lookup[weight.section_id] = weight.value 

1309 else: 

1310 q_lookup[weight.question_instance_id] = weight.value 

1311 return {"questions": q_lookup, "sections": sec_lookup} 

1312 

1313 

1314class Weighting(Base): 

1315 __tablename__ = "weightings" 

1316 

1317 weighting_set_id: Mapped[int] = mapped_column( 

1318 ForeignKey("weighting_sets.id", ondelete="CASCADE"), 

1319 nullable=False, 

1320 ) 

1321 question_instance_id: Mapped[Optional[int]] = mapped_column( 

1322 ForeignKey("question_instances.id", ondelete="CASCADE") 

1323 ) 

1324 section_id: Mapped[Optional[int]] = mapped_column( 

1325 ForeignKey("sections.id", ondelete="CASCADE") 

1326 ) 

1327 value: Mapped[Decimal] = mapped_column( 

1328 WEIGHTING_COL_TYPE, server_default=text("'1.0000'"), nullable=False 

1329 ) 

1330 

1331 question: Mapped[Optional["QuestionInstance"]] = relationship(QuestionInstance) 

1332 section: Mapped[Optional["Section"]] = relationship("Section") 

1333 weighting_set: Mapped["WeightingSet"] = relationship( 

1334 WeightingSet, back_populates="weightings" 

1335 ) 

1336 

1337 def __repr__(self): 

1338 return f"<Weighting {self.id} - {self.value}>" 

1339 

1340 

1341class TotalWeighting(Base): 

1342 """ 

1343 The total_weightings table is essentially a cache. It saves the absolute, normalised weight 

1344 values for question instances and sections for a given weighting_set_id. Null weighting_set_id 

1345 indicates the default weighting set. 

1346 

1347 Absolute weights are derived hierachically - the entire questionnaire needs to be loaded 

1348 as a tree structure in order to figure out these weights. e.g. question weight depends 

1349 on the weights of parent sections. It is therefore expense to derive on the fly - hence 

1350 the requirement for this table. 

1351 

1352 The cache is recalculated when weightings are saved 

1353 """ 

1354 

1355 __tablename__ = "total_weightings" 

1356 __table_args__ = ( 

1357 Index( 

1358 "weightings", 

1359 "project_id", 

1360 "weighting_set_id", 

1361 "question_instance_id", 

1362 "section_id", 

1363 unique=True, 

1364 ), 

1365 {"mysql_engine": "InnoDB", "mysql_charset": "utf8mb4"}, 

1366 ) 

1367 

1368 project_id: Mapped[int] = mapped_column( 

1369 ForeignKey("projects.id", ondelete="CASCADE", name="total_weightings_wset"), 

1370 nullable=False, 

1371 ) 

1372 weighting_set_id: Mapped[Optional[int]] = mapped_column( 

1373 Integer, nullable=True, default=None 

1374 ) 

1375 question_instance_id: Mapped[int] = mapped_column( 

1376 Integer, nullable=False, server_default=text("'0'") 

1377 ) 

1378 section_id: Mapped[int] = mapped_column( 

1379 Integer, nullable=False, server_default=text("'0'") 

1380 ) 

1381 weight: Mapped[Optional[float]] = mapped_column(mysql.FLOAT(asdecimal=True, scale=7), nullable=True) 

1382 

1383 project: Mapped["Project"] = relationship( 

1384 "Project", 

1385 back_populates="total_weightings", 

1386 ) 

1387 

1388 def __repr__(self): 

1389 return f"<TotalWeighting {self.id} - {self.weight}>" 

1390 

1391 

1392""" 

1393TODO 

1394- add column. One for normalised_weight, one for absolute_weight 

1395 - Rename index in DB Schema to avoid name clash with "weightings" table 

1396 - For API use, provide one weighting set at a time, load others on demand. 

1397 - or provide weightings as a lookup dict in JS 

1398""" 

1399 

1400 

1401class LoadWeightSetVisitor(Visitor): 

1402 """ 

1403 Updates Question and Section weights in the questionnaire with values 

1404 from the provided weighting_set 

1405 """ 

1406 

1407 def __init__(self, weighting_set): 

1408 super(Visitor, self).__init__() 

1409 weight_lookup = weighting_set.lookup_table() 

1410 self.question_lookup = weight_lookup["questions"] 

1411 self.section_lookup = weight_lookup["sections"] 

1412 

1413 def hello_section(self, sec): 

1414 sec._weightset_weight = self.section_lookup[sec.id] 

1415 

1416 def visit_question(self, question): 

1417 question._weightset_weight = self.question_lookup[question.id] 

1418 

1419 

1420class HierarchyWeightingsVisitor(Visitor): 

1421 """ 

1422 Sets normalised weights (percentage weight within sub-tree) 

1423 for all sections and questions 

1424 """ 

1425 

1426 def hello_section(self, sec): 

1427 if sec.is_top_level: 

1428 sec.normalised_weight = Decimal(1) 

1429 self._set_subsection_normalised_weights(sec) 

1430 self._set_question_normalised_weights(sec) 

1431 

1432 def _set_subsection_normalised_weights(self, sec): 

1433 subtree_total = sum(s.weight for s in sec.subsections) 

1434 

1435 if subtree_total == 0: 

1436 # Avoid divide by zero errors 

1437 for subsec in sec.subsections: 

1438 subsec.normalised_weight = Decimal(0) 

1439 else: 

1440 for subsec in sec.subsections: 

1441 subsec.normalised_weight = Decimal(subsec.weight) / subtree_total 

1442 

1443 def _set_question_normalised_weights(self, sec): 

1444 q_total = sum(q.weight for q in sec.questions) 

1445 

1446 if q_total == 0: 

1447 # Avoid divide by zero errors 

1448 for q in sec.questions: 

1449 q.normalised_weight = Decimal(0) 

1450 else: 

1451 for q in sec.questions: 

1452 q.normalised_weight = Decimal(q.weight) / q_total 

1453 

1454 

1455class SaveTotalWeightingsVisitor(Visitor): 

1456 """ 

1457 Saves TotalWeighting objects for the given project and weighting_set, 

1458 should be invoked after HierarchyWeightingsVisitor has had a chance 

1459 to save normalised weights 

1460 """ 

1461 

1462 def __init__(self, session, project, weighting_set_id=None): 

1463 super(Visitor, self).__init__() 

1464 self.session = session 

1465 self.project_id = project.id 

1466 self.weighting_set_id = weighting_set_id 

1467 self.total_weightings = [] 

1468 

1469 def hello_section(self, sec): 

1470 tw = dict( 

1471 project_id=self.project_id, 

1472 weight=sec.absolute_weight, 

1473 section_id=sec.id, 

1474 weighting_set_id=self.weighting_set_id, 

1475 ) 

1476 self.total_weightings.append(tw) 

1477 

1478 def visit_question(self, question): 

1479 tw = dict( 

1480 project_id=self.project_id, 

1481 weight=question.absolute_weight, 

1482 question_instance_id=question.id, 

1483 weighting_set_id=self.weighting_set_id, 

1484 ) 

1485 

1486 self.total_weightings.append(tw) 

1487 

1488 def finalise(self): 

1489 self.session.bulk_insert_mappings(TotalWeighting, self.total_weightings)