Coverage for rfpy/api/endpoints/reports/yesnoqual.py: 100%
89 statements
« prev ^ index » next coverage.py v7.0.1, created at 2022-12-31 16:00 +0000
« prev ^ index » next coverage.py v7.0.1, created at 2022-12-31 16:00 +0000
1from io import BytesIO
2from functools import partial
3from collections import defaultdict
4from itertools import groupby
5from operator import attrgetter
7import xlsxwriter
9from rfpy.api import fetch, validate
10from rfpy.suxint import http
11from rfpy.auth import perms
12from rfpy.web.mime import MimeTypes
13from rfpy.model import Answer, QuestionInstance, Issue, Score, questionnaire
14from rfpy.model.composite import QuestionMeta
16from .responses import attachment
19@http
20def get_project_report_yesno(session, user, project_id) -> MimeTypes.XLSX:
21 '''
22 Generate a spreadsheet report of answer to questions with a specific structure of 4 rows:
24 1. Question Body
25 2. Multiple choice options, e.g. 'Yes; No; Yes, with Qualifications'
26 3. Comments box header, e.g. 'Qualifications'
27 4. Text input field
29 Questions not matching this structure are ignored
30 '''
32 project = fetch.project(session, project_id)
33 validate.check(user, perms.PROJECT_ACCESS, project=project, deny_restricted=True)
35 yes_no_signature = 'c2367903b883c29f5007efcb1f751d0b'
37 query = (project.questions
38 .join(questionnaire.QuestionDefinition, QuestionMeta)
39 .filter(QuestionMeta.signature == yes_no_signature)
40 .order_by(QuestionInstance.number))
42 questions = query.all()
44 answer_map = defaultdict(dict)
45 score_map = defaultdict(dict)
47 if len(questions) > 0:
49 q_id_set = {q.id for q in questions}
50 answers = session.query(Answer)\
51 .join(Issue)\
52 .filter(
53 Issue.project == project,
54 Answer.question_instance_id.in_(q_id_set))
56 for a in answers:
57 answer_map[a.issue_id][a.element_id] = a
59 scores = session.query(Score)\
60 .join(Issue)\
61 .filter(
62 Issue.project == project,
63 Score.scoreset_id == '',
64 Score.question_instance_id.in_(q_id_set))
65 for s in scores:
66 score_map[s.issue_id][s.question_instance_id] = s
68 buff = BytesIO()
69 workbook = xlsxwriter.Workbook(buff, {'in_memory': True})
70 try:
71 worksheet = workbook.add_worksheet()
73 bold_format = workbook.add_format()
74 bold_format.set_bold()
76 wrap_format = workbook.add_format()
77 wrap_format.set_text_wrap()
79 header = partial(worksheet.write, 0)
80 headings = ('Question Number', 'Title', 'Question Body', 'Respondent',
81 'Yes/No', 'Comments', 'Score', 'Weight')
82 for col, heading in enumerate(headings):
83 header(col, heading, bold_format)
85 if len(questions) > 0:
86 row = 0
88 for instance in questions:
89 qdef = instance.question_def
90 els = instance.question_def.elements
91 label = els[0].label
93 try:
94 weight = int(instance.weight)
95 except TypeError: # pragma: no cover
96 weight = ''
98 for issue in project.scoreable_issues:
99 row = row + 1
100 cell = partial(worksheet.write, row)
101 issue_map = answer_map[issue.id]
102 yes_no = issue_map.get(els[1].id, None)
103 if yes_no is not None:
104 yes_no = yes_no.answer
106 comments = issue_map.get(els[3].id, None)
107 if comments is not None:
108 comments = comments.answer
109 try:
110 score = score_map[issue.id][instance.id].score
111 except KeyError: # pragma: no cover
112 score = None
114 cell(0, instance.safe_number)
115 cell(1, qdef.title)
116 cell(2, label)
117 cell(3, issue.respondent.name)
118 cell(4, yes_no)
119 cell(5, comments, wrap_format)
120 cell(6, score)
121 cell(7, weight)
122 finally:
123 workbook.close()
125 buff.seek(0)
126 return attachment(buff.read(), MimeTypes.XLSX.value, f'{project.title[:25]}.xlsx')
129@http
130def get_scoretotals(session, user, project_ids):
131 '''
132 Get a list of total scores by Project and Respondent for the projects given
133 by the ids in project_ids
134 '''
135 user.check_permission(perms.ISSUE_VIEW_SCORES)
136 get_pid = attrgetter('project_id')
137 rows = []
138 score_totals = fetch.score_totals_by_project(session, user.organisation.id, project_ids)
139 for pid, items in groupby(score_totals, get_pid):
140 record = {'project_id': pid, 'scores': []}
141 for item in items:
142 if 'project_title' not in record:
143 record['project_title'] = item.project_title
144 record['owner'] = item.project_owner
145 record['published'] = item.date_published
146 record['scores'].append(
147 {'score': round(item.total_weighted_score, ndigits=2),
148 'respondent': item.respondent_id})
149 rows.append(record)
150 return rows