Coverage for rfpy/model/humans.py: 97%

240 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-04-24 10:52 +0000

1from enum import Enum 

2from random import choice 

3from string import ascii_uppercase, digits 

4from typing import Optional, TYPE_CHECKING 

5from datetime import datetime, timedelta, timezone 

6import secrets 

7 

8from sqlalchemy import ( 

9 Column, 

10 DateTime, 

11 Table, 

12 types, 

13 Integer, 

14 ForeignKey, 

15 UniqueConstraint, 

16 TIMESTAMP, 

17 text, 

18 Index, 

19) 

20 

21from sqlalchemy.orm import Mapped, mapped_column, relationship, validates, DynamicMapped 

22from sqlalchemy.orm.exc import NoResultFound 

23from sqlalchemy.ext.associationproxy import association_proxy, AssociationProxy 

24from sqlalchemy.dialects.mysql import VARCHAR, TINYINT, INTEGER, MEDIUMTEXT, CHAR 

25 

26 

27from rfpy import conf 

28from rfpy.auth import ROLES, LacksPermission, AuthorizationFailure, perms 

29from rfpy.model.meta import Base 

30from rfpy.model.audit import AuditEvent 

31 

32if TYPE_CHECKING: 

33 from sqlalchemy.orm import Query 

34 from rfpy.model.notify import WebhookSubscription, IssueWatchList, ProjectWatchList 

35 from rfpy.model.acl import SectionPermission, ProjectPermission, UserAdminToken 

36 from rfpy.model.project import Project 

37 from rfpy.model.audit import AuditEvent 

38 from rfpy.model.graph import RelationshipType, Edge 

39 from rfpy.model.misc import Category 

40 from rfpy.model.tags import Tag 

41 

42 

43org_cat_rel = Table( 

44 "org_cat_rel", 

45 Base.metadata, 

46 Column("id", Integer, primary_key=True), 

47 Column( 

48 "org_id", 

49 VARCHAR(length=50), 

50 ForeignKey( 

51 "organisations.id", 

52 name="constr_org", 

53 ondelete="CASCADE", 

54 onupdate="CASCADE", 

55 ), 

56 nullable=False, 

57 ), 

58 Column( 

59 "cat_id", 

60 INTEGER(), 

61 ForeignKey("org_categories.id", name="constr_cat"), 

62 nullable=False, 

63 ), 

64 UniqueConstraint("org_id", "cat_id", name="title"), 

65 mysql_engine="InnoDB", 

66 mysql_charset="utf8mb4", 

67) 

68 

69organisation_suppliers = Table( 

70 "organisation_suppliers", 

71 Base.metadata, 

72 Column("id", Integer, primary_key=True), 

73 Column( 

74 "org_id", 

75 VARCHAR(length=50), 

76 ForeignKey("organisations.id", onupdate="CASCADE", ondelete="CASCADE"), 

77 ), 

78 Column( 

79 "supplier_id", 

80 VARCHAR(length=50), 

81 ForeignKey("organisations.id", ondelete="CASCADE", onupdate="CASCADE"), 

82 ), 

83 mysql_engine="InnoDB", 

84 mysql_charset="utf8mb4", 

85) 

86 

87 

88class OrganisationType(Enum): 

89 RESPONDENT = 0 

90 BUYER = 1 

91 CONSULTANT = 2 

92 

93 

94BUYSIDE_ORGS = (OrganisationType.BUYER, OrganisationType.RESPONDENT) 

95 

96 

97class OrgTypeCol(types.TypeDecorator): 

98 impl = INTEGER(1) 

99 

100 cache_ok = True 

101 

102 def process_bind_param(self, status, dialect): 

103 return status.value 

104 

105 def process_result_value(self, int_value, dialect): 

106 try: 

107 return OrganisationType(int_value) 

108 except ValueError: 

109 return None 

110 

111 

112class ConsultantClientRelationship(Base): 

113 __tablename__ = "consultant_orgs" 

114 __table_args__ = (Index("cons_org_client_fk", "client_id"),) + Base.__table_args__ 

115 

116 id = None # type: ignore 

117 consultant_id: Mapped[str] = mapped_column( 

118 VARCHAR(length=50), 

119 ForeignKey("organisations.id", ondelete="CASCADE", onupdate="CASCADE"), 

120 primary_key=True, 

121 ) 

