Coverage for rfpy/model/humans.py: 100%
210 statements
« prev ^ index » next coverage.py v7.0.1, created at 2022-12-31 16:00 +0000
« prev ^ index » next coverage.py v7.0.1, created at 2022-12-31 16:00 +0000
1from enum import Enum
2from random import choice
3from string import ascii_uppercase, digits
5from sqlalchemy import (
6 Column, DateTime, Table, types, Integer, ForeignKey, UniqueConstraint, TIMESTAMP, text, Index
7)
9from sqlalchemy.orm import deferred, relationship, validates
10from sqlalchemy.orm.exc import NoResultFound
11from sqlalchemy.ext.associationproxy import association_proxy
12from sqlalchemy.dialects.mysql import VARCHAR, TINYINT, INTEGER, MEDIUMTEXT, CHAR
13from sqlalchemy.sql.elements import literal_column
14from sqlalchemy.sql.sqltypes import JSON
17from rfpy.auth import ROLES, LacksPermission, AuthorizationFailure, perms
18from rfpy.model.meta import Base
21org_cat_rel = Table('org_cat_rel', Base.metadata,
22 Column('id', Integer, primary_key=True),
23 Column(
24 'org_id',
25 VARCHAR(50),
26 ForeignKey(
27 'organisations.id',
28 name='constr_org',
29 ondelete='CASCADE',
30 onupdate='CASCADE'
31 ),
32 nullable=False
33 ),
34 Column(
35 'cat_id',
36 INTEGER(display_width=11),
37 ForeignKey('org_categories.id', name='constr_cat'),
38 nullable=False
39 ),
40 UniqueConstraint('org_id', 'cat_id', name='title'),
41 mysql_engine='InnoDB', mysql_charset='utf8mb4')
43organisation_suppliers = Table(
44 'organisation_suppliers',
45 Base.metadata,
46 Column('id', Integer, primary_key=True),
47 Column(
48 'org_id', VARCHAR(50),
49 ForeignKey(
50 'organisations.id',
51 onupdate='CASCADE',
52 ondelete='CASCADE'
53 )
54 ),
55 Column(
56 'supplier_id',
57 VARCHAR(50),
58 ForeignKey(
59 'organisations.id',
60 ondelete='CASCADE',
61 onupdate='CASCADE'
62 ),
63 ),
64 mysql_engine='InnoDB',
65 mysql_charset='utf8mb4'
66)
69class OrganisationType(Enum):
70 RESPONDENT = 0
71 BUYER = 1
72 CONSULTANT = 2
75BUYSIDE_ORGS = (OrganisationType.BUYER, OrganisationType.RESPONDENT)
78class OrgTypeCol(types.TypeDecorator):
80 impl = INTEGER(1)
82 cache_ok = True
84 def process_bind_param(self, status, dialect):
85 return status.value
87 def process_result_value(self, int_value, dialect):
88 try:
89 return OrganisationType(int_value)
90 except ValueError:
91 return None
94class ConsultantClientRelationship(Base):
95 __tablename__ = 'consultant_orgs'
96 __table_args__ = (
97 Index('cons_org_client_fk', 'client_id'),
98 ) + Base.__table_args__
100 id = None
101 consultant_id = Column(
102 VARCHAR(length=50),
103 ForeignKey('organisations.id', ondelete='CASCADE', onupdate='CASCADE'),
104 primary_key=True
105 )
106 client_id = Column(
107 VARCHAR(length=50),
108 ForeignKey('organisations.id', ondelete='CASCADE', onupdate='CASCADE'),
109 primary_key=True
110 )
111 consultant = relationship('Organisation',
112 foreign_keys='ConsultantClientRelationship.consultant_id',
113 backref='consultant_org_clients')
114 client = relationship('Organisation', foreign_keys='ConsultantClientRelationship.client_id',
115 backref='consultant_org_consultants')
117 def __repr__(self):
118 return '<Consultant: %s , Client: %s >' % (self.consultant_id, self.client_id)
120 def __init__(self, client=None, consultant=None):
121 '''Constructor is called when values are appended to the 'clients' or 'consultants'
122 attributes of Organisation
123 '''
124 self.consultant = consultant
125 self.client = client
128class Organisation(Base):
129 __tablename__ = 'organisations'
131 __mapper_args__ = {
132 "polymorphic_on": 'type',
133 "polymorphic_identity": None
134 }
136 public_attrs = 'id,name,public,is_consultant'.split(',')
137 id = Column(VARCHAR(length=50), primary_key=True)
138 name = Column(VARCHAR(length=50), nullable=False)
139 address = Column(VARCHAR(length=255))
140 public = Column(TINYINT(1), default=0)
141 type = Column(OrgTypeCol, nullable=False, default=0, server_default=text("'0'"))
142 password_expiry = Column(Integer, nullable=False, default=0)
144 users = relationship('User', back_populates='organisation',
145 cascade='all,delete', passive_deletes=True)
147 organisation_categories = relationship('OrganisationCategory', secondary=org_cat_rel)
149 # These form the "Private Address Book"
150 suppliers = relationship('Organisation', secondary='organisation_suppliers',
151 primaryjoin='Organisation.id==organisation_suppliers.c.org_id',
152 secondaryjoin='Organisation.id==organisation_suppliers.c.supplier_id',
153 backref="buyers", lazy='dynamic')
155 consultants = relationship(
156 "Organisation",
157 secondary="consultant_orgs",
158 primaryjoin="Organisation.id == consultant_orgs.c.client_id",
159 secondaryjoin="Organisation.id == consultant_orgs.c.consultant_id",
160 viewonly=True
161 )
163 visible_events = relationship(
164 'AuditEvent',
165 primaryjoin='Organisation.id==EventOrgACL.org_id',
166 secondaryjoin='EventOrgACL.event_id==AuditEvent.id',
167 secondary='audit_event_orgs',
168 lazy='dynamic',
169 viewonly=True
170 )
172 webhook_subscriptions = relationship(
173 "WebhookSubscription", back_populates="organisation",
174 cascade="all, delete",
175 passive_deletes=True,
176 lazy='dynamic'
177 )
179 domain_name = Column(VARCHAR(length=256), nullable=True)
181 def __init__(self, id_name):
182 self.id = id_name
183 self.name = id_name
185 def __repr__(self):
186 return 'Organisation: {0}'.format(self.name)
188 @property
189 def is_consultant(self):
190 return self.type == OrganisationType.CONSULTANT
192 @property
193 def is_buyside(self):
194 return self.type in (OrganisationType.BUYER, OrganisationType.CONSULTANT)
196 def has_supplier(self, supplier_org):
197 try:
198 self.suppliers.filter(Organisation.id == supplier_org.id).one()
199 return True
200 except NoResultFound:
201 return False
204class RespondentOrganisation(Organisation):
205 __mapper_args__ = {
206 "polymorphic_identity": OrganisationType.RESPONDENT
207 }
210class BuyerOrganisation(Organisation):
211 __mapper_args__ = {
212 "polymorphic_identity": OrganisationType.BUYER
213 }
214 consultants = association_proxy('consultant_org_consultants', 'consultant',
215 creator=lambda org:
216 ConsultantClientRelationship(consultant=org))
219class ConsultantOrganisation(Organisation):
220 __mapper_args__ = {
221 "polymorphic_identity": OrganisationType.CONSULTANT
222 }
223 # Weird SQLAlchemy magic here - these two constructors seem to aggregate
224 # to call __init__ on ConsultantClientRelationship
225 clients = association_proxy('consultant_org_clients', 'client',
226 creator=lambda org: ConsultantClientRelationship(client=org))
228 def add_client(self, client_organisation):
229 co = ConsultantClientRelationship(consultant=self, client=client_organisation)
230 self.consultant_org_clients.append(co)
233def custom_perms(session, user_id):
234 return session.query(CustomRolePermission.permission_id)\
235 .join(CustomRole)\
236 .join(UserRole, CustomRole.id == UserRole.role_id)\
237 .filter(UserRole.user_id == user_id)
240class User(Base):
241 __tablename__ = 'users'
243 id = Column(VARCHAR(50), primary_key=True)
244 org_id = Column(VARCHAR(50),
245 ForeignKey('organisations.id', ondelete="CASCADE", onupdate='CASCADE'),
246 index=True)
247 fullname = Column(VARCHAR(50), nullable=False)
248 email = Column(VARCHAR(255))
249 password = Column(VARCHAR(255))
250 locale = Column(VARCHAR(length=10))
251 previous_login_date = Column(DateTime)
252 password_set_date = Column(
253 TIMESTAMP, nullable=False, server_default=text("CURRENT_TIMESTAMP")
254 )
255 type = Column(VARCHAR(length=10), nullable=False,
256 default="standard", server_default=text("'standard'"))
258 # role = Column(VARCHAR(length=255), nullable=False)
260 # custom_permissions = deferred(Column(JSON))
262 organisation = relationship('Organisation', uselist=False, back_populates='users')
264 custom_roles = relationship('CustomRole', secondary='user_roles',
265 secondaryjoin='UserRole.role_id==CustomRole.name',
266 viewonly=True)
268 roles = relationship('UserRole', back_populates='user',
269 cascade='all, delete', passive_deletes=True)
271 roles_q = relationship('UserRole', lazy='dynamic', viewonly=True)
273 tokens = relationship('UserAdminToken', back_populates='user',
274 cascade='all, delete', passive_deletes=True)
276 section_permissions = relationship('SectionPermission', back_populates='user',
277 cascade='all,delete', passive_deletes=True,
278 lazy='dynamic')
280 project_permissions = relationship('ProjectPermission', back_populates='user',
281 cascade='all, delete', passive_deletes=True)
283 project_watches = relationship('ProjectWatchList', back_populates='user',
284 cascade='all,delete', passive_deletes=True)
286 def __init__(self, user_id, *args, **kwargs):
287 super(User, self).__init__(*args, **kwargs)
288 self.id = user_id
289 self.fullname = user_id
291 def __repr__(self):
292 return '<User: %s (%s) of %s>' % (self.id, self.fullname, self.org_id)
294 @property
295 def custom_permissions(self):
297 cq = custom_perms(self._instance_session, self.id)
298 return {cp[0] for cp in cq}
300 @property
301 def is_restricted(self):
302 return self.type == 'restricted'
304 @is_restricted.setter
305 def is_restricted(self, restricted_bool):
306 self.type = 'restricted' if restricted_bool else 'standard'
308 def has_permission(self, permission):
309 '''
310 May be called multiple times in the life of one http request
311 so attempting to optimise. Database stored custom roles are
312 sometimes in use, but all efforts are made to avoid querying the Database
313 '''
314 if 'Administrator' in self.role_ids:
315 return True
316 if permission in self.builtin_permissions:
317 return True
318 return permission in self.custom_permissions
320 def check_permission(self, permission):
321 '''
322 @raise LacksPermission if the user doesn't have the given permission
323 '''
324 if not self.has_permission(permission):
325 raise LacksPermission(permission, self.id)
327 def check_is_standard(self):
328 if self.is_restricted:
329 raise AuthorizationFailure('Action not permitted for Domain Expert users')
331 @property
332 def builtin_permissions(self):
333 if hasattr(self, '_builtin_perms'):
334 return self._builtin_perms
336 # cache not populated, so build it
337 self._builtin_perms = pm = set()
338 # Add permissions for 'built-in roles'
339 for role_id in self.role_ids & ROLES.keys():
340 pm.update(ROLES[role_id])
341 return pm
343 @property
344 def all_permissions(self):
345 # set union syntax
346 return self.custom_permissions | self.builtin_permissions
348 @property
349 def sorted_permissions(self):
350 return sorted(self.all_permissions)
352 def add_role(self, role_id):
353 if role_id not in ROLES:
354 if role_id not in {cr.id for cr in self.organisation.custom_roles}:
355 raise KeyError('Role "%s" not found' % role_id)
356 role = UserRole(role_id=role_id)
357 self.roles.append(role)
359 @property
360 def role_ids(self):
361 try:
362 return self._role_ids
363 except AttributeError:
364 self._role_ids = set(r[0] for r in self.roles_q.with_entities(literal_column('role_id')))
365 return self._role_ids
367 def is_in_role(self, role_id):
368 return role_id in self.role_ids
370 def is_administrator(self):
371 return self.is_in_role('Administrator')
373 def can_view_section_id(self, section_id: int) -> bool:
374 if self.is_restricted:
375 clause = text(f'section_id={section_id}')
376 if self.section_permissions.filter(clause).count() != 1:
377 return False
378 return True
381class UserRole(Base):
382 __tablename__ = 'user_roles'
383 __table_args__ = (
384 UniqueConstraint('user_id', 'role_id', name='unique_user_role'),
385 ) + Base.__table_args__
387 user_id = Column(
388 VARCHAR(length=50),
389 ForeignKey('users.id', ondelete='CASCADE', name='constr_user'),
390 nullable=False
391 )
392 role_id = Column(VARCHAR(length=255), server_default=None)
394 user = relationship('User', back_populates='roles', viewonly=True)
396 def __repr__(self):
397 return f"<UserRole {self.user_id} - {self.role_id}>"
400def munged_id(sqla_context):
401 '''
402 id is a string formed by joining name, org_id and a random string,
403 e.g. Self Scorer/Thomas Murray#TK94SL
404 '''
405 name = sqla_context.current_parameters['name']
406 org_id = sqla_context.current_parameters['org_id']
407 chars = ascii_uppercase + digits
408 ran_string = [choice(chars) for c in range(5)]
409 return '%s/%s#%s' % (name, org_id, ''.join(ran_string))
412class CustomRole(Base):
413 __tablename__ = 'roles'
415 __table_args__ = (
416 UniqueConstraint('name', 'org_id', name='role_names'),
417 ) + Base.__table_args__
419 id = Column(VARCHAR(length=255), nullable=False,
420 primary_key=True, default=munged_id)
421 name = Column(VARCHAR(length=255), nullable=False)
422 description = Column(MEDIUMTEXT(), nullable=False)
423 type = Column(CHAR(length=1), nullable=False, server_default=text("'U'"))
424 org_id = Column(
425 VARCHAR(length=50),
426 ForeignKey('organisations.id', onupdate='CASCADE', ondelete='CASCADE'),
427 nullable=False,
428 )
430 organisation = relationship(Organisation, backref='custom_roles')
433class CustomRolePermission(Base):
434 __tablename__ = 'role_permissions'
436 id = None # don't provide an ID column - not in original schema
438 role_id = Column(VARCHAR(length=255),
439 ForeignKey('roles.id', ondelete='CASCADE', name='role_reference'),
440 nullable=False,
441 primary_key=True)
443 permission_id = Column(VARCHAR(length=255), nullable=False, primary_key=True)
444 role = relationship(CustomRole, backref='permissions')
446 def __repr__(self):
447 return '%s (Role: %s)' % (self.permission_id, self.role_id)
449 @validates('permission_id')
450 def known_permission(self, key, perm_id):
451 if perm_id not in perms.ALL_PERMISSIONS:
452 raise ValueError('"%s" is not a recognised permission' % perm_id)
453 return perm_id
456class OrganisationCategory(Base):
457 __tablename__ = 'org_categories'
458 __table_args__ = (
459 UniqueConstraint('title', name='title'),
460 ) + Base.__table_args__
462 title = Column(VARCHAR(255), nullable=False, unique=True)
463 description = Column(MEDIUMTEXT())
465 def __repr__(self):
466 return f"<Organisation Category: {self.title}>"
469class FailedLoginAttempt(Base):
470 __tablename__ = 'failed_login_attempts'
471 user_id = Column(VARCHAR(length=50), ForeignKey('users.id'), nullable=False)
472 timestamp = Column(DateTime, index=True, nullable=False)
473 ip_address = Column(VARCHAR(50), nullable=False)
476__all__ = ['CustomRolePermission', 'CustomRole', 'UserRole', 'User', 'Organisation',
477 'ConsultantOrganisation', 'OrganisationType', 'OrganisationCategory',
478 'FailedLoginAttempt']