Coverage for rfpy/api/endpoints/reports/msword.py: 100%
95 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-24 10:52 +0000
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-24 10:52 +0000
1import re
2import logging
3from pathlib import Path
4from itertools import tee
6from docx import Document
7from sqlalchemy.orm import subqueryload
9from rfpy import conf
10from rfpy.auth import perms
11from rfpy.suxint import http
12from rfpy.api import fetch, validate
13from rfpy.model import Answer, QuestionInstance, QElement, Project, User, Issue, Section
14from rfpy.web.response import XAccelTempResponse, XAccelResponse
15from rfpy.web.mime import MimeTypes
18TEMPLATE_DOCX = "assets/q-template.docx"
19log = logging.getLogger(__name__)
22def is_comments_label(label_text):
23 if label_text is None:
24 return False
25 patt = r"(comments|qualifications)"
26 match = re.search(patt, label_text, re.IGNORECASE) is not None
27 return match and len(label_text) < 50
30def render_paragraph_question(doc, question, answers):
31 elements, following = tee(question.question_def.elements)
32 next(following, None)
34 for element in elements:
35 next_el = next(following, None)
36 if is_comments_label(element.label):
37 if next_el.id not in answers:
38 # Don't print comments / quals label or answer
39 # if Not Answered
40 next(elements, None)
41 next(following, None)
42 continue
43 if answers and element.is_answerable:
44 if element.id in answers:
45 answer = answers[element.id].answer
46 else:
47 answer = "Not Answered"
48 doc.add_paragraph(answer, style="Answer")
49 else:
50 doc.add_paragraph(element.label)
53def render_table_question(doc, question, answers):
54 """
55 TODO - doesn't handle multi column tables
56 """
57 col_count = question.question_def.column_count
58 table = doc.add_table(rows=1, cols=col_count)
59 current_row = -1
60 row_cells = None
61 for el in question.question_def.elements:
62 if el.row > current_row:
63 row_cells = table.add_row().cells
64 cell = row_cells[el.col - 1]
65 if el.colspan > 1:
66 try:
67 # fix broken colspans
68 right_col_index = (el.col - 1) + el.colspan
69 if right_col_index > (col_count - 1):
70 right_col_index = col_count - 1
71 other_cell = row_cells[right_col_index]
72 cell.merge(other_cell)
73 except IndexError: # pragma: no cover
74 log.warning(
75 "Fix colspan failed for question instance ID %s", question.id
76 )
77 if answers and el.is_answerable:
78 if el.id in answers:
79 answer = answers[el.id].answer
80 else:
81 answer = "- Not Answered -"
82 cell.add_paragraph(answer, style="Answer")
83 else:
84 cell.add_paragraph(el.label)
85 current_row = el.row
88def _el_answer_lookup(project: Project, issue: Issue, user: User, session):
89 validate.check(
90 user,
91 perms.ISSUE_VIEW_ANSWERS,
92 project=project,
93 issue=issue,
94 section_id=project.section_id,
95 )
97 answer_query = (
98 session.query(Answer.element_id, Answer.answer)
99 .join(QElement)
100 .join(QuestionInstance)
101 .filter(Answer.issue == issue, QuestionInstance.project == project)
102 )
104 return {a.element_id: a for a in answer_query}
107@http
108def get_project_report_msword(
109 session, user, project_id, q_issue_id=None
110) -> XAccelResponse:
111 """
112 Generate an MS Word report for the quesionnaire associated with the given Project ID.
113 If an issue ID is provided the answers are populated for that Issue.
114 """
115 project = fetch.project(session, project_id)
116 validate.check(user, perms.PROJECT_ACCESS, project=project, deny_restricted=True)
118 sections = (
119 fetch.sections(project, user).options(subqueryload(Section.questions)).all()
120 )
122 if q_issue_id is not None:
123 issue = project.get_issue(q_issue_id)
124 answers = _el_answer_lookup(project, issue, user, session)
125 fname = "{0}-{1}.docx".format(project.title[:20], issue.respondent.name[:15])
126 else:
127 answers = {}
128 fname = "{0}.docx".format(project.title[:25])
130 doc_path = Path(__file__).parent.joinpath(TEMPLATE_DOCX)
131 doc = Document(docx=str(doc_path))
133 for section in sections:
134 if section.is_top_level:
135 doc.add_heading(sections[0].title, 0)
136 else:
137 heading_text = "{} {}".format(section.number.dotted, section.title)
138 level = section.number.dotted.count(".") + 1
139 doc.add_heading(heading_text, level)
140 for question in section.questions:
141 q_title = "{} {}".format(question.number.dotted, question.title)
142 doc.add_paragraph(q_title, style="QuestionTitle")
143 if question.question_def.is_tabular():
144 render_table_question(doc, question, answers)
145 else:
146 render_paragraph_question(doc, question, answers)
148 cache_file_name, cache_file_path = conf.CONF.random_cache_file_path()
150 cache_dir = Path(cache_file_path)
151 cache_dir.parent.mkdir(parents=True, exist_ok=True)
152 with open(cache_file_path, "wb+") as fp:
153 doc.save(fp)
155 return XAccelTempResponse(cache_file_name, fname, content_type=MimeTypes.DOCX)