Coverage for rfpy/model/notify.py: 100%

100 statements  

« prev     ^ index     » next       coverage.py v7.0.1, created at 2022-12-31 16:00 +0000

1import enum # Don't import Enum here - shadows the SQLAlchemy type 

2from datetime import datetime 

3 

4from sqlalchemy import ( 

5 Column, DateTime, text, Integer, ForeignKey, func, Index 

6) 

7from sqlalchemy.dialects.mysql import INTEGER, VARCHAR, ENUM, LONGTEXT 

8from sqlalchemy.dialects.mysql.types import TINYINT 

9from sqlalchemy.orm import relationship, backref 

10 

11from rfpy.model.meta import Base 

12 

13 

14class ProjectWatchList(Base): 

15 

16 __tablename__ = "project_watch_list" 

17 __table_args__ = ( 

18 Index("_user_project_wl_uc", "user_id", "project_id", unique=True), 

19 {"mysql_engine": "InnoDB", "mysql_charset": "utf8mb4"} 

20 ) 

21 

22 project_id = Column(INTEGER(10), ForeignKey('projects.id', ondelete='CASCADE')) 

23 user_id = Column(VARCHAR(length=50), ForeignKey('users.id', ondelete='CASCADE')) 

24 date_created = Column(DateTime, default=func.now()) 

25 

26 project = relationship('Project', 

27 backref=backref('watch_list', 

28 lazy='dynamic', 

29 cascade='all,delete', 

30 passive_deletes=True)) 

31 user = relationship('User', lazy='joined', back_populates='project_watches') 

32 

33 def __repr__(self): 

34 return f"<ProjectWatchList: [{self.user_id}] watching {self.project}>" 

35 

36 

37class IssueWatchList(Base): 

38 

39 __tablename__ = "issue_watch_list" 

40 __table_args__ = ( 

41 Index("_user_issue_wl_uc", "user_id", "issue_id", unique=True), 

42 {"mysql_engine": "InnoDB", "mysql_charset": "utf8mb4"} 

43 ) 

44 

45 issue_id = Column(INTEGER(10), ForeignKey('issues.id', ondelete='CASCADE')) 

46 user_id = Column(VARCHAR(length=50), ForeignKey('users.id', ondelete='CASCADE')) 

47 date_created = Column(DateTime, default=func.now()) 

48 

49 issue = relationship('Issue', 

50 backref=backref('watch_list', lazy='dynamic', 

51 cascade='all,delete', passive_deletes=True), 

52 ) 

53 user = relationship('User', lazy='joined', 

54 backref=backref('issue_watch_list', 

55 cascade='all,delete', passive_deletes=True) 

56 ) 

57 

58 def __repr__(self): 

59 return 'Issue WatchList: [%s] watching %s' % (self.user_id, self.issue) 

60 

61 

62class EmailNotification(Base): 

63 __tablename__ = 'notification_queue' 

64 __table_args__ = ( 

65 Index("_user_event_em_uc", "user_id", "event_id", unique=True), 

66 {"mysql_engine": "InnoDB", "mysql_charset": "utf8mb4"} 

67 ) 

68 

69 class Status(enum.Enum): 

70 enqueued = 1 

71 sent = 2 

72 delivered = 3 

73 opened = 4 

74 bounced = 5 

75 failed = 6 

76 

77 template_name = Column(VARCHAR(100), nullable=False, default=text("''")) 

78 user_id = Column( 

79 VARCHAR(50), 

80 ForeignKey('users.id', ondelete='SET NULL', onupdate='CASCADE'), 

81 nullable=True 

82 ) 

83 org_id = Column( 

84 VARCHAR(50), 

85 ForeignKey('organisations.id', ondelete='CASCADE', onupdate='CASCADE'), 

86 nullable=True, 

87 server_default=text("''") 

88 ) 

