Coverage for rfpy/web/middleware.py: 100%

32 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-04-24 10:52 +0000

1import logging 

2import json 

3import os 

4 

5from webob import Response 

6from webob.static import DirectoryApp 

7from webob.exc import HTTPTemporaryRedirect 

8 

9from rfpy.utils import json_default 

10from rfpy import conf 

11from rfpy.templates import get_template 

12from .request import HttpRequest 

13from .response import X_ACCEL_HEADER 

14from rfpy.mail.stub import MAILBOX, clear_mailbox 

15from rfpy.model.audit import AuditEvent, Status as EvtStatus 

16 

17log = logging.getLogger(__name__) 

18 

19 

20class DevMiddleware(object): # pragma: no cover 

21 def __init__(self, app, session_factory): 

22 self.app = app 

23 self.session_factory = session_factory 

24 self.json_app = self.static_app(conf.CONF.cache_dir) 

25 self.attachment_app = self.static_app(conf.CONF.attachments_dir) 

26 

27 log.warning(" Initialised DevMiddleware - for DEVELOPMENT ONLY! \n") 

28 log.info("cache_dir : %s " % conf.CONF.cache_dir) 

29 log.info("attachments_dir: %s" % conf.CONF.attachments_dir) 

30 

31 def static_app(self, dir_path): 

32 # the last bit of the path for attachments and cache directories is the databasename 

33 # nginx is configured to serve the next directory up (attachments/ or cache/) 

34 # so use the parent directory 

35 return DirectoryApp(dir_path.parent) 

36 

37 def __call__(self, environ, start_response): 

38 request = HttpRequest(environ) 

39 preemptive_response = self.process_request(request) 

40 

41 if preemptive_response is not None: 

42 return preemptive_response(environ, start_response) 

43 

44 app_response = request.get_response(self.app) 

45 

46 updated_response = self.process_response(request, app_response) 

47 if updated_response is not None: 

48 return updated_response(environ, start_response) 

49 

50 return app_response(environ, start_response) 

51 

52 def process_request(self, request): 

53 under_path = request.path_info.strip("/").replace("/", "_") 

54 

55 if hasattr(self, under_path): 

56 # route 'GET blah/bloo' to self.blah_bloo(request) 

57 return getattr(self, under_path)(request) 

58 

59 def process_response(self, request, response): 

60 if X_ACCEL_HEADER in response.headers: 

61 return self.xaccel(request, response) 

62 

63 if response.status_int == 401: # Unauthorised 

64 return self.logout(request) 

65 

66 # Anything-goes CORS configuration 

67 response.headers.update( 

68 { 

69 "Access-Control-Allow-Origin": "*", 

70 "Access-Control-Allow-Methods": "GET, POST, DELETE, PUT, PATCH, OPTIONS", 

71 "Access-Control-Allow-Headers": "Content-Type, api_key, Authorization", 

72 } 

73 ) 

74 

75 def logout(self, request): 

76 """This catches requests to /logout - usually handled by java app""" 

77 return HTTPTemporaryRedirect(location="/become") 

78 

79 def auth_logout(self, request): 

80 return self.logout(request) 

81 

82 def test_rollback(self, _request): 

83 """ 

84 Rollback SAVEPOINT session when running multiple requests 

85 within a nested transaction 

86 """ 

87 from rfpy import tools 

88 

89 if tools.TEST_SESSION is not None: 

90 if tools.TEST_SESSION.get_transaction() is not None: 

91 log.info( 

92 "Rolling back transaction. Nested: %s", 

93 tools.TEST_SESSION.get_transaction().nested, 

94 ) 

95 tools.TEST_SESSION.rollback() 

96 tools.TEST_SESSION.close() 

97 tools.TEST_SESSION = None 

98 

99 # Clear the mailbox of outbound emails 

100 clear_mailbox() 

101 return Response("ok") 

102 

103 def test_mailbox(self, request): 

104 """ 

105 Inspect contents of debug mailbox 

106 """ 

107 if request.prefers_json: 

108 emails = json.dumps(list(reversed(MAILBOX)), default=json_default) 

109 return Response(emails, charset="UTF-8", content_type="application/json") 

110 else: 

111 tmpl = get_template("tools/debug_emails.html") 

112 html = tmpl.render(emails=reversed(MAILBOX), request=request, user=None) 

113 return Response(html) 

114 

115 def test_process_events(self, _request): 

116 """Run background events processor - for testing without 

117 background process or thread 

118 """ 

119 from rfpy.jobs.events import handle_event 

120 

121 session = self.session_factory() 

122 evt_count = 0 

123 for evt in session.query(AuditEvent).filter( 

124 AuditEvent.status == EvtStatus.pending 

125 ): 

126 handle_event(evt, session) 

127 evt_count += 1 

128 

129 ev_data = json.dumps({"evt_count": evt_count}) 

130 return Response(ev_data, charset="UTF-8", content_type="application/json") 

131 

132 def xaccel(self, request, response): 

133 """Mimic NGINX's X-Accel-Redirect functionality""" 

134 

135 fpath = response.headers["X-Accel-Redirect"] 

136 request.path_info = fpath 

137 # virtual_path is the path used address the relevant 

138 # 'location{' block in nginx config 

139 virtual_path = request.path_info_pop() 

140 

141 if virtual_path == "cache": 

142 app = self.json_app 

143 else: 

144 app = self.attachment_app 

145 

146 new_response = request.get_response(app) 

147 

148 if new_response.status_int == 404: 

149 log.error( 

150 "File %s not found for request %s" % os.path.join(app.path, fpath), 

151 request, 

152 ) 

153 return new_response 

154 

155 new_response.content_type = response.content_type 

156 new_response.content_disposition = response.content_disposition 

157 

158 new_response.cache_control = "no-store" 

159 

160 return new_response 

161 

162 def config(self, request): 

163 """Show config information for server - db name etc""" 

164 tmpl = get_template("tools/conf_info.html") 

165 html = tmpl.render(conf=conf.CONF, request=request, user=None) 

166 return Response(html) 

167 

168 

169class DispatchingMiddleware: 

170 """Combine multiple applications as a single WSGI application. 

171 Requests are dispatched to an application based on the path it is 

172 mounted under. 

173 

174 :param app: The WSGI application to dispatch to if the request 

175 doesn't match a mounted path. 

176 :param mounts: Maps path prefixes to applications for dispatching. 

177 """ 

178 

179 def __init__(self, app, mounts=None): 

180 self.app = app 

181 self.mounts = mounts or {} 

182 

183 def __call__(self, environ, start_response): 

184 script = environ.get("PATH_INFO", "") 

185 path_info = "" 

186 

187 while "/" in script: 

188 if script in self.mounts: 

189 app = self.mounts[script] 

190 break 

191 

192 script, last_item = script.rsplit("/", 1) 

193 path_info = f"/{last_item}{path_info}" 

194 else: 

195 app = self.mounts.get(script, self.app) 

196 

197 original_script_name = environ.get("SCRIPT_NAME", "") 

198 environ["SCRIPT_NAME"] = original_script_name + script 

199 environ["PATH_INFO"] = path_info 

200 return app(environ, start_response)