122 client_id: Mapped[str] = mapped_column( 

123 VARCHAR(length=50), 

124 ForeignKey("organisations.id", ondelete="CASCADE", onupdate="CASCADE"), 

125 primary_key=True, 

126 ) 

127 consultant: Mapped["Organisation"] = relationship( 

128 "Organisation", 

129 foreign_keys="ConsultantClientRelationship.consultant_id", 

130 backref="consultant_org_clients", 

131 ) 

132 client: Mapped["Organisation"] = relationship( 

133 "Organisation", 

134 foreign_keys="ConsultantClientRelationship.client_id", 

135 backref="consultant_org_consultants", 

136 ) 

137 

138 def __repr__(self): 

139 return f"<Consultant: {self.consultant_id} , Client: {self.client_id} >" 

140 

141 def __init__(self, client=None, consultant=None): 

142 """Constructor is called when values are appended to the 'clients' or 'consultants' 

143 attributes of Organisation 

144 """ 

145 self.consultant = consultant 

146 self.client = client 

147 

148 

149class Organisation(Base): 

150 __tablename__ = "organisations" 

151 

152 __mapper_args__ = {"polymorphic_on": "type", "polymorphic_identity": None} 

153 

154 public_attrs = "id,name,public,is_consultant".split(",") 

155 

156 id: Mapped[str] = mapped_column(VARCHAR(length=50), primary_key=True) # type: ignore 

157 

158 name: Mapped[str] = mapped_column(VARCHAR(length=50), nullable=False) 

159 address: Mapped[Optional[str]] = mapped_column(VARCHAR(length=255), nullable=True) 

160 public: Mapped[bool] = mapped_column(TINYINT(1), default=0, nullable=True) 

161 type: Mapped[OrganisationType] = mapped_column( 

162 OrgTypeCol, nullable=False, default=0, server_default=text("'0'") 

163 ) 

164 password_expiry: Mapped[int] = mapped_column(Integer, nullable=False, default=0) 

165 domain_name: Mapped[Optional[str]] = mapped_column(VARCHAR(length=256), nullable=True) 

166 

167 users: Mapped[list["User"]] = relationship( 

168 "User", 

169 back_populates="organisation", 

170 cascade="all,delete", 

171 passive_deletes=True, 

172 ) 

173 

174 organisation_categories: Mapped[list["OrganisationCategory"]] = relationship( 

175 "OrganisationCategory", secondary=org_cat_rel 

176 ) 

177 

178 custom_roles: Mapped[list["CustomRole"]] = relationship( 

179 "CustomRole", back_populates="organisation" 

180 ) 

181 

182 # These form the "Private Address Book" 

183 suppliers: DynamicMapped["Organisation"] = relationship( 

184 "Organisation", 

185 secondary="organisation_suppliers", 

186 primaryjoin="Organisation.id==organisation_suppliers.c.org_id", 

187 secondaryjoin="Organisation.id==organisation_suppliers.c.supplier_id", 

188 backref="buyers", 

189 lazy="dynamic", 

190 ) 

191 

192 visible_events: DynamicMapped["AuditEvent"] = relationship( 

193 "AuditEvent", 

194 primaryjoin="Organisation.id==EventOrgACL.org_id", 

195 secondaryjoin="EventOrgACL.event_id==AuditEvent.id", 

196 secondary="audit_event_orgs", 

197 lazy="dynamic", 

198 viewonly=True, 

199 ) 

200 

201 webhook_subscriptions: DynamicMapped["WebhookSubscription"] = relationship( 

202 "WebhookSubscription", 

203 back_populates="organisation", 

204 cascade="all, delete", 

205 passive_deletes=True, 

206 lazy="dynamic", 

207 ) 

208 

209 projects: DynamicMapped["Project"] = relationship( 

210 "Project", 

211 back_populates="owner_org", 

212 cascade="all, delete", 

213 passive_deletes=True, 

214 lazy="dynamic", 

215 ) 

216 relationship_types: DynamicMapped["RelationshipType"] = relationship( 

217 "RelationshipType", 

218 back_populates="organisation", 

219 cascade="all,delete", 

220 passive_deletes=True, 

221 lazy="dynamic", 

222 ) 

223 lower_edges: Mapped[list["Edge"]] = relationship( 

224 "Edge", 

225 primaryjoin="Organisation.id==Edge.from_org_id", 

226 back_populates="from_org", 

227 cascade="all, delete", 

228 ) 

229 higher_edges: Mapped[list["Edge"]] = relationship( 

230 "Edge", 

231 primaryjoin="Organisation.id==Edge.to_org_id", 

232 back_populates="to_org", 

233 cascade="all, delete", 

234 ) 

