Coverage for rfpy/api/endpoints/reports/msword.py: 100%

95 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-04-24 10:52 +0000

1import re 

2import logging 

3from pathlib import Path 

4from itertools import tee 

5 

6from docx import Document 

7from sqlalchemy.orm import subqueryload 

8 

9from rfpy import conf 

10from rfpy.auth import perms 

11from rfpy.suxint import http 

12from rfpy.api import fetch, validate 

13from rfpy.model import Answer, QuestionInstance, QElement, Project, User, Issue, Section 

14from rfpy.web.response import XAccelTempResponse, XAccelResponse 

15from rfpy.web.mime import MimeTypes 

16 

17 

18TEMPLATE_DOCX = "assets/q-template.docx" 

19log = logging.getLogger(__name__) 

20 

21 

22def is_comments_label(label_text): 

23 if label_text is None: 

24 return False 

25 patt = r"(comments|qualifications)" 

26 match = re.search(patt, label_text, re.IGNORECASE) is not None 

27 return match and len(label_text) < 50 

28 

29 

30def render_paragraph_question(doc, question, answers): 

31 elements, following = tee(question.question_def.elements) 

32 next(following, None) 

33 

34 for element in elements: 

35 next_el = next(following, None) 

36 if is_comments_label(element.label): 

37 if next_el.id not in answers: 

38 # Don't print comments / quals label or answer 

39 # if Not Answered 

40 next(elements, None) 

41 next(following, None) 

42 continue 

43 if answers and element.is_answerable: 

44 if element.id in answers: 

45 answer = answers[element.id].answer 

46 else: 

47 answer = "Not Answered" 

48 doc.add_paragraph(answer, style="Answer") 

49 else: 

50 doc.add_paragraph(element.label) 

51 

52 

53def render_table_question(doc, question, answers): 

54 """ 

55 TODO - doesn't handle multi column tables 

56 """ 

57 col_count = question.question_def.column_count 

58 table = doc.add_table(rows=1, cols=col_count) 

59 current_row = -1 

60 row_cells = None 

61 for el in question.question_def.elements: 

62 if el.row > current_row: 

63 row_cells = table.add_row().cells 

64 cell = row_cells[el.col - 1] 

65 if el.colspan > 1: 

66 try: 

67 # fix broken colspans 

68 right_col_index = (el.col - 1) + el.colspan 

69 if right_col_index > (col_count - 1): 

70 right_col_index = col_count - 1 

71 other_cell = row_cells[right_col_index] 

72 cell.merge(other_cell) 

73 except IndexError: # pragma: no cover 

74 log.warning( 

75 "Fix colspan failed for question instance ID %s", question.id 

76 ) 

77 if answers and el.is_answerable: 

78 if el.id in answers: 

79 answer = answers[el.id].answer 

80 else: 

81 answer = "- Not Answered -" 

82 cell.add_paragraph(answer, style="Answer") 

83 else: 

84 cell.add_paragraph(el.label) 

85 current_row = el.row 

86 

87 

88def _el_answer_lookup(project: Project, issue: Issue, user: User, session): 

89 validate.check( 

90 user, 

91 perms.ISSUE_VIEW_ANSWERS, 

92 project=project, 

93 issue=issue, 

94 section_id=project.section_id, 

95 ) 

96 

97 answer_query = ( 

98 session.query(Answer.element_id, Answer.answer) 

99 .join(QElement) 

100 .join(QuestionInstance) 

101 .filter(Answer.issue == issue, QuestionInstance.project == project) 

102 ) 

103 

104 return {a.element_id: a for a in answer_query} 

105 

106 

107@http 

108def get_project_report_msword( 

109 session, user, project_id, q_issue_id=None 

110) -> XAccelResponse: 

111 """ 

112 Generate an MS Word report for the quesionnaire associated with the given Project ID. 

113 If an issue ID is provided the answers are populated for that Issue. 

114 """ 

115 project = fetch.project(session, project_id) 

116 validate.check(user, perms.PROJECT_ACCESS, project=project, deny_restricted=True) 

117 

118 sections = ( 

119 fetch.sections(project, user).options(subqueryload(Section.questions)).all() 

120 ) 

121 

122 if q_issue_id is not None: 

123 issue = project.get_issue(q_issue_id) 

124 answers = _el_answer_lookup(project, issue, user, session) 

125 fname = "{0}-{1}.docx".format(project.title[:20], issue.respondent.name[:15]) 

126 else: 

127 answers = {} 

128 fname = "{0}.docx".format(project.title[:25]) 

129 

130 doc_path = Path(__file__).parent.joinpath(TEMPLATE_DOCX) 

131 doc = Document(docx=str(doc_path)) 

132 

133 for section in sections: 

134 if section.is_top_level: 

135 doc.add_heading(sections[0].title, 0) 

136 else: 

137 heading_text = "{} {}".format(section.number.dotted, section.title) 

138 level = section.number.dotted.count(".") + 1 

139 doc.add_heading(heading_text, level) 

140 for question in section.questions: 

141 q_title = "{} {}".format(question.number.dotted, question.title) 

142 doc.add_paragraph(q_title, style="QuestionTitle") 

143 if question.question_def.is_tabular(): 

144 render_table_question(doc, question, answers) 

145 else: 

146 render_paragraph_question(doc, question, answers) 

147 

148 cache_file_name, cache_file_path = conf.CONF.random_cache_file_path() 

149 

150 cache_dir = Path(cache_file_path) 

151 cache_dir.parent.mkdir(parents=True, exist_ok=True) 

152 with open(cache_file_path, "wb+") as fp: 

153 doc.save(fp) 

154 

155 return XAccelTempResponse(cache_file_name, fname, content_type=MimeTypes.DOCX)