Coverage for rfpy/api/endpoints/reports/yesnoqual.py: 100%

90 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-04-24 10:52 +0000

1from io import BytesIO 

2from functools import partial 

3from collections import defaultdict 

4from itertools import groupby 

5from operator import attrgetter 

6 

7import xlsxwriter # type: ignore[import] 

8 

9from rfpy.api import fetch, validate 

10from rfpy.suxint import http 

11from rfpy.auth import perms 

12from rfpy.web.mime import MimeTypes 

13from rfpy.web.response import XAccelResponse 

14from rfpy.model import Answer, QuestionInstance, Issue, Score, questionnaire 

15from rfpy.model.composite import QuestionMeta 

16 

17from .responses import attachment 

18 

19 

20@http 

21def get_project_report_yesno(session, user, project_id) -> XAccelResponse: 

22 """ 

23 Generate a spreadsheet report of answer to questions with a specific structure of 4 rows: 

24 

25 1. Question Body 

26 2. Multiple choice options, e.g. 'Yes; No; Yes, with Qualifications' 

27 3. Comments box header, e.g. 'Qualifications' 

28 4. Text input field 

29 

30 Questions not matching this structure are ignored 

31 """ 

32 

33 project = fetch.project(session, project_id) 

34 validate.check(user, perms.PROJECT_ACCESS, project=project, deny_restricted=True) 

35 

36 yes_no_signature = "c2367903b883c29f5007efcb1f751d0b" 

37 

38 query = ( 

39 project.questions.join(questionnaire.QuestionDefinition) 

40 .join(QuestionMeta) 

41 .filter(QuestionMeta.signature == yes_no_signature) 

42 .order_by(QuestionInstance.number) 

43 ) 

44 

45 questions = query.all() 

46 

47 answer_map: dict[int, dict] = defaultdict(dict) 

48 score_map: dict[int, dict] = defaultdict(dict) 

49 

50 if len(questions) > 0: 

51 q_id_set = {q.id for q in questions} 

52 answers = ( 

53 session.query(Answer) 

54 .join(Issue) 

55 .filter(Issue.project == project, Answer.question_instance_id.in_(q_id_set)) 

56 ) 

57 

58 for a in answers: 

59 answer_map[a.issue_id][a.element_id] = a 

60 

61 scores = ( 

62 session.query(Score) 

63 .join(Issue) 

64 .filter( 

65 Issue.project == project, 

66 Score.scoreset_id == "", 

67 Score.question_instance_id.in_(q_id_set), 

68 ) 

69 ) 

70 for s in scores: 

71 score_map[s.issue_id][s.question_instance_id] = s 

72 

73 buff = BytesIO() 

74 workbook = xlsxwriter.Workbook(buff, {"in_memory": True}) 

75 try: 

76 worksheet = workbook.add_worksheet() 

77 

78 bold_format = workbook.add_format() 

79 bold_format.set_bold() 

80 

81 wrap_format = workbook.add_format() 

82 wrap_format.set_text_wrap() 

83 

84 header = partial(worksheet.write, 0) 

85 headings = ( 

86 "Question Number", 

87 "Title", 

88 "Question Body", 

89 "Respondent", 

90 "Yes/No", 

91 "Comments", 

92 "Score", 

93 "Weight", 

94 ) 

95 for col, heading in enumerate(headings): 

96 header(col, heading, bold_format) 

97 

98 if len(questions) > 0: 

99 row = 0 

100 

101 for instance in questions: 

102 qdef = instance.question_def 

103 els = instance.question_def.elements 

104 label = els[0].label 

105 

106 try: 

107 weight = int(instance.weight) 

108 except TypeError: # pragma: no cover 

109 weight = 1 

110 

111 for issue in project.scoreable_issues: 

112 row = row + 1 

113 cell = partial(worksheet.write, row) 

114 issue_map = answer_map[issue.id] 

115 yes_no = issue_map.get(els[1].id, None) 

116 if yes_no is not None: 

117 yes_no = yes_no.answer 

118 

119 comments = issue_map.get(els[3].id, None) 

120 if comments is not None: 

121 comments = comments.answer 

122 try: 

123 score = score_map[issue.id][instance.id].score 

124 except KeyError: # pragma: no cover 

125 score = None 

126 

127 cell(0, instance.safe_number) 

128 cell(1, qdef.title) 

129 cell(2, label) 

130 cell(3, issue.respondent.name) 

131 cell(4, yes_no) 

132 cell(5, comments, wrap_format) 

133 cell(6, score) 

134 cell(7, weight) 

135 finally: 

136 workbook.close() 

137 

138 buff.seek(0) 

139 return attachment(buff.read(), MimeTypes.XLSX.value, f"{project.title[:25]}.xlsx") 

140 

141 

142@http 

143def get_scoretotals(session, user, project_ids): 

144 """ 

145 Get a list of total scores by Project and Respondent for the projects given 

146 by the ids in project_ids 

147 """ 

148 user.check_permission(perms.ISSUE_VIEW_SCORES) 

149 get_pid = attrgetter("project_id") 

150 rows = [] 

151 score_totals = fetch.score_totals_by_project( 

152 session, user.organisation.id, project_ids 

153 ) 

154 for pid, items in groupby(score_totals, get_pid): 

155 record = {"project_id": pid, "scores": []} 

156 for item in items: 

157 if "project_title" not in record: 

158 record["project_title"] = item.project_title 

159 record["owner"] = item.project_owner 

160 record["published"] = item.date_published 

161 record["scores"].append( 

162 { 

163 "score": round(item.total_weighted_score, ndigits=2), 

164 "respondent": item.respondent_id, 

165 } 

166 ) 

167 rows.append(record) 

168 return rows