235 

236 categories: DynamicMapped["Category"] = relationship( 

237 "Category", back_populates="organisation" 

238 ) 

239 

240 tags: DynamicMapped["Tag"] = relationship( 

241 "Tag", back_populates="organisation", lazy="dynamic" 

242 ) 

243 

244 events: DynamicMapped[AuditEvent] = relationship( 

245 "AuditEvent", 

246 lazy="dynamic", 

247 back_populates="organisation", 

248 cascade_backrefs=False, 

249 primaryjoin=("foreign(AuditEvent.org_id)" "==Organisation.id"), 

250 ) 

251 

252 def __init__(self, id_name) -> None: 

253 self.id = id_name 

254 self.name = id_name 

255 

256 def __repr__(self) -> str: 

257 return f"Organisation: {self.name}" 

258 

259 @property 

260 def is_consultant(self) -> bool: 

261 return self.type == OrganisationType.CONSULTANT 

262 

263 @property 

264 def is_buyside(self) -> bool: 

265 return self.type in (OrganisationType.BUYER, OrganisationType.CONSULTANT) 

266 

267 def has_supplier(self, supplier_org) -> bool: 

268 try: 

269 self.suppliers.filter(Organisation.id == supplier_org.id).one() 

270 return True 

271 except NoResultFound: 

272 return False 

273 

274 

275class RespondentOrganisation(Organisation): 

276 __mapper_args__ = {"polymorphic_identity": OrganisationType.RESPONDENT} 

277 

278 

279class BuyerOrganisation(Organisation): 

280 __mapper_args__ = {"polymorphic_identity": OrganisationType.BUYER} 

281 consultants: AssociationProxy[list["ConsultantClientRelationship"]] = ( 

282 association_proxy( 

283 "consultant_org_consultants", 

284 "consultant", 

285 creator=lambda org: ConsultantClientRelationship(consultant=org), 

286 ) 

287 ) 

288 

289 

290class ConsultantOrganisation(Organisation): 

291 __mapper_args__ = {"polymorphic_identity": OrganisationType.CONSULTANT} 

292 # Weird SQLAlchemy magic here - these two constructors seem to aggregate 

293 # to call __init__ on ConsultantClientRelationship 

294 clients: AssociationProxy[list["BuyerOrganisation"]] = association_proxy( 

295 "consultant_org_clients", 

296 "client", 

297 creator=lambda org: ConsultantClientRelationship(client=org), 

298 ) 

299 

300 def add_client(self, client_organisation) -> None: 

301 self.clients.append(client_organisation) 

302 

303 

304def custom_perms(session, user_id) -> "Query": 

305 return ( 

306 session.query(CustomRolePermission.permission_id) 

307 .join(CustomRole) 

308 .join(UserRole, CustomRole.id == UserRole.role_id) 

309 .filter(UserRole.user_id == user_id) 

310 ) 

311 

312class RefreshToken(Base): 

313 __tablename__ = "refresh_tokens" 

314 

315 id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True) 

316 user_id: Mapped[str] = mapped_column( 

317 VARCHAR(length=50), 

318 ForeignKey("users.id", ondelete="CASCADE"), 

319 nullable=False, 

320 index=True, 

321 ) 

322 token: Mapped[str] = mapped_column(VARCHAR(length=255), nullable=False, unique=True) 

323 issued_at: Mapped[datetime] = mapped_column(DateTime, nullable=False) 

324 expires_at: Mapped[datetime] = mapped_column(DateTime, nullable=False) 

325 revoked: Mapped[bool] = mapped_column(types.Boolean, default=False, nullable=False) 

326 

327 user: Mapped["User"] = relationship("User", back_populates="refresh_tokens") 

328 

329 def __repr__(self): 

330 return f"<RefreshToken user_id={self.user_id} expires_at={self.expires_at} revoked={self.revoked}>" 

331 

332 

333 

334class User(Base): 

335 __tablename__ = "users" 

336 

337 id: Mapped[str] = mapped_column(VARCHAR(length=50), primary_key=True) # type: ignore 

338 org_id: Mapped[str] = mapped_column( 

339 VARCHAR(length=50), 

340 ForeignKey("organisations.id", ondelete="CASCADE", onupdate="CASCADE"), 

341 index=True, 

342 nullable=True, 

343 ) 

