Coverage for rfpy/model/audit/event.py: 100%
151 statements
« prev ^ index » next coverage.py v7.0.1, created at 2022-12-31 16:00 +0000
« prev ^ index » next coverage.py v7.0.1, created at 2022-12-31 16:00 +0000
1import logging
2from enum import Enum
3from typing import List
4from xml.etree import ElementTree as ET
7from sqlalchemy import (Column, Boolean, DateTime, Integer,
8 ForeignKey, event)
9from sqlalchemy import Index, text as sa_text
10from sqlalchemy.orm import relationship, validates, backref
11from sqlalchemy.dialects.mysql import VARCHAR, LONGTEXT, TINYINT
12import sqlalchemy.types
15from rfpy.model.meta import Base
16from rfpy.model.audit import visible
18log = logging.getLogger(__name__)
21class EventKlass:
22 ANSWER = 'ANSWER'
23 BASE = 'BASE'
24 ISSUE = 'ISSUE'
25 ORGANISATION = 'ORGANISATION'
26 PROJECT = 'PROJECT'
27 QUESTION = 'QUESTION'
28 QUESTIONNAIRE = 'QUESTIONNAIRE'
29 ROLE = 'ROLE'
30 SCORE = 'SCORE'
31 SCORE_COMMENT = 'SCORE_COMMENT'
32 USER = 'USER'
33 SECTION = 'SECTION'
36PROJECT_KLASSES = {EventKlass.ISSUE, EventKlass.PROJECT, EventKlass.QUESTION,
37 EventKlass.QUESTIONNAIRE, EventKlass.SCORE, EventKlass.SCORE_COMMENT,
38 EventKlass.SECTION}
41class PropertyChanges(object):
43 def __init__(self):
44 self.root = ET.Element('d')
46 def add(self, property_name, old_value, new_value):
47 change = ET.SubElement(self.root, 'c', name=property_name)
48 ET.SubElement(change, 'old').text = str(old_value)
49 ET.SubElement(change, 'new').text = str(new_value)
51 def as_string(self):
52 return ET.tostring(self.root, encoding="unicode")
55def change_to_json(change):
56 js = dict(name=change.get('name'))
57 for vals in change:
58 js[vals.tag] = vals.text
59 return js
62class Status(Enum):
63 pending = 0
64 done = 1
65 error = 2
68class EventStatusType(sqlalchemy.types.TypeDecorator):
70 impl = TINYINT(1)
72 cache_ok = True
74 def process_bind_param(self, status, dialect):
75 return status.value
77 def process_result_value(self, int_value, dialect):
78 return Status(int_value).name
81class AuditEvent(Base):
83 ''' @DynamicAttrs '''
85 __tablename__ = 'audit_events'
87 public_attrs = ('id', 'event_class', 'event_type', 'timestamp', 'question_id',
88 'project_id', 'issue_id', 'user_id', 'org_id', 'object_id',
89 'changes', 'user')
91 __table_args__ = (
92 Index('event_object_id', 'object_id'),
93 Index('audit_events_user', 'user_id'),
94 Index('audit_events_type', 'type'),
95 Index('audit_events_timestamp', 'timestamp', 'id'),
96 Index('audit_events_question', 'question_id'),
97 Index('audit_events_project', 'project_id'),
98 Index('audit_events_ord', 'org_id'),
99 Index('audit_events_issue', 'issue_id'),
100 Index('audit_events_enqueued', 'enqueued')
101 ) + Base.__table_args__
103 project_id = Column(Integer, nullable=True)
104 issue_id = Column(Integer, nullable=True)
105 question_id = Column(Integer)
106 event_class = Column('class', VARCHAR(length=20))
107 event_type = Column('type', VARCHAR(length=50))
108 object_id = Column(VARCHAR(length=150))
109 user_id = Column(VARCHAR(length=150))
110 org_id = Column(VARCHAR(length=150))
111 private = Column(Boolean,
112 nullable=False, default=0)
113 timestamp = Column(DateTime, nullable=False,
114 server_default=sa_text("CURRENT_TIMESTAMP"))
115 text = Column(LONGTEXT())
116 status = Column(
117 'enqueued',
118 EventStatusType,
119 default=Status.pending,
120 nullable=False,
121 server_default=sa_text("'0'")
122 )
123 session_id = Column(VARCHAR(length=50), nullable=True)
125 project = relationship('Project', backref=backref('events', lazy='dynamic'),
126 primaryjoin=('foreign(AuditEvent.project_id)'
127 '==remote(Project.id)'))
129 issue = relationship('Issue', backref=backref('events', lazy='dynamic'),
130 primaryjoin='foreign(AuditEvent.issue_id)==remote(Issue.id)')
132 organisation = relationship('Organisation',
133 backref=backref('events', lazy='dynamic'),
134 primaryjoin=('foreign(AuditEvent.org_id)'
135 '==remote(Organisation.id)'))
136 user = relationship('User',
137 primaryjoin='foreign(AuditEvent.user_id)==remote(User.id)')
139 def __init__(self, *args, **kwargs):
140 super(AuditEvent, self).__init__(*args, **kwargs)
141 if self.event_class is None:
142 self.event_class = self.event_type.split('_')[0]
144 def __repr__(self):
145 return f'<AuditEvent #{self.id}, Status: {self.status}>'
147 def set_text(self, text): # pragma: no cover
148 # subclasses to specialise
149 self.text = str(text)
151 @validates('object_id')
152 def check_object_id_unicode(self, key, object_id):
153 return str(object_id)
155 @classmethod
156 def create(cls, event_type_name: str, **kwargs) -> 'AuditEvent':
157 '''Creates an instance of the given event type'''
159 if 'user' in kwargs:
160 user = kwargs.pop('user')
161 kwargs['user_id'] = kwargs.get('user_id', user.id)
162 kwargs['org_id'] = kwargs.get('org_id', user.org_id)
164 change_list = kwargs.pop('change_list', None)
166 evt = cls(event_type=event_type_name, **kwargs)
168 if change_list:
169 for prop_name, old_value, new_value in change_list:
170 evt.add_change(prop_name, old_value, new_value)
172 evt.build_acl()
173 return evt
175 @property
176 def changes(self) -> List:
177 '''
178 Parses the xml saved in the text column and returns a json
179 representation of the property changes associated with this event
180 '''
182 if not self.text:
183 return []
184 try:
185 changes_xml = ET.fromstring(self.text)
186 return [change_to_json(c) for c in changes_xml]
187 except Exception:
188 log.exception('Error attemping to parse AuditEvent xml text')
189 return []
191 @changes.setter
192 def changes(self, property_changes):
193 self.text = property_changes.as_string()
195 def add_change(self, prop_name, old_value, new_value):
196 if getattr(self, '_pc', None) is None:
197 self._pc = PropertyChanges()
198 self._pc.add(prop_name, old_value, new_value)
199 self.changes = self._pc
201 def build_acl(self):
202 '''Create EventOrgACL records for this event'''
204 event_type = self.event_type
206 acl_orgid_set = set()
208 if event_type in visible.to_participants:
209 for participant in self.project.participants:
210 acl_orgid_set.add(participant.org_id)
212 if event_type in visible.to_respondent and self.issue is not None:
213 acl_orgid_set.add(self.issue.respondent_id)
215 if event_type in visible.to_initiator:
216 acl_orgid_set.add(self.org_id)
218 for org_id in acl_orgid_set:
219 self.add_to_acl(org_id)
221 return acl_orgid_set
223 def add_to_acl(self, org_id: str):
224 self.acl.append(EventOrgACL(org_id=org_id))
226 def set_done(self):
227 self.status = Status.done
230class EventOrgACL(Base):
231 '''
232 Provides an Access Control List to restrict which organisations
233 can view which events.
234 '''
235 __tablename__ = 'audit_event_orgs'
236 __table_args__ = (
237 Index('audit_event_org_idx', 'org_id', unique=False),
238 ) + Base.__table_args__
240 event_id = Column(Integer, ForeignKey('audit_events.id',
241 ondelete='CASCADE',
242 name='audit_event_orgs_ibfk_1'))
244 org_id = Column(
245 VARCHAR(length=150),
246 ForeignKey('organisations.id', ondelete='CASCADE', onupdate='CASCADE')
247 )
249 event = relationship(AuditEvent, lazy='joined', backref='acl')
250 organisation = relationship('Organisation')
252 def __repr__(self) -> str:
253 return f"<EventACL Event {self.event_id} Org {self.org_id}>"
256@event.listens_for(AuditEvent, 'before_insert', propagate=True)
257def serialize_property_changes(mapper, connection, target):
258 if hasattr(target, '_pc'):
259 target.text = target._pc.as_string()
262@event.listens_for(AuditEvent, 'before_insert', propagate=True)
263def set_status_done_if_no_handler(mapper, connection, target: AuditEvent):
264 from rfpy.jobs.events.action import handler_exists_for
265 if not handler_exists_for(target):
266 target.set_done()
269@event.listens_for(AuditEvent, 'after_insert')
270def enqueue_event_job(mapper, connection, target: AuditEvent):
271 from rfpy.jobs.events.action import handler_exists_for
272 if handler_exists_for(target):
273 from rfpy.jobs.offload import spool_event_handler
274 evt_id = str(target.id).encode('utf8')
275 spool_file = spool_event_handler({'event_id': evt_id})
276 log.info('Event %s spooled to file %s', target.id, spool_file)