Coverage for rfpy/auth/endpoints.py: 45%

44 statements  

« prev     ^ index     » next       coverage.py v7.0.1, created at 2022-12-31 16:00 +0000

1 

2from cryptography.fernet import InvalidToken 

3from rfpy.mail.schemas import PostmarkSimpleMessage 

4from rfpy.mail.postmark import api_headers, send_simple_email 

5from rfpy.auth.password import create_signature, gensalt 

6from rfpy.utils import decrypt_value, encrypt_value 

7import jwt 

8from sqlalchemy.orm import Session 

9from sqlalchemy.orm.exc import NoResultFound 

10import webob 

11 

12from rfpy.suxint import http 

13from rfpy.api import fetch 

14from rfpy import conf 

15 

16from .adaptors import Token, Login, UserDoc, ResetDoc 

17 

18 

19@http 

20def post_login(session: Session, login_doc: Login) -> Token: 

21 ''' 

22 Authenticate the user with the given user_id via the password proviced. 

23 

24 Returns a JWT token which can be provided in the Bearer Authentication header 

25 ''' 

26 try: 

27 user = fetch.user_by_password(session, login_doc.user_id, login_doc.password) 

28 

29 auth_doc = { 

30 "user_id": user.id 

31 } 

32 jwt_token = jwt.encode(auth_doc, conf.CONF.crypt_key, algorithm="HS256") 

33 return { 

34 "token": jwt_token 

35 } 

36 

37 except NoResultFound: 

38 raise webob.exc.HTTPForbidden('No matching user found') 

39 

40 

41RESET_MESSAGE = ''' 

42A password reset has been requested for user ID <user_id> at <hostname>. 

43 

44To reset your password follow this link: 

45 

46https://<hostname>/vue/#/reset?key=<key>&user=<user_id> 

47 

48Key: <key> 

49 

50''' 

51 

52 

53@http 

54def post_password_emailkey(session: Session, user_doc: UserDoc): 

55 ''' 

56 Trigger a password reset email to be sent to the user with the provided user_id. 

57 

58 The generated email will provide a URL link for the user to reset their password. 

59 ''' 

60 user = fetch.user(session, user_doc.user_id) 

61 salt = gensalt() 

62 key_str = f"{salt}:{user.id}" 

63 hash_key = encrypt_value(key_str) 

64 msg = (RESET_MESSAGE 

65 .replace("<user_id>", user.id) 

66 .replace("<key>", hash_key) 

67 .replace("<hostname>", conf.CONF.webapp_hostname)) 

68 

69 model = PostmarkSimpleMessage( 

70 To=user.email, 

71 From="[email protected]", 

72 Subject="PostRFP Email Reset", 

73 TextBody=msg, 

74 Tag='Testing' 

75 ).dict() 

76 headers = api_headers(conf.CONF.postmark_api_key) 

77 send_simple_email(model, headers) 

78 

79 

80@http 

81def post_password_update(session: Session, reset_doc: ResetDoc): 

82 

83 ''' 

84 Reset the user's password using an emailed key value to authenticate the user. 

85 ''' 

86 

87 user = fetch.user(session, reset_doc.user_id) 

88 try: 

89 decrypted_value = decrypt_value(reset_doc.key.encode('utf8'), max_age=600) 

90 except InvalidToken: 

91 raise ValueError("Invalid key value - expired or malformed") 

92 _salt, hashed_user_id = decrypted_value.split(':') 

93 if hashed_user_id != user.id: 

94 raise webob.exc.HTTPForbidden("User ID doesn't match key") 

95 

96 user.password = create_signature(reset_doc.new_password)