344 created_at: Mapped[datetime] = mapped_column( 

345 DateTime, 

346 nullable=False, 

347 server_default=text("CURRENT_TIMESTAMP"), 

348 ) 

349 updated_at: Mapped[datetime] = mapped_column( 

350 DateTime, 

351 nullable=False, 

352 server_default=text("CURRENT_TIMESTAMP"), 

353 server_onupdate=text("CURRENT_TIMESTAMP"), 

354 ) 

355 fullname: Mapped[str] = mapped_column(VARCHAR(length=50), nullable=False) 

356 email: Mapped[Optional[str]] = mapped_column(VARCHAR(length=255), nullable=True) 

357 password: Mapped[Optional[str]] = mapped_column(VARCHAR(length=255), nullable=True) 

358 locale: Mapped[Optional[str]] = mapped_column(VARCHAR(length=10), nullable=True) 

359 previous_login_date: Mapped[Optional[datetime]] = mapped_column( 

360 DateTime, nullable=True, server_default=text("NULL") 

361 ) 

362 password_set_date: Mapped[datetime] = mapped_column( 

363 TIMESTAMP, nullable=False, server_default=text("CURRENT_TIMESTAMP") 

364 ) 

365 is_active: Mapped[bool] = mapped_column(types.Boolean, default=True, nullable=False) 

366 failed_login_attempts: Mapped[int] = mapped_column(Integer, default=0, nullable=False) 

367 total_failed_logins: Mapped[int] = mapped_column(Integer, default=0, nullable=False) 

368 locked_until: Mapped[Optional[datetime]] = mapped_column(DateTime, nullable=True) 

369 

370 type: Mapped[str] = mapped_column( 

371 VARCHAR(length=10), 

372 nullable=False, 

373 default="standard", 

374 server_default=text("'standard'"), 

375 ) 

376 

377 organisation: Mapped["Organisation"] = relationship( 

378 "Organisation", uselist=False, back_populates="users" 

379 ) 

380 

381 custom_roles: Mapped[list["CustomRole"]] = relationship( 

382 "CustomRole", 

383 secondary="user_roles", 

384 secondaryjoin="UserRole.role_id==CustomRole.name", 

385 viewonly=True, 

386 ) 

387 

388 role_list: Mapped[list["UserRole"]] = relationship( 

389 "UserRole", back_populates="user", cascade="all, delete", passive_deletes=True 

390 ) 

391 

392 roles: AssociationProxy[list[str]] = association_proxy( 

393 "role_list", "role_id", creator=lambda r: UserRole(role_id=r) 

394 ) 

395 

396 roles_q: Mapped[list["UserRole"]] = relationship( 

397 "UserRole", lazy="dynamic", viewonly=True 

398 ) 

399 

400 tokens: Mapped[list["UserAdminToken"]] = relationship( 

401 "UserAdminToken", 

402 back_populates="user", 

403 cascade="all, delete", 

404 passive_deletes=True, 

405 ) 

406 

407 refresh_tokens: Mapped[list["RefreshToken"]] = relationship( 

408 "RefreshToken", 

409 back_populates="user", 

410 cascade="all, delete-orphan", 

411 passive_deletes=True, 

412 ) 

413 

414 section_permissions: DynamicMapped["SectionPermission"] = relationship( 

415 "SectionPermission", 

416 back_populates="user", 

417 cascade="all,delete", 

418 passive_deletes=True, 

419 lazy="dynamic", 

420 ) 

421 

422 project_permissions: Mapped[list["ProjectPermission"]] = relationship( 

423 "ProjectPermission", 

424 back_populates="user", 

425 cascade="all, delete", 

426 passive_deletes=True, 

427 ) 

428 

429 project_watches: Mapped[list["ProjectWatchList"]] = relationship( 

430 "ProjectWatchList", 

431 back_populates="user", 

432 cascade="all,delete", 

433 passive_deletes=True, 

434 ) 

435 

436 issue_watches: Mapped[list["IssueWatchList"]] = relationship( 

437 "IssueWatchList", 

438 back_populates="user", 

439 cascade="all,delete", 

440 passive_deletes=True, 

441 ) 

442 

443 events: DynamicMapped[AuditEvent] = relationship( 

444 "AuditEvent", 

445 lazy="dynamic", 

446 back_populates="user", 

447 cascade_backrefs=False, 

448 primaryjoin="foreign(AuditEvent.user_id)==User.id", 

449 ) 

450 

451 def __init__(self, user_id, *args, **kwargs): 

