Coverage for rfpy/model/notify.py: 87%

105 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-04-24 10:52 +0000

1import enum # Don't import Enum here - shadows the SQLAlchemy type 

2from datetime import datetime 

3from typing import Optional, TYPE_CHECKING 

4 

5from sqlalchemy import DateTime, text, Integer, ForeignKey, func, Index 

6from sqlalchemy.dialects.mysql import INTEGER, VARCHAR, ENUM, LONGTEXT 

7from sqlalchemy.dialects.mysql.types import TINYINT 

8from sqlalchemy.orm import Mapped, mapped_column, relationship 

9 

10from rfpy.model.meta import Base 

11 

12if TYPE_CHECKING: 

13 from rfpy.model.project import Project 

14 from rfpy.model.issue import Issue 

15 

16 

17class ProjectWatchList(Base): 

18 __tablename__ = "project_watch_list" 

19 __table_args__ = ( 

20 Index("_user_project_wl_uc", "user_id", "project_id", unique=True), 

21 {"mysql_engine": "InnoDB", "mysql_charset": "utf8mb4"}, 

22 ) 

23 

24 project_id: Mapped[Optional[int]] = mapped_column( 

25 INTEGER(10), ForeignKey("projects.id", ondelete="CASCADE"), nullable=True 

26 ) 

27 user_id: Mapped[Optional[str]] = mapped_column( 

28 VARCHAR(length=50), ForeignKey("users.id", ondelete="CASCADE"), nullable=True 

29 ) 

30 date_created: Mapped[Optional[datetime]] = mapped_column(DateTime, default=func.now(), nullable=True) 

31 

32 project: Mapped["Project"] = relationship("Project", back_populates="watch_list") 

33 user = relationship("User", lazy="joined", back_populates="project_watches") 

34 

35 def __repr__(self): 

36 return f"<ProjectWatchList: [{self.user_id}] watching {self.project}>" 

37 

38 

39class IssueWatchList(Base): 

40 __tablename__ = "issue_watch_list" 

41 __table_args__ = ( 

42 Index("_user_issue_wl_uc", "user_id", "issue_id", unique=True), 

43 {"mysql_engine": "InnoDB", "mysql_charset": "utf8mb4"}, 

44 ) 

45 

46 issue_id: Mapped[int] = mapped_column( 

47 INTEGER(10), ForeignKey("issues.id", ondelete="CASCADE") 

48 ) 

49 user_id: Mapped[str] = mapped_column( 

50 VARCHAR(length=50), ForeignKey("users.id", ondelete="CASCADE") 

51 ) 

52 date_created: Mapped[datetime] = mapped_column(DateTime, default=func.now()) 

53 

54 issue: Mapped["Issue"] = relationship( 

55 "Issue", 

56 back_populates="watch_list", 

57 ) 

58 user = relationship( 

59 "User", 

60 lazy="joined", 

61 back_populates="issue_watches", 

62 ) 

63 

64 def __repr__(self) -> str: 

65 return "Issue WatchList: [%s] watching %s" % (self.user_id, self.issue) 

66 

67 

68class EmailNotification(Base): 

69 __tablename__ = "notification_queue" 

70 __table_args__ = ( 

71 Index("_user_event_em_uc", "user_id", "event_id", unique=True), 

72 {"mysql_engine": "InnoDB", "mysql_charset": "utf8mb4"}, 

73 ) 

74 

75 class Status(enum.Enum): 

76 enqueued = 1 

77 sent = 2 

78 delivered = 3 

79 opened = 4 

80 bounced = 5 

81 failed = 6 

82 

83 template_name: Mapped[str] = mapped_column( 

84 VARCHAR(length=100), nullable=False, default=text("''") 

85 ) 

86 user_id: Mapped[Optional[str]] = mapped_column( 

87 VARCHAR(length=50), 

88 ForeignKey("users.id", ondelete="SET NULL", onupdate="CASCADE"), 

89 nullable=True, 

90 ) 

91 org_id: Mapped[Optional[str]] = mapped_column( 

92 VARCHAR(length=50), 

93 ForeignKey("organisations.id", ondelete="CASCADE", onupdate="CASCADE"), 

94 nullable=True, 

95 server_default=text("''"), 

96 ) 

97 event_id: Mapped[int] = mapped_column( 

98 Integer, 

99 ForeignKey("audit_events.id", ondelete="CASCADE"), 

100 nullable=False, 

101 index=True, 

102 server_default=text("'0'"), 

103 ) 

104 email: Mapped[Optional[str]] = mapped_column(VARCHAR(length=50)) 

105 updated_date: Mapped[datetime] = mapped_column( 

106 DateTime, nullable=False, default=func.now() 

107 ) 

