Coverage for rfpy/vendor/validation.py: 100%
77 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-24 10:52 +0000
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-24 10:52 +0000
1from datetime import datetime
3from rfpy.auth import AuthorizationFailure, ValidationErrors, vendor_actions, perms
6def validate(
7 effective_user,
8 issue=None,
9 action=perms.ISSUE_VIEW_ANSWERS,
10 question=None,
11 section=None,
12 response_state=None,
13 answer=None,
14 bulk_import=False,
15):
16 validator = Validator(
17 effective_user,
18 issue=issue,
19 action=action,
20 question=question,
21 section=section,
22 response_state=response_state,
23 answer=answer,
24 bulk_import=bulk_import,
25 )
26 validator.validate()
29class Validator:
30 def __init__(
31 self,
32 user,
33 issue=None,
34 action=perms.ISSUE_VIEW_ANSWERS,
35 question=None,
36 section=None,
37 response_state=None,
38 answer=None,
39 bulk_import=False,
40 ):
41 self.user = user
42 self.issue = issue
43 self.action = action
44 self.question = question
45 self.section = section
46 self.response_state = response_state
47 self.answer = None
48 self.bulk_import = bulk_import
49 self.errors = ValidationErrors(action)
50 self.now = datetime.now()
52 def validate(self):
53 if self.issue is not None:
54 self.check_visibility()
55 if self.action:
56 self.check_user_permissions()
57 self.check_rules()
58 self.raise_on_error()
60 def check_visibility(self):
61 if not self.issue.respondent_id == self.user.organisation.id:
62 m = "Not Permitted: {} does not belong to Respondent organisation {}"
63 self.fail(m, self.user, self.issue.respondent_id)
65 if (
66 self.question is not None
67 and self.question.project_id != self.issue.project_id
68 ):
69 self.fail(
70 "Question # {} does not belong to Project {}",
71 self.question.safe_number,
72 self.question.project_id,
73 )
75 if (
76 self.section is not None
77 and self.section.project_id != self.issue.project_id
78 ):
79 self.fail(
80 f"Section ID {self.section.id} does not belong to Issue ID{self.issue.id}"
81 )
83 def check_user_permissions(self):
84 if not self.user.has_permission(self.action):
85 self.fail("User {} lacks permission {}", self.user, self.action)
87 def check_rules(self):
88 action = self.action
89 if action == perms.ISSUE_VIEW_WINLOSS:
90 self.authorise_winloss()
92 if action in vendor_actions.ISSUE_ACTIONS:
93 self.check_issue_rules(self.action, self.issue)
95 def check_issue_rules(self, action, issue):
96 if issue is None:
97 self.fail("Issue not provided, cannot verify permission to {}", action)
98 return
99 if action not in vendor_actions.ISSUE_STATUS_ACTIONS[issue.status]:
100 self.invalid_issue_status()
101 if action in vendor_actions.ILLEGAL_AFTER_DEADLINE:
102 if issue.deadline_passed:
103 act = perms.title_cased(action)
104 self.fail("Unable to {}, Deadline ({}) has passed", act, issue.deadline)
105 if action in vendor_actions.ANSWERING_ACTIONS:
106 self.check_answering_rules(action, issue)
108 def check_answering_rules(self, action, issue):
109 if not issue.use_workflow or self.bulk_import:
110 return # no special rules
112 if self.response_state is None:
113 self.fail("response_state not provided - cannot determine permissions")
115 elif self.response_state.allocated_to != self.user.id:
116 answer_anything = perms.ANSWER_QUESTIONS_ALLOCATED_TO_ANYONE
117 if not self.user.has_permission(answer_anything):
118 m = (
119 f"{self.user.id} has not been allocated responsibility"
120 f" so cannot save this answer without "
121 f' "{perms.ANSWER_QUESTIONS_ALLOCATED_TO_ANYONE}" permission'
122 )
123 self.fail(m)
125 def authorise_winloss(self):
126 issue = self.issue
127 is_not_exposed = not issue.winloss_exposed
128 is_expired = (
129 issue.winloss_expiry is not None and issue.winloss_expiry < datetime.now()
130 )
132 if is_not_exposed or is_expired:
133 self.fail("WinLoss Reports not exposed for this Response")
135 def fail(self, tmpl, *args, **kwargs):
136 msg = tmpl.format(*args, **kwargs)
137 self.errors.auth_failure(msg)
139 def invalid_issue_status(self):
140 self.errors.invalid_issue_status(self.issue.status)
142 def raise_on_error(self):
143 if self.errors.has_errors:
144 raise AuthorizationFailure(errors=self.errors)