452 super(User, self).__init__(*args, **kwargs) 

453 self.id = user_id 

454 self.fullname = user_id 

455 

456 def __repr__(self): 

457 return f"<User: {self.id} ({self.fullname}) of {self.org_id}>" 

458 

459 @property 

460 def custom_permissions(self): 

461 cq = custom_perms(self._instance_session, self.id) 

462 return {cp[0] for cp in cq} 

463 

464 @property 

465 def is_restricted(self): 

466 return self.type == "restricted" 

467 

468 @is_restricted.setter 

469 def is_restricted(self, restricted_bool): 

470 self.type = "restricted" if restricted_bool else "standard" 

471 

472 def has_permission(self, permission): 

473 """ 

474 May be called multiple times in the life of one http request 

475 so attempting to optimise. Database stored custom roles are 

476 sometimes in use, but all efforts are made to avoid querying the Database 

477 """ 

478 if "Administrator" in self.roles: 

479 return True 

480 if permission in self.builtin_permissions: 

481 return True 

482 return permission in self.custom_permissions 

483 

484 def check_permission(self, permission): 

485 """ 

486 @raise LacksPermission if the user doesn't have the given permission 

487 """ 

488 if not self.has_permission(permission): 

489 raise LacksPermission(permission, self.id) 

490 

491 def check_is_standard(self): 

492 if self.is_restricted: 

493 raise AuthorizationFailure("Action not permitted for Domain Expert users") 

494 

495 @property 

496 def builtin_permissions(self): 

497 if hasattr(self, "_builtin_perms"): 

498 return self._builtin_perms 

499 

500 # cache not populated, so build it 

501 self._builtin_perms = pm = set() 

502 # Add permissions for 'built-in roles' 

503 for role_id in self.roles & ROLES.keys(): 

504 pm.update(ROLES[role_id]) 

505 return pm 

506 

507 @property 

508 def all_permissions(self) -> set[str]: 

509 # set union syntax 

510 return self.custom_permissions | self.builtin_permissions 

511 

512 @property 

513 def sorted_permissions(self) -> list[str]: 

514 return sorted(self.all_permissions) 

515 

516 def add_role(self, role_id: str) -> None: 

517 if role_id not in ROLES: 

518 if role_id not in {cr.id for cr in self.organisation.custom_roles}: 

519 raise KeyError('Role "%s" not found' % role_id) 

520 self.roles.append(role_id) 

521 

522 def is_in_role(self, role_id: str): 

523 return role_id in self.roles 

524 

525 def is_administrator(self): 

526 return self.is_in_role("Administrator") 

527 

528 def can_view_section_id(self, section_id: int) -> bool: 

529 if self.is_restricted: 

530 clause = text(f"section_id={section_id}") 

531 if self.section_permissions.filter(clause).count() != 1: 

532 return False 

533 return True 

534 

535 def create_refresh_token(self, session): 

536 """Create a new refresh token for this user""" 

537 now = datetime.now(timezone.utc) 

538 expires_at = now + timedelta(hours=conf.CONF.jwt_refresh_token_expiry_hours) 

539 

540 token_value = secrets.token_urlsafe(32) 

541 refresh_token = RefreshToken( 

542 user_id=self.id, 

543 token=token_value, 

544 issued_at=now, 

545 expires_at=expires_at, 

546 revoked=False 

547 ) 

548 

549 session.add(refresh_token) 

550 return token_value 

551 

552 def revoke_all_refresh_tokens(self, session): 

553 """Revoke all refresh tokens for this user""" 

554 session.query(RefreshToken).filter( 

555 RefreshToken.user_id == self.id 

556 ).update({"revoked": True}) 

557 

558 

559class UserRole(Base): 

560 __tablename__ = "user_roles" 

561 __table_args__ = ( 

562 UniqueConstraint("user_id", "role_id", name="unique_user_role"), 

563 ) + Base.__table_args__ 

564 

565 user_id: Mapped[str] = mapped_column( 

566 VARCHAR(length=50), 

567 ForeignKey("users.id", ondelete="CASCADE", name="constr_user"), 

568 nullable=False, 

569 ) 

570 role_id: Mapped[str] = mapped_column( 

571 VARCHAR(length=255), 

572 nullable=True, 

573 ) 

574 

575 user: Mapped["User"] = relationship( 

576 "User", back_populates="role_list", viewonly=True 

577 ) 

578 

579 def __repr__(self): 

