Coverage for rfpy/model/helpers.py: 100%

41 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-04-24 10:52 +0000

1from functools import wraps 

2from typing import Set, TYPE_CHECKING, Iterable 

3 

4from pydantic import BaseModel 

5 

6from rfpy.auth import LacksPermission, AuthorizationFailure 

7 

8if TYPE_CHECKING: 

9 from rfpy.model import AuditEvent 

10 from rfpy.model.meta import Base 

11 

12 

13class ensure: 

14 def __init__(self, perm, is_vendor=False, is_participant=False): 

15 self.perm = perm 

16 if not (is_vendor or is_participant): 

17 raise ValueError("either is_vendor or is_participant must be True") 

18 self.is_participant = is_participant 

19 self.is_vendor = is_vendor 

20 

21 def __call__(self, func): 

22 @wraps(func) 

23 def wrapped(this, user): 

24 # this == self == Issue instance 

25 if not user.has_permission(self.perm): 

26 raise LacksPermission(self.perm, user.id) 

27 

28 if ( 

29 self.is_participant 

30 and user.organisation not in this.project.participants 

31 ): 

32 m = ( 

33 "%s does not belong to a participant " 

34 + "organisation so cannot perform this action" 

35 ) 

36 raise AuthorizationFailure(message=m % user) 

37 

38 if self.is_vendor and user.organisation != this.respondent: 

39 m = ( 

40 "%s does not belong to respondent %s " 

41 + "so cannot perform this action" 

42 ) 

43 raise AuthorizationFailure(message=m % (user, this.respondent)) 

44 

45 return func(this, user) 

46 

47 return wrapped 

48 

49 

50def validate_section_children( 

51 session, 

52 node_type: type, 

53 provided_id_set: Set[int], 

54 found_node_ids: Set[int], 

55 current_child_ids: Set[int], 

56 delete_orphans=False, 

57): 

58 """ 

59 Validate the set of provided ids. Ensure: 

60 - that the set of provided_ids matches that of the found_ids 

61 - that any missing ids (ie present in current_child_ids but missing in provided_id_set) are 

62 only provided if delete_orphans is True 

63 """ 

64 if len(found_node_ids) != len(provided_id_set): 

65 alien = provided_id_set - found_node_ids 

66 m = f"Some provided {node_type.__name__} ids not found in this project: {alien}" 

67 raise ValueError(m) 

68 

69 potential_orphans = current_child_ids - provided_id_set 

70 if potential_orphans and not delete_orphans: 

71 m = f"{node_type.__name__} ids {potential_orphans} exist in the current section but were " 

72 m += "not provided in the provided ID list. Will not delete because delete_orphans is false" 

73 raise ValueError(m) 

74 

75 

76def audited_patch( 

77 db_model: "Base", 

78 serial_doc: "BaseModel", 

79 audit_event: "AuditEvent", 

80 patch_keys: Iterable[str], 

81): 

82 """ 

83 Patch the db_model with the serial_doc for the given keys and record the changes to the audit_event 

84 """ 

85 for k in patch_keys: 

86 old = getattr(db_model, k) 

87 new = getattr(serial_doc, k) 

88 if old != new: 

89 audit_event.add_change(k, old, new) 

90 setattr(db_model, k, new)