Coverage for rfpy/auth/endpoints.py: 45%
44 statements
« prev ^ index » next coverage.py v7.0.1, created at 2022-12-31 16:00 +0000
« prev ^ index » next coverage.py v7.0.1, created at 2022-12-31 16:00 +0000
2from cryptography.fernet import InvalidToken
3from rfpy.mail.schemas import PostmarkSimpleMessage
4from rfpy.mail.postmark import api_headers, send_simple_email
5from rfpy.auth.password import create_signature, gensalt
6from rfpy.utils import decrypt_value, encrypt_value
7import jwt
8from sqlalchemy.orm import Session
9from sqlalchemy.orm.exc import NoResultFound
10import webob
12from rfpy.suxint import http
13from rfpy.api import fetch
14from rfpy import conf
16from .adaptors import Token, Login, UserDoc, ResetDoc
19@http
20def post_login(session: Session, login_doc: Login) -> Token:
21 '''
22 Authenticate the user with the given user_id via the password proviced.
24 Returns a JWT token which can be provided in the Bearer Authentication header
25 '''
26 try:
27 user = fetch.user_by_password(session, login_doc.user_id, login_doc.password)
29 auth_doc = {
30 "user_id": user.id
31 }
32 jwt_token = jwt.encode(auth_doc, conf.CONF.crypt_key, algorithm="HS256")
33 return {
34 "token": jwt_token
35 }
37 except NoResultFound:
38 raise webob.exc.HTTPForbidden('No matching user found')
41RESET_MESSAGE = '''
42A password reset has been requested for user ID <user_id> at <hostname>.
44To reset your password follow this link:
46https://<hostname>/vue/#/reset?key=<key>&user=<user_id>
48Key: <key>
50'''
53@http
54def post_password_emailkey(session: Session, user_doc: UserDoc):
55 '''
56 Trigger a password reset email to be sent to the user with the provided user_id.
58 The generated email will provide a URL link for the user to reset their password.
59 '''
60 user = fetch.user(session, user_doc.user_id)
61 salt = gensalt()
62 key_str = f"{salt}:{user.id}"
63 hash_key = encrypt_value(key_str)
64 msg = (RESET_MESSAGE
65 .replace("<user_id>", user.id)
66 .replace("<key>", hash_key)
67 .replace("<hostname>", conf.CONF.webapp_hostname))
69 model = PostmarkSimpleMessage(
70 To=user.email,
71 From="[email protected]",
72 Subject="PostRFP Email Reset",
73 TextBody=msg,
74 Tag='Testing'
75 ).dict()
76 headers = api_headers(conf.CONF.postmark_api_key)
77 send_simple_email(model, headers)
80@http
81def post_password_update(session: Session, reset_doc: ResetDoc):
83 '''
84 Reset the user's password using an emailed key value to authenticate the user.
85 '''
87 user = fetch.user(session, reset_doc.user_id)
88 try:
89 decrypted_value = decrypt_value(reset_doc.key.encode('utf8'), max_age=600)
90 except InvalidToken:
91 raise ValueError("Invalid key value - expired or malformed")
92 _salt, hashed_user_id = decrypted_value.split(':')
93 if hashed_user_id != user.id:
94 raise webob.exc.HTTPForbidden("User ID doesn't match key")
96 user.password = create_signature(reset_doc.new_password)