Coverage for rfpy/web/middleware.py: 100%
32 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-24 10:52 +0000
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-24 10:52 +0000
1import logging
2import json
3import os
5from webob import Response
6from webob.static import DirectoryApp
7from webob.exc import HTTPTemporaryRedirect
9from rfpy.utils import json_default
10from rfpy import conf
11from rfpy.templates import get_template
12from .request import HttpRequest
13from .response import X_ACCEL_HEADER
14from rfpy.mail.stub import MAILBOX, clear_mailbox
15from rfpy.model.audit import AuditEvent, Status as EvtStatus
17log = logging.getLogger(__name__)
20class DevMiddleware(object): # pragma: no cover
21 def __init__(self, app, session_factory):
22 self.app = app
23 self.session_factory = session_factory
24 self.json_app = self.static_app(conf.CONF.cache_dir)
25 self.attachment_app = self.static_app(conf.CONF.attachments_dir)
27 log.warning(" Initialised DevMiddleware - for DEVELOPMENT ONLY! \n")
28 log.info("cache_dir : %s " % conf.CONF.cache_dir)
29 log.info("attachments_dir: %s" % conf.CONF.attachments_dir)
31 def static_app(self, dir_path):
32 # the last bit of the path for attachments and cache directories is the databasename
33 # nginx is configured to serve the next directory up (attachments/ or cache/)
34 # so use the parent directory
35 return DirectoryApp(dir_path.parent)
37 def __call__(self, environ, start_response):
38 request = HttpRequest(environ)
39 preemptive_response = self.process_request(request)
41 if preemptive_response is not None:
42 return preemptive_response(environ, start_response)
44 app_response = request.get_response(self.app)
46 updated_response = self.process_response(request, app_response)
47 if updated_response is not None:
48 return updated_response(environ, start_response)
50 return app_response(environ, start_response)
52 def process_request(self, request):
53 under_path = request.path_info.strip("/").replace("/", "_")
55 if hasattr(self, under_path):
56 # route 'GET blah/bloo' to self.blah_bloo(request)
57 return getattr(self, under_path)(request)
59 def process_response(self, request, response):
60 if X_ACCEL_HEADER in response.headers:
61 return self.xaccel(request, response)
63 if response.status_int == 401: # Unauthorised
64 return self.logout(request)
66 # Anything-goes CORS configuration
67 response.headers.update(
68 {
69 "Access-Control-Allow-Origin": "*",
70 "Access-Control-Allow-Methods": "GET, POST, DELETE, PUT, PATCH, OPTIONS",
71 "Access-Control-Allow-Headers": "Content-Type, api_key, Authorization",
72 }
73 )
75 def logout(self, request):
76 """This catches requests to /logout - usually handled by java app"""
77 return HTTPTemporaryRedirect(location="/become")
79 def auth_logout(self, request):
80 return self.logout(request)
82 def test_rollback(self, _request):
83 """
84 Rollback SAVEPOINT session when running multiple requests
85 within a nested transaction
86 """
87 from rfpy import tools
89 if tools.TEST_SESSION is not None:
90 if tools.TEST_SESSION.get_transaction() is not None:
91 log.info(
92 "Rolling back transaction. Nested: %s",
93 tools.TEST_SESSION.get_transaction().nested,
94 )
95 tools.TEST_SESSION.rollback()
96 tools.TEST_SESSION.close()
97 tools.TEST_SESSION = None
99 # Clear the mailbox of outbound emails
100 clear_mailbox()
101 return Response("ok")
103 def test_mailbox(self, request):
104 """
105 Inspect contents of debug mailbox
106 """
107 if request.prefers_json:
108 emails = json.dumps(list(reversed(MAILBOX)), default=json_default)
109 return Response(emails, charset="UTF-8", content_type="application/json")
110 else:
111 tmpl = get_template("tools/debug_emails.html")
112 html = tmpl.render(emails=reversed(MAILBOX), request=request, user=None)
113 return Response(html)
115 def test_process_events(self, _request):
116 """Run background events processor - for testing without
117 background process or thread
118 """
119 from rfpy.jobs.events import handle_event
121 session = self.session_factory()
122 evt_count = 0
123 for evt in session.query(AuditEvent).filter(
124 AuditEvent.status == EvtStatus.pending
125 ):
126 handle_event(evt, session)
127 evt_count += 1
129 ev_data = json.dumps({"evt_count": evt_count})
130 return Response(ev_data, charset="UTF-8", content_type="application/json")
132 def xaccel(self, request, response):
133 """Mimic NGINX's X-Accel-Redirect functionality"""
135 fpath = response.headers["X-Accel-Redirect"]
136 request.path_info = fpath
137 # virtual_path is the path used address the relevant
138 # 'location{' block in nginx config
139 virtual_path = request.path_info_pop()
141 if virtual_path == "cache":
142 app = self.json_app
143 else:
144 app = self.attachment_app
146 new_response = request.get_response(app)
148 if new_response.status_int == 404:
149 log.error(
150 "File %s not found for request %s" % os.path.join(app.path, fpath),
151 request,
152 )
153 return new_response
155 new_response.content_type = response.content_type
156 new_response.content_disposition = response.content_disposition
158 new_response.cache_control = "no-store"
160 return new_response
162 def config(self, request):
163 """Show config information for server - db name etc"""
164 tmpl = get_template("tools/conf_info.html")
165 html = tmpl.render(conf=conf.CONF, request=request, user=None)
166 return Response(html)
169class DispatchingMiddleware:
170 """Combine multiple applications as a single WSGI application.
171 Requests are dispatched to an application based on the path it is
172 mounted under.
174 :param app: The WSGI application to dispatch to if the request
175 doesn't match a mounted path.
176 :param mounts: Maps path prefixes to applications for dispatching.
177 """
179 def __init__(self, app, mounts=None):
180 self.app = app
181 self.mounts = mounts or {}
183 def __call__(self, environ, start_response):
184 script = environ.get("PATH_INFO", "")
185 path_info = ""
187 while "/" in script:
188 if script in self.mounts:
189 app = self.mounts[script]
190 break
192 script, last_item = script.rsplit("/", 1)
193 path_info = f"/{last_item}{path_info}"
194 else:
195 app = self.mounts.get(script, self.app)
197 original_script_name = environ.get("SCRIPT_NAME", "")
198 environ["SCRIPT_NAME"] = original_script_name + script
199 environ["PATH_INFO"] = path_info
200 return app(environ, start_response)