Coverage for rfpy/api/fetch.py: 97%

433 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-04-24 10:52 +0000

1""" 

2Functions which fetch objects from the database without checking for 

3permissions etc 

4""" 

5 

6import logging 

7from operator import itemgetter 

8from rfpy.model.audit.event import EventOrgACL 

9from rfpy.model.notify import WebhookSubscription 

10from typing import ( 

11 Iterable, 

12 List, 

13 Sequence, 

14 Tuple, 

15 Optional, 

16 Dict, 

17 Any, 

18 TYPE_CHECKING, 

19 Union, 

20) 

21 

22from sqlalchemy import ( 

23 select, 

24 literal, 

25 func, 

26 or_, 

27 case, 

28 and_, 

29 cast, 

30 INTEGER, 

31 desc, 

32 null, 

33 text, 

34 Row, 

35 Alias, 

36) 

37from sqlalchemy.orm import ( 

38 subqueryload, 

39 joinedload, 

40 lazyload, 

41 Bundle, 

42 Query, 

43 Session, 

44 undefer, 

45) 

46from sqlalchemy.orm.session import object_session 

47from sqlalchemy.orm.exc import NoResultFound 

48 

49from rfpy.model import ( 

50 Participant, # nopep8 # noqa: F403 

51 ProjectPermission, 

52 Section, 

53 ProjectWatchList, 

54 ProjectNote, 

55 SectionPermission, 

56 QuestionInstance, 

57 Score, 

58 TotalWeighting, 

59 Weighting, 

60 QuestionDefinition, 

61 QElement, 

62 Answer, 

63 AAttachment, 

64 AuditEvent, 

65 Organisation, 

66 Category, 

67 WeightingSet, 

68 QuestionResponseState, 

69 QAttachment, 

70 IssueWatchList, 

71 Issue, 

72 Project, 

73 User, 

74 Edge, 

75 RelationshipType, 

76) 

77 

78from rfpy.model.questionnaire import ( 

79 NumberString, 

80 from_b36, 

81 ResponseStatus, 

82) # nopep8 # noqa: F403 

83from rfpy.model import misc # nopep8 # noqa: F403 

84from rfpy.templates import get_template # nopep8 # noqa: F403 

85from rfpy.auth.password import validate_hash 

86from rfpy.web import serial 

87 

88 

89if TYPE_CHECKING: 

90 from sqlalchemy import Alias, Subquery 

91 

92log = logging.getLogger(__name__) 

93 

94 

95def user(session: Session, user_id: str) -> User: 

96 """ 

97 Fetch a User by ID 

98 @raises NoResultFound 

99 """ 

100 return session.query(User).filter(User.id == user_id).one() 

101 

102 

103def organisation(session: Session, org_id: str) -> Organisation: 

104 """ 

105 Fetch an Organisation by ID, 

106 

107 Raises 

108 ------ 

109 NoResultFound if no Organisation found for the given org_id 

110 """ 

111 return session.query(Organisation).filter(Organisation.id == org_id).one() 

112 

113 

114def user_by_password(session: Session, user_id: str, password: str) -> User: 

115 unauth_user = user(session, user_id) 

116 if not unauth_user.password: 

117 log.error("User %s has no password, refusing to authenticate", user_id) 

118 raise NoResultFound("No matching user found") 

119 if not validate_hash(password, unauth_user.password): 

120 raise NoResultFound("No matching user found") 

121 return unauth_user 

122 

123 

124def dummy_hash(session: Session): 

125 """ 

126 Perform a user fetch a dummy password hash 

127 to mitigate a timing attack 

128 """ 

129 session.query(User).first() 

130 validate_hash("dummy", "dummy") 

131 

132 

133def project( 

134 session: Session, 

135 project_id: int, 

136 with_description: bool = False, 

137) -> Project: 

138 """ 

139 Fetch a Project from the database 

140 

141 Parameters 

142 ---------- 

143 session : Session 

144 

145 project_id : int 

146 

147 Raises 

148 ------ 

149 NoResultFound if no Project found for the given project_id 

150 """ 

151 q = ( 

152 session.query(Project) 

153 .filter(Project.id == project_id) 

154 .options(lazyload(Project.owner_org)) 

155 ) 

156 if with_description: 

157 q.options(undefer(Project.description)) 

158 return q.one() 

159 

160 

161def section(session: Session, section_id: int) -> Section: 

162 """ 

163 Fetch a Section from the database 

164 

165 Parameters 

166 ---------- 

167 session : Session 

168 

169 section_id : int 

170 

171 Raises 

172 ------ 

173 NoResultFound if no Section is found for the given section_id 

174 """ 

175 return session.query(Section).filter(Section.id == section_id).one() 

176 

177 

178pw_cols: Bundle = Bundle( 

179 "listed_project", 

180 Project.id, 

181 Project.title, 

182 Project.deadline, 

183 Organisation.name.label("owner_org_name"), 

184 Project.status, 

185 Organisation.id.label("owner_org_id"), 

186 Project.date_created, 

187 ProjectWatchList.id.isnot(None).label("is_watched"), 

188 ProjectWatchList.date_created.label("watching_since"), 

189 single_entity=True, 

190) 

191 

192 

193def projects_with_watched( 

194 session: Session, user: User, participant_id: Optional[str] = None 

195) -> Query: 

196 org_id = user.org_id 

197 if participant_id: 

198 org_id = participant_id 

199 pq = ( 

200 session.query(pw_cols) 

201 .join(Participant) 

202 .filter(Participant.org_id == org_id) 

203 .outerjoin( 

204 ProjectWatchList, 

205 and_( 

206 ProjectWatchList.user == user, ProjectWatchList.project_id == Project.id 

207 ), 

208 ) 

209 .outerjoin(Organisation, Project.org_id == Organisation.id) 

210 ) 

211 if user.is_restricted: 

212 pq = pq.join(ProjectPermission).filter(ProjectPermission.user == user) 

213 return pq 

214 

215 

216def category_ids_by_project(session: Session, user: User) -> Dict[int, List[int]]: 

217 """Fetch a lookup dictionary mapping project_id: [list of category ids]""" 

218 pcq = ( 

219 session.query( 

220 func.group_concat(Category.id).label("category_ids"), 

221 Project.id.label("project_id"), 

222 ) 

223 .join(Category.projects) 

224 .group_by(Project.id) 

225 .filter(Category.organisation == user.organisation) 

226 ) 

227 

228 return {r.project_id: [int(cid) for cid in r.category_ids.split(",")] for r in pcq} 

229 

230 

231def issue(session: Session, issue_id: int) -> Issue: 

232 return session.query(Issue).filter(Issue.id == issue_id).one() 

233 

234 

235def section_of_project(project: Project, section_id: int) -> Section: 

236 """ 

237 Fetch a section if it belongs to the given project. 

238 It is assumed the user has access to the project. 

239 

240 Raises 

241 ------ 

242 NoResultFound 

243 """ 

244 return project.sections.filter(Section.id == section_id).one() 

245 

246 

247def section_by_id(session: Session, section_id: int) -> Section: 

248 return session.query(Section).filter(Section.id == section_id).one() 

249 

250 

251def sections(project: Project, user: User) -> Query: 

