Coverage for rfpy/model/humans.py: 100%

210 statements  

« prev     ^ index     » next       coverage.py v7.0.1, created at 2022-12-31 16:00 +0000

1from enum import Enum 

2from random import choice 

3from string import ascii_uppercase, digits 

4 

5from sqlalchemy import ( 

6 Column, DateTime, Table, types, Integer, ForeignKey, UniqueConstraint, TIMESTAMP, text, Index 

7) 

8 

9from sqlalchemy.orm import deferred, relationship, validates 

10from sqlalchemy.orm.exc import NoResultFound 

11from sqlalchemy.ext.associationproxy import association_proxy 

12from sqlalchemy.dialects.mysql import VARCHAR, TINYINT, INTEGER, MEDIUMTEXT, CHAR 

13from sqlalchemy.sql.elements import literal_column 

14from sqlalchemy.sql.sqltypes import JSON 

15 

16 

17from rfpy.auth import ROLES, LacksPermission, AuthorizationFailure, perms 

18from rfpy.model.meta import Base 

19 

20 

21org_cat_rel = Table('org_cat_rel', Base.metadata, 

22 Column('id', Integer, primary_key=True), 

23 Column( 

24 'org_id', 

25 VARCHAR(50), 

26 ForeignKey( 

27 'organisations.id', 

28 name='constr_org', 

29 ondelete='CASCADE', 

30 onupdate='CASCADE' 

31 ), 

32 nullable=False 

33 ), 

34 Column( 

35 'cat_id', 

36 INTEGER(display_width=11), 

37 ForeignKey('org_categories.id', name='constr_cat'), 

38 nullable=False 

39 ), 

40 UniqueConstraint('org_id', 'cat_id', name='title'), 

41 mysql_engine='InnoDB', mysql_charset='utf8mb4') 

42 

43organisation_suppliers = Table( 

44 'organisation_suppliers', 

45 Base.metadata, 

46 Column('id', Integer, primary_key=True), 

47 Column( 

48 'org_id', VARCHAR(50), 

49 ForeignKey( 

50 'organisations.id', 

51 onupdate='CASCADE', 

52 ondelete='CASCADE' 

53 ) 

54 ), 

55 Column( 

56 'supplier_id', 

57 VARCHAR(50), 

58 ForeignKey( 

59 'organisations.id', 

60 ondelete='CASCADE', 

61 onupdate='CASCADE' 

62 ), 

63 ), 

64 mysql_engine='InnoDB', 

65 mysql_charset='utf8mb4' 

66) 

67 

68 

69class OrganisationType(Enum): 

70 RESPONDENT = 0 

71 BUYER = 1 

72 CONSULTANT = 2 

73 

74 

75BUYSIDE_ORGS = (OrganisationType.BUYER, OrganisationType.RESPONDENT) 

76 

77 

78class OrgTypeCol(types.TypeDecorator): 

79 

80 impl = INTEGER(1) 

81 

82 cache_ok = True 

83 

84 def process_bind_param(self, status, dialect): 

85 return status.value 

86 

87 def process_result_value(self, int_value, dialect): 

88 try: 

89 return OrganisationType(int_value) 

90 except ValueError: 

91 return None 

92 

93 

94class ConsultantClientRelationship(Base): 

95 __tablename__ = 'consultant_orgs' 

96 __table_args__ = ( 

97 Index('cons_org_client_fk', 'client_id'), 

98 ) + Base.__table_args__ 

99 

100 id = None 

101 consultant_id = Column( 

102 VARCHAR(length=50), 

103 ForeignKey('organisations.id', ondelete='CASCADE', onupdate='CASCADE'), 

104 primary_key=True 

105 ) 

106 client_id = Column( 

107 VARCHAR(length=50), 

108 ForeignKey('organisations.id', ondelete='CASCADE', onupdate='CASCADE'), 

109 primary_key=True 

110 ) 

111 consultant = relationship('Organisation', 

112 foreign_keys='ConsultantClientRelationship.consultant_id', 

113 backref='consultant_org_clients') 

114 client = relationship('Organisation', foreign_keys='ConsultantClientRelationship.client_id', 

115 backref='consultant_org_consultants') 

116 

117 def __repr__(self): 

118 return '<Consultant: %s , Client: %s >' % (self.consultant_id, self.client_id) 

119 

120 def __init__(self, client=None, consultant=None): 

121 '''Constructor is called when values are appended to the 'clients' or 'consultants' 

122 attributes of Organisation 

123 ''' 

124 self.consultant = consultant 

125 self.client = client 