89 event_id = Column(Integer, ForeignKey('audit_events.id', ondelete='CASCADE'), 

90 nullable=False, index=True, server_default=text("'0'")) 

91 email = Column(VARCHAR(50)) 

92 updated_date = Column(DateTime, nullable=False, default=func.now()) 

93 sent_date = Column(DateTime) 

94 delivered_date = Column(DateTime) 

95 opened_date = Column(DateTime) 

96 error = Column(LONGTEXT()) 

97 message_id = Column(VARCHAR(255), index=True) 

98 delivery_status = Column( 

99 ENUM(Status), 

100 default=Status.enqueued.name 

101 ) 

102 

103 organisation = relationship('Organisation') 

104 user = relationship('User') 

105 event = relationship('AuditEvent', 

106 backref=backref('notifications_q', lazy='dynamic')) 

107 

108 def __repr__(self): 

109 return (f'<EmailNotification #{self.id} ({self.delivery_status}) ' 

110 f'for {self.email} Evt {self.event_id}>') 

111 

112 def set_sent(self): 

113 self.delivery_status = self.Status.sent 

114 self.sent_date = datetime.now() 

115 

116 def set_delivered(self): 

117 self.delivery_status = self.Status.delivered 

118 self.delivered_date = datetime.now() 

119 

120 def set_bounced(self): 

121 self.delivery_status = self.Status.bounced 

122 

123 def set_failed(self): 

124 self.delivery_status = self.Status.failed 

125 

126 

127class DeliveryStatus(enum.Enum): 

128 untried = 'untried' 

129 delivered = 'delivered' 

130 failing = 'failing' 

131 aborted = 'aborted' 

132 

133 

134class WebhookSubscription(Base): 

135 __tablename__ = 'webhook_subscriptions' 

136 __table_args__ = ( 

137 {"mysql_engine": "InnoDB", "mysql_charset": "utf8mb4"} 

138 ) 

139 MAX_ATTEMPTS = 4 

140 

141 id = None 

142 org_id = Column( 

143 VARCHAR(50), 

144 ForeignKey('organisations.id', ondelete='CASCADE', onupdate='CASCADE'), 

145 primary_key=True, 

146 nullable=False 

147 ) 

148 event_type = Column(VARCHAR(length=50), nullable=False, primary_key=True) 

149 remote_url = Column(VARCHAR(150), nullable=False) 

150 delivery_status = Column(ENUM(DeliveryStatus), default=DeliveryStatus.untried, nullable=False) 

151 http_header = Column(VARCHAR(100)) 

152 last_delivery = Column(DateTime) 

153 error_message = Column(VARCHAR(100)) 

154 retries = Column(TINYINT(), default=0, nullable=False) 

155 

156 organisation = relationship("Organisation", back_populates="webhook_subscriptions") 

157 

158 def __repr__(self): 

159 return f"<WebhookSubscription #{self.id} for {self.org_id}: {self.remote_url}>" 

160 

161 def set_delivered(self): 

162 self.delivery_status = DeliveryStatus.delivered 

163 self.last_delivery = datetime.now() 

164 self.retries = 0 

165 

166 def set_failed_attempt(self, msg='Attempt to POST to webhook failed'): 

167 self.delivery_status = DeliveryStatus.failing 

168 self.error_message = msg 

169 self.retries += 1 

170 

171 def set_aborted(self): 

172 self.delivery_status = DeliveryStatus.aborted 

173 if not self.error_message: 

174 m = f"Given up trying after {self.MAX_ATTEMPTS} attempts" 

175 self.error_message = m 

176 

177 def reset_status(self): 

178 self.delivery_status = DeliveryStatus.untried 

179 self.retries = 0 

180 self.error_message = None 

181 

182 @property 

183 def max_tries_exceeded(self) -> bool: 

184 return self.retries >= self.MAX_ATTEMPTS 

185 

186 @property 

187 def is_aborted(self) -> bool: 

188 return self.delivery_status == DeliveryStatus.aborted