Coverage for rfpy/api/endpoints/reports/msword.py: 100%
96 statements
« prev ^ index » next coverage.py v7.0.1, created at 2022-12-31 16:00 +0000
« prev ^ index » next coverage.py v7.0.1, created at 2022-12-31 16:00 +0000
1import re
2import logging
3from pathlib import Path
4from itertools import tee
6from docx import Document
7from sqlalchemy.orm import subqueryload
9from rfpy import conf
10from rfpy.auth import perms
11from rfpy.suxint import http
12from rfpy.api import fetch, validate
13from rfpy.model import Answer, QuestionInstance, QElement, Project, User, Issue
14from rfpy.web.response import XAccelTempResponse
15from rfpy.web.mime import MimeTypes
18TEMPLATE_DOCX = 'assets/q-template.docx'
19log = logging.getLogger(__name__)
22def is_comments_label(label_text):
23 if label_text is None:
24 return False
25 patt = r'(comments|qualifications)'
26 match = re.search(patt, label_text, re.IGNORECASE) is not None
27 return match and len(label_text) < 50
30def render_paragraph_question(doc, question, answers):
32 elements, following = tee(question.question_def.elements)
33 next(following, None)
35 for element in elements:
36 next_el = next(following, None)
37 if is_comments_label(element.label):
38 if next_el.id not in answers:
39 # Don't print comments / quals label or answer
40 # if Not Answered
41 next(elements, None)
42 next(following, None)
43 continue
44 if answers and element.is_answerable:
45 if element.id in answers:
46 answer = answers[element.id].answer
47 else:
48 answer = 'Not Answered'
49 doc.add_paragraph(answer, style="Answer")
50 else:
51 doc.add_paragraph(element.label)
54def render_table_question(doc, question, answers):
55 '''
56 TODO - doesn't handle multi column tables
57 '''
58 col_count = question.question_def.column_count
59 table = doc.add_table(rows=1, cols=col_count)
60 current_row = -1
61 row_cells = None
62 for el in question.question_def.elements:
63 if el.row > current_row:
64 row_cells = table.add_row().cells
65 cell = row_cells[el.col - 1]
66 if el.colspan > 1:
67 try:
68 # fix broken colspans
69 right_col_index = (el.col - 1) + el.colspan
70 if right_col_index > (col_count - 1):
71 right_col_index = col_count - 1
72 other_cell = row_cells[right_col_index]
73 cell.merge(other_cell)
74 except IndexError: # pragma: no cover
75 log.warning('Fix colspan failed for question instance ID %s', question.id)
76 if answers and el.is_answerable:
77 if el.id in answers:
78 answer = answers[el.id].answer
79 else:
80 answer = '- Not Answered -'
81 cell.add_paragraph(answer, style="Answer")
82 else:
83 cell.add_paragraph(el.label)
84 current_row = el.row
87def _el_answer_lookup(project: Project, issue: Issue, user: User, session):
89 validate.check(user, perms.ISSUE_VIEW_ANSWERS,
90 project=project, issue=issue, section_id=project.section_id)
92 answer_query = session.query(Answer.element_id, Answer.answer)\
93 .join(QElement, QuestionInstance)\
94 .filter(
95 Answer.issue == issue,
96 QuestionInstance.project == project)
98 return {a.element_id: a for a in answer_query}
101@http
102def get_project_report_msword(session, user, project_id, q_issue_id=None) -> MimeTypes.DOCX:
103 '''
104 Generate an MS Word report for the quesionnaire associated with the given Project ID.
105 If an issue ID is provided the answers are populated for that Issue.
106 '''
107 project = fetch.project(session, project_id)
108 validate.check(user, perms.PROJECT_ACCESS, project=project, deny_restricted=True)
110 sections = fetch.sections(project, user)\
111 .options(subqueryload('questions'))\
112 .all()
114 if q_issue_id is not None:
115 issue = project.get_issue(q_issue_id)
116 answers = _el_answer_lookup(project, issue, user, session)
117 fname = '{0}-{1}.docx'.format(project.title[:20],
118 issue.respondent.name[:15])
119 else:
120 answers = {}
121 fname = '{0}.docx'.format(project.title[:25])
123 doc_path = Path(__file__).parent.joinpath(TEMPLATE_DOCX)
124 doc = Document(docx=doc_path)
126 for section in sections:
127 if section.is_top_level:
128 doc.add_heading(sections[0].title, 0)
129 else:
130 heading_text = '{} {}'.format(section.number.dotted, section.title)
131 level = section.number.dotted.count('.') + 1
132 doc.add_heading(heading_text, level)
133 for question in section.questions:
134 q_title = '{} {}'.format(question.number.dotted, question.title)
135 doc.add_paragraph(q_title, style='QuestionTitle')
136 if question.question_def.is_tabular():
137 render_table_question(doc, question, answers)
138 else:
139 render_paragraph_question(doc, question, answers)
141 cache_file_name, cache_file_path = conf.CONF.random_cache_file_path()
143 cache_dir = Path(cache_file_path)
144 cache_dir.parent.mkdir(parents=True, exist_ok=True)
145 print(cache_file_path)
146 with open(cache_file_path, 'wb+') as fp:
147 doc.save(fp)
149 return XAccelTempResponse(cache_file_name, fname,
150 content_type=MimeTypes.DOCX)