Coverage for rfpy/api/endpoints/reports/msword.py: 100%

96 statements  

« prev     ^ index     » next       coverage.py v7.0.1, created at 2022-12-31 16:00 +0000

1import re 

2import logging 

3from pathlib import Path 

4from itertools import tee 

5 

6from docx import Document 

7from sqlalchemy.orm import subqueryload 

8 

9from rfpy import conf 

10from rfpy.auth import perms 

11from rfpy.suxint import http 

12from rfpy.api import fetch, validate 

13from rfpy.model import Answer, QuestionInstance, QElement, Project, User, Issue 

14from rfpy.web.response import XAccelTempResponse 

15from rfpy.web.mime import MimeTypes 

16 

17 

18TEMPLATE_DOCX = 'assets/q-template.docx' 

19log = logging.getLogger(__name__) 

20 

21 

22def is_comments_label(label_text): 

23 if label_text is None: 

24 return False 

25 patt = r'(comments|qualifications)' 

26 match = re.search(patt, label_text, re.IGNORECASE) is not None 

27 return match and len(label_text) < 50 

28 

29 

30def render_paragraph_question(doc, question, answers): 

31 

32 elements, following = tee(question.question_def.elements) 

33 next(following, None) 

34 

35 for element in elements: 

36 next_el = next(following, None) 

37 if is_comments_label(element.label): 

38 if next_el.id not in answers: 

39 # Don't print comments / quals label or answer 

40 # if Not Answered 

41 next(elements, None) 

42 next(following, None) 

43 continue 

44 if answers and element.is_answerable: 

45 if element.id in answers: 

46 answer = answers[element.id].answer 

47 else: 

48 answer = 'Not Answered' 

49 doc.add_paragraph(answer, style="Answer") 

50 else: 

51 doc.add_paragraph(element.label) 

52 

53 

54def render_table_question(doc, question, answers): 

55 ''' 

56 TODO - doesn't handle multi column tables 

57 ''' 

58 col_count = question.question_def.column_count 

59 table = doc.add_table(rows=1, cols=col_count) 

60 current_row = -1 

61 row_cells = None 

62 for el in question.question_def.elements: 

63 if el.row > current_row: 

64 row_cells = table.add_row().cells 

65 cell = row_cells[el.col - 1] 

66 if el.colspan > 1: 

67 try: 

68 # fix broken colspans 

69 right_col_index = (el.col - 1) + el.colspan 

70 if right_col_index > (col_count - 1): 

71 right_col_index = col_count - 1 

72 other_cell = row_cells[right_col_index] 

73 cell.merge(other_cell) 

74 except IndexError: # pragma: no cover 

75 log.warning('Fix colspan failed for question instance ID %s', question.id) 

76 if answers and el.is_answerable: 

77 if el.id in answers: 

78 answer = answers[el.id].answer 

79 else: 

80 answer = '- Not Answered -' 

81 cell.add_paragraph(answer, style="Answer") 

82 else: 

83 cell.add_paragraph(el.label) 

84 current_row = el.row 

85 

86 

87def _el_answer_lookup(project: Project, issue: Issue, user: User, session): 

88 

89 validate.check(user, perms.ISSUE_VIEW_ANSWERS, 

90 project=project, issue=issue, section_id=project.section_id) 

91 

92 answer_query = session.query(Answer.element_id, Answer.answer)\ 

93 .join(QElement, QuestionInstance)\ 

94 .filter( 

95 Answer.issue == issue, 

96 QuestionInstance.project == project) 

97 

98 return {a.element_id: a for a in answer_query} 

99 

100 

101@http 

102def get_project_report_msword(session, user, project_id, q_issue_id=None) -> MimeTypes.DOCX: 

103 ''' 

104 Generate an MS Word report for the quesionnaire associated with the given Project ID. 

105 If an issue ID is provided the answers are populated for that Issue. 

106 ''' 

107 project = fetch.project(session, project_id) 

108 validate.check(user, perms.PROJECT_ACCESS, project=project, deny_restricted=True) 

109 

110 sections = fetch.sections(project, user)\ 

111 .options(subqueryload('questions'))\ 

112 .all() 

113 

114 if q_issue_id is not None: 

115 issue = project.get_issue(q_issue_id) 

116 answers = _el_answer_lookup(project, issue, user, session) 

117 fname = '{0}-{1}.docx'.format(project.title[:20], 

118 issue.respondent.name[:15]) 

119 else: 

120 answers = {} 

121 fname = '{0}.docx'.format(project.title[:25]) 

122 

123 doc_path = Path(__file__).parent.joinpath(TEMPLATE_DOCX) 

124 doc = Document(docx=doc_path) 

125 

126 for section in sections: 

127 if section.is_top_level: 

128 doc.add_heading(sections[0].title, 0) 

129 else: 

130 heading_text = '{} {}'.format(section.number.dotted, section.title) 

131 level = section.number.dotted.count('.') + 1 

132 doc.add_heading(heading_text, level) 

133 for question in section.questions: 

134 q_title = '{} {}'.format(question.number.dotted, question.title) 

135 doc.add_paragraph(q_title, style='QuestionTitle') 

136 if question.question_def.is_tabular(): 

137 render_table_question(doc, question, answers) 

138 else: 

139 render_paragraph_question(doc, question, answers) 

140 

141 cache_file_name, cache_file_path = conf.CONF.random_cache_file_path() 

142 

143 cache_dir = Path(cache_file_path) 

144 cache_dir.parent.mkdir(parents=True, exist_ok=True) 

145 print(cache_file_path) 

146 with open(cache_file_path, 'wb+') as fp: 

147 doc.save(fp) 

148 

149 return XAccelTempResponse(cache_file_name, fname, 

150 content_type=MimeTypes.DOCX)