Coverage for rfpy/vendor/api/questionnaire.py: 100%

130 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-04-24 10:52 +0000

1import logging 

2from typing import List, Optional, Dict 

3from operator import attrgetter 

4from itertools import groupby 

5from collections import defaultdict 

6 

7from sqlalchemy.orm.exc import NoResultFound 

8from sqlalchemy.orm import subqueryload, Session 

9import webob.exc 

10 

11from rfpy.suxint import http 

12from rfpy.api import fetch, update, attachments 

13from ..validation import validate 

14from rfpy.model.exc import ValidationFailure 

15from rfpy.auth import perms 

16from rfpy.web import serial 

17from rfpy.model import Section, Answer, QuestionInstance, User, Project 

18from rfpy.model.questionnaire import NumberString 

19 

20log = logging.getLogger(__name__) 

21 

22 

23@http 

24def get_issue_tree( 

25 session: Session, effective_user: User, issue_id: int, q_with_questions: bool 

26) -> serial.Nodes: 

27 """ 

28 Get the structure of the Project's questionnaire as a nested 

29 object structure. 

30 """ 

31 issue = fetch.issue(session, issue_id) 

32 validate(effective_user, issue) 

33 root = fetch.light_tree(session, issue.project_id, with_questions=q_with_questions) 

34 return serial.Nodes(**root) 

35 

36 

37@http 

38def get_issue_question( 

39 session: Session, effective_user: User, issue_id: int, node_number: NumberString 

40) -> serial.AnsweredQuestion: 

41 issue = fetch.issue(session, issue_id) 

42 question = fetch.question_instance_by_number(session, issue.project_id, node_number) 

43 validate(effective_user, issue, question=question) 

44 qdict = question.single_vendor_dict(issue) 

45 qdict["response_state"] = issue.response_state_for_q(question.id).as_dict() 

46 return qdict 

47 

48 

49@http 

50def get_issue_sections( 

51 session: Session, effective_user: User, issue_id: int 

52) -> List[serial.TreeNode]: 

53 issue = fetch.issue(session, issue_id) 

54 validate(effective_user, issue) 

55 

56 secq = ( 

57 session.query(Section.id, Section.parent_id, Section.number, Section.title) 

58 .filter(Section.project_id == issue.project_id) 

59 .order_by(Section.number) 

60 ) 

61 

62 return [ 

63 serial.TreeNode( 

64 id=s.id, parent_id=s.parent_id, number=s.number.dotted, title=s.title 

65 ) 

66 for s in secq 

67 ] 

68 

69 

70@http 

71def get_issue_section( 

72 session: Session, effective_user: User, issue_id: int, section_id: int 

73) -> serial.FullSection: 

74 issue = fetch.issue(session, issue_id) 

75 section = fetch.section(session, section_id) 

76 validate(effective_user, issue, section=section) 

77 sec_dict = section.as_dict() 

78 sec_dict["subsections"] = [s.as_dict() for s in section.subsections] 

79 sec_dict["questions"] = list(fetch.answered_questions(issue, section_id)) 

80 return sec_dict 

81 

82 

83@http 

84def get_issue_section_stats( 

85 session: Session, effective_user: User, issue_id: int, section_id: int 

86) -> List[serial.AnswerStats]: 

87 issue = fetch.issue(session, issue_id) 

88 section = fetch.section(session, section_id) 

89 validate(effective_user, issue, section=section) 

90 return [ 

91 serial.AnswerStats.model_validate(x) 

92 for x in fetch.answering_stats(issue, section) 

93 ] 

94 

95 

96@http 

97def post_issue_question_answers( 

98 session: Session, 

99 effective_user: User, 

100 issue_id: int, 

101 question_id: int, 

102 answers_doc: serial.ElementAnswerList, 

103) -> None: 

104 """ 

105 Save answers for a single question. 

106 

107 Note that Attachments cannot be added using this operation. Use operation 

108 post_issue_answer_attachment for that purpose. 

109 """ 

110 

111 if len(answers_doc.root) == 0: 

112 raise webob.exc.HTTPBadRequest("No answers provided") 

113 

114 issue = fetch.issue(session, issue_id) 

115 question = fetch.question(session, question_id) 

116 response_state = issue.response_state_for_q(question_id) 

117 

118 validate( 

119 effective_user, 

120 issue=issue, 

121 action=perms.ISSUE_SAVE_QUESTION_RESPONSE, 

122 question=question, 

123 response_state=response_state, 

124 ) 

125 

126 answer_lookup = {a.element_id: a.answer for a in answers_doc.root} 

127 

128 update.save_answers(session, effective_user, question, answer_lookup, issue) 

129 

130 

131@http 

132def get_issue_answers_search( 

133 session: Session, effective_user: User, issue_id: int, search_term: str 

134) -> Dict[str, List[Dict[str, str]]]: 

135 """ 

136 Search the text of answers in the given issue for the search term 

137 """ 

138 issue = fetch.issue(session, issue_id) 

139 validate(effective_user, issue=issue, action=perms.ISSUE_VIEW_ANSWERS) 

140 

141 # NB use of tf8mb4_bin collation - all searches case insensitive otherwise 

142 

143 q = ( 

144 session.query(QuestionInstance.id, QuestionInstance.number) 

145 .join(Answer) 

146 .filter(Answer.issue_id == issue_id) 

147 .filter(Answer.answer.collate("utf8mb4_bin").like(f"%{search_term}%")) 

148 .group_by(QuestionInstance.id) 

149 ) 

150 

151 matches = [{"number": row.number.dotted, "question_id": row.id} for row in q] 

152 return {"matches": matches} 

153 

154 