252 """ 

253 Returns a Section query object, filtered by permission 

254 if the user is restricted 

255 """ 

256 if user.is_restricted: 

257 return ( 

258 project.sections.join(SectionPermission) 

259 .filter(SectionPermission.user == user) 

260 .order_by(Section.number) 

261 ) 

262 else: 

263 return project.sections.order_by(Section.number) 

264 

265 

266def visible_subsections_query(parent: Section, user: User) -> Query: 

267 session = object_session(parent) 

268 assert session is not None 

269 sq = ( 

270 session.query(Section) 

271 .filter(Section.parent_id == parent.id) 

272 .order_by(Section.number) 

273 ) 

274 if user.is_restricted: 

275 sq = sq.join(SectionPermission).filter(SectionPermission.user == user) 

276 return sq 

277 

278 

279def qelement(session: Session, q_element_id: int) -> QElement: 

280 element = session.get(QElement, q_element_id) 

281 if element is None: 

282 raise NoResultFound(f"No QElement found with id {q_element_id}") 

283 return element 

284 

285 

286def answer(session: Session, answer_id: int) -> Answer: 

287 return session.query(Answer).filter(Answer.id == answer_id).one() 

288 

289 

290def answers_in_issues_query( 

291 session: Session, project_id: int, issue_id_set: set 

292) -> Query: 

293 return ( 

294 session.query(Answer) 

295 .join(Issue) 

296 .filter(Issue.project_id == project_id, Answer.issue_id.in_(issue_id_set)) 

297 ) 

298 

299 

300def scores( 

301 session: Session, 

302 project: Project, 

303 section: Section, 

304 scoreset_id: str, 

305 user: User, 

306 filter_by, 

307) -> List[Dict[str, Any]]: 

308 Score.validate_scoreset(scoreset_id, project) 

309 Score.check_view_scores(user, scoreset_id) 

310 

311 sq = ( 

312 session.query( 

313 QuestionInstance.id.label("question_id"), 

314 Issue.id.label("issue_id"), 

315 literal(scoreset_id).label("scoreset_id"), 

316 Score.score.label("score"), 

317 ) 

318 .join(Section) 

319 .join(Project, Section.project) 

320 .join(Issue) 

321 .outerjoin( 

322 Score, 

323 and_( 

324 Score.scoreset_id == scoreset_id, 

325 QuestionInstance.id == Score.question_instance_id, 

326 Issue.id == Score.issue_id, 

327 ), 

328 ) 

329 .filter(Project.id == Issue.project_id, Section.id == section.id, filter_by) 

330 .order_by(QuestionInstance.id.asc(), Issue.id.asc()) 

331 ) 

332 return [s._asdict() for s in sq] 

333 

334 

335def question(session: Session, question_id: int) -> QuestionInstance: 

336 """ 

337 Fetch a question instance by ID 

338 """ 

339 return ( 

340 session.query(QuestionInstance).filter(QuestionInstance.id == question_id).one() 

341 ) 

342 

343 

344def question_of_project(project: Project, question_id: int) -> QuestionInstance: 

345 """ 

346 Fetch a question instance if it belongs to the given project. 

347 It is assumed the user has access to the project. 

348 Raises 

349 ------ 

350 NoResultFound 

351 """ 

352 return project.questions.filter(QuestionInstance.id == question_id).one() 

353 

354 

355def question_of_section( 

356 session: Session, section_id: int, question_id: int 

357) -> QuestionInstance: 

358 """ 

359 Fetch question instance by ID only if it belongs in the section whose ID provided 

360 Useful when a sections permission have been checked 

361 """ 

362 return ( 

363 session.query(QuestionInstance) 

364 .filter( 

365 QuestionInstance.id == question_id, 

366 QuestionInstance.section_id == section_id, 

367 ) 

368 .one() 

369 ) 

370 

371 

372def get_subsections_recursive(session: Session, section_id: int) -> Query: 

373 """ 

374 Fetch list of question instance by section ID 

375 """ 

376 beginning_getter = ( 

377 session.query( 

378 Section.id, 

379 Section.parent_id, 

380 literal(0).label("recursive_depth"), 

381 cast(null(), INTEGER).label("parent_lvl_1"), 

382 ) 

383 .filter(Section.id == section_id) 

384 .cte(name="children_for", recursive=True) 

385 ) 

386 with_recursive = beginning_getter.union_all( 

387 session.query( 

388 Section.id, 

389 Section.parent_id, 

390 (beginning_getter.c.recursive_depth + 1).label("recursive_depth"), 

391 case( 

392 (beginning_getter.c.recursive_depth == 0, Section.id), 

393 else_=beginning_getter.c.parent_lvl_1, 

394 ).label("parent_lvl_1"), 

395 ).filter(Section.parent_id == beginning_getter.c.id) 

396 ) 

397 return session.query(with_recursive) 

398 

399 

400def or_create_score( 

401 session: Session, 

402 project: Project, 

403 evaluator: User, 

404 question: QuestionInstance, 

405 score_doc: serial.Score, 

406) -> Tuple[Score, bool]: 

407 """ 

408 Get issue and question required to access score 

409 Do required validation and permission checks 

410 """ 

411 issue_id, scoreset_id = score_doc.issue_id, score_doc.scoreset_id 

412 if scoreset_id is None: 

413 scoreset_id = "" 

414 

415 Score.validate_scoreset(scoreset_id, project) 

416 

417 issue = project.get_issue(issue_id) 

418 

419 # Fetch or create score object 

420 created = False 

421 try: 

422 score = ( 

423 session.query(Score) 

424 .filter( 

425 Score.question_instance_id == question.id, 

426 Score.issue_id == issue.id, 

427 Score.scoreset_id == scoreset_id, 

428 ) 

429 .one() 

430 ) 

431 except NoResultFound: 

432 score = Score(issue=issue, question=question, scoreset_id=scoreset_id) 

433 session.add(score) 

434 created = True 

435 

436 return score, created 

437 

438 

439def light_nodes( 

440 session: Session, project_id: int, with_questions: bool = True 

441) -> Query: 

442 """ 

443 Returns a query of the project's questionnaire nodes. 

444 The query returns rows representing the sections and questions 

445 in the project. Each row has the following keys: 

446 - id: The ID of the section or question 

447 - title: The title of the section or question 

448 - number: The number of the section or question 

449 - type: The type of the node, either 'section' or 'question' 

450 - parent_id: The ID of the parent section of the section or question 

451 - depth: The depth of the section or question in the hierarchy 

452 

453 The query is ordered by the number of the section or question. 

454 If the with_questions flag is set to True, the query will include 

455 the questions in the project. 

456 

457 This function avoids the expense of building ORM objects and is 

458 useful for building a nested structure of the project's questionnaire. 

459 """ 

460 q = session.query( 

461 Section.id, 

462 Section.title, 

463 literal("section").label("type"), 

464 Section.parent_id, 

465 Section.number.label("number"), 

466 (func.length(func.ifnull(Section.number, "")) / 2).label("depth"), 

467 ).filter(Section.project_id == project_id) 

468 

469 if with_questions: 

