Coverage for rfpy/vendor/api/questionnaire.py: 100%
130 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-24 10:52 +0000
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-24 10:52 +0000
1import logging
2from typing import List, Optional, Dict
3from operator import attrgetter
4from itertools import groupby
5from collections import defaultdict
7from sqlalchemy.orm.exc import NoResultFound
8from sqlalchemy.orm import subqueryload, Session
9import webob.exc
11from rfpy.suxint import http
12from rfpy.api import fetch, update, attachments
13from ..validation import validate
14from rfpy.model.exc import ValidationFailure
15from rfpy.auth import perms
16from rfpy.web import serial
17from rfpy.model import Section, Answer, QuestionInstance, User, Project
18from rfpy.model.questionnaire import NumberString
20log = logging.getLogger(__name__)
23@http
24def get_issue_tree(
25 session: Session, effective_user: User, issue_id: int, q_with_questions: bool
26) -> serial.Nodes:
27 """
28 Get the structure of the Project's questionnaire as a nested
29 object structure.
30 """
31 issue = fetch.issue(session, issue_id)
32 validate(effective_user, issue)
33 root = fetch.light_tree(session, issue.project_id, with_questions=q_with_questions)
34 return serial.Nodes(**root)
37@http
38def get_issue_question(
39 session: Session, effective_user: User, issue_id: int, node_number: NumberString
40) -> serial.AnsweredQuestion:
41 issue = fetch.issue(session, issue_id)
42 question = fetch.question_instance_by_number(session, issue.project_id, node_number)
43 validate(effective_user, issue, question=question)
44 qdict = question.single_vendor_dict(issue)
45 qdict["response_state"] = issue.response_state_for_q(question.id).as_dict()
46 return qdict
49@http
50def get_issue_sections(
51 session: Session, effective_user: User, issue_id: int
52) -> List[serial.TreeNode]:
53 issue = fetch.issue(session, issue_id)
54 validate(effective_user, issue)
56 secq = (
57 session.query(Section.id, Section.parent_id, Section.number, Section.title)
58 .filter(Section.project_id == issue.project_id)
59 .order_by(Section.number)
60 )
62 return [
63 serial.TreeNode(
64 id=s.id, parent_id=s.parent_id, number=s.number.dotted, title=s.title
65 )
66 for s in secq
67 ]
70@http
71def get_issue_section(
72 session: Session, effective_user: User, issue_id: int, section_id: int
73) -> serial.FullSection:
74 issue = fetch.issue(session, issue_id)
75 section = fetch.section(session, section_id)
76 validate(effective_user, issue, section=section)
77 sec_dict = section.as_dict()
78 sec_dict["subsections"] = [s.as_dict() for s in section.subsections]
79 sec_dict["questions"] = list(fetch.answered_questions(issue, section_id))
80 return sec_dict
83@http
84def get_issue_section_stats(
85 session: Session, effective_user: User, issue_id: int, section_id: int
86) -> List[serial.AnswerStats]:
87 issue = fetch.issue(session, issue_id)
88 section = fetch.section(session, section_id)
89 validate(effective_user, issue, section=section)
90 return [
91 serial.AnswerStats.model_validate(x)
92 for x in fetch.answering_stats(issue, section)
93 ]
96@http
97def post_issue_question_answers(
98 session: Session,
99 effective_user: User,
100 issue_id: int,
101 question_id: int,
102 answers_doc: serial.ElementAnswerList,
103) -> None:
104 """
105 Save answers for a single question.
107 Note that Attachments cannot be added using this operation. Use operation
108 post_issue_answer_attachment for that purpose.
109 """
111 if len(answers_doc.root) == 0:
112 raise webob.exc.HTTPBadRequest("No answers provided")
114 issue = fetch.issue(session, issue_id)
115 question = fetch.question(session, question_id)
116 response_state = issue.response_state_for_q(question_id)
118 validate(
119 effective_user,
120 issue=issue,
121 action=perms.ISSUE_SAVE_QUESTION_RESPONSE,
122 question=question,
123 response_state=response_state,
124 )
126 answer_lookup = {a.element_id: a.answer for a in answers_doc.root}
128 update.save_answers(session, effective_user, question, answer_lookup, issue)
131@http
132def get_issue_answers_search(
133 session: Session, effective_user: User, issue_id: int, search_term: str
134) -> Dict[str, List[Dict[str, str]]]:
135 """
136 Search the text of answers in the given issue for the search term
137 """
138 issue = fetch.issue(session, issue_id)
139 validate(effective_user, issue=issue, action=perms.ISSUE_VIEW_ANSWERS)
141 # NB use of tf8mb4_bin collation - all searches case insensitive otherwise
143 q = (
144 session.query(QuestionInstance.id, QuestionInstance.number)
145 .join(Answer)
146 .filter(Answer.issue_id == issue_id)
147 .filter(Answer.answer.collate("utf8mb4_bin").like(f"%{search_term}%"))
148 .group_by(QuestionInstance.id)
149 )
151 matches = [{"number": row.number.dotted, "question_id": row.id} for row in q]
152 return {"matches": matches}
155@http
156def post_issue_answers_replace(
157 session: Session,
158 effective_user: User,
159 issue_id: int,
160 replace_doc: serial.TextReplace,
161) -> serial.Count:
162 issue = fetch.issue(session, issue_id)
163 validate(
164 effective_user,
165 issue=issue,
166 bulk_import=True,
167 action=perms.ISSUE_SAVE_QUESTION_RESPONSE,
168 )
170 replace_term = replace_doc.replace_term
171 search_term = replace_doc.search_term
173 q_lookup: dict[QuestionInstance, dict[int, str]] = defaultdict(dict)
174 a: Answer
176 # NB use of tf8mb4_bin collation - all searches case insensitive otherwise
177 for a in (
178 session.query(Answer)
179 .options(subqueryload(Answer.question_instance))
180 .filter(
181 Answer.issue_id == issue_id,
182 Answer.answer.collate("utf8mb4_bin").like(f"%{search_term}%"),
183 )
184 ):
185 q_lookup[a.question_instance][a.element_id] = a.answer
187 for qi, answer_lookup in q_lookup.items():
188 for el_id in answer_lookup:
189 answer_lookup[el_id] = answer_lookup[el_id].replace(
190 search_term, replace_term
191 )
193 update.save_answers(session, effective_user, qi, answer_lookup, issue)
194 return serial.Count(description="answers_updated_count", count=len(q_lookup))
197@http
198def delete_issue_answer_element(
199 session: Session, effective_user: User, issue_id: int, element_id: int
200) -> None:
201 issue = fetch.issue(session, issue_id)
202 qelement = fetch.qelement(session, element_id)
203 question_instance = qelement.get_question_instance(issue.project_id)
204 response_state = issue.response_state_for_q(question_instance.id)
206 validate(
207 effective_user,
208 issue=issue,
209 action=perms.ISSUE_SAVE_QUESTION_RESPONSE,
210 question=question_instance,
211 response_state=response_state,
212 )
214 answer = qelement.get_answer(issue)
215 if answer.attachment is not None:
216 try:
217 session.delete(answer.attachment)
218 attachments.delete_from_disc(answer.attachment)
219 except FileNotFoundError:
220 m = (
221 f"Could not delete attachment id {answer.attachment.id}"
222 f" filename {answer.attachment.filename} - File Not Found"
223 )
224 log.warn(m)
226 session.delete(answer)
229@http
230def get_issue_answerimport(
231 session: Session,
232 effective_user: User,
233 issue_id: int,
234 node_number: Optional[NumberString] = None,
235) -> serial.ImportableAnswersList:
236 """
237 Get a count of importable answers from the issue given by issueId GET param.
238 If provided, the questionNumber GET parameter restricts the question to the given
239 section
240 """
241 target_issue = fetch.issue(session, issue_id)
242 validate(effective_user, target_issue, action=perms.ISSUE_VIEW_ANSWERS)
244 res = fetch.importable_answers(session, target_issue, sec_number=node_number)
245 doc = serial.ImportableAnswersList(
246 root=[serial.ImportableAnswers.model_validate(x) for x in res]
247 )
248 return doc
251@http
252def post_issue_answerimport(
253 session: Session,
254 effective_user: User,
255 issue_id: int,
256 import_answers_doc: serial.ImportAnswers,
257) -> serial.AnswerImportResult:
258 """Import answers from source_issue_id to issue_id optionally filtered by section_number"""
260 target_issue = fetch.issue(session, issue_id)
261 validate(
262 effective_user,
263 target_issue,
264 action=perms.ISSUE_SAVE_QUESTION_RESPONSE,
265 bulk_import=True,
266 )
268 try:
269 source_issue = fetch.issue(session, import_answers_doc.source_issue_id)
270 except NoResultFound:
271 m = f"Source Issue with ID {import_answers_doc.source_issue_id} not found"
272 raise ValueError(m)
273 validate(effective_user, source_issue, action=perms.ISSUE_VIEW_ANSWERS)
275 proj: Project = target_issue.project
277 rows = fetch.importable_answer_lookup(
278 session, source_issue, proj, import_answers_doc.section_number
279 )
281 # create an individual answer lookup dict for each question, keyed by q def id
282 by_qid = {
283 qdef_id: {a.element_id: a.answer for a in answers}
284 for qdef_id, answers in groupby(rows, key=attrgetter("question_def_id"))
285 }
286 src_id_set = by_qid.keys()
288 imported, errors, unchanged = [], [], []
289 for q in proj.questions.filter(
290 QuestionInstance.question_def_id.in_(src_id_set)
291 ).order_by(QuestionInstance.number):
292 try:
293 lookup = by_qid[q.question_def_id]
294 if update.save_answers(
295 session, effective_user, q, lookup, target_issue, imported=True
296 ):
297 imported.append(q.number.dotted)
298 else:
299 unchanged.append(q.number.dotted)
300 except ValidationFailure as ve:
301 errors.append(f"{q.number}: {ve.message}, {ve.errors_list}")
303 return serial.AnswerImportResult(
304 imported=imported, errors=errors, unchanged=unchanged
305 )