Coverage for rfpy/model/notify.py: 100%
100 statements
« prev ^ index » next coverage.py v7.0.1, created at 2022-12-31 16:00 +0000
« prev ^ index » next coverage.py v7.0.1, created at 2022-12-31 16:00 +0000
1import enum # Don't import Enum here - shadows the SQLAlchemy type
2from datetime import datetime
4from sqlalchemy import (
5 Column, DateTime, text, Integer, ForeignKey, func, Index
6)
7from sqlalchemy.dialects.mysql import INTEGER, VARCHAR, ENUM, LONGTEXT
8from sqlalchemy.dialects.mysql.types import TINYINT
9from sqlalchemy.orm import relationship, backref
11from rfpy.model.meta import Base
14class ProjectWatchList(Base):
16 __tablename__ = "project_watch_list"
17 __table_args__ = (
18 Index("_user_project_wl_uc", "user_id", "project_id", unique=True),
19 {"mysql_engine": "InnoDB", "mysql_charset": "utf8mb4"}
20 )
22 project_id = Column(INTEGER(10), ForeignKey('projects.id', ondelete='CASCADE'))
23 user_id = Column(VARCHAR(length=50), ForeignKey('users.id', ondelete='CASCADE'))
24 date_created = Column(DateTime, default=func.now())
26 project = relationship('Project',
27 backref=backref('watch_list',
28 lazy='dynamic',
29 cascade='all,delete',
30 passive_deletes=True))
31 user = relationship('User', lazy='joined', back_populates='project_watches')
33 def __repr__(self):
34 return f"<ProjectWatchList: [{self.user_id}] watching {self.project}>"
37class IssueWatchList(Base):
39 __tablename__ = "issue_watch_list"
40 __table_args__ = (
41 Index("_user_issue_wl_uc", "user_id", "issue_id", unique=True),
42 {"mysql_engine": "InnoDB", "mysql_charset": "utf8mb4"}
43 )
45 issue_id = Column(INTEGER(10), ForeignKey('issues.id', ondelete='CASCADE'))
46 user_id = Column(VARCHAR(length=50), ForeignKey('users.id', ondelete='CASCADE'))
47 date_created = Column(DateTime, default=func.now())
49 issue = relationship('Issue',
50 backref=backref('watch_list', lazy='dynamic',
51 cascade='all,delete', passive_deletes=True),
52 )
53 user = relationship('User', lazy='joined',
54 backref=backref('issue_watch_list',
55 cascade='all,delete', passive_deletes=True)
56 )
58 def __repr__(self):
59 return 'Issue WatchList: [%s] watching %s' % (self.user_id, self.issue)
62class EmailNotification(Base):
63 __tablename__ = 'notification_queue'
64 __table_args__ = (
65 Index("_user_event_em_uc", "user_id", "event_id", unique=True),
66 {"mysql_engine": "InnoDB", "mysql_charset": "utf8mb4"}
67 )
69 class Status(enum.Enum):
70 enqueued = 1
71 sent = 2
72 delivered = 3
73 opened = 4
74 bounced = 5
75 failed = 6
77 template_name = Column(VARCHAR(100), nullable=False, default=text("''"))
78 user_id = Column(
79 VARCHAR(50),
80 ForeignKey('users.id', ondelete='SET NULL', onupdate='CASCADE'),
81 nullable=True
82 )
83 org_id = Column(
84 VARCHAR(50),
85 ForeignKey('organisations.id', ondelete='CASCADE', onupdate='CASCADE'),
86 nullable=True,
87 server_default=text("''")
88 )
89 event_id = Column(Integer, ForeignKey('audit_events.id', ondelete='CASCADE'),
90 nullable=False, index=True, server_default=text("'0'"))
91 email = Column(VARCHAR(50))
92 updated_date = Column(DateTime, nullable=False, default=func.now())
93 sent_date = Column(DateTime)
94 delivered_date = Column(DateTime)
95 opened_date = Column(DateTime)
96 error = Column(LONGTEXT())
97 message_id = Column(VARCHAR(255), index=True)
98 delivery_status = Column(
99 ENUM(Status),
100 default=Status.enqueued.name
101 )
103 organisation = relationship('Organisation')
104 user = relationship('User')
105 event = relationship('AuditEvent',
106 backref=backref('notifications_q', lazy='dynamic'))
108 def __repr__(self):
109 return (f'<EmailNotification #{self.id} ({self.delivery_status}) '
110 f'for {self.email} Evt {self.event_id}>')
112 def set_sent(self):
113 self.delivery_status = self.Status.sent
114 self.sent_date = datetime.now()
116 def set_delivered(self):
117 self.delivery_status = self.Status.delivered
118 self.delivered_date = datetime.now()
120 def set_bounced(self):
121 self.delivery_status = self.Status.bounced
123 def set_failed(self):
124 self.delivery_status = self.Status.failed
127class DeliveryStatus(enum.Enum):
128 untried = 'untried'
129 delivered = 'delivered'
130 failing = 'failing'
131 aborted = 'aborted'
134class WebhookSubscription(Base):
135 __tablename__ = 'webhook_subscriptions'
136 __table_args__ = (
137 {"mysql_engine": "InnoDB", "mysql_charset": "utf8mb4"}
138 )
139 MAX_ATTEMPTS = 4
141 id = None
142 org_id = Column(
143 VARCHAR(50),
144 ForeignKey('organisations.id', ondelete='CASCADE', onupdate='CASCADE'),
145 primary_key=True,
146 nullable=False
147 )
148 event_type = Column(VARCHAR(length=50), nullable=False, primary_key=True)
149 remote_url = Column(VARCHAR(150), nullable=False)
150 delivery_status = Column(ENUM(DeliveryStatus), default=DeliveryStatus.untried, nullable=False)
151 http_header = Column(VARCHAR(100))
152 last_delivery = Column(DateTime)
153 error_message = Column(VARCHAR(100))
154 retries = Column(TINYINT(), default=0, nullable=False)
156 organisation = relationship("Organisation", back_populates="webhook_subscriptions")
158 def __repr__(self):
159 return f"<WebhookSubscription #{self.id} for {self.org_id}: {self.remote_url}>"
161 def set_delivered(self):
162 self.delivery_status = DeliveryStatus.delivered
163 self.last_delivery = datetime.now()
164 self.retries = 0
166 def set_failed_attempt(self, msg='Attempt to POST to webhook failed'):
167 self.delivery_status = DeliveryStatus.failing
168 self.error_message = msg
169 self.retries += 1
171 def set_aborted(self):
172 self.delivery_status = DeliveryStatus.aborted
173 if not self.error_message:
174 m = f"Given up trying after {self.MAX_ATTEMPTS} attempts"
175 self.error_message = m
177 def reset_status(self):
178 self.delivery_status = DeliveryStatus.untried
179 self.retries = 0
180 self.error_message = None
182 @property
183 def max_tries_exceeded(self) -> bool:
184 return self.retries >= self.MAX_ATTEMPTS
186 @property
187 def is_aborted(self) -> bool:
188 return self.delivery_status == DeliveryStatus.aborted