580 return f"<UserRole {self.user_id} - {self.role_id}>" 

581 

582 

583def munged_id(sqla_context): 

584 """ 

585 id is a string formed by joining name, org_id and a random string, 

586 e.g. Self Scorer/Thomas Murray#TK94SL 

587 """ 

588 name = sqla_context.current_parameters["name"] 

589 org_id = sqla_context.current_parameters["org_id"] 

590 chars = ascii_uppercase + digits 

591 ran_string = [choice(chars) for c in range(5)] 

592 return "%s/%s#%s" % (name, org_id, "".join(ran_string)) 

593 

594 

595class CustomRole(Base): 

596 __tablename__ = "roles" 

597 

598 __table_args__ = ( 

599 UniqueConstraint("name", "org_id", name="role_names"), 

600 ) + Base.__table_args__ 

601 

602 id: Mapped[str] = mapped_column( # type: ignore 

603 VARCHAR(length=255), nullable=False, primary_key=True, default=munged_id 

604 ) 

605 name: Mapped[str] = mapped_column(VARCHAR(length=255), nullable=False) 

606 description: Mapped[str] = mapped_column(MEDIUMTEXT(), nullable=False) 

607 type: Mapped[str] = mapped_column( 

608 CHAR(length=1), nullable=False, server_default=text("'U'") 

609 ) 

610 org_id: Mapped[str] = mapped_column( 

611 VARCHAR(length=50), 

612 ForeignKey("organisations.id", onupdate="CASCADE", ondelete="CASCADE"), 

613 nullable=False, 

614 ) 

615 

616 organisation: Mapped["Organisation"] = relationship( 

617 Organisation, back_populates="custom_roles" 

618 ) 

619 permissions: Mapped[list["CustomRolePermission"]] = relationship( 

620 "CustomRolePermission", 

621 back_populates="role", 

622 cascade="all, delete", 

623 passive_deletes=True, 

624 ) 

625 

626 def __repr__(self): 

627 return f"<CustomRole {self.id} - {self.name}>" 

628 

629 

630class CustomRolePermission(Base): 

631 __tablename__ = "role_permissions" 

632 

633 id = None # type: ignore 

634 

635 role_id: Mapped[str] = mapped_column( 

636 VARCHAR(length=255), 

637 ForeignKey("roles.id", ondelete="CASCADE", name="role_reference"), 

638 nullable=False, 

639 primary_key=True, 

640 ) 

641 

642 permission_id: Mapped[str] = mapped_column( 

643 VARCHAR(length=255), nullable=False, primary_key=True 

644 ) 

645 role: Mapped["CustomRole"] = relationship(CustomRole, back_populates="permissions") 

646 

647 def __repr__(self): 

648 return f"{self.permission_id} (Role: {self.role_id})" 

649 

650 @validates("permission_id") 

651 def known_permission(self, key, perm_id): 

652 if perm_id not in perms.ALL_PERMISSIONS: 

653 raise ValueError('"%s" is not a recognised permission' % perm_id) 

654 return perm_id 

655 

656 

657class OrganisationCategory(Base): 

658 __tablename__ = "org_categories" 

659 __table_args__ = (UniqueConstraint("title", name="title"),) + Base.__table_args__ 

660 

661 title: Mapped[str] = mapped_column(VARCHAR(length=255), nullable=False, unique=True) 

662 description: Mapped[Optional[str]] = mapped_column(MEDIUMTEXT(), nullable=True) 

663 

664 def __repr__(self): 

665 return f"<Organisation Category: {self.title}>" 

666 

667 

668class FailedLoginAttempt(Base): 

669 __tablename__ = "failed_login_attempts" 

670 user_id: Mapped[str] = mapped_column( 

671 VARCHAR(length=50), ForeignKey("users.id"), nullable=False 

672 ) 

673 timestamp: Mapped[datetime] = mapped_column(DateTime, index=True, nullable=False) 

674 ip_address: Mapped[str] = mapped_column(VARCHAR(length=50), nullable=False) 

675 

676 def __repr__(self): 

677 return f"<FailedLoginAttempt {self.user_id} at {self.timestamp}>" 

678 

679 

680__all__ = [ 

681 "CustomRolePermission", 

682 "CustomRole", 

683 "UserRole", 

684 "User", 

685 "Organisation", 

686 "ConsultantOrganisation", 

687 "OrganisationType", 

688 "OrganisationCategory", 

689 "FailedLoginAttempt", 

690]