Coverage for rfpy/auth/policy.py: 66%
70 statements
« prev ^ index » next coverage.py v7.0.1, created at 2022-12-31 16:00 +0000
« prev ^ index » next coverage.py v7.0.1, created at 2022-12-31 16:00 +0000
1import abc
2import logging
3import webob
4import webob.exc
6import jwt
7from sqlalchemy.orm.exc import NoResultFound
9from rfpy.api import fetch
10from rfpy.auth import NotLoggedIn
11import rfpy.conf
14log = logging.getLogger(__name__)
17class AbstractIdentityPolicy: # pragma: no cover
19 __metaclass__ = abc.ABCMeta
21 @abc.abstractmethod
22 def identify(self, request):
23 """Set remote_user(id) user(object) on the request"""
24 pass
26 @abc.abstractmethod
27 def remember(self, request, response):
28 """Remember the users identity for subsequent requests"""
29 pass
32def set_user_object(request):
33 user_id = request.remote_user
34 try:
35 if not user_id:
36 raise NotLoggedIn('User ID Not Found for Request')
37 request.user = fetch.user(request.session, user_id)
39 except NoResultFound as nre:
40 log.warning("User ID \"%s\" not in database ", user_id)
41 raise NotLoggedIn("User ID \"%s\" not found ", user_id) from nre
44class JwtBearerPolicy(AbstractIdentityPolicy):
46 def identify(self, request: webob.Request):
48 if request.remote_user is not None:
49 msg = 'REMOTE_USER illegally set to [%s] upstream'
50 raise ValueError(msg % request.remote_user)
52 if request.authorization is None or request.authorization.authtype != "Bearer":
53 raise webob.exc.HTTPForbidden('Bearer Authentication token required')
55 token = request.authorization.params
56 try:
57 jwt_doc = jwt.decode(token, rfpy.conf.CONF.crypt_key, algorithms=["HS256"])
58 except jwt.DecodeError as decode_err:
59 raise webob.exc.HTTPBadRequest(f"Failed to decode JWT token: '{decode_err}'")
61 request.remote_user = jwt_doc['user_id']
62 set_user_object(request)
64 def remember(self, request, response):
65 '''Client is responsible for storing token'''
66 pass
69class PassthroughPolicy(AbstractIdentityPolicy):
71 '''
72 Identity Policies purpose is to identify authenticated
73 users for a given request. For login on we are dealing
74 with unauthenticated users, so this is implementation does
75 nothing.
76 '''
78 def identify(self, request):
79 pass
81 def remember(self, request, response):
82 pass
85class TestHeaderPolicy(AbstractIdentityPolicy):
87 def __init__(self, default_user=None):
88 super().__init__()
89 self.default_user = default_user
91 def identify(self, request):
92 try:
93 if 'RFPY-TEST-USER' in request.headers:
94 user_id = request.headers['RFPY-TEST-USER']
95 log.info('Unsafely Authenticated user [%s] via RFPY-TEST-USER HTTP Header', user_id)
96 else:
97 user_id = request.environ['RFPY-TEST-USER']
98 log.info(
99 'Unsafely Authenticated user [%s] via RFPY-TEST-USER wsgi Environ key', user_id
100 )
101 except KeyError:
102 if self.default_user is None:
103 raise ValueError('RFPY-TEST-USER header not set')
104 user_id = self.default_user
105 log.info(
106 'Unsafely Authenticated user [%s] as default user for TestHeaderPolicy', user_id
107 )
108 request.remote_user = str(user_id)
109 set_user_object(request)
112class FallbackPolicy(AbstractIdentityPolicy):
114 def __init__(self, default_user=None):
115 self.default_user = None
116 self.header_policy = TestHeaderPolicy(default_user=default_user)
117 self.bearer_policy = JwtBearerPolicy()
119 def identify(self, request):
120 try:
121 self.bearer_policy.identify(request)
122 log.warn(f'Authenticated user {request.remote_user} with JWT Bearer policy')
123 except webob.exc.HTTPForbidden:
124 self.header_policy.identify(request)
125 log.warn(f'Authenticated user {request.remote_user} with HTTP Test Header policy')