Coverage for rfpy/auth/policy.py: 100%

88 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-04-24 10:52 +0000

1import abc 

2import logging 

3import webob 

4import webob.exc 

5 

6import jwt 

7from sqlalchemy.orm.exc import NoResultFound 

8 

9from rfpy.api import fetch 

10from rfpy.auth import NotLoggedIn 

11import rfpy.conf 

12 

13 

14log = logging.getLogger(__name__) 

15 

16 

17class AbstractIdentityPolicy: # pragma: no cover 

18 __metaclass__ = abc.ABCMeta 

19 

20 @abc.abstractmethod 

21 def identify(self, request): 

22 """Set remote_user(id) user(object) on the request""" 

23 pass 

24 

25 @abc.abstractmethod 

26 def remember(self, request, response): 

27 """Remember the users identity for subsequent requests""" 

28 pass 

29 

30 

31def set_user_object(request): 

32 user_id = request.remote_user 

33 try: 

34 if not user_id: 

35 raise NotLoggedIn("User ID Not Found for Request") 

36 request.user = fetch.user(request.session, user_id) 

37 

38 except NoResultFound as nre: 

39 log.warning('User ID "%s" not in database ', user_id) 

40 raise NotLoggedIn(f'User ID "{user_id}" not found ') from nre 

41 

42 

43class JwtBearerPolicy(AbstractIdentityPolicy): 

44 

45 def identify(self, request: webob.Request): 

46 if request.remote_user is not None: 

47 msg = "REMOTE_USER illegally set to [%s] upstream" 

48 raise webob.exc.HTTPBadRequest(msg % request.remote_user) 

49 

50 if request.authorization is None or request.authorization.authtype != "Bearer": 

51 raise webob.exc.HTTPForbidden("Bearer Authentication token required") 

52 

53 token: str = request.authorization.params # type: ignore 

54 jwt_doc = self._decode_jwt_token(token) 

55 

56 request.remote_user = jwt_doc["user_id"] 

57 set_user_object(request) 

58 

59 def remember(self, request, response): 

60 """Client is responsible for storing token""" 

61 pass 

62 

63 def _decode_jwt_token(self, token: str) -> dict: 

64 """ 

65 Helper method to decode JWT token and handle exceptions. 

66 """ 

67 try: 

68 decoded = jwt.decode( 

69 token, 

70 rfpy.conf.CONF.crypt_key, 

71 algorithms=["HS256"], 

72 options={"require": ["exp", "iat"]}, 

73 ) 

74 

75 # Verify token type if present 

76 # For backward compatibility, only check token_type if it exists 

77 token_type = decoded.get("token_type") 

78 if token_type is not None and token_type != "access": 

79 raise webob.exc.HTTPForbidden( 

80 f"Invalid token type: '{token_type}'. Expected 'access'" 

81 ) 

82 

83 return decoded 

84 except jwt.ExpiredSignatureError: 

85 raise webob.exc.HTTPForbidden("Authentication token has expired") 

86 except jwt.MissingRequiredClaimError as claim_err: 

87 raise webob.exc.HTTPBadRequest(f"'{claim_err}'") 

88 except jwt.InvalidAlgorithmError: 

89 raise webob.exc.HTTPBadRequest("Invalid algorithm specified in JWT token") 

90 except jwt.DecodeError as decode_err: 

91 raise webob.exc.HTTPBadRequest( 

92 f"Failed to decode JWT token: '{decode_err}'" 

93 ) 

94 except jwt.InvalidTokenError as token_err: 

95 raise webob.exc.HTTPBadRequest(f"Invalid JWT token: '{token_err}'") 

96 

97 

98class PassthroughPolicy(AbstractIdentityPolicy): 

99 """ 

100 Identity Policies purpose is to identify authenticated 

101 users for a given request. For login on we are dealing 

102 with unauthenticated users, so this is implementation does 

103 nothing. 

104 """ 

105 

106 def identify(self, request): 

107 pass 

108 

109 def remember(self, request, response): 

110 pass 

111 

112 

113class DevHeaderPolicy(AbstractIdentityPolicy): 

114 def __init__(self, default_user=None): 

115 super().__init__() 

116 self.default_user = default_user 

117 

118 def identify(self, request): 

119 try: 

120 if "RFPY-TEST-USER" in request.headers: 

121 user_id = request.headers["RFPY-TEST-USER"] 

122 log.info( 

123 "Unsafely Authenticated user [%s] via RFPY-TEST-USER HTTP Header", 

124 user_id, 

125 ) 

126 else: 

127 user_id = request.environ["RFPY-TEST-USER"] 

128 log.info( 

129 "Unsafely Authenticated user [%s] via RFPY-TEST-USER wsgi Environ key", 

130 user_id, 

131 ) 

132 except KeyError: 

133 if self.default_user is None: 

134 raise ValueError("RFPY-TEST-USER header not set") 

135 user_id = self.default_user 

136 log.info( 

137 "Unsafely Authenticated user [%s] as default user for DevHeaderPolicy", 

138 user_id, 

139 ) 

140 request.remote_user = str(user_id) 

141 set_user_object(request) 

142 

143 def remember(self, request, response): 

144 pass 

145 

146 

147class FallbackPolicy(AbstractIdentityPolicy): 

148 def __init__(self, default_user=None): 

149 self.default_user = None 

150 self.header_policy = DevHeaderPolicy(default_user=default_user) 

151 self.bearer_policy = JwtBearerPolicy() 

152 

153 def identify(self, request): 

154 try: 

155 self.bearer_policy.identify(request) 

156 log.warning( 

157 f"Authenticated user {request.remote_user} with JWT Bearer policy" 

158 ) 

159 except webob.exc.HTTPForbidden: 

160 self.header_policy.identify(request) 

161 log.warning( 

162 f"Authenticated user {request.remote_user} with HTTP Test Header policy" 

163 ) 

164 

165 def remember(self, request, response): 

166 pass