126 

127 

128class Organisation(Base): 

129 __tablename__ = 'organisations' 

130 

131 __mapper_args__ = { 

132 "polymorphic_on": 'type', 

133 "polymorphic_identity": None 

134 } 

135 

136 public_attrs = 'id,name,public,is_consultant'.split(',') 

137 id = Column(VARCHAR(length=50), primary_key=True) 

138 name = Column(VARCHAR(length=50), nullable=False) 

139 address = Column(VARCHAR(length=255)) 

140 public = Column(TINYINT(1), default=0) 

141 type = Column(OrgTypeCol, nullable=False, default=0, server_default=text("'0'")) 

142 password_expiry = Column(Integer, nullable=False, default=0) 

143 

144 users = relationship('User', back_populates='organisation', 

145 cascade='all,delete', passive_deletes=True) 

146 

147 organisation_categories = relationship('OrganisationCategory', secondary=org_cat_rel) 

148 

149 # These form the "Private Address Book" 

150 suppliers = relationship('Organisation', secondary='organisation_suppliers', 

151 primaryjoin='Organisation.id==organisation_suppliers.c.org_id', 

152 secondaryjoin='Organisation.id==organisation_suppliers.c.supplier_id', 

153 backref="buyers", lazy='dynamic') 

154 

155 consultants = relationship( 

156 "Organisation", 

157 secondary="consultant_orgs", 

158 primaryjoin="Organisation.id == consultant_orgs.c.client_id", 

159 secondaryjoin="Organisation.id == consultant_orgs.c.consultant_id", 

160 viewonly=True 

161 ) 

162 

163 visible_events = relationship( 

164 'AuditEvent', 

165 primaryjoin='Organisation.id==EventOrgACL.org_id', 

166 secondaryjoin='EventOrgACL.event_id==AuditEvent.id', 

167 secondary='audit_event_orgs', 

168 lazy='dynamic', 

169 viewonly=True 

170 ) 

171 

172 webhook_subscriptions = relationship( 

173 "WebhookSubscription", back_populates="organisation", 

174 cascade="all, delete", 

175 passive_deletes=True, 

176 lazy='dynamic' 

177 ) 

178 

179 domain_name = Column(VARCHAR(length=256), nullable=True) 

180 

181 def __init__(self, id_name): 

182 self.id = id_name 

183 self.name = id_name 

184 

185 def __repr__(self): 

186 return 'Organisation: {0}'.format(self.name) 

187 

188 @property 

189 def is_consultant(self): 

190 return self.type == OrganisationType.CONSULTANT 

191 

192 @property 

193 def is_buyside(self): 

194 return self.type in (OrganisationType.BUYER, OrganisationType.CONSULTANT) 

195 

196 def has_supplier(self, supplier_org): 

197 try: 

198 self.suppliers.filter(Organisation.id == supplier_org.id).one() 

199 return True 

200 except NoResultFound: 

201 return False 

202 

203 

204class RespondentOrganisation(Organisation): 

205 __mapper_args__ = { 

206 "polymorphic_identity": OrganisationType.RESPONDENT 

207 } 

208 

209 

210class BuyerOrganisation(Organisation): 

211 __mapper_args__ = { 

212 "polymorphic_identity": OrganisationType.BUYER 

213 } 

214 consultants = association_proxy('consultant_org_consultants', 'consultant', 

215 creator=lambda org: 

216 ConsultantClientRelationship(consultant=org)) 

217 

218 

219class ConsultantOrganisation(Organisation): 

220 __mapper_args__ = { 

221 "polymorphic_identity": OrganisationType.CONSULTANT 

222 } 

223 # Weird SQLAlchemy magic here - these two constructors seem to aggregate 

224 # to call __init__ on ConsultantClientRelationship 

225 clients = association_proxy('consultant_org_clients', 'client', 

226 creator=lambda org: ConsultantClientRelationship(client=org)) 

227 

228 def add_client(self, client_organisation): 

229 co = ConsultantClientRelationship(consultant=self, client=client_organisation) 

230 self.consultant_org_clients.append(co) 

231 

232 

233def custom_perms(session, user_id): 

234 return session.query(CustomRolePermission.permission_id)\ 

235 .join(CustomRole)\ 

236 .join(UserRole, CustomRole.id == UserRole.role_id)\ 

237 .filter(UserRole.user_id == user_id) 

238 

239 

240class User(Base): 

241 __tablename__ = 'users' 

242 

243 id = Column(VARCHAR(50), primary_key=True) 