470 aq = ( 

471 session.query( 

472 QuestionInstance.id, 

473 QuestionDefinition.title, 

474 literal("question").label("type"), 

475 QuestionInstance.section_id.label("parent_id"), 

476 QuestionInstance.number.label("number"), 

477 (func.length(QuestionInstance.number) / 2).label("depth"), 

478 ) 

479 .join(QuestionDefinition) 

480 .filter(QuestionInstance.project_id == project_id) 

481 ) 

482 q = q.union(aq) 

483 

484 return q.order_by("number") 

485 

486 

487def light_tree( 

488 session: Session, project_id: int, with_questions: bool = True 

489) -> dict[str, Any]: 

490 """ 

491 Returns a nested structure of the project's questionnaire. 

492 """ 

493 node_lookup: dict[int, dict[str, Any]] = {} 

494 

495 for node in light_nodes(session, project_id, with_questions=with_questions): 

496 if node.type == "section": 

497 node_lookup[node.id] = { 

498 "id": node.id, 

499 "title": node.title, 

500 "number": node.number.dotted, 

501 "type": "section", 

502 "parent_id": node.parent_id, 

503 "depth": node.depth, 

504 "subsections": [], 

505 "questions": [], 

506 } 

507 else: 

508 node_lookup[node.id] = { 

509 "id": node.id, 

510 "title": node.title, 

511 "number": node.number.dotted, 

512 "type": "question", 

513 "parent_id": node.parent_id, 

514 "depth": node.depth, 

515 } 

516 

517 root_nodes: list[dict] = [] 

518 

519 for node in node_lookup.values(): 

520 parent_id = node["parent_id"] 

521 

522 if parent_id is None: 

523 root_nodes.append(node) 

524 continue 

525 

526 try: 

527 parent = node_lookup[parent_id] 

528 except KeyError: 

529 raise ValueError(f"Parent ID {parent_id} not found in node_lookup") 

530 

531 if node["type"] == "section": 

532 parent["subsections"].append(node) 

533 else: 

534 parent["questions"].append(node) 

535 

536 if len(root_nodes) == 1: 

537 return root_nodes[0] 

538 elif len(root_nodes) == 0: 

539 raise ValueError("No root found in the questionnaire") 

540 else: 

541 raise ValueError("Multiple roots found in the questionnaire") 

542 

543 

544def _check_total_weights( 

545 project: Project, weighting_set_id: Optional[int], session: Session 

546): 

547 """ 

548 Regenerate TotalWeightings if required 

549 @see rfpy.model.questionnaire.TotalWeighting 

550 """ 

551 if not project.total_weights_exist_for(weighting_set_id): 

552 log.info( 

553 "Regenerating TotalWeightings for %s , WeightingSet: %s", 

554 project, 

555 weighting_set_id, 

556 ) 

557 project.save_total_weights(weighting_set_id) 

558 session.flush() 

559 

560 

561def scoring_data(project: Project, scoreset_id: str = "") -> Query: 

562 """ 

563 Returns a list of a score records for the given project and scoring set. 

564 

565 Each dict has keys: score, number, issue_id 

566 

567 """ 

568 session = object_session(project) 

569 assert session is not None 

570 q = ( 

571 session.query( 

572 Score.score, 

573 QuestionInstance.id.label("question_id"), 

574 QuestionInstance.number, 

575 Score.issue_id, 

576 ) 

577 .join(QuestionInstance) 

578 .filter(QuestionInstance.project == project, Score.scoreset_id == scoreset_id) 

579 ) 

580 

581 return q 

582 

583 

584def old_scoring_data( 

585 session: Session, project: Project, weighting_set_id: Optional[int] = None 

586) -> List[Dict[str, Any]]: 

587 """ 

588 Returns a list of a score dicts for the given project and weighting set. 

589 

590 Each dict has keys: score, scoreSet, number, respondent, weight, weightSet 

591 

592 'weight' is the absolute weight taken from TotalWeightings table. 

593 

594 If TotalWeighting records don't exist for the given project/scoreset, they 

595 are regenerated. 

596 

597 N.B. This assumes that TotalWeighting records are *deleted* when weights are edited 

598 - this is currently managed in Java land (March 2015) 

599 """ 

600 

601 _check_total_weights(project, weighting_set_id, session) 

602 

603 tw_sc_join = (QuestionInstance.id == TotalWeighting.question_instance_id) & ( 

604 TotalWeighting.project_id == QuestionInstance.project_id 

605 ) 

606 

607 q = ( 

608 session.query( 

609 Score.score, 

610 Score.scoreset_id.label("scoreSet"), 

611 QuestionInstance.number, 

612 QuestionInstance._weight.label("qiWeight"), 

613 QuestionDefinition.weight.label("qdefWeight"), 

614 Issue.respondent_id.label("respondent"), 

615 Issue.id.label("issueId"), 

616 Issue.label.label("label"), 

617 TotalWeighting.weight.label("weight"), 

618 func.coalesce(WeightingSet.name, "").label("weightSet"), 

619 ) 

620 .join(Issue, Score.issue_id == Issue.id) 

621 .join(QuestionInstance) 

622 .join(QuestionDefinition) 

623 .join(TotalWeighting, tw_sc_join) 

624 .outerjoin(WeightingSet, TotalWeighting.weighting_set_id == WeightingSet.id) 

625 .filter(QuestionInstance.project == project) 

626 ) 

627 

628 return q.all() 

629 

630 

631def participant_notes_query(project: Project) -> Query: 

632 session = object_session(project) 

633 assert session is not None 

634 pq = session.query(Participant.org_id).filter(Participant.project == project) 

635 pn = project.notes_query.filter( 

636 or_(ProjectNote.private == 0, ProjectNote.org_id.in_(pq)) 

637 ).order_by(ProjectNote.note_time.desc()) 

638 return pn 

639 

640 

641def vendor_notes(issue: Issue, user: User) -> Query: 

642 nq = issue.project.notes_query 

643 own_org_id = user.org_id 

644 return nq.filter( 

645 or_( 

646 and_( 

647 ProjectNote.private == True, # noqa 

648 ProjectNote.org_id == own_org_id, 

649 ProjectNote.kind 

650 == "RespondentNote", # Posted by this vendor to himself 

651 ), 

652 and_( 

653 ProjectNote.private == False, # noqa: E712 

654 ProjectNote.kind == "IssuerNote", 

655 or_( 

656 ProjectNote.target_org_id == None, # noqa: E711 # Sent by issuer to all vendors 

657 ProjectNote.target_org_id == own_org_id, # Sent to just this vendor 

658 ), 

659 ), 

660 and_( 

661 ProjectNote.private == False, # noqa: E712 

662 ProjectNote.org_id == own_org_id, 

663 ProjectNote.kind 

664 == "RespondentNote", # Posted by this vendor to the issue 

665 ), 

666 ) 

667 ).order_by(ProjectNote.note_time.desc()) 

668 

669 

670def note(project: Project, note_id: int) -> ProjectNote: 

671 """ 

672 Fetch a ProjectNote by id checking it belongs to the given project but 

673 without checking user visibility 

674 

675 @raises NoResultFound 

676 """ 

677 return project.notes_query.filter(ProjectNote.id == note_id).one() 

678 

679 

680def answer_attachments_q(project: Project, user: User) -> Query: 

