Coverage for rfpy/auth/endpoints.py: 96%

135 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-04-24 10:52 +0000

1""" 

2Authentication Endpoints using JWT Access Tokens and Refresh Tokens. 

3 

4This module implements authentication endpoints following a common pattern: 

5 

61. **Login (`post_login`):** 

7 * User provides credentials (e.g., user ID and password). 

8 * Server validates credentials against the database. 

9 * Upon successful validation: 

10 * A short-lived JWT **Access Token** is generated. This token contains user identification and  

11 permissions and is signed by the server. It's returned in the response body. 

12 * A longer-lived **Refresh Token** (a secure random string) is generated and stored 

13 in the database, linked to the user. This token is returned as an `HttpOnly`, `Secure`, `SameSite=Lax` cookie. 

14 * The client stores the access token (e.g., in memory) and uses it in the `Authorization: Bearer <token>` header  

15 for subsequent API requests. 

16 * The browser automatically sends the refresh token cookie on requests to the `/api/auth` path. 

17 

182. **Access Token Usage:** 

19 * For protected API endpoints, the client sends the access token in the Authorization header. 

20 * The server validates the access token's signature and expiration. If valid, the request is processed. 

21 

223. **Token Refresh (`post_refresh_token`):** 

23 * When the access token expires, the client receives a 401 Unauthorized error. 

24 * The client then makes a request to the refresh endpoint (`/api/auth/refresh`). 

25 * The browser automatically includes the refresh token cookie in this request. 

26 * The server looks up the refresh token in the database, verifies it's not expired or revoked. 

27 * If the refresh token is valid, the server generates a *new* short-lived JWT access token and returns it in the response body. 

28 * The client replaces its expired access token with the new one and retries the original failed request. 

29 * The refresh token itself is typically *not* rotated with every refresh for simplicity, but relies on its own expiry and the ability to revoke it. 

30 

314. **Logout (`post_logout`):** 

32 * The client makes a request to the logout endpoint (`/api/auth/logout`). 

33 * The server identifies the user (via the access token) and the refresh token (via the cookie). 

34 * The server marks the corresponding refresh token(s) as revoked in the database. 

35 * The server sends back a response instructing the browser to clear the refresh token cookie (e.g., by setting `Max-Age=0`). 

36 * The client discards its access token. 

37 

38This approach enhances security by minimizing the exposure of the long-lived refresh token (it's not directly  

39handled by client-side JavaScript) while providing a seamless user experience by allowing sessions to be  

40extended without requiring frequent re-logins. 

41""" 

42 

43from datetime import datetime, timedelta, timezone 

44 

45import jwt 

46import webob 

47import secrets 

48import webob.exc 

49from sqlalchemy import and_ 

50from sqlalchemy.orm import Session 

51from sqlalchemy.orm.exc import NoResultFound 

52from cryptography.fernet import InvalidToken 

53 

54from rfpy.suxint import http 

55from rfpy.api import fetch 

56from rfpy import conf 

57from rfpy.model.audit import AuditEvent, evt_types 

58from rfpy.model.humans import RefreshToken 

59from rfpy.mail.schemas import PostmarkSimpleMessage 

60from rfpy.mail.postmark import api_headers, send_simple_email 

61from rfpy.auth.password import create_signature, gensalt 

62from rfpy.utils import decrypt_value, encrypt_value 

63from .adaptors import Login, UserDoc, ResetDoc, Token 

64 

65 

66 

67@http 

68def post_login(session: Session, login_doc: Login): 

69 """ 

70 Authenticate user, handle lockout, return access token in body, refresh token as cookie. 

71 """ 

72 try: 

73 user = fetch.user_by_password(session, login_doc.user_id, login_doc.password) 

74 now = datetime.now(timezone.utc) 

75 

76 _check_and_clear_lockout(session, user, now) 

77 _update_user_on_login(session, user, now) 

78 

