Coverage for rfpy/api/endpoints/scoring.py: 81%
159 statements
« prev ^ index » next coverage.py v7.0.1, created at 2022-12-31 16:00 +0000
« prev ^ index » next coverage.py v7.0.1, created at 2022-12-31 16:00 +0000
1'''
2Operations for viewing & assigning scores for vendors' answers
3'''
4from typing import List
5from datetime import datetime
8from sqlalchemy.orm import Session
9from sqlalchemy.orm.exc import NoResultFound
11from rfpy.suxint import http
12from rfpy.model import (
13 ScoreComment, AuditEvent, Score, LiveProject, Issue, User, QuestionInstance
14)
15from rfpy.api import fetch, validate, update
16from rfpy.auth import AuthorizationFailure, perms
17from rfpy.web import serial
20@http
21def get_project_scores(session, user, project_id,
22 scoreset_id='__default__') -> serial.ScoringData:
23 '''
24 Returns all scores. Used for analysis in the browser.
26 @raise AuthorisationFailue - if the user is not a standard user from the buying organisation
27 @raise NoResultFound - if the specified project cannot be loaded
28 '''
29 project = fetch.project(session, project_id)
30 validate.check(user, perms.ISSUE_VIEW_AGREED_SCORES,
31 project=project, deny_restricted=True)
33 return {
34 'scoreset_id': scoreset_id,
35 'scores': [s._asdict() for s in fetch.scoring_data(project)]
36 }
39@http
40def get_question_scores(session, user, question_id, scoreset_id=''):
42 question = fetch.question(session, question_id)
43 section = question.section
45 question_filter = QuestionInstance.id == question.id
47 score_permission = Score.get_permission_for_scoreset(user, scoreset_id)
48 validate.check(user, score_permission,
49 project=question.project,
50 section_id=section.id)
52 return fetch.scores(session, section.project, section,
53 scoreset_id, user, question_filter)
56@http
57def post_question_score(session: Session, user, question_id, score_doc) -> serial.Id:
58 '''Create or update a single score for a given question/issue combination'''
60 question = fetch.question(session, question_id)
61 validate.check(user, perms.PROJECT_VIEW_QUESTIONNAIRE,
62 project=question.project,
63 section_id=question.section_id,
64 deny_restricted=False)
65 project = question.project
66 score, created = fetch.or_create_score(session, project, user, question, score_doc)
67 initial_score_value = score.score
68 score_value = score_doc['score_value']
69 Score.check_score_value(score_value, project)
70 score.score = score_value
72 score_perm = Score.get_permission_for_scoreset(user,
73 score_doc['scoreset_id'],
74 to_save=True)
75 validate.check(user, score_perm,
76 project=question.project,
77 section_id=question.section_id,
78 deny_restricted=False)
80 # Need the Score record's ID for the audit event record, so flush
81 session.flush()
82 update.log_score_event(session, score, initial_score_value, created,
83 project, user)
84 return {'id': score.id}
87@http
88def get_project_section_issue_scores(session, user, project_id, section_id, issue_id,
89 scoreset_id=''):
91 project = fetch.project(session, project_id)
92 section = fetch.section_of_project(project, section_id)
93 issue = project.get_issue(issue_id)
95 score_permission = Score.get_permission_for_scoreset(user, scoreset_id)
96 validate.check(user, score_permission,
97 project=project,
98 issue=issue,
99 section_id=section.id)
101 issue_filter = Issue.id == issue.id
102 return fetch.scores(session, project, section, scoreset_id, user, issue_filter)
105@http
106def post_question_score_comment(session: Session, user, question_id, score_doc):
107 '''Add a single score comment'''
108 question = fetch.question(session, question_id)
109 validate.check(user, perms.PROJECT_VIEW_QUESTIONNAIRE,
110 project=question.project,
111 section_id=question.section_id)
112 project = question.project
114 score, created = fetch.or_create_score(session, project,
115 user, question, score_doc)
117 to_save = False if score_doc['score_value'] is None else True
119 score_permission = Score.get_permission_for_scoreset(
120 user, score_doc['scoreset_id'], to_save=to_save)
122 validate.check(user, score_permission, issue=score.issue,
123 section_id=question.section_id, project=question.project)
125 if score_doc['score_value'] is not None:
126 initial_score_value = score.score
127 score_value = score_doc['score_value']
128 Score.check_score_value(score_value, project)
129 score.score = score_value
130 # Need the Score record's ID for the audit event record, so flush
131 session.flush()
132 update.log_score_event(session, score, initial_score_value, created,
133 project, user)
135 if score_doc['comment'] is not None:
136 # make the comment and add to database
137 comment = ScoreComment(score=score, comment_time=datetime.utcnow(),
138 user_id=user.id, comment_text=score_doc['comment'])
139 session.add(comment)
140 session.flush()
141 evt = AuditEvent.create('SCORE_COMMENT_ADDED',
142 object_id=comment.id,
143 user=user,
144 project=project,
145 issue_id=score_doc['issue_id'],
146 question_id=question.id)
147 evt.add_change('Comment', '', comment.comment_text)
149 session.add(evt)
152def check_autoscore_permissions(project, initiating_user, target_user):
153 if not project.multiscored:
154 raise ValueError('Project must be using Multiple Score Sets')
155 if target_user.organisation not in project.participants:
156 m = f'User {target_user.id} not a participant in project {project.id}'
157 raise AuthorizationFailure(m)
158 target_user.check_permission(perms.ISSUE_SAVE_SCORES)
159 initiating_user.check_permission(perms.ISSUE_SAVE_AGREED_SCORES)
162@http
163def get_project_calcautoscores(session, user, project_id, target_user):
164 project: LiveProject = fetch.project(session, project_id)
165 check_autoscore_permissions(project, user, target_user)
166 ascores = project.generate_autoscores(session, target_user)
167 return list(ascores.values())
170@http
171def post_project_calcautoscores(session, user, project_id, target_user):
172 '''
173 Autoscores are not calculated for User Score Sets. This methods allows autoscores
174 to be set for the target_user's score set in the current project
175 '''
176 project = fetch.project(session, project_id)
177 check_autoscore_permissions(project, user, target_user)
178 existing_scores = project.scores_dict()
179 unsaved_scores = []
180 for auto_key, score_dict in project.generate_autoscores(session, target_user).items():
182 if auto_key in existing_scores:
183 score = existing_scores[auto_key]
184 if score.score is not None and int(score.score) == int(score_dict.get('score', -1)):
185 continue
186 initial_score_value = score.score
187 score.score = score_dict['score']
188 update.log_score_event(session, score, initial_score_value, False,
189 project, target_user, autoscore=True)
190 else:
191 score = Score(question_instance_id=score_dict['question_id'],
192 scoreset_id=score_dict['scoreset_id'],
193 issue_id=score_dict['issue_id'],
194 score=score_dict['score'])
195 unsaved_scores.append(score)
196 session.add(score)
197 # Need ID values for newly created score objects
198 session.flush()
199 for score in unsaved_scores:
200 update.log_score_event(session, score, None, True, project, target_user, autoscore=True)
203@http
204def get_section_scoresummaries(
205 session, user, section_id, scoreset_id) -> serial.ScoreSummary:
206 """
207 A report summarising scores for all submitted Issues for a Section
209 For each subsection a summary of the number of questions and the number of scored
210 questions is provided. This data can be used to calculate a percentage completion
211 for each subsection. A score total for that subsection is also provided.
213 For each question within the current section a single score value is provided
215 A user with the permission ISSUE_VIEW_AGREED_SCORES can see other users'
216 scores sets.
217 """
218 section = fetch.section(session, section_id)
219 project = section.project
220 permission = Score.get_permission_for_scoreset(user, scoreset_id)
221 validate.check(user, permission,
222 project=project,
223 section_id=section.id)
224 sub = fetch.subsection_scoressummary(session, user, project, section, scoreset_id)
225 return {
226 'subsections': fetch.section_scoresummary(session, user, project, section, sub),
227 'questions': fetch.question_scoresummary(session, user, project, section, scoreset_id)
228 }
231@http
232def get_project_scoresets(session, user,
233 project_id, scoreset_id='') -> List[serial.ScoreSets]:
234 '''
235 Returns a list of scoresets that the user can view
236 '''
238 project = fetch.project(session, project_id)
240 # VIEW_AGREED_SCORE permission allows a user to view other user's score sets
241 if user.has_permission(perms.ISSUE_VIEW_AGREED_SCORES):
242 sq = session.query(Score.scoreset_id, User.fullname)\
243 .join(Issue)\
244 .outerjoin(User, Score.scoreset_id == User.id)\
245 .filter(Issue.project == project,
246 Score.scoreset_id != "")\
247 .distinct()
249 sc = [row._asdict() for row in sq]
250 sc.append({'scoreset_id': '__default__',
251 'fullname': 'Agreed Scoring Set'})
252 return sc
254 else:
255 user.check_permission(perms.ISSUE_VIEW_SCORES)
256 return [{'scoreset_id': user.id, 'fullname': user.fullname}]
259@http
260def get_question_issue_comments(session, user, question_id, issue_id, scoreset_id=''):
262 question = fetch.question(session, question_id)
263 validate.check(user, perms.PROJECT_VIEW_QUESTIONNAIRE,
264 project=question.project,
265 section_id=question.section_id)
266 issue = question.project.get_issue(issue_id)
268 score_permission = Score.get_permission_for_scoreset(user, scoreset_id)
270 validate.check(user, score_permission, issue=issue,
271 section_id=question.section_id, project=question.project)
273 try:
274 score = session.query(Score).filter(Score.question_instance_id == question.id,
275 Score.issue_id == issue.id,
276 Score.scoreset_id == scoreset_id).one()
277 except NoResultFound:
278 return []
280 return [comment.as_dict() for comment in score.comments]
282@http
283def post_section_scoreset_scores(session: Session, user, section_id, scoreset_id, section_score_docs) -> List[serial.Id]:
284 '''Bulk create/update scores'''
286 section = fetch.section(session, section_id)
287 project = section.project
288 validate.check(user, perms.PROJECT_VIEW_QUESTIONNAIRE,
289 project=project,
290 section_id=section_id,
291 deny_restricted=False)
293 score_perm = Score.get_permission_for_scoreset(
294 user, scoreset_id, to_save=True)
295 validate.check(user, score_perm, project=project,
296 section_id=section_id, deny_restricted=False)
297 data = []
298 for doc in section_score_docs:
299 question = fetch.question_of_section(
300 session, section_id, doc['question_id'])
301 for score in doc['scores']:
302 issue = project.get_issue(score['issue_id'])
303 score_doc = {
304 'issue_id': issue.id,
305 'score_value': score['score_value'],
306 'scoreset_id': scoreset_id,
307 }
308 score, created = fetch.or_create_score(
309 session, project, user, question, score_doc)
310 initial_score_value = score.score
311 score_value = score_doc['score_value']
312 Score.check_score_value(score_value, project)
313 score.score = score_value
314 data.append({
315 'score': score,
316 'initial_score_value': initial_score_value,
317 'created': created
318 })
320 session.flush()
321 score_ids = []
323 for item in data:
324 session.flush()
325 update.log_score_event(session, item['score'], item['initial_score_value'],
326 item['created'], project, user)
327 score_ids.append(item['score'].id)
328 return score_ids