Coverage for rfpy/auth/policy.py: 66%

70 statements  

« prev     ^ index     » next       coverage.py v7.0.1, created at 2022-12-31 16:00 +0000

1import abc 

2import logging 

3import webob 

4import webob.exc 

5 

6import jwt 

7from sqlalchemy.orm.exc import NoResultFound 

8 

9from rfpy.api import fetch 

10from rfpy.auth import NotLoggedIn 

11import rfpy.conf 

12 

13 

14log = logging.getLogger(__name__) 

15 

16 

17class AbstractIdentityPolicy: # pragma: no cover 

18 

19 __metaclass__ = abc.ABCMeta 

20 

21 @abc.abstractmethod 

22 def identify(self, request): 

23 """Set remote_user(id) user(object) on the request""" 

24 pass 

25 

26 @abc.abstractmethod 

27 def remember(self, request, response): 

28 """Remember the users identity for subsequent requests""" 

29 pass 

30 

31 

32def set_user_object(request): 

33 user_id = request.remote_user 

34 try: 

35 if not user_id: 

36 raise NotLoggedIn('User ID Not Found for Request') 

37 request.user = fetch.user(request.session, user_id) 

38 

39 except NoResultFound as nre: 

40 log.warning("User ID \"%s\" not in database ", user_id) 

41 raise NotLoggedIn("User ID \"%s\" not found ", user_id) from nre 

42 

43 

44class JwtBearerPolicy(AbstractIdentityPolicy): 

45 

46 def identify(self, request: webob.Request): 

47 

48 if request.remote_user is not None: 

49 msg = 'REMOTE_USER illegally set to [%s] upstream' 

50 raise ValueError(msg % request.remote_user) 

51 

52 if request.authorization is None or request.authorization.authtype != "Bearer": 

53 raise webob.exc.HTTPForbidden('Bearer Authentication token required') 

54 

55 token = request.authorization.params 

56 try: 

57 jwt_doc = jwt.decode(token, rfpy.conf.CONF.crypt_key, algorithms=["HS256"]) 

58 except jwt.DecodeError as decode_err: 

59 raise webob.exc.HTTPBadRequest(f"Failed to decode JWT token: '{decode_err}'") 

60 

61 request.remote_user = jwt_doc['user_id'] 

62 set_user_object(request) 

63 

64 def remember(self, request, response): 

65 '''Client is responsible for storing token''' 

66 pass 

67 

68 

69class PassthroughPolicy(AbstractIdentityPolicy): 

70 

71 ''' 

72 Identity Policies purpose is to identify authenticated 

73 users for a given request. For login on we are dealing 

74 with unauthenticated users, so this is implementation does 

75 nothing. 

76 ''' 

77 

78 def identify(self, request): 

79 pass 

80 

81 def remember(self, request, response): 

82 pass 

83 

84 

85class TestHeaderPolicy(AbstractIdentityPolicy): 

86 

87 def __init__(self, default_user=None): 

88 super().__init__() 

89 self.default_user = default_user 

90 

91 def identify(self, request): 

92 try: 

93 if 'RFPY-TEST-USER' in request.headers: 

94 user_id = request.headers['RFPY-TEST-USER'] 

95 log.info('Unsafely Authenticated user [%s] via RFPY-TEST-USER HTTP Header', user_id) 

96 else: 

97 user_id = request.environ['RFPY-TEST-USER'] 

98 log.info( 

99 'Unsafely Authenticated user [%s] via RFPY-TEST-USER wsgi Environ key', user_id 

100 ) 

101 except KeyError: 

102 if self.default_user is None: 

103 raise ValueError('RFPY-TEST-USER header not set') 

104 user_id = self.default_user 

105 log.info( 

106 'Unsafely Authenticated user [%s] as default user for TestHeaderPolicy', user_id 

107 ) 

108 request.remote_user = str(user_id) 

109 set_user_object(request) 

110 

111 

112class FallbackPolicy(AbstractIdentityPolicy): 

113 

114 def __init__(self, default_user=None): 

115 self.default_user = None 

116 self.header_policy = TestHeaderPolicy(default_user=default_user) 

117 self.bearer_policy = JwtBearerPolicy() 

118 

119 def identify(self, request): 

120 try: 

121 self.bearer_policy.identify(request) 

122 log.warn(f'Authenticated user {request.remote_user} with JWT Bearer policy') 

123 except webob.exc.HTTPForbidden: 

124 self.header_policy.identify(request) 

125 log.warn(f'Authenticated user {request.remote_user} with HTTP Test Header policy')