Coverage for rfpy/model/helpers.py: 100%
41 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-24 10:52 +0000
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-24 10:52 +0000
1from functools import wraps
2from typing import Set, TYPE_CHECKING, Iterable
4from pydantic import BaseModel
6from rfpy.auth import LacksPermission, AuthorizationFailure
8if TYPE_CHECKING:
9 from rfpy.model import AuditEvent
10 from rfpy.model.meta import Base
13class ensure:
14 def __init__(self, perm, is_vendor=False, is_participant=False):
15 self.perm = perm
16 if not (is_vendor or is_participant):
17 raise ValueError("either is_vendor or is_participant must be True")
18 self.is_participant = is_participant
19 self.is_vendor = is_vendor
21 def __call__(self, func):
22 @wraps(func)
23 def wrapped(this, user):
24 # this == self == Issue instance
25 if not user.has_permission(self.perm):
26 raise LacksPermission(self.perm, user.id)
28 if (
29 self.is_participant
30 and user.organisation not in this.project.participants
31 ):
32 m = (
33 "%s does not belong to a participant "
34 + "organisation so cannot perform this action"
35 )
36 raise AuthorizationFailure(message=m % user)
38 if self.is_vendor and user.organisation != this.respondent:
39 m = (
40 "%s does not belong to respondent %s "
41 + "so cannot perform this action"
42 )
43 raise AuthorizationFailure(message=m % (user, this.respondent))
45 return func(this, user)
47 return wrapped
50def validate_section_children(
51 session,
52 node_type: type,
53 provided_id_set: Set[int],
54 found_node_ids: Set[int],
55 current_child_ids: Set[int],
56 delete_orphans=False,
57):
58 """
59 Validate the set of provided ids. Ensure:
60 - that the set of provided_ids matches that of the found_ids
61 - that any missing ids (ie present in current_child_ids but missing in provided_id_set) are
62 only provided if delete_orphans is True
63 """
64 if len(found_node_ids) != len(provided_id_set):
65 alien = provided_id_set - found_node_ids
66 m = f"Some provided {node_type.__name__} ids not found in this project: {alien}"
67 raise ValueError(m)
69 potential_orphans = current_child_ids - provided_id_set
70 if potential_orphans and not delete_orphans:
71 m = f"{node_type.__name__} ids {potential_orphans} exist in the current section but were "
72 m += "not provided in the provided ID list. Will not delete because delete_orphans is false"
73 raise ValueError(m)
76def audited_patch(
77 db_model: "Base",
78 serial_doc: "BaseModel",
79 audit_event: "AuditEvent",
80 patch_keys: Iterable[str],
81):
82 """
83 Patch the db_model with the serial_doc for the given keys and record the changes to the audit_event
84 """
85 for k in patch_keys:
86 old = getattr(db_model, k)
87 new = getattr(serial_doc, k)
88 if old != new:
89 audit_event.add_change(k, old, new)
90 setattr(db_model, k, new)