681 """ 

682 Returns a list of AAttachment (Answer Attachment) objects from 

683 scoreable issues in the given project 

684 """ 

685 session = object_session(project) 

686 assert session is not None 

687 issue_filter = Issue.scoreable_filter(project) 

688 q = ( 

689 session.query(AAttachment) 

690 .join(Answer) 

691 .join(Issue) 

692 .filter(issue_filter) 

693 .filter(Issue.project_id == project.id) 

694 .options( 

695 joinedload(AAttachment.answer), 

696 joinedload(AAttachment.answer, Answer.question_instance), 

697 subqueryload(AAttachment.answer, Answer.issue), 

698 joinedload(AAttachment.answer, Answer.issue, Issue.respondent), 

699 ) 

700 ) 

701 

702 if user.is_restricted: 

703 q = ( 

704 q.join(QuestionInstance) 

705 .join(QuestionDefinition) 

706 .join(Section) 

707 .join(SectionPermission) 

708 .filter(SectionPermission.user == user) 

709 ) 

710 

711 return q 

712 

713 

714def question_scoresummary( 

715 session: Session, 

716 user: User, 

717 project: Project, 

718 section: Section, 

719 scoreset_id: str = "", 

720) -> Query: 

721 # Questions 

722 questions = ( 

723 session.query( 

724 QuestionInstance.id.label("question_id"), 

725 Issue.id.label("issue_id"), 

726 literal(scoreset_id).label("scoreset_id"), 

727 ) 

728 .join(Section) 

729 .join(Project, Section.project) 

730 .join(Issue) 

731 .outerjoin( 

732 Score, 

733 and_( 

734 Score.scoreset_id == scoreset_id, 

735 QuestionInstance.id == Score.question_instance_id, 

736 Issue.id == Score.issue_id, 

737 ), 

738 ) 

739 .filter( 

740 Issue.project == project, 

741 Issue.scoreable_filter(project), 

742 Section.id == section.id, 

743 ) 

744 ) 

745 

746 if user.is_restricted: 

747 query = questions.outerjoin( 

748 SectionPermission, 

749 and_( 

750 SectionPermission.user == user, 

751 Section.id == SectionPermission.section_id, 

752 ), 

753 ).add_columns( 

754 case( 

755 (SectionPermission.section_id != None, Score.score), # noqa 

756 else_=literal("N/A"), 

757 ).label("score") 

758 ) 

759 else: 

760 query = questions.add_columns(Score.score) 

761 

762 return query 

763 

764 

765def subsection_scoressummary( 

766 session: Session, 

767 user: User, 

768 project: Project, 

769 section: Section, 

770 scoreset_id: str = "", 

771) -> "Subquery": 

772 """ 

773 Subsections 

774 First create a subquery totalling up the counts of questions 

775 contained immediately in each section. 

776 Exclude all sections the score_user does not have permissions on 

777 """ 

778 sub = ( 

779 session.query( 

780 Section.number.label("number"), 

781 Issue.id.label("issue_id"), 

782 func.count(QuestionInstance.id).label("q_count"), 

783 func.count(Score.score).label("q_scored"), 

784 func.coalesce(func.sum(Score.score), 0).label("score"), 

785 ) 

786 .join(Project, Section.project) 

787 .join(Issue) 

788 .outerjoin( 

789 QuestionInstance, 

790 and_( 

791 QuestionInstance.project_id == Project.id, 

792 QuestionInstance.section_id == Section.id, 

793 ), 

794 ) 

795 .outerjoin( 

796 Score, 

797 and_( 

798 Score.scoreset_id == scoreset_id, 

799 QuestionInstance.id == Score.question_instance_id, 

800 Issue.id == Score.issue_id, 

801 ), 

802 ) 

803 .filter(Issue.project_id == project.id, Issue.scoreable_filter(project)) 

804 .group_by(Section.id, Issue.id) 

805 ) 

806 

807 if user.is_restricted: 

808 sub = sub.join( 

809 SectionPermission, 

810 and_( 

811 SectionPermission.user == user, 

812 Section.id == SectionPermission.section_id, 

813 ), 

814 ) 

815 

816 return sub.subquery() 

817 

818 

819def section_scoresummary( 

820 session: Session, 

821 user: User, 

822 project: Project, 

823 section: Section, 

824 sub: Union["Alias", "Subquery"], 

825) -> Query: 

826 """ 

827 create main query of all immediate subsections in this section 

828 and total up the totals from the subsection and child subsections 

829 from the subquery. No rows from the subquery indicates the score_user 

830 does not have access to any subsections, in this case a score of 'N/A' 

831 is returned. 

832 """ 

833 

834 subsections = ( 

835 session.query( 

836 Section.id.label("section_id"), 

837 Issue.id.label("issue_id"), 

838 cast(func.sum(sub.c.q_count), INTEGER).label("question_count"), 

839 cast(func.sum(sub.c.q_scored), INTEGER).label("questions_scored"), 

840 func.coalesce(cast(func.sum(sub.c.score), INTEGER), literal("N/A")).label( 

841 "score" 

842 ), 

843 ) 

844 .join(Project, Section.project) 

845 .join(Issue) 

846 .outerjoin( 

847 sub, 

848 and_(sub.c.number.startswith(Section.number), Issue.id == sub.c.issue_id), 

849 ) 

850 .filter( 

851 Issue.project == project, 

852 Issue.scoreable_filter(project), 

853 Section.parent_id == section.id, 

854 ) 

855 .group_by(Section.id, Issue.id) 

856 ) 

857 

858 return subsections 

859 

860 

861def issues_for_respondent( 

862 session: Session, buyer_user: User, respondent_id: str 

863) -> List[Dict[str, Any]]: 

864 """ 

865 Fetch a list of Issue/Project records for the given respondent_id 

866 which are visible to the given user 

867 """ 

868 cols = ( 

869 Issue.id.label("issue_id"), 

870 Issue.id.label("id"), 

871 Issue.status, 

872 Issue.issue_date, 

873 Issue.accepted_date, 

874 Issue.submitted_date, 

875 Issue.deadline, 

876 Issue.winloss_exposed, 

877 Issue.winloss_expiry, 

878 Project.id.label("project_id"), 

879 Project.title.label("project_title"), 

880 Project.org_id.label("project_owner"), 

881 ) 

882 q = ( 

883 session.query(*cols) 

884 .join(Project) 

885 .join(Participant) 

886 .filter( 

887 Participant.organisation == buyer_user.organisation, 

888 Issue.respondent_id == respondent_id, 

889 ) 

890 .order_by(Issue.issue_date.desc()) 

891 ) 

892 return q.all() 

893 

894 

895def audit_events(organisation: Organisation, event_type: Optional[str] = None) -> Query: 

896 eq = organisation.visible_events.order_by(AuditEvent.id.desc()) 

897 

898 if event_type: 

899 return eq.filter(AuditEvent.event_type == event_type) 

900 else: 

901 return eq 

902 

903 

904def project_audit_events( 

905 organisation: Organisation, project: Project, event_type: Optional[str] = None 

906) -> Query: 

907 ev_query = audit_events(organisation, event_type=event_type) 

908 return ev_query.filter(AuditEvent.project == project) 

