Coverage for rfpy/api/endpoints/reports/yesnoqual.py: 100%
90 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-24 10:52 +0000
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-24 10:52 +0000
1from io import BytesIO
2from functools import partial
3from collections import defaultdict
4from itertools import groupby
5from operator import attrgetter
7import xlsxwriter # type: ignore[import]
9from rfpy.api import fetch, validate
10from rfpy.suxint import http
11from rfpy.auth import perms
12from rfpy.web.mime import MimeTypes
13from rfpy.web.response import XAccelResponse
14from rfpy.model import Answer, QuestionInstance, Issue, Score, questionnaire
15from rfpy.model.composite import QuestionMeta
17from .responses import attachment
20@http
21def get_project_report_yesno(session, user, project_id) -> XAccelResponse:
22 """
23 Generate a spreadsheet report of answer to questions with a specific structure of 4 rows:
25 1. Question Body
26 2. Multiple choice options, e.g. 'Yes; No; Yes, with Qualifications'
27 3. Comments box header, e.g. 'Qualifications'
28 4. Text input field
30 Questions not matching this structure are ignored
31 """
33 project = fetch.project(session, project_id)
34 validate.check(user, perms.PROJECT_ACCESS, project=project, deny_restricted=True)
36 yes_no_signature = "c2367903b883c29f5007efcb1f751d0b"
38 query = (
39 project.questions.join(questionnaire.QuestionDefinition)
40 .join(QuestionMeta)
41 .filter(QuestionMeta.signature == yes_no_signature)
42 .order_by(QuestionInstance.number)
43 )
45 questions = query.all()
47 answer_map: dict[int, dict] = defaultdict(dict)
48 score_map: dict[int, dict] = defaultdict(dict)
50 if len(questions) > 0:
51 q_id_set = {q.id for q in questions}
52 answers = (
53 session.query(Answer)
54 .join(Issue)
55 .filter(Issue.project == project, Answer.question_instance_id.in_(q_id_set))
56 )
58 for a in answers:
59 answer_map[a.issue_id][a.element_id] = a
61 scores = (
62 session.query(Score)
63 .join(Issue)
64 .filter(
65 Issue.project == project,
66 Score.scoreset_id == "",
67 Score.question_instance_id.in_(q_id_set),
68 )
69 )
70 for s in scores:
71 score_map[s.issue_id][s.question_instance_id] = s
73 buff = BytesIO()
74 workbook = xlsxwriter.Workbook(buff, {"in_memory": True})
75 try:
76 worksheet = workbook.add_worksheet()
78 bold_format = workbook.add_format()
79 bold_format.set_bold()
81 wrap_format = workbook.add_format()
82 wrap_format.set_text_wrap()
84 header = partial(worksheet.write, 0)
85 headings = (
86 "Question Number",
87 "Title",
88 "Question Body",
89 "Respondent",
90 "Yes/No",
91 "Comments",
92 "Score",
93 "Weight",
94 )
95 for col, heading in enumerate(headings):
96 header(col, heading, bold_format)
98 if len(questions) > 0:
99 row = 0
101 for instance in questions:
102 qdef = instance.question_def
103 els = instance.question_def.elements
104 label = els[0].label
106 try:
107 weight = int(instance.weight)
108 except TypeError: # pragma: no cover
109 weight = 1
111 for issue in project.scoreable_issues:
112 row = row + 1
113 cell = partial(worksheet.write, row)
114 issue_map = answer_map[issue.id]
115 yes_no = issue_map.get(els[1].id, None)
116 if yes_no is not None:
117 yes_no = yes_no.answer
119 comments = issue_map.get(els[3].id, None)
120 if comments is not None:
121 comments = comments.answer
122 try:
123 score = score_map[issue.id][instance.id].score
124 except KeyError: # pragma: no cover
125 score = None
127 cell(0, instance.safe_number)
128 cell(1, qdef.title)
129 cell(2, label)
130 cell(3, issue.respondent.name)
131 cell(4, yes_no)
132 cell(5, comments, wrap_format)
133 cell(6, score)
134 cell(7, weight)
135 finally:
136 workbook.close()
138 buff.seek(0)
139 return attachment(buff.read(), MimeTypes.XLSX.value, f"{project.title[:25]}.xlsx")
142@http
143def get_scoretotals(session, user, project_ids):
144 """
145 Get a list of total scores by Project and Respondent for the projects given
146 by the ids in project_ids
147 """
148 user.check_permission(perms.ISSUE_VIEW_SCORES)
149 get_pid = attrgetter("project_id")
150 rows = []
151 score_totals = fetch.score_totals_by_project(
152 session, user.organisation.id, project_ids
153 )
154 for pid, items in groupby(score_totals, get_pid):
155 record = {"project_id": pid, "scores": []}
156 for item in items:
157 if "project_title" not in record:
158 record["project_title"] = item.project_title
159 record["owner"] = item.project_owner
160 record["published"] = item.date_published
161 record["scores"].append(
162 {
163 "score": round(item.total_weighted_score, ndigits=2),
164 "respondent": item.respondent_id,
165 }
166 )
167 rows.append(record)
168 return rows