244 org_id = Column(VARCHAR(50), 

245 ForeignKey('organisations.id', ondelete="CASCADE", onupdate='CASCADE'), 

246 index=True) 

247 fullname = Column(VARCHAR(50), nullable=False) 

248 email = Column(VARCHAR(255)) 

249 password = Column(VARCHAR(255)) 

250 locale = Column(VARCHAR(length=10)) 

251 previous_login_date = Column(DateTime) 

252 password_set_date = Column( 

253 TIMESTAMP, nullable=False, server_default=text("CURRENT_TIMESTAMP") 

254 ) 

255 type = Column(VARCHAR(length=10), nullable=False, 

256 default="standard", server_default=text("'standard'")) 

257 

258 # role = Column(VARCHAR(length=255), nullable=False) 

259 

260 # custom_permissions = deferred(Column(JSON)) 

261 

262 organisation = relationship('Organisation', uselist=False, back_populates='users') 

263 

264 custom_roles = relationship('CustomRole', secondary='user_roles', 

265 secondaryjoin='UserRole.role_id==CustomRole.name', 

266 viewonly=True) 

267 

268 roles = relationship('UserRole', back_populates='user', 

269 cascade='all, delete', passive_deletes=True) 

270 

271 roles_q = relationship('UserRole', lazy='dynamic', viewonly=True) 

272 

273 tokens = relationship('UserAdminToken', back_populates='user', 

274 cascade='all, delete', passive_deletes=True) 

275 

276 section_permissions = relationship('SectionPermission', back_populates='user', 

277 cascade='all,delete', passive_deletes=True, 

278 lazy='dynamic') 

279 

280 project_permissions = relationship('ProjectPermission', back_populates='user', 

281 cascade='all, delete', passive_deletes=True) 

282 

283 project_watches = relationship('ProjectWatchList', back_populates='user', 

284 cascade='all,delete', passive_deletes=True) 

285 

286 def __init__(self, user_id, *args, **kwargs): 

287 super(User, self).__init__(*args, **kwargs) 

288 self.id = user_id 

289 self.fullname = user_id 

290 

291 def __repr__(self): 

292 return '<User: %s (%s) of %s>' % (self.id, self.fullname, self.org_id) 

293 

294 @property 

295 def custom_permissions(self): 

296 

297 cq = custom_perms(self._instance_session, self.id) 

298 return {cp[0] for cp in cq} 

299 

300 @property 

301 def is_restricted(self): 

302 return self.type == 'restricted' 

303 

304 @is_restricted.setter 

305 def is_restricted(self, restricted_bool): 

306 self.type = 'restricted' if restricted_bool else 'standard' 

307 

308 def has_permission(self, permission): 

309 ''' 

310 May be called multiple times in the life of one http request 

311 so attempting to optimise. Database stored custom roles are 

312 sometimes in use, but all efforts are made to avoid querying the Database 

313 ''' 

314 if 'Administrator' in self.role_ids: 

315 return True 

316 if permission in self.builtin_permissions: 

317 return True 

318 return permission in self.custom_permissions 

319 

320 def check_permission(self, permission): 

321 ''' 

322 @raise LacksPermission if the user doesn't have the given permission 

323 ''' 

324 if not self.has_permission(permission): 

325 raise LacksPermission(permission, self.id) 

326 

327 def check_is_standard(self): 

328 if self.is_restricted: 

329 raise AuthorizationFailure('Action not permitted for Domain Expert users') 

330 

331 @property 

332 def builtin_permissions(self): 

333 if hasattr(self, '_builtin_perms'): 

334 return self._builtin_perms 

335 

336 # cache not populated, so build it 

337 self._builtin_perms = pm = set() 

338 # Add permissions for 'built-in roles' 

339 for role_id in self.role_ids & ROLES.keys(): 

340 pm.update(ROLES[role_id]) 

341 return pm 

342 

343 @property 

344 def all_permissions(self): 

345 # set union syntax 

346 return self.custom_permissions | self.builtin_permissions 

347 

348 @property 

349 def sorted_permissions(self): 

350 return sorted(self.all_permissions) 

351 

352 def add_role(self, role_id): 

353 if role_id not in ROLES: 

354 if role_id not in {cr.id for cr in self.organisation.custom_roles}: 

355 raise KeyError('Role "%s" not found' % role_id) 

356 role = UserRole(role_id=role_id) 

357 self.roles.append(role) 

358 

359 @property 

360 def role_ids(self): 

361 try: 

362 return self._role_ids 

363 except AttributeError: 

364 self._role_ids = set(r[0] for r in self.roles_q.with_entities(literal_column('role_id'))) 