909 

910 

911def element_answers(session: Session, user: User, element_id: int) -> List[Row]: 

912 q = ( 

913 session.query( 

914 Answer.id.label("answer_id"), 

915 Answer.answer, 

916 Issue.respondent_id, 

917 Issue.id.label("issue_id"), 

918 Project.id.label("project_id"), 

919 Project.title.label("project_title"), 

920 Project.date_published, 

921 ) 

922 .join(Issue, Issue.id == Answer.issue_id) 

923 .join(Project) 

924 .join(Participant) 

925 .filter( 

926 Answer.element_id == element_id, 

927 Participant.organisation == user.organisation, 

928 Issue.status.in_(("Submitted", "Updateable")), 

929 ) 

930 ) 

931 

932 return q.all() 

933 

934 

935def score_totals_by_project( 

936 session: Session, org_id: str, project_ids: List[int] 

937) -> Query: 

938 project_ids_q = ( 

939 session.query(Participant.project_id) 

940 .filter(Participant.org_id == org_id) 

941 .filter(Participant.project_id.in_(project_ids)) 

942 .distinct() 

943 ) 

944 

945 return ( 

946 session.query( 

947 func.sum(Score.score * TotalWeighting.weight).label("total_weighted_score"), 

948 Project.title.label("project_title"), 

949 Project.org_id.label("project_owner"), 

950 Project.date_published, 

951 Project.id.label("project_id"), 

952 Issue.respondent_id, 

953 ) 

954 .join( 

955 TotalWeighting, 

956 Score.question_instance_id == TotalWeighting.question_instance_id, 

957 ) 

958 .join(Issue) 

959 .join(Project) 

960 .filter( 

961 Score.scoreset_id == "", 

962 TotalWeighting.weighting_set_id == None, # noqa: E711 

963 TotalWeighting.section_id == 0, 

964 Project.id.in_(project_ids_q), 

965 ) 

966 .group_by(Project.id, Score.issue_id) 

967 .order_by(Project.id.desc(), text("total_weighted_score desc")) 

968 ) 

969 

970 

971def question_instance_by_number( 

972 session: Session, project_id: int, qnode_number: str 

973) -> QuestionInstance: 

974 return ( 

975 session.query(QuestionInstance) 

976 .filter( 

977 QuestionInstance.number == qnode_number, 

978 QuestionInstance.project_id == project_id, 

979 ) 

980 .one() 

981 ) 

982 

983 

984def score_gaps( 

985 issue: Issue, 

986 weighting_set_id: Optional[int] = None, 

987 expose_weights: bool = True, 

988 show_gap_value: bool = True, 

989 debug: bool = False, 

990) -> List[Dict[str, Any]]: 

991 tmpl = get_template("sql/score_gap.sql") 

992 sql_script = text( 

993 tmpl.render( 

994 expose_weights=expose_weights, show_gap_value=show_gap_value, debug=debug 

995 ) 

996 ) 

997 

998 params = { 

999 "project_id": issue.project_id, 

1000 "issue_id": issue.id, 

1001 "weighting_set_id": weighting_set_id, 

1002 } 

1003 

1004 res: List[Dict[str, Any]] = [] 

1005 session = object_session(issue) 

1006 

1007 assert session is not None 

1008 

1009 if show_gap_value: 

1010 for q_row in session.execute(sql_script, params): 

1011 q_dict = dict(q_row._mapping) 

1012 q_dict["number"] = from_b36(q_row.number) 

1013 res.append(q_dict) 

1014 else: 

1015 # Don't show a numerical value for the gap, just =, - or + 

1016 for q_row in session.execute(sql_script, params): 

1017 q_dict = dict(q_row._mapping) 

1018 q_dict["number"] = from_b36(q_row.number) 

1019 score_gap = q_dict["score_gap"] 

1020 if score_gap is None: 

1021 q_dict["score_gap"] = "==" 

1022 elif score_gap > 0: 

1023 q_dict["score_gap"] = "+" 

1024 elif score_gap == 0: 

1025 q_dict["score_gap"] = "==" 

1026 else: 

1027 # score_gap < 1 

1028 q_dict["score_gap"] = "-" 

1029 res.append(q_dict) 

1030 

1031 res.sort(key=itemgetter("number")) 

1032 

1033 return res 

1034 

1035 

1036def search( 

1037 session: Session, 

1038 org_id: str, 

1039 search_term: str, 

1040 search_options: Sequence[str], 

1041 project_id: int, 

1042 offset: int, 

1043) -> List[Dict[str, Any]]: 

1044 """ 

1045 Run a search query. 

1046 'search_options' must be a sequence of strings. valid values: 

1047 "answers", "questions", "notes", "scoreComments" 

1048 """ 

1049 available_options = {"answers", "questions", "notes", "scoreComments"} 

1050 

1051 if not set(search_options) <= available_options: 

1052 raise ValueError( 

1053 f"search_options {search_options} contains values not in {available_options}" 

1054 ) 

1055 

1056 tmpl = get_template("sql/evaluator_search.sql") 

1057 sql = tmpl.render(options=set(search_options), project_id=project_id) 

1058 sql_text = text(sql) 

1059 params = { 

1060 "orgId": org_id, 

1061 "search_term": misc.clean_search_term(search_term), 

1062 "offset": int(offset), 

1063 } 

1064 hit_list = [] 

1065 for row in session.execute(sql_text, params=params).fetchall(): 

1066 doc = dict(row._mapping) 

1067 try: 

1068 doc["object_ref"] = from_b36(row.object_ref) 

1069 except TypeError: 

1070 pass 

1071 hit_list.append(doc) 

1072 return hit_list 

1073 

1074 

1075def project_users(user: User, project_id: int, restricted_users_only: bool) -> Query: 

1076 session = object_session(user) 

1077 assert session is not None 

1078 uq = session.query(User).join(Organisation).order_by(Organisation.id, User.id) 

1079 

1080 if user.organisation.is_consultant: 

1081 uq = uq.join(Participant).filter(Participant.project_id == project_id) 

1082 else: 

1083 uq = uq.filter(User.organisation == user.organisation) 

1084 

1085 if restricted_users_only: 

1086 uq = uq.filter(User.type == "restricted") 

1087 

1088 return uq 

1089 

1090 

1091def _ws_weights( 

1092 project: Project, weightset_id: int, parent_section_id: Optional[int] = None 

1093) -> Dict[str, Any]: 

1094 "Get Weights for Questions and Sections for the given weightset_id" 

1095 ws = project.weighting_sets.filter(WeightingSet.id == weightset_id).one() 

1096 

1097 questions = [] 

1098 sections = [] 

1099 

1100 wq = ws.weightings 

1101 

1102 if parent_section_id is not None: 

1103 wq = ( 

1104 wq.outerjoin(Section, Weighting.section_id == Section.id) 

1105 .outerjoin( 

1106 QuestionInstance, Weighting.question_instance_id == QuestionInstance.id 

1107 ) 

1108 .filter( 

1109 or_( 

1110 Section.parent_id == parent_section_id, 

1111 QuestionInstance.section_id == parent_section_id, 

1112 ) 

1113 ) 

1114 ) 

1115 

