Coverage for rfpy/auth/endpoints.py: 96%
135 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-24 10:52 +0000
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-24 10:52 +0000
1"""
2Authentication Endpoints using JWT Access Tokens and Refresh Tokens.
4This module implements authentication endpoints following a common pattern:
61. **Login (`post_login`):**
7 * User provides credentials (e.g., user ID and password).
8 * Server validates credentials against the database.
9 * Upon successful validation:
10 * A short-lived JWT **Access Token** is generated. This token contains user identification and
11 permissions and is signed by the server. It's returned in the response body.
12 * A longer-lived **Refresh Token** (a secure random string) is generated and stored
13 in the database, linked to the user. This token is returned as an `HttpOnly`, `Secure`, `SameSite=Lax` cookie.
14 * The client stores the access token (e.g., in memory) and uses it in the `Authorization: Bearer <token>` header
15 for subsequent API requests.
16 * The browser automatically sends the refresh token cookie on requests to the `/api/auth` path.
182. **Access Token Usage:**
19 * For protected API endpoints, the client sends the access token in the Authorization header.
20 * The server validates the access token's signature and expiration. If valid, the request is processed.
223. **Token Refresh (`post_refresh_token`):**
23 * When the access token expires, the client receives a 401 Unauthorized error.
24 * The client then makes a request to the refresh endpoint (`/api/auth/refresh`).
25 * The browser automatically includes the refresh token cookie in this request.
26 * The server looks up the refresh token in the database, verifies it's not expired or revoked.
27 * If the refresh token is valid, the server generates a *new* short-lived JWT access token and returns it in the response body.
28 * The client replaces its expired access token with the new one and retries the original failed request.
29 * The refresh token itself is typically *not* rotated with every refresh for simplicity, but relies on its own expiry and the ability to revoke it.
314. **Logout (`post_logout`):**
32 * The client makes a request to the logout endpoint (`/api/auth/logout`).
33 * The server identifies the user (via the access token) and the refresh token (via the cookie).
34 * The server marks the corresponding refresh token(s) as revoked in the database.
35 * The server sends back a response instructing the browser to clear the refresh token cookie (e.g., by setting `Max-Age=0`).
36 * The client discards its access token.
38This approach enhances security by minimizing the exposure of the long-lived refresh token (it's not directly
39handled by client-side JavaScript) while providing a seamless user experience by allowing sessions to be
40extended without requiring frequent re-logins.
41"""
43from datetime import datetime, timedelta, timezone
45import jwt
46import webob
47import secrets
48import webob.exc
49from sqlalchemy import and_
50from sqlalchemy.orm import Session
51from sqlalchemy.orm.exc import NoResultFound
52from cryptography.fernet import InvalidToken
54from rfpy.suxint import http
55from rfpy.api import fetch
56from rfpy import conf
57from rfpy.model.audit import AuditEvent, evt_types
58from rfpy.model.humans import RefreshToken
59from rfpy.mail.schemas import PostmarkSimpleMessage
60from rfpy.mail.postmark import api_headers, send_simple_email
61from rfpy.auth.password import create_signature, gensalt
62from rfpy.utils import decrypt_value, encrypt_value
63from .adaptors import Login, UserDoc, ResetDoc, Token
67@http
68def post_login(session: Session, login_doc: Login):
69 """
70 Authenticate user, handle lockout, return access token in body, refresh token as cookie.
71 """
72 try:
73 user = fetch.user_by_password(session, login_doc.user_id, login_doc.password)
74 now = datetime.now(timezone.utc)
76 _check_and_clear_lockout(session, user, now)
77 _update_user_on_login(session, user, now)
79 access_token, access_expiry_minutes = _generate_access_token(user.id, now)
80 refresh_token_value, refresh_max_age = _generate_and_store_refresh_token(
81 session, user.id, now
82 )
84 return _create_login_response(
85 access_token, refresh_token_value, access_expiry_minutes, refresh_max_age
86 )
88 except NoResultFound:
89 # Handle failed login attempt (increment counter, potentially lock)
90 _handle_failed_login(session, login_doc.user_id)
91 # Reraise as HTTPForbidden regardless of whether user existed
92 raise webob.exc.HTTPForbidden("No matching user found or invalid password")
95def _lock_user_account(session: Session, user):
96 """Locks the user account and creates an audit event."""
97 # Make sure to use timezone-aware datetime when setting locked_until
98 user.locked_until = datetime.now(timezone.utc) + timedelta(
99 minutes=conf.CONF.lockout_duration_minutes
100 )
101 user.failed_login_attempts = 0 # Reset counter after locking
102 AuditEvent.create(
103 session,
104 evt_types.USER_ACCOUNT_LOCKED,
105 user=user,
106 )
107 # ToDo - send email to user about lockout
110def _handle_failed_login(session: Session, user_id: str):
111 """
112 Handle a failed login attempt: increment counters, log event, potentially lock account.
113 """
114 try:
115 failed_user = fetch.user(session, user_id)
116 failed_user.failed_login_attempts += 1
117 failed_user.total_failed_logins += 1
119 AuditEvent.create(session, evt_types.USER_LOGIN_FAILED, user=failed_user)
121 if failed_user.failed_login_attempts >= conf.CONF.max_failed_login_attempts:
122 _lock_user_account(session, failed_user)
124 session.commit() # Commit changes for the found user
126 except NoResultFound:
127 # If the user is not found, perform dummy hash to mitigate timing attacks.
128 # No database writes occur in this case.
129 fetch.dummy_hash(session)
132RESET_MESSAGE = """
133A password reset has been requested for user ID <user_id> at <hostname>.
135To reset your password follow this link:
137https://<hostname>/vue/#/reset?key=<key>&user=<user_id>
139Key: <key>
141"""
144@http
145def post_password_emailkey(session: Session, user_doc: UserDoc):
146 """
147 Trigger a password reset email to be sent to the user with the provided user_id.
149 The generated email will provide a URL link for the user to reset their password.
150 """
151 user = fetch.user(session, user_doc.user_id)
152 if user.email is None:
153 raise webob.exc.HTTPBadRequest("User has no email address")
155 salt = gensalt()
156 key_str = f"{salt}:{user.id}"
157 hash_key = encrypt_value(key_str)
158 msg = (
159 RESET_MESSAGE.replace("<user_id>", user.id)
160 .replace("<key>", hash_key)
161 .replace("<hostname>", conf.CONF.webapp_hostname)
162 )
164 model = PostmarkSimpleMessage(
165 To=user.email,
166 From="[email protected]",
167 Subject="PostRFP Email Reset",
168 TextBody=msg,
169 Tag="Testing",
170 ).model_dump()
171 headers = api_headers(conf.CONF.postmark_api_key)
172 send_simple_email(model, headers)
175@http
176def post_password_update(session: Session, reset_doc: ResetDoc):
177 """
178 Reset the user's password using an emailed key value to authenticate the user.
179 """
181 user = fetch.user(session, reset_doc.user_id)
182 try:
183 decrypted_value = decrypt_value(reset_doc.key.encode("utf8"), max_age=600)
184 except InvalidToken:
185 raise webob.exc.HTTPForbidden("Invalid key value - expired or malformed")
186 _salt, hashed_user_id = decrypted_value.split(":")
187 if hashed_user_id != user.id:
188 raise webob.exc.HTTPForbidden("User ID doesn't match key")
190 user.password = create_signature(reset_doc.new_password)
193@http
194def post_refresh_token(session: Session, request: webob.Request):
195 """
196 Exchange a valid refresh token cookie for a new access token.
197 """
198 # Get refresh token from cookie
199 refresh_token_value = request.cookies.get("refresh_token")
201 if not refresh_token_value:
202 raise webob.exc.HTTPForbidden("No refresh token provided")
204 # Find the refresh token in the database
205 refresh_token = (
206 session.query(RefreshToken)
207 .filter(
208 and_(
209 RefreshToken.token == refresh_token_value,
210 RefreshToken.revoked == False, # noqa E712
211 RefreshToken.expires_at > datetime.now(timezone.utc),
212 )
213 )
214 .first()
215 )
217 response: webob.Response
219 if not refresh_token:
220 # If token is invalid, clear the cookie
221 response = webob.exc.HTTPForbidden("Invalid or expired refresh token")
222 response.delete_cookie("refresh_token", path="/api/auth")
223 return response
225 # Get the user
226 user = fetch.user(session, refresh_token.user_id)
228 # Generate new access token
229 now = datetime.now(timezone.utc)
230 jwt_expiry_minutes = max(5, conf.CONF.jwt_expiry_minutes)
231 access_expiry = now + timedelta(minutes=jwt_expiry_minutes)
233 auth_doc = {
234 "user_id": user.id,
235 "exp": int(access_expiry.timestamp()),
236 "iat": int(now.timestamp()),
237 "token_type": "access",
238 }
239 access_token = jwt.encode(auth_doc, conf.CONF.crypt_key, algorithm="HS256")
241 # Create Token object
242 token = Token(
243 access_token=access_token,
244 token_type="Bearer",
245 expires_in=jwt_expiry_minutes * 60,
246 )
248 # Create Response using Pydantic's serialization
249 response = webob.Response(
250 token.model_dump_json(), # Use model_dump_json() instead of json.dumps()
251 content_type="application/json",
252 charset="utf-8",
253 )
255 return response
258@http
259def post_logout(session: Session, request: webob.Request):
260 """
261 Logout by revoking refresh token and clearing the cookie.
262 """
263 user_id = request.user.id
264 refresh_token_value = request.cookies.get("refresh_token")
266 if refresh_token_value:
267 # Revoke the specific token from the cookie
268 session.query(RefreshToken).filter(
269 and_(
270 RefreshToken.token == refresh_token_value,
271 RefreshToken.user_id == user_id,
272 )
273 ).update({"revoked": True})
274 else:
275 # If no specific token provided, revoke all tokens for the user
276 session.query(RefreshToken).filter(RefreshToken.user_id == user_id).update(
277 {"revoked": True}
278 )
280 AuditEvent.create(session, evt_types.USER_LOGGED_OUT, user=request.user)
282 # Create response using dict
283 result = {"result": "ok"}
285 # Use webob's json_body feature instead of manual serialization
286 response = webob.Response(
287 json_body=result, # webob will serialize this for us
288 content_type="application/json",
289 charset="utf-8",
290 )
292 # Clear the cookie
293 response.delete_cookie("refresh_token", path="/api/auth")
295 return response
298def _check_and_clear_lockout(session: Session, user, now: datetime):
299 """Checks if user is locked out, clears if expired, raises if still locked."""
300 if user.locked_until is not None:
301 # Ensure comparison is timezone-aware
302 if user.locked_until.tzinfo is None:
303 user_locked_until = user.locked_until.replace(tzinfo=timezone.utc)
304 else:
305 user_locked_until = user.locked_until
307 if user_locked_until > now:
308 raise webob.exc.HTTPForbidden("Account is locked")
309 else:
310 # Lockout expired, clear it and log event
311 user.locked_until = None
312 AuditEvent.create(
313 session,
314 evt_types.USER_ACCOUNT_UNLOCKED,
315 user=user,
316 )
319def _generate_access_token(user_id: str, now: datetime) -> tuple[str, int]:
320 """Generates JWT access token string and expiry minutes."""
321 jwt_expiry_minutes = max(5, conf.CONF.jwt_expiry_minutes)
322 access_expiry = now + timedelta(minutes=jwt_expiry_minutes)
323 auth_doc = {
324 "user_id": user_id,
325 "exp": int(access_expiry.timestamp()),
326 "iat": int(now.timestamp()),
327 "token_type": "access",
328 }
329 access_token = jwt.encode(auth_doc, conf.CONF.crypt_key, algorithm="HS256")
330 return access_token, jwt_expiry_minutes
333def _generate_and_store_refresh_token(
334 session: Session, user_id: str, now: datetime
335) -> tuple[str, int]:
336 """Generates, stores, and returns refresh token value and max_age seconds."""
337 refresh_expiry = now + timedelta(hours=conf.CONF.jwt_refresh_token_expiry_hours)
338 refresh_token_value = secrets.token_urlsafe(32)
339 max_age = int(conf.CONF.jwt_refresh_token_expiry_hours * 3600)
341 # Store refresh token in database
342 refresh_token_record = RefreshToken(
343 user_id=user_id,
344 token=refresh_token_value,
345 issued_at=now,
346 expires_at=refresh_expiry,
347 revoked=False,
348 )
349 session.add(refresh_token_record)
350 return refresh_token_value, max_age
353def _update_user_on_login(session: Session, user, now: datetime):
354 """Updates user state and logs audit event on successful login."""
355 # Reset failed attempts count on successful login
356 if user.failed_login_attempts > 0:
357 user.failed_login_attempts = 0
359 # Update user's last login time if the field exists
360 if hasattr(user, "previous_login_date"):
361 user.previous_login_date = now
363 # Create audit event for successful login
364 AuditEvent.create(session, evt_types.USER_LOGGED_IN, user=user)
367def _create_login_response(
368 access_token: str,
369 refresh_token_value: str,
370 access_expiry_minutes: int,
371 refresh_max_age: int,
372):
373 """Creates the webob.Response for a successful login."""
374 # Create Token object for response body
375 token_payload = Token(
376 access_token=access_token,
377 token_type="Bearer",
378 expires_in=access_expiry_minutes * 60,
379 )
381 # Create Response using Pydantic's serialization
382 response = webob.Response(
383 token_payload.model_dump_json(),
384 content_type="application/json",
385 charset="utf-8",
386 )
388 # Set refresh token as HttpOnly cookie
389 response.set_cookie(
390 "refresh_token",
391 refresh_token_value,
392 max_age=refresh_max_age,
393 path="/api/auth",
394 httponly=True,
395 secure=True,
396 samesite="Lax",
397 )
398 return response