Coverage for rfpy/model/acl.py: 100%

73 statements  

« prev     ^ index     » next       coverage.py v7.0.1, created at 2022-12-31 16:00 +0000

1import logging 

2 

3from sqlalchemy import (Column, DateTime, text, 

4 Integer, ForeignKey, UniqueConstraint, TIMESTAMP) 

5from sqlalchemy.dialects import mysql 

6from sqlalchemy.orm import relationship, backref, foreign 

7 

8from .meta import Base 

9from rfpy.auth import ROLES 

10from rfpy.model.humans import CustomRole 

11 

12 

13log = logging.getLogger(__name__) 

14 

15 

16class Participant(Base): 

17 __tablename__ = 'project_org_roles' 

18 __table_args__ = ( 

19 UniqueConstraint('project_id', 'org_id'), 

20 ) + Base.__table_args__ 

21 

22 project_id = Column( 

23 mysql.INTEGER(11), 

24 ForeignKey('projects.id', ondelete="CASCADE"), 

25 nullable=False 

26 ) 

27 

28 org_id = Column( 

29 mysql.VARCHAR(50), 

30 ForeignKey('organisations.id', onupdate='CASCADE'), 

31 nullable=False, 

32 server_default=text("''") 

33 ) 

34 role = Column(mysql.VARCHAR(255), default=None) 

35 organisation = relationship("Organisation", innerjoin=True, 

36 backref='participants') 

37 project = relationship('Project', back_populates="participants_query") 

38 

39 permissions = relationship('ProjectPermission', lazy='dynamic', 

40 back_populates='participant', 

41 cascade='all, delete', 

42 passive_deletes=True) 

43 custom_role = relationship( 

44 CustomRole, 

45 foreign_keys=[role], 

46 primaryjoin=CustomRole.id == foreign(role) 

47 ) 

48 

49 def __repr__(self): 

50 args = (self.org_id, self.project_id, self.role) 

51 return "%s participant in project %s with role %s" % args 

52 

53 @property 

54 def role_permissions(self): 

55 return ROLES[self.role] 

56 

57 @property 

58 def role_name(self): 

59 if self.role in ROLES: 

60 return self.role 

61 elif self.custom_role: 

62 return self.custom_role.name 

63 else: 

64 logging.error('Participant # %s has no role configured', self.id) 

65 

66 def as_dict(self): 

67 org_dict = self.organisation.as_dict() 

68 return {'organisation': org_dict, 'role': self.role_name} 

69 

70 

71class ProjectPermission(Base): 

72 __tablename__ = 'project_permissions' 

73 __table_args__ = ( 

74 UniqueConstraint('project_org_role_id', 'user_id'), 

75 ) + Base.__table_args__ 

76 

77 project_org_role_id = Column(mysql.INTEGER(11), 

78 ForeignKey("project_org_roles.id", 

79 ondelete="CASCADE")) 

80 

81 user_id = Column(mysql.VARCHAR(length=50), ForeignKey('users.id', ondelete='CASCADE')) 

82 

83 user = relationship("User", back_populates='project_permissions') 

84 

85 participant = relationship('Participant', back_populates='permissions') 

86 project = relationship("Project", secondary='project_org_roles', 

87 viewonly=True, 

88 backref=backref('permissions', lazy='dynamic')) 

89 

90 def __repr__(self): 

91 return "Permission for user %s on project_org_role %s" %\ 

92 (self.user_id, self.project_org_role_id) 

93 

94 

95class SectionPermission(Base): 

96 __tablename__ = 'section_permissions' 

97 __table_args__ = ( 

98 UniqueConstraint('project_permissions_id', 'user_id', 'section_id'), 

99 ) + Base.__table_args__ 

100 

101 section_id = Column(mysql.INTEGER(11), ForeignKey('sections.id', ondelete="CASCADE")) 

102 user_id = Column(mysql.VARCHAR(length=50), ForeignKey('users.id', ondelete="CASCADE")) 

103 project_permissions_id = Column( 

104 mysql.INTEGER(11), 

105 ForeignKey('project_permissions.id', ondelete='CASCADE'), 

106 nullable=False 

107 ) 

108 

109 section = relationship("Section", backref=backref('permissions'), viewonly=True) 

110 user = relationship("User", back_populates='section_permissions') 

111 project_permission = relationship("ProjectPermission", backref=backref('section_permissions')) 

112 

113 def __repr__(self): 

114 return "Permission for user %s on section %s" % (self.user_id, self.section_id) 

115 

116 

117class TokenExpired(Exception): 

118 pass 

119 

120 

121class UserAdminToken(Base): 

122 __tablename__ = 'user_admin_tokens' 

123 __mapper_args__ = { 

124 'polymorphic_on': 'kind', 

125 'polymorphic_identity': None 

126 } 

127 id = None 

128 token = Column(mysql.VARCHAR(length=255), primary_key=True) 

129 user_id = Column(mysql.VARCHAR(length=50), ForeignKey('users.id', ondelete="CASCADE")) 

130 email_address = Column(mysql.VARCHAR(length=100)) 

131 buyer_org_id = Column('buyer_org_id', mysql.VARCHAR(length=100)) 

132 manager_id = Column(mysql.VARCHAR(length=50)) 

133 issue_id = Column(Integer, ForeignKey('issues.id', ondelete="CASCADE")) 

134 kind = Column(mysql.VARCHAR(length=50)) 

135 created_date = Column( 

136 TIMESTAMP, nullable=False, server_default=text('CURRENT_TIMESTAMP') 

137 ) 

138 expiry_date = Column(DateTime, nullable=False) 

139 

140 user = relationship('User', back_populates='tokens') 

141 issue = relationship('Issue') 

142 

143 def __repr__(self): 

144 kind = self.kind if self.kind else 'Undefined' 

145 return "%s Token expires: %s" % (kind, self.expiry_date) 

146 

147 

148class LoginToken(UserAdminToken): 

149 __mapper_args__ = { 

150 'polymorphic_identity': 'COLLAB_LOGIN' 

151 } 

152 

153 def __repr__(self): 

154 return "Login Token for user: %s, expires: %s" % (self.user_id, self.expiry_date) 

155 

156 

157class UserRegistrationToken(UserAdminToken): 

158 __mapper_args__ = { 

159 'polymorphic_identity': 'USER_REGISTRATION' 

160 } 

161 

162 def __repr__(self): 

163 return "User Registration for %s, expires: %s" % (self.email_address, self.expiry_date) 

164 

165 

166class IssueRegistrationToken(UserAdminToken): 

167 __mapper_args__ = { 

168 'polymorphic_identity': 'ISSUE_REGISTRATION' 

169 } 

170 

171 def __repr__(self): 

172 return "Issue Registration for %s, expires: %s" % (self.email_address, self.expiry_date) 

173 

174 

175class PasswordResetToken(UserAdminToken): 

176 __mapper_args__ = { 

177 'polymorphic_identity': 'PASSWORD_RESET' 

178 } 

179 

180 def __repr__(self): 

181 return "Password Reset for user: %s, expires: %s" % (self.user_id, self.expiry_date)