1116 for w in wq.with_entities( 

1117 Weighting.section_id, Weighting.question_instance_id, Weighting.value 

1118 ): 

1119 if w.section_id: 

1120 sections.append(dict(section_id=w.section_id, weight=w.value)) 

1121 else: 

1122 questions.append(dict(question_id=w.question_instance_id, weight=w.value)) 

1123 

1124 return {"questions": questions, "sections": sections} 

1125 

1126 

1127def _default_weights( 

1128 project: Project, parent_section_id: Optional[int] = None 

1129) -> Dict[str, Any]: 

1130 "Default weights for all sections and questions in the project" 

1131 qiq = project.questions.options( 

1132 lazyload(QuestionInstance.question_def).lazyload(QuestionDefinition.elements) 

1133 ) 

1134 siq = project.sections 

1135 

1136 if parent_section_id is not None: 

1137 qiq = qiq.filter(QuestionInstance.section_id == parent_section_id) 

1138 siq = siq.filter(Section.parent_id == parent_section_id) 

1139 

1140 return { 

1141 "questions": [dict(question_id=q.id, weight=q.weight) for q in qiq], 

1142 "sections": [ 

1143 dict(section_id=s.id, weight=s._weight) 

1144 for s in siq.with_entities(Section.id, Section._weight) 

1145 ], 

1146 } 

1147 

1148 

1149def weightings_dict( 

1150 project: Project, 

1151 weightset_id: Optional[int] = None, 

1152 parent_section_id: Optional[int] = None, 

1153) -> Dict[str, Any]: 

1154 """ 

1155 Get a dictionary of weightings for all sections and 

1156 questions in the given project, 

1157 { 

1158 questions: [{question_id: question_weight}] 

1159 sections: [{section_id: section_weight}] 

1160 }, 

1161 for the given project and weightset_id 

1162 """ 

1163 if weightset_id is None: 

1164 return _default_weights(project, parent_section_id=parent_section_id) 

1165 else: 

1166 return _ws_weights(project, weightset_id, parent_section_id=parent_section_id) 

1167 

1168 

1169def total_weightings_dict( 

1170 project: Project, weightset_id: Optional[int] = None 

1171) -> Dict[str, Any]: 

1172 """ 

1173 Get a dictionary of *total* weightings for each section 

1174 and question in the given project { 

1175 weightset: {id, name} 

1176 questions: {question_id: total_question_weight} 

1177 sections: {section_id: total_section_weight} 

1178 }, 

1179 for the given project and weightset_id 

1180 """ 

1181 q = project.total_weightings.filter( 

1182 TotalWeighting.weighting_set_id == weightset_id 

1183 ).with_entities( 

1184 TotalWeighting.question_instance_id, 

1185 TotalWeighting.section_id, 

1186 TotalWeighting.weight, 

1187 ) 

1188 

1189 res: dict[str, list[dict]] = {"questions": [], "sections": []} 

1190 sections = res["sections"] 

1191 questions = res["questions"] 

1192 

1193 for tw in q: 

1194 if tw.section_id: # zero is the default 

1195 sections.append({"section_id": tw.section_id, "weight": tw.weight}) 

1196 else: 

1197 questions.append( 

1198 {"question_id": tw.question_instance_id, "weight": tw.weight} 

1199 ) 

1200 

1201 return res 

1202 

1203 

1204def sec_total_weighting(section: Section, weightset_id: Optional[int] = None) -> float: 

1205 "Lookup total weighting as a float for the given section and weightset" 

1206 session = object_session(section) 

1207 assert session is not None 

1208 query = ( 

1209 session.query(TotalWeighting.weight) 

1210 .filter(TotalWeighting.section_id == section.id) 

1211 .filter(TotalWeighting.weighting_set_id == weightset_id) 

1212 ) 

1213 tw_value = query.scalar() 

1214 if tw_value is None: 

1215 log.warning(f"Replacing totals of weight set #{weightset_id} for {section}") 

1216 section.project.delete_total_weights(weighting_set_id=weightset_id) 

1217 section.project.save_total_weights(weighting_set_id=weightset_id) 

1218 tw_value = query.scalar() 

1219 return tw_value 

1220 

1221 

1222ElBundle: Bundle = Bundle( 

1223 "element_bundle", 

1224 QElement.id, 

1225 QElement.label, 

1226 QElement.row, 

1227 QElement.col, 

1228 QElement.colspan, 

1229 QElement.mandatory, 

1230 QElement.height, 

1231 QElement.width, 

1232 QElement.regexp, 

1233 QElement.el_type.label("el_type"), 

1234 QElement.rowspan, 

1235 QElement.choices, 

1236 QuestionInstance.id.label("question_id"), 

1237 QuestionInstance.number, 

1238 QuestionDefinition.title, 

1239 single_entity=True, 

1240) 

1241 

1242QAttBundle: Bundle = Bundle( 

1243 "qatt_bundle", 

1244 QAttachment.id, 

1245 QAttachment.element_id, 

1246 QAttachment.size_bytes, 

1247 QAttachment.filename, 

1248 QAttachment.mimetype, 

1249 single_entity=True, 

1250) 

1251 

1252 

1253def response_states( 

1254 issue: Issue, section_id: Optional[int] = None 

1255) -> Dict[int, QuestionResponseState]: 

1256 """ 

1257 Returns a dictionary of QuestionResponseState objects keyed by question_instance_id 

1258 """ 

1259 session = object_session(issue) 

1260 assert session is not None 

1261 QRS = QuestionResponseState 

1262 q = session.query(QRS).join(QuestionInstance).filter(QRS.issue == issue) 

1263 

1264 if section_id is not None: 

1265 q = q.filter(QuestionInstance.section_id == section_id) 

1266 

1267 return {qrs.question_instance_id: qrs for qrs in q} 

1268 

1269 

1270def answered_questions(issue: Issue, section_id: int) -> Iterable[Dict[str, Any]]: 

1271 session = object_session(issue) 

1272 assert session is not None 

1273 

1274 aq = issue.answers.join(QuestionInstance).filter( 

1275 QuestionInstance.section_id == section_id 

1276 ) 

1277 

1278 answer_lookup = {a.element_id: a.answer for a in aq} 

1279 

1280 q_filter = QuestionInstance.section_id == section_id 

1281 atq = ( 

1282 session.query(QAttBundle) 

1283 .join(QElement) 

1284 .join(QuestionDefinition) 

1285 .join(QuestionInstance) 

1286 .filter(QuestionInstance.section_id == section_id) 

1287 ) 

1288 

1289 qatt_lookup = {qa.element_id: qa._asdict() for qa in atq} 

1290 

1291 response_state_lookup = response_states(issue, section_id=section_id) 

1292 

1293 q = ( 

1294 session.query(ElBundle) 

1295 .join(QuestionDefinition) 

1296 .join(QuestionInstance) 

1297 .filter(q_filter) 

1298 .order_by( 

1299 QuestionInstance.number, 

1300 QuestionInstance.position, 

1301 QElement.row, 

1302 QElement.col, 

1303 ) 

1304 ) 

1305 

1306 return iter_quick_questions(q, answer_lookup, qatt_lookup, response_state_lookup) 

1307 

1308 

