Coverage for rfpy/vendor/validation.py: 100%

77 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-04-24 10:52 +0000

1from datetime import datetime 

2 

3from rfpy.auth import AuthorizationFailure, ValidationErrors, vendor_actions, perms 

4 

5 

6def validate( 

7 effective_user, 

8 issue=None, 

9 action=perms.ISSUE_VIEW_ANSWERS, 

10 question=None, 

11 section=None, 

12 response_state=None, 

13 answer=None, 

14 bulk_import=False, 

15): 

16 validator = Validator( 

17 effective_user, 

18 issue=issue, 

19 action=action, 

20 question=question, 

21 section=section, 

22 response_state=response_state, 

23 answer=answer, 

24 bulk_import=bulk_import, 

25 ) 

26 validator.validate() 

27 

28 

29class Validator: 

30 def __init__( 

31 self, 

32 user, 

33 issue=None, 

34 action=perms.ISSUE_VIEW_ANSWERS, 

35 question=None, 

36 section=None, 

37 response_state=None, 

38 answer=None, 

39 bulk_import=False, 

40 ): 

41 self.user = user 

42 self.issue = issue 

43 self.action = action 

44 self.question = question 

45 self.section = section 

46 self.response_state = response_state 

47 self.answer = None 

48 self.bulk_import = bulk_import 

49 self.errors = ValidationErrors(action) 

50 self.now = datetime.now() 

51 

52 def validate(self): 

53 if self.issue is not None: 

54 self.check_visibility() 

55 if self.action: 

56 self.check_user_permissions() 

57 self.check_rules() 

58 self.raise_on_error() 

59 

60 def check_visibility(self): 

61 if not self.issue.respondent_id == self.user.organisation.id: 

62 m = "Not Permitted: {} does not belong to Respondent organisation {}" 

63 self.fail(m, self.user, self.issue.respondent_id) 

64 

65 if ( 

66 self.question is not None 

67 and self.question.project_id != self.issue.project_id 

68 ): 

69 self.fail( 

70 "Question # {} does not belong to Project {}", 

71 self.question.safe_number, 

72 self.question.project_id, 

73 ) 

74 

75 if ( 

76 self.section is not None 

77 and self.section.project_id != self.issue.project_id 

78 ): 

79 self.fail( 

80 f"Section ID {self.section.id} does not belong to Issue ID{self.issue.id}" 

81 ) 

82 

83 def check_user_permissions(self): 

84 if not self.user.has_permission(self.action): 

85 self.fail("User {} lacks permission {}", self.user, self.action) 

86 

87 def check_rules(self): 

88 action = self.action 

89 if action == perms.ISSUE_VIEW_WINLOSS: 

90 self.authorise_winloss() 

91 

92 if action in vendor_actions.ISSUE_ACTIONS: 

93 self.check_issue_rules(self.action, self.issue) 

94 

95 def check_issue_rules(self, action, issue): 

96 if issue is None: 

97 self.fail("Issue not provided, cannot verify permission to {}", action) 

98 return 

99 if action not in vendor_actions.ISSUE_STATUS_ACTIONS[issue.status]: 

100 self.invalid_issue_status() 

101 if action in vendor_actions.ILLEGAL_AFTER_DEADLINE: 

102 if issue.deadline_passed: 

103 act = perms.title_cased(action) 

104 self.fail("Unable to {}, Deadline ({}) has passed", act, issue.deadline) 

105 if action in vendor_actions.ANSWERING_ACTIONS: 

106 self.check_answering_rules(action, issue) 

107 

108 def check_answering_rules(self, action, issue): 

109 if not issue.use_workflow or self.bulk_import: 

110 return # no special rules 

111 

112 if self.response_state is None: 

113 self.fail("response_state not provided - cannot determine permissions") 

114 

115 elif self.response_state.allocated_to != self.user.id: 

116 answer_anything = perms.ANSWER_QUESTIONS_ALLOCATED_TO_ANYONE 

117 if not self.user.has_permission(answer_anything): 

118 m = ( 

119 f"{self.user.id} has not been allocated responsibility" 

120 f" so cannot save this answer without " 

121 f' "{perms.ANSWER_QUESTIONS_ALLOCATED_TO_ANYONE}" permission' 

122 ) 

123 self.fail(m) 

124 

125 def authorise_winloss(self): 

126 issue = self.issue 

127 is_not_exposed = not issue.winloss_exposed 

128 is_expired = ( 

129 issue.winloss_expiry is not None and issue.winloss_expiry < datetime.now() 

130 ) 

131 

132 if is_not_exposed or is_expired: 

133 self.fail("WinLoss Reports not exposed for this Response") 

134 

135 def fail(self, tmpl, *args, **kwargs): 

136 msg = tmpl.format(*args, **kwargs) 

137 self.errors.auth_failure(msg) 

138 

139 def invalid_issue_status(self): 

140 self.errors.invalid_issue_status(self.issue.status) 

141 

142 def raise_on_error(self): 

143 if self.errors.has_errors: 

144 raise AuthorizationFailure(errors=self.errors)