Coverage for rfpy/api/endpoints/reports/yesnoqual.py: 100%

89 statements  

« prev     ^ index     » next       coverage.py v7.0.1, created at 2022-12-31 16:00 +0000

1from io import BytesIO 

2from functools import partial 

3from collections import defaultdict 

4from itertools import groupby 

5from operator import attrgetter 

6 

7import xlsxwriter 

8 

9from rfpy.api import fetch, validate 

10from rfpy.suxint import http 

11from rfpy.auth import perms 

12from rfpy.web.mime import MimeTypes 

13from rfpy.model import Answer, QuestionInstance, Issue, Score, questionnaire 

14from rfpy.model.composite import QuestionMeta 

15 

16from .responses import attachment 

17 

18 

19@http 

20def get_project_report_yesno(session, user, project_id) -> MimeTypes.XLSX: 

21 ''' 

22 Generate a spreadsheet report of answer to questions with a specific structure of 4 rows: 

23 

24 1. Question Body 

25 2. Multiple choice options, e.g. 'Yes; No; Yes, with Qualifications' 

26 3. Comments box header, e.g. 'Qualifications' 

27 4. Text input field 

28 

29 Questions not matching this structure are ignored 

30 ''' 

31 

32 project = fetch.project(session, project_id) 

33 validate.check(user, perms.PROJECT_ACCESS, project=project, deny_restricted=True) 

34 

35 yes_no_signature = 'c2367903b883c29f5007efcb1f751d0b' 

36 

37 query = (project.questions 

38 .join(questionnaire.QuestionDefinition, QuestionMeta) 

39 .filter(QuestionMeta.signature == yes_no_signature) 

40 .order_by(QuestionInstance.number)) 

41 

42 questions = query.all() 

43 

44 answer_map = defaultdict(dict) 

45 score_map = defaultdict(dict) 

46 

47 if len(questions) > 0: 

48 

49 q_id_set = {q.id for q in questions} 

50 answers = session.query(Answer)\ 

51 .join(Issue)\ 

52 .filter( 

53 Issue.project == project, 

54 Answer.question_instance_id.in_(q_id_set)) 

55 

56 for a in answers: 

57 answer_map[a.issue_id][a.element_id] = a 

58 

59 scores = session.query(Score)\ 

60 .join(Issue)\ 

61 .filter( 

62 Issue.project == project, 

63 Score.scoreset_id == '', 

64 Score.question_instance_id.in_(q_id_set)) 

65 for s in scores: 

66 score_map[s.issue_id][s.question_instance_id] = s 

67 

68 buff = BytesIO() 

69 workbook = xlsxwriter.Workbook(buff, {'in_memory': True}) 

70 try: 

71 worksheet = workbook.add_worksheet() 

72 

73 bold_format = workbook.add_format() 

74 bold_format.set_bold() 

75 

76 wrap_format = workbook.add_format() 

77 wrap_format.set_text_wrap() 

78 

79 header = partial(worksheet.write, 0) 

80 headings = ('Question Number', 'Title', 'Question Body', 'Respondent', 

81 'Yes/No', 'Comments', 'Score', 'Weight') 

82 for col, heading in enumerate(headings): 

83 header(col, heading, bold_format) 

84 

85 if len(questions) > 0: 

86 row = 0 

87 

88 for instance in questions: 

89 qdef = instance.question_def 

90 els = instance.question_def.elements 

91 label = els[0].label 

92 

93 try: 

94 weight = int(instance.weight) 

95 except TypeError: # pragma: no cover 

96 weight = '' 

97 

98 for issue in project.scoreable_issues: 

99 row = row + 1 

100 cell = partial(worksheet.write, row) 

101 issue_map = answer_map[issue.id] 

102 yes_no = issue_map.get(els[1].id, None) 

103 if yes_no is not None: 

104 yes_no = yes_no.answer 

105 

106 comments = issue_map.get(els[3].id, None) 

107 if comments is not None: 

108 comments = comments.answer 

109 try: 

110 score = score_map[issue.id][instance.id].score 

111 except KeyError: # pragma: no cover 

112 score = None 

113 

114 cell(0, instance.safe_number) 

115 cell(1, qdef.title) 

116 cell(2, label) 

117 cell(3, issue.respondent.name) 

118 cell(4, yes_no) 

119 cell(5, comments, wrap_format) 

120 cell(6, score) 

121 cell(7, weight) 

122 finally: 

123 workbook.close() 

124 

125 buff.seek(0) 

126 return attachment(buff.read(), MimeTypes.XLSX.value, f'{project.title[:25]}.xlsx') 

127 

128 

129@http 

130def get_scoretotals(session, user, project_ids): 

131 ''' 

132 Get a list of total scores by Project and Respondent for the projects given 

133 by the ids in project_ids 

134 ''' 

135 user.check_permission(perms.ISSUE_VIEW_SCORES) 

136 get_pid = attrgetter('project_id') 

137 rows = [] 

138 score_totals = fetch.score_totals_by_project(session, user.organisation.id, project_ids) 

139 for pid, items in groupby(score_totals, get_pid): 

140 record = {'project_id': pid, 'scores': []} 

141 for item in items: 

142 if 'project_title' not in record: 

143 record['project_title'] = item.project_title 

144 record['owner'] = item.project_owner 

145 record['published'] = item.date_published 

146 record['scores'].append( 

147 {'score': round(item.total_weighted_score, ndigits=2), 

148 'respondent': item.respondent_id}) 

149 rows.append(record) 

150 return rows