1309def iter_quick_questions( 

1310 elbundle_query: Query, 

1311 answer_lookup: Dict[int, Any], 

1312 qatt_lookup: Dict[int, Any], 

1313 response_state_lookup: Dict[int, QuestionResponseState], 

1314) -> Iterable[Dict[str, Any]]: 

1315 """ 

1316 Builds question dictionaries from ElBundle rows 

1317 

1318 This is about twice as fast as approaches going via the ORM 

1319 """ 

1320 

1321 answerables = {"TX", "CB", "CR", "CC", "AT"} 

1322 labelled = {"LB", "QA", "CB"} 

1323 current_question = None 

1324 current_question_id = None 

1325 # row_list = None 

1326 current_row = None 

1327 

1328 for el in elbundle_query: 

1329 el_type = el.el_type 

1330 

1331 if el.question_id != current_question_id: 

1332 # starting a new question 

1333 if current_question is not None: 

1334 # if not the first question, yield the previous one 

1335 yield current_question 

1336 

1337 current_question_id = el.question_id 

1338 row_list: list[list[dict]] = [] 

1339 current_row = -1 

1340 current_question = { 

1341 "title": el.title, 

1342 "id": current_question_id, 

1343 "number": el.number.dotted, 

1344 "elements": row_list, 

1345 } 

1346 rsjs = response_state_lookup[current_question_id].as_dict() 

1347 current_question["response_state"] = rsjs 

1348 

1349 if el.row > current_row: 

1350 row_list.append([]) 

1351 

1352 current_row = el.row 

1353 

1354 el_dict = { 

1355 "id": el.id, 

1356 "el_type": el.el_type, 

1357 "colspan": el.colspan, 

1358 "rowspan": el.rowspan, 

1359 } 

1360 

1361 if el_type in labelled: 

1362 el_dict["label"] = el.label 

1363 if el_type == "QA" and el.id in qatt_lookup: 

1364 el_dict["attachment"] = qatt_lookup[el.id] 

1365 

1366 if el_type in answerables: 

1367 el_dict["answer"] = answer_lookup.get(el.id, None) 

1368 

1369 if el_type in ("CR", "CC"): 

1370 el_dict["choices"] = [{"label": c["label"]} for c in el.choices] 

1371 

1372 elif el_type == "TX": 

1373 el_dict["height"] = el.height 

1374 el_dict["width"] = el.width 

1375 el_dict["regexp"] = el.regexp 

1376 

1377 if el_type != "CB": 

1378 el_dict["mandatory"] = el.mandatory 

1379 

1380 row_list[-1].append(el_dict) 

1381 

1382 if current_question is not None: 

1383 yield current_question # don't forget the last one!! 

1384 

1385 

1386def answering_stats(issue: Issue, section: Section) -> List[Dict[str, Any]]: 

1387 QR = QuestionResponseState 

1388 

1389 q = ( 

1390 select(QR.allocated_to, QR.status, func.count(QR.id).label("question_count")) 

1391 .join(Issue) 

1392 .select_from(QR) 

1393 .join(QuestionInstance) 

1394 .join(Section) 

1395 .filter(Section.number.startswith(section.number)) 

1396 .filter(Issue.id == issue.id) 

1397 .group_by(QR.allocated_to, QR.status) 

1398 ) 

1399 session = object_session(issue) 

1400 assert session is not None 

1401 

1402 return [ 

1403 dict( 

1404 status=si.status.name, 

1405 allocated_to=si.allocated_to, 

1406 question_count=si.question_count, 

1407 ) 

1408 for si in session.execute(q) 

1409 ] 

1410 

1411 

1412def issue_watchers(session: Session, issue: Issue) -> Query: 

1413 cols = ( 

1414 User.id.label("user_id"), 

1415 User.email, 

1416 User.fullname, 

1417 IssueWatchList.date_created.label("watching_since"), 

1418 ) 

1419 return ( 

1420 session.query(*cols) 

1421 .outerjoin( 

1422 IssueWatchList, 

1423 and_( 

1424 IssueWatchList.user_id == User.id, IssueWatchList.issue_id == issue.id 

1425 ), 

1426 ) 

1427 .filter(User.organisation == issue.respondent) 

1428 ) 

1429 

1430 

1431def project_watchers(session: Session, project: Project) -> Query: 

1432 cols = ( 

1433 User.id.label("user_id"), 

1434 User.email, 

1435 User.fullname, 

1436 ProjectWatchList.date_created.label("watching_since"), 

1437 ) 

1438 return ( 

1439 session.query(*cols) 

1440 .join(ProjectWatchList) 

1441 .filter(ProjectWatchList.project_id == project.id) 

1442 ) 

1443 

1444 

1445def _question_ids_q( 

1446 session: Session, target_project_id: int, sec_number: Optional[str] = None 

1447) -> Query: 

1448 # IDs of all question definitions in target (destination) issue 

1449 sel = session.query(QuestionInstance.question_def_id).filter( 

1450 QuestionInstance.project_id == target_project_id 

1451 ) 

1452 if sec_number is not None: 

1453 sel = sel.filter(QuestionInstance.number.startswith(str(sec_number))) 

1454 return sel 

1455 

1456 

1457def importable_answers( 

1458 session: Session, target_issue: Issue, sec_number: Optional[str] = None 

1459) -> Query: 

1460 """Get a count of answered questions in source_issue that can be imported into 

1461 target_issue 

1462 """ 

1463 

1464 sel_al = ( 

1465 _question_ids_q(session, target_issue.project_id, sec_number=sec_number) 

1466 .subquery() 

1467 .alias() 

1468 ) 

1469 

1470 q = ( 

1471 session.query( 

1472 Issue.id.label("issue_id"), 

1473 Issue.issue_date, 

1474 Issue.submitted_date, 

1475 Project.title, 

1476 func.count(QuestionResponseState.id).label("question_count"), 

1477 ) 

1478 .join(QuestionResponseState.question_instance) 

1479 .join(Issue) 

1480 .join(Project) 

1481 .join(sel_al, sel_al.c.question_def_id == QuestionInstance.question_def_id) 

1482 .filter( 

1483 Issue.respondent_id == target_issue.respondent_id, 

1484 Issue.id != target_issue.id, 

1485 Issue.status.in_(["Accepted", "Submitted", "Updateable"]), 

1486 QuestionResponseState.status.in_( 

1487 [ResponseStatus.ANSWERED, ResponseStatus.APPROVED] 

1488 ), 

1489 ) 

1490 .group_by(Issue.id) 

1491 .order_by(desc("question_count")) 

1492 ) 

1493 

1494 return q 

1495 

1496 

1497def importable_answer_lookup( 

1498 session: Session, source_issue: Issue, target_project: Project, sec_number: str 

1499) -> Query: 

1500 sel_al = ( 

1501 _question_ids_q(session, target_project.id, sec_number=sec_number) 

1502 .subquery() 

1503 .alias() 

1504 ) 