79 access_token, access_expiry_minutes = _generate_access_token(user.id, now) 

80 refresh_token_value, refresh_max_age = _generate_and_store_refresh_token( 

81 session, user.id, now 

82 ) 

83 

84 return _create_login_response( 

85 access_token, refresh_token_value, access_expiry_minutes, refresh_max_age 

86 ) 

87 

88 except NoResultFound: 

89 # Handle failed login attempt (increment counter, potentially lock) 

90 _handle_failed_login(session, login_doc.user_id) 

91 # Reraise as HTTPForbidden regardless of whether user existed 

92 raise webob.exc.HTTPForbidden("No matching user found or invalid password") 

93 

94 

95def _lock_user_account(session: Session, user): 

96 """Locks the user account and creates an audit event.""" 

97 # Make sure to use timezone-aware datetime when setting locked_until 

98 user.locked_until = datetime.now(timezone.utc) + timedelta( 

99 minutes=conf.CONF.lockout_duration_minutes 

100 ) 

101 user.failed_login_attempts = 0 # Reset counter after locking 

102 AuditEvent.create( 

103 session, 

104 evt_types.USER_ACCOUNT_LOCKED, 

105 user=user, 

106 ) 

107 # ToDo - send email to user about lockout 

108 

109 

110def _handle_failed_login(session: Session, user_id: str): 

111 """ 

112 Handle a failed login attempt: increment counters, log event, potentially lock account. 

113 """ 

114 try: 

115 failed_user = fetch.user(session, user_id) 

116 failed_user.failed_login_attempts += 1 

117 failed_user.total_failed_logins += 1 

118 

119 AuditEvent.create(session, evt_types.USER_LOGIN_FAILED, user=failed_user) 

120 

121 if failed_user.failed_login_attempts >= conf.CONF.max_failed_login_attempts: 

122 _lock_user_account(session, failed_user) 

123 

124 session.commit() # Commit changes for the found user 

125 

126 except NoResultFound: 

127 # If the user is not found, perform dummy hash to mitigate timing attacks. 

128 # No database writes occur in this case. 

129 fetch.dummy_hash(session) 

130 

131 

132RESET_MESSAGE = """ 

133A password reset has been requested for user ID <user_id> at <hostname>. 

134 

135To reset your password follow this link: 

136 

137https://<hostname>/vue/#/reset?key=<key>&user=<user_id> 

138 

139Key: <key> 

140 

141""" 

142 

143 

144@http 

145def post_password_emailkey(session: Session, user_doc: UserDoc): 

146 """ 

147 Trigger a password reset email to be sent to the user with the provided user_id. 

148 

149 The generated email will provide a URL link for the user to reset their password. 

150 """ 

151 user = fetch.user(session, user_doc.user_id) 

152 if user.email is None: 

153 raise webob.exc.HTTPBadRequest("User has no email address") 

154 

155 salt = gensalt() 

156 key_str = f"{salt}:{user.id}" 

157 hash_key = encrypt_value(key_str) 

158 msg = ( 

159 RESET_MESSAGE.replace("<user_id>", user.id) 

160 .replace("<key>", hash_key) 

161 .replace("<hostname>", conf.CONF.webapp_hostname) 

162 ) 

163 

164 model = PostmarkSimpleMessage( 

165 To=user.email, 

166 From="[email protected]", 

167 Subject="PostRFP Email Reset", 

168 TextBody=msg, 

169 Tag="Testing", 

170 ).model_dump() 

171 headers = api_headers(conf.CONF.postmark_api_key) 

172 send_simple_email(model, headers) 

173 

174 

175@http 

176def post_password_update(session: Session, reset_doc: ResetDoc): 

177 """ 

178 Reset the user's password using an emailed key value to authenticate the user. 

179 """ 

180 

181 user = fetch.user(session, reset_doc.user_id) 

182 try: 

183 decrypted_value = decrypt_value(reset_doc.key.encode("utf8"), max_age=600) 