365 return self._role_ids 

366 

367 def is_in_role(self, role_id): 

368 return role_id in self.role_ids 

369 

370 def is_administrator(self): 

371 return self.is_in_role('Administrator') 

372 

373 def can_view_section_id(self, section_id: int) -> bool: 

374 if self.is_restricted: 

375 clause = text(f'section_id={section_id}') 

376 if self.section_permissions.filter(clause).count() != 1: 

377 return False 

378 return True 

379 

380 

381class UserRole(Base): 

382 __tablename__ = 'user_roles' 

383 __table_args__ = ( 

384 UniqueConstraint('user_id', 'role_id', name='unique_user_role'), 

385 ) + Base.__table_args__ 

386 

387 user_id = Column( 

388 VARCHAR(length=50), 

389 ForeignKey('users.id', ondelete='CASCADE', name='constr_user'), 

390 nullable=False 

391 ) 

392 role_id = Column(VARCHAR(length=255), server_default=None) 

393 

394 user = relationship('User', back_populates='roles', viewonly=True) 

395 

396 def __repr__(self): 

397 return f"<UserRole {self.user_id} - {self.role_id}>" 

398 

399 

400def munged_id(sqla_context): 

401 ''' 

402 id is a string formed by joining name, org_id and a random string, 

403 e.g. Self Scorer/Thomas Murray#TK94SL 

404 ''' 

405 name = sqla_context.current_parameters['name'] 

406 org_id = sqla_context.current_parameters['org_id'] 

407 chars = ascii_uppercase + digits 

408 ran_string = [choice(chars) for c in range(5)] 

409 return '%s/%s#%s' % (name, org_id, ''.join(ran_string)) 

410 

411 

412class CustomRole(Base): 

413 __tablename__ = 'roles' 

414 

415 __table_args__ = ( 

416 UniqueConstraint('name', 'org_id', name='role_names'), 

417 ) + Base.__table_args__ 

418 

419 id = Column(VARCHAR(length=255), nullable=False, 

420 primary_key=True, default=munged_id) 

421 name = Column(VARCHAR(length=255), nullable=False) 

422 description = Column(MEDIUMTEXT(), nullable=False) 

423 type = Column(CHAR(length=1), nullable=False, server_default=text("'U'")) 

424 org_id = Column( 

425 VARCHAR(length=50), 

426 ForeignKey('organisations.id', onupdate='CASCADE', ondelete='CASCADE'), 

427 nullable=False, 

428 ) 

429 

430 organisation = relationship(Organisation, backref='custom_roles') 

431 

432 

433class CustomRolePermission(Base): 

434 __tablename__ = 'role_permissions' 

435 

436 id = None # don't provide an ID column - not in original schema 

437 

438 role_id = Column(VARCHAR(length=255), 

439 ForeignKey('roles.id', ondelete='CASCADE', name='role_reference'), 

440 nullable=False, 

441 primary_key=True) 

442 

443 permission_id = Column(VARCHAR(length=255), nullable=False, primary_key=True) 

444 role = relationship(CustomRole, backref='permissions') 

445 

446 def __repr__(self): 

447 return '%s (Role: %s)' % (self.permission_id, self.role_id) 

448 

449 @validates('permission_id') 

450 def known_permission(self, key, perm_id): 

451 if perm_id not in perms.ALL_PERMISSIONS: 

452 raise ValueError('"%s" is not a recognised permission' % perm_id) 

453 return perm_id 

454 

455 

456class OrganisationCategory(Base): 

457 __tablename__ = 'org_categories' 

458 __table_args__ = ( 

459 UniqueConstraint('title', name='title'), 

460 ) + Base.__table_args__ 

461 

462 title = Column(VARCHAR(255), nullable=False, unique=True) 

463 description = Column(MEDIUMTEXT()) 

464 

465 def __repr__(self): 

466 return f"<Organisation Category: {self.title}>" 

467 

468 

469class FailedLoginAttempt(Base): 

470 __tablename__ = 'failed_login_attempts' 

471 user_id = Column(VARCHAR(length=50), ForeignKey('users.id'), nullable=False) 

472 timestamp = Column(DateTime, index=True, nullable=False) 

473 ip_address = Column(VARCHAR(50), nullable=False) 

474 

475 

476__all__ = ['CustomRolePermission', 'CustomRole', 'UserRole', 'User', 'Organisation', 

477 'ConsultantOrganisation', 'OrganisationType', 'OrganisationCategory', 

478 'FailedLoginAttempt']