Coverage for rfpy/model/audit/event.py: 100%

151 statements  

« prev     ^ index     » next       coverage.py v7.0.1, created at 2022-12-31 16:00 +0000

1import logging 

2from enum import Enum 

3from typing import List 

4from xml.etree import ElementTree as ET 

5 

6 

7from sqlalchemy import (Column, Boolean, DateTime, Integer, 

8 ForeignKey, event) 

9from sqlalchemy import Index, text as sa_text 

10from sqlalchemy.orm import relationship, validates, backref 

11from sqlalchemy.dialects.mysql import VARCHAR, LONGTEXT, TINYINT 

12import sqlalchemy.types 

13 

14 

15from rfpy.model.meta import Base 

16from rfpy.model.audit import visible 

17 

18log = logging.getLogger(__name__) 

19 

20 

21class EventKlass: 

22 ANSWER = 'ANSWER' 

23 BASE = 'BASE' 

24 ISSUE = 'ISSUE' 

25 ORGANISATION = 'ORGANISATION' 

26 PROJECT = 'PROJECT' 

27 QUESTION = 'QUESTION' 

28 QUESTIONNAIRE = 'QUESTIONNAIRE' 

29 ROLE = 'ROLE' 

30 SCORE = 'SCORE' 

31 SCORE_COMMENT = 'SCORE_COMMENT' 

32 USER = 'USER' 

33 SECTION = 'SECTION' 

34 

35 

36PROJECT_KLASSES = {EventKlass.ISSUE, EventKlass.PROJECT, EventKlass.QUESTION, 

37 EventKlass.QUESTIONNAIRE, EventKlass.SCORE, EventKlass.SCORE_COMMENT, 

38 EventKlass.SECTION} 

39 

40 

41class PropertyChanges(object): 

42 

43 def __init__(self): 

44 self.root = ET.Element('d') 

45 

46 def add(self, property_name, old_value, new_value): 

47 change = ET.SubElement(self.root, 'c', name=property_name) 

48 ET.SubElement(change, 'old').text = str(old_value) 

49 ET.SubElement(change, 'new').text = str(new_value) 

50 

51 def as_string(self): 

52 return ET.tostring(self.root, encoding="unicode") 

53 

54 

55def change_to_json(change): 

56 js = dict(name=change.get('name')) 

57 for vals in change: 

58 js[vals.tag] = vals.text 

59 return js 

60 

61 

62class Status(Enum): 

63 pending = 0 

64 done = 1 

65 error = 2 

66 

67 

68class EventStatusType(sqlalchemy.types.TypeDecorator): 

69 

70 impl = TINYINT(1) 

71 

72 cache_ok = True 

73 

74 def process_bind_param(self, status, dialect): 

75 return status.value 

76 

77 def process_result_value(self, int_value, dialect): 

78 return Status(int_value).name 

79 

80 

81class AuditEvent(Base): 

82 

83 ''' @DynamicAttrs ''' 

84 

85 __tablename__ = 'audit_events' 

86 

87 public_attrs = ('id', 'event_class', 'event_type', 'timestamp', 'question_id', 

88 'project_id', 'issue_id', 'user_id', 'org_id', 'object_id', 

89 'changes', 'user') 

90 

91 __table_args__ = ( 

92 Index('event_object_id', 'object_id'), 

93 Index('audit_events_user', 'user_id'), 

94 Index('audit_events_type', 'type'), 

95 Index('audit_events_timestamp', 'timestamp', 'id'), 

96 Index('audit_events_question', 'question_id'), 

97 Index('audit_events_project', 'project_id'), 

98 Index('audit_events_ord', 'org_id'), 

99 Index('audit_events_issue', 'issue_id'), 

100 Index('audit_events_enqueued', 'enqueued') 

101 ) + Base.__table_args__ 

102 

103 project_id = Column(Integer, nullable=True) 

104 issue_id = Column(Integer, nullable=True) 

105 question_id = Column(Integer) 

106 event_class = Column('class', VARCHAR(length=20)) 

107 event_type = Column('type', VARCHAR(length=50)) 

108 object_id = Column(VARCHAR(length=150)) 

109 user_id = Column(VARCHAR(length=150)) 

110 org_id = Column(VARCHAR(length=150)) 

111 private = Column(Boolean, 

112 nullable=False, default=0) 

113 timestamp = Column(DateTime, nullable=False, 

114 server_default=sa_text("CURRENT_TIMESTAMP")) 

115 text = Column(LONGTEXT()) 

116 status = Column( 

117 'enqueued', 

118 EventStatusType, 

119 default=Status.pending, 

120 nullable=False, 

121 server_default=sa_text("'0'") 

122 ) 

123 session_id = Column(VARCHAR(length=50), nullable=True) 

124 

125 project = relationship('Project', backref=backref('events', lazy='dynamic'), 

126 primaryjoin=('foreign(AuditEvent.project_id)' 

127 '==remote(Project.id)')) 

128 

129 issue = relationship('Issue', backref=backref('events', lazy='dynamic'), 

130 primaryjoin='foreign(AuditEvent.issue_id)==remote(Issue.id)') 

131 

132 organisation = relationship('Organisation', 

133 backref=backref('events', lazy='dynamic'), 

134 primaryjoin=('foreign(AuditEvent.org_id)' 

135 '==remote(Organisation.id)')) 