184 except InvalidToken: 

185 raise webob.exc.HTTPForbidden("Invalid key value - expired or malformed") 

186 _salt, hashed_user_id = decrypted_value.split(":") 

187 if hashed_user_id != user.id: 

188 raise webob.exc.HTTPForbidden("User ID doesn't match key") 

189 

190 user.password = create_signature(reset_doc.new_password) 

191 

192 

193@http 

194def post_refresh_token(session: Session, request: webob.Request): 

195 """ 

196 Exchange a valid refresh token cookie for a new access token. 

197 """ 

198 # Get refresh token from cookie 

199 refresh_token_value = request.cookies.get("refresh_token") 

200 

201 if not refresh_token_value: 

202 raise webob.exc.HTTPForbidden("No refresh token provided") 

203 

204 # Find the refresh token in the database 

205 refresh_token = ( 

206 session.query(RefreshToken) 

207 .filter( 

208 and_( 

209 RefreshToken.token == refresh_token_value, 

210 RefreshToken.revoked == False, # noqa E712 

211 RefreshToken.expires_at > datetime.now(timezone.utc), 

212 ) 

213 ) 

214 .first() 

215 ) 

216 

217 response: webob.Response 

218 

219 if not refresh_token: 

220 # If token is invalid, clear the cookie 

221 response = webob.exc.HTTPForbidden("Invalid or expired refresh token") 

222 response.delete_cookie("refresh_token", path="/api/auth") 

223 return response 

224 

225 # Get the user 

226 user = fetch.user(session, refresh_token.user_id) 

227 

228 # Generate new access token 

229 now = datetime.now(timezone.utc) 

230 jwt_expiry_minutes = max(5, conf.CONF.jwt_expiry_minutes) 

231 access_expiry = now + timedelta(minutes=jwt_expiry_minutes) 

232 

233 auth_doc = { 

234 "user_id": user.id, 

235 "exp": int(access_expiry.timestamp()), 

236 "iat": int(now.timestamp()), 

237 "token_type": "access", 

238 } 

239 access_token = jwt.encode(auth_doc, conf.CONF.crypt_key, algorithm="HS256") 

240 

241 # Create Token object 

242 token = Token( 

243 access_token=access_token, 

244 token_type="Bearer", 

245 expires_in=jwt_expiry_minutes * 60, 

246 ) 

247 

248 # Create Response using Pydantic's serialization 

249 response = webob.Response( 

250 token.model_dump_json(), # Use model_dump_json() instead of json.dumps() 

251 content_type="application/json", 

252 charset="utf-8", 

253 ) 

254 

255 return response 

256 

257 

258@http 

259def post_logout(session: Session, request: webob.Request): 

260 """ 

261 Logout by revoking refresh token and clearing the cookie. 

262 """ 

263 user_id = request.user.id 

264 refresh_token_value = request.cookies.get("refresh_token") 

265 

266 if refresh_token_value: 

267 # Revoke the specific token from the cookie 

268 session.query(RefreshToken).filter( 

269 and_( 

270 RefreshToken.token == refresh_token_value, 

271 RefreshToken.user_id == user_id, 

272 ) 

273 ).update({"revoked": True}) 

274 else: 

275 # If no specific token provided, revoke all tokens for the user 

276 session.query(RefreshToken).filter(RefreshToken.user_id == user_id).update( 

277 {"revoked": True} 

278 ) 

279 

280 AuditEvent.create(session, evt_types.USER_LOGGED_OUT, user=request.user) 

281 

282 # Create response using dict 

283 result = {"result": "ok"} 

284 

285 # Use webob's json_body feature instead of manual serialization 

286 response = webob.Response( 

287 json_body=result, # webob will serialize this for us 

288 content_type="application/json", 

289 charset="utf-8", 

290 ) 

291 

292 # Clear the cookie 

293 response.delete_cookie("refresh_token", path="/api/auth") 

