Coverage for rfpy/auth/policy.py: 100%
88 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-24 10:52 +0000
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-24 10:52 +0000
1import abc
2import logging
3import webob
4import webob.exc
6import jwt
7from sqlalchemy.orm.exc import NoResultFound
9from rfpy.api import fetch
10from rfpy.auth import NotLoggedIn
11import rfpy.conf
14log = logging.getLogger(__name__)
17class AbstractIdentityPolicy: # pragma: no cover
18 __metaclass__ = abc.ABCMeta
20 @abc.abstractmethod
21 def identify(self, request):
22 """Set remote_user(id) user(object) on the request"""
23 pass
25 @abc.abstractmethod
26 def remember(self, request, response):
27 """Remember the users identity for subsequent requests"""
28 pass
31def set_user_object(request):
32 user_id = request.remote_user
33 try:
34 if not user_id:
35 raise NotLoggedIn("User ID Not Found for Request")
36 request.user = fetch.user(request.session, user_id)
38 except NoResultFound as nre:
39 log.warning('User ID "%s" not in database ', user_id)
40 raise NotLoggedIn(f'User ID "{user_id}" not found ') from nre
43class JwtBearerPolicy(AbstractIdentityPolicy):
45 def identify(self, request: webob.Request):
46 if request.remote_user is not None:
47 msg = "REMOTE_USER illegally set to [%s] upstream"
48 raise webob.exc.HTTPBadRequest(msg % request.remote_user)
50 if request.authorization is None or request.authorization.authtype != "Bearer":
51 raise webob.exc.HTTPForbidden("Bearer Authentication token required")
53 token: str = request.authorization.params # type: ignore
54 jwt_doc = self._decode_jwt_token(token)
56 request.remote_user = jwt_doc["user_id"]
57 set_user_object(request)
59 def remember(self, request, response):
60 """Client is responsible for storing token"""
61 pass
63 def _decode_jwt_token(self, token: str) -> dict:
64 """
65 Helper method to decode JWT token and handle exceptions.
66 """
67 try:
68 decoded = jwt.decode(
69 token,
70 rfpy.conf.CONF.crypt_key,
71 algorithms=["HS256"],
72 options={"require": ["exp", "iat"]},
73 )
75 # Verify token type if present
76 # For backward compatibility, only check token_type if it exists
77 token_type = decoded.get("token_type")
78 if token_type is not None and token_type != "access":
79 raise webob.exc.HTTPForbidden(
80 f"Invalid token type: '{token_type}'. Expected 'access'"
81 )
83 return decoded
84 except jwt.ExpiredSignatureError:
85 raise webob.exc.HTTPForbidden("Authentication token has expired")
86 except jwt.MissingRequiredClaimError as claim_err:
87 raise webob.exc.HTTPBadRequest(f"'{claim_err}'")
88 except jwt.InvalidAlgorithmError:
89 raise webob.exc.HTTPBadRequest("Invalid algorithm specified in JWT token")
90 except jwt.DecodeError as decode_err:
91 raise webob.exc.HTTPBadRequest(
92 f"Failed to decode JWT token: '{decode_err}'"
93 )
94 except jwt.InvalidTokenError as token_err:
95 raise webob.exc.HTTPBadRequest(f"Invalid JWT token: '{token_err}'")
98class PassthroughPolicy(AbstractIdentityPolicy):
99 """
100 Identity Policies purpose is to identify authenticated
101 users for a given request. For login on we are dealing
102 with unauthenticated users, so this is implementation does
103 nothing.
104 """
106 def identify(self, request):
107 pass
109 def remember(self, request, response):
110 pass
113class DevHeaderPolicy(AbstractIdentityPolicy):
114 def __init__(self, default_user=None):
115 super().__init__()
116 self.default_user = default_user
118 def identify(self, request):
119 try:
120 if "RFPY-TEST-USER" in request.headers:
121 user_id = request.headers["RFPY-TEST-USER"]
122 log.info(
123 "Unsafely Authenticated user [%s] via RFPY-TEST-USER HTTP Header",
124 user_id,
125 )
126 else:
127 user_id = request.environ["RFPY-TEST-USER"]
128 log.info(
129 "Unsafely Authenticated user [%s] via RFPY-TEST-USER wsgi Environ key",
130 user_id,
131 )
132 except KeyError:
133 if self.default_user is None:
134 raise ValueError("RFPY-TEST-USER header not set")
135 user_id = self.default_user
136 log.info(
137 "Unsafely Authenticated user [%s] as default user for DevHeaderPolicy",
138 user_id,
139 )
140 request.remote_user = str(user_id)
141 set_user_object(request)
143 def remember(self, request, response):
144 pass
147class FallbackPolicy(AbstractIdentityPolicy):
148 def __init__(self, default_user=None):
149 self.default_user = None
150 self.header_policy = DevHeaderPolicy(default_user=default_user)
151 self.bearer_policy = JwtBearerPolicy()
153 def identify(self, request):
154 try:
155 self.bearer_policy.identify(request)
156 log.warning(
157 f"Authenticated user {request.remote_user} with JWT Bearer policy"
158 )
159 except webob.exc.HTTPForbidden:
160 self.header_policy.identify(request)
161 log.warning(
162 f"Authenticated user {request.remote_user} with HTTP Test Header policy"
163 )
165 def remember(self, request, response):
166 pass