1505 lq = ( 

1506 session.query( 

1507 Answer.element_id, 

1508 Answer.answer, 

1509 QuestionDefinition.id.label("question_def_id"), 

1510 ) 

1511 .join(QElement, Answer.element_id == QElement.id) 

1512 .join(QuestionDefinition) 

1513 .join(QuestionInstance) 

1514 .join(QuestionResponseState) 

1515 .join(sel_al, sel_al.c.question_def_id == QuestionInstance.question_def_id) 

1516 .filter( 

1517 Answer.issue == source_issue, 

1518 QuestionInstance.project_id == source_issue.project_id, 

1519 QuestionResponseState.issue == source_issue, 

1520 ) 

1521 .filter( 

1522 QuestionResponseState.status.in_( 

1523 [ResponseStatus.ANSWERED, ResponseStatus.APPROVED] 

1524 ) 

1525 ) 

1526 ) 

1527 return lq 

1528 

1529 

1530def latest_event(session: Session) -> Optional[AuditEvent]: 

1531 assert session is not None 

1532 return session.query(AuditEvent).order_by(AuditEvent.id.desc()).first() 

1533 

1534 

1535def category_for_user(session: Session, user: User, category_id: int) -> Category: 

1536 """ 

1537 Fetch the category with given id checking that it belongs to the user's organisation 

1538 Returns NoResultFound if category id or user's org_id are wrong 

1539 """ 

1540 assert session is not None 

1541 q = ( 

1542 select(Category) 

1543 .where(Category.id == category_id) 

1544 .where(Category.organisation == user.organisation) 

1545 ) 

1546 return session.execute(q).scalar_one() 

1547 

1548 

1549def questionnaire_stats(session: Session, project_id: int) -> Dict[str, Any]: 

1550 aec = ( 

1551 session.query(QElement.el_type, func.count(QElement.id)) 

1552 .join(QuestionDefinition) 

1553 .join(QuestionInstance) 

1554 .filter(QuestionInstance.project_id == project_id) 

1555 .group_by(QElement.el_type) 

1556 ) 

1557 

1558 el_counts: dict[str, int] = dict((el_type, count) for el_type, count in aec) 

1559 answerable_count = 0 

1560 

1561 for answerable_type in QElement.answerable_types: 

1562 if answerable_type in el_counts: 

1563 answerable_count += el_counts[answerable_type] 

1564 

1565 el_counts["answerable_elements"] = answerable_count 

1566 qi_count = ( 

1567 session.query(QuestionInstance.id).filter_by(project_id=project_id).count() 

1568 ) 

1569 sec_count = session.query(Section.id).filter_by(project_id=project_id).count() 

1570 

1571 return {"questions": qi_count, "sections": sec_count, "elements": el_counts} 

1572 

1573 

1574def unanswered_mandatory(issue: Issue) -> Query: 

1575 """ 

1576 Return a query fetching QuestionInstance id and number fields for unanswered, mandatory 

1577 questions for the given issue 

1578 """ 

1579 session = object_session(issue) 

1580 assert session is not None 

1581 return ( 

1582 session.query(QuestionInstance.id, QuestionInstance.number) 

1583 .join(QuestionDefinition) 

1584 .join(QElement) 

1585 .join(QuestionResponseState) 

1586 .filter(QElement.mandatory == 1) 

1587 .filter(QuestionInstance.project == issue.project) 

1588 .filter(QuestionResponseState.status == ResponseStatus.NOT_ANSWERED) 

1589 .filter(QuestionResponseState.issue_id == issue.id) 

1590 .distinct() 

1591 ) 

1592 

1593 

1594def edges_for_org_query(session: Session, org_id: str) -> Query: 

1595 return ( 

1596 session.query(Edge) 

1597 .join(RelationshipType) 

1598 .join(Organisation) 

1599 .filter(RelationshipType.org_id == org_id) 

1600 ) 

1601 

1602 

1603def webhooks_for_event(event: AuditEvent) -> Query[WebhookSubscription]: 

1604 session = object_session(event) 

1605 assert session is not None 

1606 q = ( 

1607 session.query(WebhookSubscription) 

1608 .join(Organisation) 

1609 .join(EventOrgACL) 

1610 .filter(EventOrgACL.event_id == event.id) 

1611 .filter(WebhookSubscription.event_type == event.event_type) 

1612 ) 

1613 return q 

1614 

1615 

1616def visible_nodes( 

1617 session: Session, 

1618 section: Section, 

1619 with_questions: bool = True, 

1620 with_ancestors: bool = False, 

1621) -> Query: 

1622 if with_ancestors: 

1623 regex = NumberString.visible_relatives_regex(section.number) 

1624 else: 

1625 regex = f"^{section.number}.{ 0,2} $" 

1626 

1627 q = ( 

1628 session.query( 

1629 Section.id, 

1630 Section.title, 

1631 Section.description, 

1632 literal("section").label("type"), 

1633 Section.parent_id, 

1634 Section.number.label("number"), 

1635 (func.length(Section.number) / 2).label("depth"), 

1636 ) 

1637 .filter(Section.project_id == section.project_id) 

1638 .filter(Section.number.op("REGEXP")(regex)) 

1639 ) 

1640 

1641 if with_questions: 

1642 aq = ( 

1643 session.query( 

1644 QuestionInstance.id, 

1645 QuestionDefinition.title, 

1646 literal("").label("description"), 

1647 literal("question").label("type"), 

1648 QuestionInstance.section_id.label("parent_id"), 

1649 QuestionInstance.number.label("number"), 

1650 (func.length(QuestionInstance.number) / 2).label("depth"), 

1651 ) 

1652 .join(QuestionDefinition) 

1653 .filter(QuestionInstance.project_id == section.project_id) 

1654 .filter(QuestionInstance.section_id == section.id) 

1655 ) 

1656 q = q.union(aq) 

1657 return q.order_by("number") 

1658 

1659 

1660def duplicated_qdefs( 

1661 session: Session, 

1662 destination_project_id: int, 

1663 source_project_id: int, 

1664 src_sections: List[Section], 

1665 src_questions: List[QuestionInstance], 

1666) -> List[QuestionInstance]: 

1667 """ 

1668 Returns a list of QuestionInstance objects belonging to the Project given by 

1669 destination_project_id and whose QuestionDefintion is shared with QuestionInstances in 

1670 src_questions or src_sections 

1671 """ 

1672 # Find QuestionDefinition ids for all questions to be imported from the source project 

1673 condition = [] 

1674 if src_sections: 

1675 regex = "|".join([f"^{s.number}" for s in src_sections]) 

1676 condition.append( 

1677 and_( 

1678 QuestionInstance.number.op("REGEXP")(regex), 

1679 QuestionInstance.project_id == source_project_id, 

1680 ) 

1681 ) 

1682 

1683 if src_questions: 

1684 condition.append(QuestionInstance.id.in_([q.id for q in src_questions])) 

1685 

1686 source_qids = select(QuestionInstance.question_def_id).where(or_(*condition)) 

1687 

1688 # Find QuestionInstances in the destination project that share the same QuestionDefinitions 

1689 # as in the source project 

1690 qi_query = ( 

1691 session.query(QuestionInstance) 

1692 .filter(QuestionInstance.question_def_id.in_(source_qids)) 

1693 .filter(QuestionInstance.project_id == destination_project_id) 

1694 .options(lazyload(QuestionInstance.question_def)) 

1695 ) 

1696 

1697 return qi_query.all()