136 user = relationship('User', 

137 primaryjoin='foreign(AuditEvent.user_id)==remote(User.id)') 

138 

139 def __init__(self, *args, **kwargs): 

140 super(AuditEvent, self).__init__(*args, **kwargs) 

141 if self.event_class is None: 

142 self.event_class = self.event_type.split('_')[0] 

143 

144 def __repr__(self): 

145 return f'<AuditEvent #{self.id}, Status: {self.status}>' 

146 

147 def set_text(self, text): # pragma: no cover 

148 # subclasses to specialise 

149 self.text = str(text) 

150 

151 @validates('object_id') 

152 def check_object_id_unicode(self, key, object_id): 

153 return str(object_id) 

154 

155 @classmethod 

156 def create(cls, event_type_name: str, **kwargs) -> 'AuditEvent': 

157 '''Creates an instance of the given event type''' 

158 

159 if 'user' in kwargs: 

160 user = kwargs.pop('user') 

161 kwargs['user_id'] = kwargs.get('user_id', user.id) 

162 kwargs['org_id'] = kwargs.get('org_id', user.org_id) 

163 

164 change_list = kwargs.pop('change_list', None) 

165 

166 evt = cls(event_type=event_type_name, **kwargs) 

167 

168 if change_list: 

169 for prop_name, old_value, new_value in change_list: 

170 evt.add_change(prop_name, old_value, new_value) 

171 

172 evt.build_acl() 

173 return evt 

174 

175 @property 

176 def changes(self) -> List: 

177 ''' 

178 Parses the xml saved in the text column and returns a json 

179 representation of the property changes associated with this event 

180 ''' 

181 

182 if not self.text: 

183 return [] 

184 try: 

185 changes_xml = ET.fromstring(self.text) 

186 return [change_to_json(c) for c in changes_xml] 

187 except Exception: 

188 log.exception('Error attemping to parse AuditEvent xml text') 

189 return [] 

190 

191 @changes.setter 

192 def changes(self, property_changes): 

193 self.text = property_changes.as_string() 

194 

195 def add_change(self, prop_name, old_value, new_value): 

196 if getattr(self, '_pc', None) is None: 

197 self._pc = PropertyChanges() 

198 self._pc.add(prop_name, old_value, new_value) 

199 self.changes = self._pc 

200 

201 def build_acl(self): 

202 '''Create EventOrgACL records for this event''' 

203 

204 event_type = self.event_type 

205 

206 acl_orgid_set = set() 

207 

208 if event_type in visible.to_participants: 

209 for participant in self.project.participants: 

210 acl_orgid_set.add(participant.org_id) 

211 

212 if event_type in visible.to_respondent and self.issue is not None: 

213 acl_orgid_set.add(self.issue.respondent_id) 

214 

215 if event_type in visible.to_initiator: 

216 acl_orgid_set.add(self.org_id) 

217 

218 for org_id in acl_orgid_set: 

219 self.add_to_acl(org_id) 

220 

221 return acl_orgid_set 

222 

223 def add_to_acl(self, org_id: str): 

224 self.acl.append(EventOrgACL(org_id=org_id)) 

225 

226 def set_done(self): 

227 self.status = Status.done 

228 

229 

230class EventOrgACL(Base): 

231 ''' 

232 Provides an Access Control List to restrict which organisations 

233 can view which events. 

234 ''' 

235 __tablename__ = 'audit_event_orgs' 

236 __table_args__ = ( 

237 Index('audit_event_org_idx', 'org_id', unique=False), 

238 ) + Base.__table_args__ 

239 

240 event_id = Column(Integer, ForeignKey('audit_events.id', 

241 ondelete='CASCADE', 

242 name='audit_event_orgs_ibfk_1')) 

243 

244 org_id = Column( 

245 VARCHAR(length=150), 

246 ForeignKey('organisations.id', ondelete='CASCADE', onupdate='CASCADE') 

247 ) 

248 

249 event = relationship(AuditEvent, lazy='joined', backref='acl') 

250 organisation = relationship('Organisation') 

251 

252 def __repr__(self) -> str: 

253 return f"<EventACL Event {self.event_id} Org {self.org_id}>" 

254 

255 

256@event.listens_for(AuditEvent, 'before_insert', propagate=True) 

257def serialize_property_changes(mapper, connection, target): 

258 if hasattr(target, '_pc'): 

259 target.text = target._pc.as_string() 

260 

261 

262@event.listens_for(AuditEvent, 'before_insert', propagate=True) 

263def set_status_done_if_no_handler(mapper, connection, target: AuditEvent): 

264 from rfpy.jobs.events.action import handler_exists_for 

265 if not handler_exists_for(target): 

266 target.set_done() 

267 

268 

269@event.listens_for(AuditEvent, 'after_insert') 

270def enqueue_event_job(mapper, connection, target: AuditEvent): 

271 from rfpy.jobs.events.action import handler_exists_for 

272 if handler_exists_for(target): 

273 from rfpy.jobs.offload import spool_event_handler 

274 evt_id = str(target.id).encode('utf8') 

275 spool_file = spool_event_handler({'event_id': evt_id}) 

276 log.info('Event %s spooled to file %s', target.id, spool_file)