Coverage for rfpy/model/humans.py: 97%
240 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-24 10:52 +0000
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-24 10:52 +0000
1from enum import Enum
2from random import choice
3from string import ascii_uppercase, digits
4from typing import Optional, TYPE_CHECKING
5from datetime import datetime, timedelta, timezone
6import secrets
8from sqlalchemy import (
9 Column,
10 DateTime,
11 Table,
12 types,
13 Integer,
14 ForeignKey,
15 UniqueConstraint,
16 TIMESTAMP,
17 text,
18 Index,
19)
21from sqlalchemy.orm import Mapped, mapped_column, relationship, validates, DynamicMapped
22from sqlalchemy.orm.exc import NoResultFound
23from sqlalchemy.ext.associationproxy import association_proxy, AssociationProxy
24from sqlalchemy.dialects.mysql import VARCHAR, TINYINT, INTEGER, MEDIUMTEXT, CHAR
27from rfpy import conf
28from rfpy.auth import ROLES, LacksPermission, AuthorizationFailure, perms
29from rfpy.model.meta import Base
30from rfpy.model.audit import AuditEvent
32if TYPE_CHECKING:
33 from sqlalchemy.orm import Query
34 from rfpy.model.notify import WebhookSubscription, IssueWatchList, ProjectWatchList
35 from rfpy.model.acl import SectionPermission, ProjectPermission, UserAdminToken
36 from rfpy.model.project import Project
37 from rfpy.model.audit import AuditEvent
38 from rfpy.model.graph import RelationshipType, Edge
39 from rfpy.model.misc import Category
40 from rfpy.model.tags import Tag
43org_cat_rel = Table(
44 "org_cat_rel",
45 Base.metadata,
46 Column("id", Integer, primary_key=True),
47 Column(
48 "org_id",
49 VARCHAR(length=50),
50 ForeignKey(
51 "organisations.id",
52 name="constr_org",
53 ondelete="CASCADE",
54 onupdate="CASCADE",
55 ),
56 nullable=False,
57 ),
58 Column(
59 "cat_id",
60 INTEGER(),
61 ForeignKey("org_categories.id", name="constr_cat"),
62 nullable=False,
63 ),
64 UniqueConstraint("org_id", "cat_id", name="title"),
65 mysql_engine="InnoDB",
66 mysql_charset="utf8mb4",
67)
69organisation_suppliers = Table(
70 "organisation_suppliers",
71 Base.metadata,
72 Column("id", Integer, primary_key=True),
73 Column(
74 "org_id",
75 VARCHAR(length=50),
76 ForeignKey("organisations.id", onupdate="CASCADE", ondelete="CASCADE"),
77 ),
78 Column(
79 "supplier_id",
80 VARCHAR(length=50),
81 ForeignKey("organisations.id", ondelete="CASCADE", onupdate="CASCADE"),
82 ),
83 mysql_engine="InnoDB",
84 mysql_charset="utf8mb4",
85)
88class OrganisationType(Enum):
89 RESPONDENT = 0
90 BUYER = 1
91 CONSULTANT = 2
94BUYSIDE_ORGS = (OrganisationType.BUYER, OrganisationType.RESPONDENT)
97class OrgTypeCol(types.TypeDecorator):
98 impl = INTEGER(1)
100 cache_ok = True
102 def process_bind_param(self, status, dialect):
103 return status.value
105 def process_result_value(self, int_value, dialect):
106 try:
107 return OrganisationType(int_value)
108 except ValueError:
109 return None
112class ConsultantClientRelationship(Base):
113 __tablename__ = "consultant_orgs"
114 __table_args__ = (Index("cons_org_client_fk", "client_id"),) + Base.__table_args__
116 id = None # type: ignore
117 consultant_id: Mapped[str] = mapped_column(
118 VARCHAR(length=50),
119 ForeignKey("organisations.id", ondelete="CASCADE", onupdate="CASCADE"),
120 primary_key=True,
121 )
122 client_id: Mapped[str] = mapped_column(
123 VARCHAR(length=50),
124 ForeignKey("organisations.id", ondelete="CASCADE", onupdate="CASCADE"),
125 primary_key=True,
126 )
127 consultant: Mapped["Organisation"] = relationship(
128 "Organisation",
129 foreign_keys="ConsultantClientRelationship.consultant_id",
130 backref="consultant_org_clients",
131 )
132 client: Mapped["Organisation"] = relationship(
133 "Organisation",
134 foreign_keys="ConsultantClientRelationship.client_id",
135 backref="consultant_org_consultants",
136 )
138 def __repr__(self):
139 return f"<Consultant: {self.consultant_id} , Client: {self.client_id} >"
141 def __init__(self, client=None, consultant=None):
142 """Constructor is called when values are appended to the 'clients' or 'consultants'
143 attributes of Organisation
144 """
145 self.consultant = consultant
146 self.client = client
149class Organisation(Base):
150 __tablename__ = "organisations"
152 __mapper_args__ = {"polymorphic_on": "type", "polymorphic_identity": None}
154 public_attrs = "id,name,public,is_consultant".split(",")
156 id: Mapped[str] = mapped_column(VARCHAR(length=50), primary_key=True) # type: ignore
158 name: Mapped[str] = mapped_column(VARCHAR(length=50), nullable=False)
159 address: Mapped[Optional[str]] = mapped_column(VARCHAR(length=255), nullable=True)
160 public: Mapped[bool] = mapped_column(TINYINT(1), default=0, nullable=True)
161 type: Mapped[OrganisationType] = mapped_column(
162 OrgTypeCol, nullable=False, default=0, server_default=text("'0'")
163 )
164 password_expiry: Mapped[int] = mapped_column(Integer, nullable=False, default=0)
165 domain_name: Mapped[Optional[str]] = mapped_column(VARCHAR(length=256), nullable=True)
167 users: Mapped[list["User"]] = relationship(
168 "User",
169 back_populates="organisation",
170 cascade="all,delete",
171 passive_deletes=True,
172 )
174 organisation_categories: Mapped[list["OrganisationCategory"]] = relationship(
175 "OrganisationCategory", secondary=org_cat_rel
176 )
178 custom_roles: Mapped[list["CustomRole"]] = relationship(
179 "CustomRole", back_populates="organisation"
180 )
182 # These form the "Private Address Book"
183 suppliers: DynamicMapped["Organisation"] = relationship(
184 "Organisation",
185 secondary="organisation_suppliers",
186 primaryjoin="Organisation.id==organisation_suppliers.c.org_id",
187 secondaryjoin="Organisation.id==organisation_suppliers.c.supplier_id",
188 backref="buyers",
189 lazy="dynamic",
190 )
192 visible_events: DynamicMapped["AuditEvent"] = relationship(
193 "AuditEvent",
194 primaryjoin="Organisation.id==EventOrgACL.org_id",
195 secondaryjoin="EventOrgACL.event_id==AuditEvent.id",
196 secondary="audit_event_orgs",
197 lazy="dynamic",
198 viewonly=True,
199 )
201 webhook_subscriptions: DynamicMapped["WebhookSubscription"] = relationship(
202 "WebhookSubscription",
203 back_populates="organisation",
204 cascade="all, delete",
205 passive_deletes=True,
206 lazy="dynamic",
207 )
209 projects: DynamicMapped["Project"] = relationship(
210 "Project",
211 back_populates="owner_org",
212 cascade="all, delete",
213 passive_deletes=True,
214 lazy="dynamic",
215 )
216 relationship_types: DynamicMapped["RelationshipType"] = relationship(
217 "RelationshipType",
218 back_populates="organisation",
219 cascade="all,delete",
220 passive_deletes=True,
221 lazy="dynamic",
222 )
223 lower_edges: Mapped[list["Edge"]] = relationship(
224 "Edge",
225 primaryjoin="Organisation.id==Edge.from_org_id",
226 back_populates="from_org",
227 cascade="all, delete",
228 )
229 higher_edges: Mapped[list["Edge"]] = relationship(
230 "Edge",
231 primaryjoin="Organisation.id==Edge.to_org_id",
232 back_populates="to_org",
233 cascade="all, delete",
234 )
236 categories: DynamicMapped["Category"] = relationship(
237 "Category", back_populates="organisation"
238 )
240 tags: DynamicMapped["Tag"] = relationship(
241 "Tag", back_populates="organisation", lazy="dynamic"
242 )
244 events: DynamicMapped[AuditEvent] = relationship(
245 "AuditEvent",
246 lazy="dynamic",
247 back_populates="organisation",
248 cascade_backrefs=False,
249 primaryjoin=("foreign(AuditEvent.org_id)" "==Organisation.id"),
250 )
252 def __init__(self, id_name) -> None:
253 self.id = id_name
254 self.name = id_name
256 def __repr__(self) -> str:
257 return f"Organisation: {self.name}"
259 @property
260 def is_consultant(self) -> bool:
261 return self.type == OrganisationType.CONSULTANT
263 @property
264 def is_buyside(self) -> bool:
265 return self.type in (OrganisationType.BUYER, OrganisationType.CONSULTANT)
267 def has_supplier(self, supplier_org) -> bool:
268 try:
269 self.suppliers.filter(Organisation.id == supplier_org.id).one()
270 return True
271 except NoResultFound:
272 return False
275class RespondentOrganisation(Organisation):
276 __mapper_args__ = {"polymorphic_identity": OrganisationType.RESPONDENT}
279class BuyerOrganisation(Organisation):
280 __mapper_args__ = {"polymorphic_identity": OrganisationType.BUYER}
281 consultants: AssociationProxy[list["ConsultantClientRelationship"]] = (
282 association_proxy(
283 "consultant_org_consultants",
284 "consultant",
285 creator=lambda org: ConsultantClientRelationship(consultant=org),
286 )
287 )
290class ConsultantOrganisation(Organisation):
291 __mapper_args__ = {"polymorphic_identity": OrganisationType.CONSULTANT}
292 # Weird SQLAlchemy magic here - these two constructors seem to aggregate
293 # to call __init__ on ConsultantClientRelationship
294 clients: AssociationProxy[list["BuyerOrganisation"]] = association_proxy(
295 "consultant_org_clients",
296 "client",
297 creator=lambda org: ConsultantClientRelationship(client=org),
298 )
300 def add_client(self, client_organisation) -> None:
301 self.clients.append(client_organisation)
304def custom_perms(session, user_id) -> "Query":
305 return (
306 session.query(CustomRolePermission.permission_id)
307 .join(CustomRole)
308 .join(UserRole, CustomRole.id == UserRole.role_id)
309 .filter(UserRole.user_id == user_id)
310 )
312class RefreshToken(Base):
313 __tablename__ = "refresh_tokens"
315 id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
316 user_id: Mapped[str] = mapped_column(
317 VARCHAR(length=50),
318 ForeignKey("users.id", ondelete="CASCADE"),
319 nullable=False,
320 index=True,
321 )
322 token: Mapped[str] = mapped_column(VARCHAR(length=255), nullable=False, unique=True)
323 issued_at: Mapped[datetime] = mapped_column(DateTime, nullable=False)
324 expires_at: Mapped[datetime] = mapped_column(DateTime, nullable=False)
325 revoked: Mapped[bool] = mapped_column(types.Boolean, default=False, nullable=False)
327 user: Mapped["User"] = relationship("User", back_populates="refresh_tokens")
329 def __repr__(self):
330 return f"<RefreshToken user_id={self.user_id} expires_at={self.expires_at} revoked={self.revoked}>"
334class User(Base):
335 __tablename__ = "users"
337 id: Mapped[str] = mapped_column(VARCHAR(length=50), primary_key=True) # type: ignore
338 org_id: Mapped[str] = mapped_column(
339 VARCHAR(length=50),
340 ForeignKey("organisations.id", ondelete="CASCADE", onupdate="CASCADE"),
341 index=True,
342 nullable=True,
343 )
344 created_at: Mapped[datetime] = mapped_column(
345 DateTime,
346 nullable=False,
347 server_default=text("CURRENT_TIMESTAMP"),
348 )
349 updated_at: Mapped[datetime] = mapped_column(
350 DateTime,
351 nullable=False,
352 server_default=text("CURRENT_TIMESTAMP"),
353 server_onupdate=text("CURRENT_TIMESTAMP"),
354 )
355 fullname: Mapped[str] = mapped_column(VARCHAR(length=50), nullable=False)
356 email: Mapped[Optional[str]] = mapped_column(VARCHAR(length=255), nullable=True)
357 password: Mapped[Optional[str]] = mapped_column(VARCHAR(length=255), nullable=True)
358 locale: Mapped[Optional[str]] = mapped_column(VARCHAR(length=10), nullable=True)
359 previous_login_date: Mapped[Optional[datetime]] = mapped_column(
360 DateTime, nullable=True, server_default=text("NULL")
361 )
362 password_set_date: Mapped[datetime] = mapped_column(
363 TIMESTAMP, nullable=False, server_default=text("CURRENT_TIMESTAMP")
364 )
365 is_active: Mapped[bool] = mapped_column(types.Boolean, default=True, nullable=False)
366 failed_login_attempts: Mapped[int] = mapped_column(Integer, default=0, nullable=False)
367 total_failed_logins: Mapped[int] = mapped_column(Integer, default=0, nullable=False)
368 locked_until: Mapped[Optional[datetime]] = mapped_column(DateTime, nullable=True)
370 type: Mapped[str] = mapped_column(
371 VARCHAR(length=10),
372 nullable=False,
373 default="standard",
374 server_default=text("'standard'"),
375 )
377 organisation: Mapped["Organisation"] = relationship(
378 "Organisation", uselist=False, back_populates="users"
379 )
381 custom_roles: Mapped[list["CustomRole"]] = relationship(
382 "CustomRole",
383 secondary="user_roles",
384 secondaryjoin="UserRole.role_id==CustomRole.name",
385 viewonly=True,
386 )
388 role_list: Mapped[list["UserRole"]] = relationship(
389 "UserRole", back_populates="user", cascade="all, delete", passive_deletes=True
390 )
392 roles: AssociationProxy[list[str]] = association_proxy(
393 "role_list", "role_id", creator=lambda r: UserRole(role_id=r)
394 )
396 roles_q: Mapped[list["UserRole"]] = relationship(
397 "UserRole", lazy="dynamic", viewonly=True
398 )
400 tokens: Mapped[list["UserAdminToken"]] = relationship(
401 "UserAdminToken",
402 back_populates="user",
403 cascade="all, delete",
404 passive_deletes=True,
405 )
407 refresh_tokens: Mapped[list["RefreshToken"]] = relationship(
408 "RefreshToken",
409 back_populates="user",
410 cascade="all, delete-orphan",
411 passive_deletes=True,
412 )
414 section_permissions: DynamicMapped["SectionPermission"] = relationship(
415 "SectionPermission",
416 back_populates="user",
417 cascade="all,delete",
418 passive_deletes=True,
419 lazy="dynamic",
420 )
422 project_permissions: Mapped[list["ProjectPermission"]] = relationship(
423 "ProjectPermission",
424 back_populates="user",
425 cascade="all, delete",
426 passive_deletes=True,
427 )
429 project_watches: Mapped[list["ProjectWatchList"]] = relationship(
430 "ProjectWatchList",
431 back_populates="user",
432 cascade="all,delete",
433 passive_deletes=True,
434 )
436 issue_watches: Mapped[list["IssueWatchList"]] = relationship(
437 "IssueWatchList",
438 back_populates="user",
439 cascade="all,delete",
440 passive_deletes=True,
441 )
443 events: DynamicMapped[AuditEvent] = relationship(
444 "AuditEvent",
445 lazy="dynamic",
446 back_populates="user",
447 cascade_backrefs=False,
448 primaryjoin="foreign(AuditEvent.user_id)==User.id",
449 )
451 def __init__(self, user_id, *args, **kwargs):
452 super(User, self).__init__(*args, **kwargs)
453 self.id = user_id
454 self.fullname = user_id
456 def __repr__(self):
457 return f"<User: {self.id} ({self.fullname}) of {self.org_id}>"
459 @property
460 def custom_permissions(self):
461 cq = custom_perms(self._instance_session, self.id)
462 return {cp[0] for cp in cq}
464 @property
465 def is_restricted(self):
466 return self.type == "restricted"
468 @is_restricted.setter
469 def is_restricted(self, restricted_bool):
470 self.type = "restricted" if restricted_bool else "standard"
472 def has_permission(self, permission):
473 """
474 May be called multiple times in the life of one http request
475 so attempting to optimise. Database stored custom roles are
476 sometimes in use, but all efforts are made to avoid querying the Database
477 """
478 if "Administrator" in self.roles:
479 return True
480 if permission in self.builtin_permissions:
481 return True
482 return permission in self.custom_permissions
484 def check_permission(self, permission):
485 """
486 @raise LacksPermission if the user doesn't have the given permission
487 """
488 if not self.has_permission(permission):
489 raise LacksPermission(permission, self.id)
491 def check_is_standard(self):
492 if self.is_restricted:
493 raise AuthorizationFailure("Action not permitted for Domain Expert users")
495 @property
496 def builtin_permissions(self):
497 if hasattr(self, "_builtin_perms"):
498 return self._builtin_perms
500 # cache not populated, so build it
501 self._builtin_perms = pm = set()
502 # Add permissions for 'built-in roles'
503 for role_id in self.roles & ROLES.keys():
504 pm.update(ROLES[role_id])
505 return pm
507 @property
508 def all_permissions(self) -> set[str]:
509 # set union syntax
510 return self.custom_permissions | self.builtin_permissions
512 @property
513 def sorted_permissions(self) -> list[str]:
514 return sorted(self.all_permissions)
516 def add_role(self, role_id: str) -> None:
517 if role_id not in ROLES:
518 if role_id not in {cr.id for cr in self.organisation.custom_roles}:
519 raise KeyError('Role "%s" not found' % role_id)
520 self.roles.append(role_id)
522 def is_in_role(self, role_id: str):
523 return role_id in self.roles
525 def is_administrator(self):
526 return self.is_in_role("Administrator")
528 def can_view_section_id(self, section_id: int) -> bool:
529 if self.is_restricted:
530 clause = text(f"section_id={section_id}")
531 if self.section_permissions.filter(clause).count() != 1:
532 return False
533 return True
535 def create_refresh_token(self, session):
536 """Create a new refresh token for this user"""
537 now = datetime.now(timezone.utc)
538 expires_at = now + timedelta(hours=conf.CONF.jwt_refresh_token_expiry_hours)
540 token_value = secrets.token_urlsafe(32)
541 refresh_token = RefreshToken(
542 user_id=self.id,
543 token=token_value,
544 issued_at=now,
545 expires_at=expires_at,
546 revoked=False
547 )
549 session.add(refresh_token)
550 return token_value
552 def revoke_all_refresh_tokens(self, session):
553 """Revoke all refresh tokens for this user"""
554 session.query(RefreshToken).filter(
555 RefreshToken.user_id == self.id
556 ).update({"revoked": True})
559class UserRole(Base):
560 __tablename__ = "user_roles"
561 __table_args__ = (
562 UniqueConstraint("user_id", "role_id", name="unique_user_role"),
563 ) + Base.__table_args__
565 user_id: Mapped[str] = mapped_column(
566 VARCHAR(length=50),
567 ForeignKey("users.id", ondelete="CASCADE", name="constr_user"),
568 nullable=False,
569 )
570 role_id: Mapped[str] = mapped_column(
571 VARCHAR(length=255),
572 nullable=True,
573 )
575 user: Mapped["User"] = relationship(
576 "User", back_populates="role_list", viewonly=True
577 )
579 def __repr__(self):
580 return f"<UserRole {self.user_id} - {self.role_id}>"
583def munged_id(sqla_context):
584 """
585 id is a string formed by joining name, org_id and a random string,
586 e.g. Self Scorer/Thomas Murray#TK94SL
587 """
588 name = sqla_context.current_parameters["name"]
589 org_id = sqla_context.current_parameters["org_id"]
590 chars = ascii_uppercase + digits
591 ran_string = [choice(chars) for c in range(5)]
592 return "%s/%s#%s" % (name, org_id, "".join(ran_string))
595class CustomRole(Base):
596 __tablename__ = "roles"
598 __table_args__ = (
599 UniqueConstraint("name", "org_id", name="role_names"),
600 ) + Base.__table_args__
602 id: Mapped[str] = mapped_column( # type: ignore
603 VARCHAR(length=255), nullable=False, primary_key=True, default=munged_id
604 )
605 name: Mapped[str] = mapped_column(VARCHAR(length=255), nullable=False)
606 description: Mapped[str] = mapped_column(MEDIUMTEXT(), nullable=False)
607 type: Mapped[str] = mapped_column(
608 CHAR(length=1), nullable=False, server_default=text("'U'")
609 )
610 org_id: Mapped[str] = mapped_column(
611 VARCHAR(length=50),
612 ForeignKey("organisations.id", onupdate="CASCADE", ondelete="CASCADE"),
613 nullable=False,
614 )
616 organisation: Mapped["Organisation"] = relationship(
617 Organisation, back_populates="custom_roles"
618 )
619 permissions: Mapped[list["CustomRolePermission"]] = relationship(
620 "CustomRolePermission",
621 back_populates="role",
622 cascade="all, delete",
623 passive_deletes=True,
624 )
626 def __repr__(self):
627 return f"<CustomRole {self.id} - {self.name}>"
630class CustomRolePermission(Base):
631 __tablename__ = "role_permissions"
633 id = None # type: ignore
635 role_id: Mapped[str] = mapped_column(
636 VARCHAR(length=255),
637 ForeignKey("roles.id", ondelete="CASCADE", name="role_reference"),
638 nullable=False,
639 primary_key=True,
640 )
642 permission_id: Mapped[str] = mapped_column(
643 VARCHAR(length=255), nullable=False, primary_key=True
644 )
645 role: Mapped["CustomRole"] = relationship(CustomRole, back_populates="permissions")
647 def __repr__(self):
648 return f"{self.permission_id} (Role: {self.role_id})"
650 @validates("permission_id")
651 def known_permission(self, key, perm_id):
652 if perm_id not in perms.ALL_PERMISSIONS:
653 raise ValueError('"%s" is not a recognised permission' % perm_id)
654 return perm_id
657class OrganisationCategory(Base):
658 __tablename__ = "org_categories"
659 __table_args__ = (UniqueConstraint("title", name="title"),) + Base.__table_args__
661 title: Mapped[str] = mapped_column(VARCHAR(length=255), nullable=False, unique=True)
662 description: Mapped[Optional[str]] = mapped_column(MEDIUMTEXT(), nullable=True)
664 def __repr__(self):
665 return f"<Organisation Category: {self.title}>"
668class FailedLoginAttempt(Base):
669 __tablename__ = "failed_login_attempts"
670 user_id: Mapped[str] = mapped_column(
671 VARCHAR(length=50), ForeignKey("users.id"), nullable=False
672 )
673 timestamp: Mapped[datetime] = mapped_column(DateTime, index=True, nullable=False)
674 ip_address: Mapped[str] = mapped_column(VARCHAR(length=50), nullable=False)
676 def __repr__(self):
677 return f"<FailedLoginAttempt {self.user_id} at {self.timestamp}>"
680__all__ = [
681 "CustomRolePermission",
682 "CustomRole",
683 "UserRole",
684 "User",
685 "Organisation",
686 "ConsultantOrganisation",
687 "OrganisationType",
688 "OrganisationCategory",
689 "FailedLoginAttempt",
690]