294 

295 return response 

296 

297 

298def _check_and_clear_lockout(session: Session, user, now: datetime): 

299 """Checks if user is locked out, clears if expired, raises if still locked.""" 

300 if user.locked_until is not None: 

301 # Ensure comparison is timezone-aware 

302 if user.locked_until.tzinfo is None: 

303 user_locked_until = user.locked_until.replace(tzinfo=timezone.utc) 

304 else: 

305 user_locked_until = user.locked_until 

306 

307 if user_locked_until > now: 

308 raise webob.exc.HTTPForbidden("Account is locked") 

309 else: 

310 # Lockout expired, clear it and log event 

311 user.locked_until = None 

312 AuditEvent.create( 

313 session, 

314 evt_types.USER_ACCOUNT_UNLOCKED, 

315 user=user, 

316 ) 

317 

318 

319def _generate_access_token(user_id: str, now: datetime) -> tuple[str, int]: 

320 """Generates JWT access token string and expiry minutes.""" 

321 jwt_expiry_minutes = max(5, conf.CONF.jwt_expiry_minutes) 

322 access_expiry = now + timedelta(minutes=jwt_expiry_minutes) 

323 auth_doc = { 

324 "user_id": user_id, 

325 "exp": int(access_expiry.timestamp()), 

326 "iat": int(now.timestamp()), 

327 "token_type": "access", 

328 } 

329 access_token = jwt.encode(auth_doc, conf.CONF.crypt_key, algorithm="HS256") 

330 return access_token, jwt_expiry_minutes 

331 

332 

333def _generate_and_store_refresh_token( 

334 session: Session, user_id: str, now: datetime 

335) -> tuple[str, int]: 

336 """Generates, stores, and returns refresh token value and max_age seconds.""" 

337 refresh_expiry = now + timedelta(hours=conf.CONF.jwt_refresh_token_expiry_hours) 

338 refresh_token_value = secrets.token_urlsafe(32) 

339 max_age = int(conf.CONF.jwt_refresh_token_expiry_hours * 3600) 

340 

341 # Store refresh token in database 

342 refresh_token_record = RefreshToken( 

343 user_id=user_id, 

344 token=refresh_token_value, 

345 issued_at=now, 

346 expires_at=refresh_expiry, 

347 revoked=False, 

348 ) 

349 session.add(refresh_token_record) 

350 return refresh_token_value, max_age 

351 

352 

353def _update_user_on_login(session: Session, user, now: datetime): 

354 """Updates user state and logs audit event on successful login.""" 

355 # Reset failed attempts count on successful login 

356 if user.failed_login_attempts > 0: 

357 user.failed_login_attempts = 0 

358 

359 # Update user's last login time if the field exists 

360 if hasattr(user, "previous_login_date"): 

361 user.previous_login_date = now 

362 

363 # Create audit event for successful login 

364 AuditEvent.create(session, evt_types.USER_LOGGED_IN, user=user) 

365 

366 

367def _create_login_response( 

368 access_token: str, 

369 refresh_token_value: str, 

370 access_expiry_minutes: int, 

371 refresh_max_age: int, 

372): 

373 """Creates the webob.Response for a successful login.""" 

374 # Create Token object for response body 

375 token_payload = Token( 

376 access_token=access_token, 

377 token_type="Bearer", 

378 expires_in=access_expiry_minutes * 60, 

379 ) 

380 

381 # Create Response using Pydantic's serialization 

382 response = webob.Response( 

383 token_payload.model_dump_json(), 

384 content_type="application/json", 

385 charset="utf-8", 

386 ) 

387 

388 # Set refresh token as HttpOnly cookie 

389 response.set_cookie( 

390 "refresh_token", 

391 refresh_token_value, 

392 max_age=refresh_max_age, 

393 path="/api/auth", 

394 httponly=True, 

395 secure=True, 

396 samesite="Lax", 

397 ) 

398 return response