155@http 

156def post_issue_answers_replace( 

157 session: Session, 

158 effective_user: User, 

159 issue_id: int, 

160 replace_doc: serial.TextReplace, 

161) -> serial.Count: 

162 issue = fetch.issue(session, issue_id) 

163 validate( 

164 effective_user, 

165 issue=issue, 

166 bulk_import=True, 

167 action=perms.ISSUE_SAVE_QUESTION_RESPONSE, 

168 ) 

169 

170 replace_term = replace_doc.replace_term 

171 search_term = replace_doc.search_term 

172 

173 q_lookup: dict[QuestionInstance, dict[int, str]] = defaultdict(dict) 

174 a: Answer 

175 

176 # NB use of tf8mb4_bin collation - all searches case insensitive otherwise 

177 for a in ( 

178 session.query(Answer) 

179 .options(subqueryload(Answer.question_instance)) 

180 .filter( 

181 Answer.issue_id == issue_id, 

182 Answer.answer.collate("utf8mb4_bin").like(f"%{search_term}%"), 

183 ) 

184 ): 

185 q_lookup[a.question_instance][a.element_id] = a.answer 

186 

187 for qi, answer_lookup in q_lookup.items(): 

188 for el_id in answer_lookup: 

189 answer_lookup[el_id] = answer_lookup[el_id].replace( 

190 search_term, replace_term 

191 ) 

192 

193 update.save_answers(session, effective_user, qi, answer_lookup, issue) 

194 return serial.Count(description="answers_updated_count", count=len(q_lookup)) 

195 

196 

197@http 

198def delete_issue_answer_element( 

199 session: Session, effective_user: User, issue_id: int, element_id: int 

200) -> None: 

201 issue = fetch.issue(session, issue_id) 

202 qelement = fetch.qelement(session, element_id) 

203 question_instance = qelement.get_question_instance(issue.project_id) 

204 response_state = issue.response_state_for_q(question_instance.id) 

205 

206 validate( 

207 effective_user, 

208 issue=issue, 

209 action=perms.ISSUE_SAVE_QUESTION_RESPONSE, 

210 question=question_instance, 

211 response_state=response_state, 

212 ) 

213 

214 answer = qelement.get_answer(issue) 

215 if answer.attachment is not None: 

216 try: 

217 session.delete(answer.attachment) 

218 attachments.delete_from_disc(answer.attachment) 

219 except FileNotFoundError: 

220 m = ( 

221 f"Could not delete attachment id {answer.attachment.id}" 

222 f" filename {answer.attachment.filename} - File Not Found" 

223 ) 

224 log.warn(m) 

225 

226 session.delete(answer) 

227 

228 

229@http 

230def get_issue_answerimport( 

231 session: Session, 

232 effective_user: User, 

233 issue_id: int, 

234 node_number: Optional[NumberString] = None, 

235) -> serial.ImportableAnswersList: 

236 """ 

237 Get a count of importable answers from the issue given by issueId GET param. 

238 If provided, the questionNumber GET parameter restricts the question to the given 

239 section 

240 """ 

241 target_issue = fetch.issue(session, issue_id) 

242 validate(effective_user, target_issue, action=perms.ISSUE_VIEW_ANSWERS) 

243 

244 res = fetch.importable_answers(session, target_issue, sec_number=node_number) 

245 doc = serial.ImportableAnswersList( 

246 root=[serial.ImportableAnswers.model_validate(x) for x in res] 

247 ) 

248 return doc 

249 

250 

251@http 

252def post_issue_answerimport( 

253 session: Session, 

254 effective_user: User, 

255 issue_id: int, 

256 import_answers_doc: serial.ImportAnswers, 

257) -> serial.AnswerImportResult: 

258 """Import answers from source_issue_id to issue_id optionally filtered by section_number""" 

259 

260 target_issue = fetch.issue(session, issue_id) 

261 validate( 

262 effective_user, 

263 target_issue, 

264 action=perms.ISSUE_SAVE_QUESTION_RESPONSE, 

265 bulk_import=True, 

266 ) 

267 

268 try: 

269 source_issue = fetch.issue(session, import_answers_doc.source_issue_id) 

270 except NoResultFound: 

271 m = f"Source Issue with ID {import_answers_doc.source_issue_id} not found" 

272 raise ValueError(m) 

273 validate(effective_user, source_issue, action=perms.ISSUE_VIEW_ANSWERS) 

274 

275 proj: Project = target_issue.project 

276 

277 rows = fetch.importable_answer_lookup( 

278 session, source_issue, proj, import_answers_doc.section_number 

279 ) 

280 

281 # create an individual answer lookup dict for each question, keyed by q def id 

282 by_qid = { 

283 qdef_id: {a.element_id: a.answer for a in answers} 

284 for qdef_id, answers in groupby(rows, key=attrgetter("question_def_id")) 

285 } 

286 src_id_set = by_qid.keys() 

287 

288 imported, errors, unchanged = [], [], [] 

289 for q in proj.questions.filter( 

290 QuestionInstance.question_def_id.in_(src_id_set) 

291 ).order_by(QuestionInstance.number): 

292 try: 

293 lookup = by_qid[q.question_def_id] 

294 if update.save_answers( 

295 session, effective_user, q, lookup, target_issue, imported=True 

296 ): 

297 imported.append(q.number.dotted) 

298 else: 

299 unchanged.append(q.number.dotted) 

300 except ValidationFailure as ve: 

301 errors.append(f"{q.number}: {ve.message}, {ve.errors_list}") 

302 

303 return serial.AnswerImportResult( 

304 imported=imported, errors=errors, unchanged=unchanged 

305 )