108 sent_date: Mapped[Optional[datetime]] = mapped_column(DateTime) 

109 delivered_date: Mapped[Optional[datetime]] = mapped_column(DateTime) 

110 opened_date: Mapped[Optional[datetime]] = mapped_column(DateTime) 

111 error: Mapped[Optional[str]] = mapped_column(LONGTEXT()) 

112 message_id: Mapped[Optional[str]] = mapped_column(VARCHAR(length=255), index=True) 

113 delivery_status: Mapped[Status] = mapped_column( 

114 ENUM(Status), default=Status.enqueued.name, nullable=True 

115 ) 

116 

117 organisation = relationship("Organisation") 

118 user = relationship("User") 

119 event = relationship("AuditEvent", back_populates="notifications") 

120 

121 def __repr__(self): 

122 return ( 

123 f"<EmailNotification #{self.id} ({self.delivery_status}) " 

124 f"for {self.email} Evt {self.event_id}>" 

125 ) 

126 

127 def set_sent(self): 

128 self.delivery_status = self.Status.sent 

129 self.sent_date = datetime.now() 

130 

131 def set_delivered(self): 

132 self.delivery_status = self.Status.delivered 

133 self.delivered_date = datetime.now() 

134 self.last_delivery = datetime.now() 

135 self.retries = 0 

136 

137 def set_bounced(self): 

138 self.delivery_status = self.Status.bounced 

139 

140 def set_failed(self): 

141 self.delivery_status = self.Status.failed 

142 

143 

144class DeliveryStatus(enum.Enum): 

145 untried = "untried" 

146 delivered = "delivered" 

147 failing = "failing" 

148 aborted = "aborted" 

149 

150 

151class WebhookSubscription(Base): 

152 """ 

153 Represents a webhook subscription for an organisation. Each subscription 

154 is for a specific event type and remote URL. The delivery status is used 

155 to track the success of the last delivery attempt. 

156 """ 

157 

158 __tablename__ = "webhook_subscriptions" 

159 __table_args__ = {"mysql_engine": "InnoDB", "mysql_charset": "utf8mb4"} 

160 MAX_ATTEMPTS = 4 

161 

162 id = None # type: ignore 

163 

164 org_id: Mapped[str] = mapped_column( 

165 VARCHAR(length=50), 

166 ForeignKey("organisations.id", ondelete="CASCADE", onupdate="CASCADE"), 

167 primary_key=True, 

168 nullable=False, 

169 ) 

170 event_type: Mapped[str] = mapped_column( 

171 VARCHAR(length=50), nullable=False, primary_key=True 

172 ) 

173 remote_url: Mapped[str] = mapped_column(VARCHAR(length=150), nullable=False) 

174 delivery_status: Mapped[DeliveryStatus] = mapped_column( 

175 ENUM(DeliveryStatus), default=DeliveryStatus.untried, nullable=False 

176 ) 

177 http_header: Mapped[Optional[str]] = mapped_column(VARCHAR(length=100), nullable=True) 

178 last_delivery: Mapped[Optional[datetime]] = mapped_column(DateTime, nullable=True) 

179 error_message: Mapped[Optional[str]] = mapped_column(VARCHAR(length=100), nullable=True) 

180 retries: Mapped[int] = mapped_column(TINYINT(), default=0, nullable=False) 

181 

182 organisation = relationship("Organisation", back_populates="webhook_subscriptions") 

183 

184 def __repr__(self): 

185 return f"<WebhookSubscription #{self.id} for {self.org_id}: {self.remote_url}>" 

186 

187 def set_delivered(self): 

188 self.delivery_status = DeliveryStatus.delivered 

189 self.last_delivery = datetime.now() 

190 self.retries = 0 

191 

192 def set_failed_attempt(self, msg: str): 

193 self.delivery_status = DeliveryStatus.failing 

194 self.error_message = msg 

195 self.retries += 1 

196 if self.retries >= self.MAX_ATTEMPTS: 

197 self.set_aborted() 

198 

199 def set_aborted(self): 

200 self.delivery_status = DeliveryStatus.aborted 

201 if not self.error_message: 

202 m = f"Given up trying after {self.MAX_ATTEMPTS} attempts" 

203 self.error_message = m 

204 

205 def reset_status(self): 

206 self.delivery_status = DeliveryStatus.untried 

207 self.retries = 0 

208 self.error_message = None 

209 

210 @property 

211 def max_tries_exceeded(self) -> bool: 

212 return self.retries >= self.MAX_ATTEMPTS 

213 

214 @property 

215 def is_aborted(self) -> bool: 

216 return self.delivery_status == DeliveryStatus.aborted