Coverage for rfpy/web/response.py: 100%
34 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-24 10:52 +0000
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-24 10:52 +0000
1import os
2import logging
4from webob import Response
6from rfpy import conf
7from rfpy.api import attachments
9log = logging.getLogger(__name__)
12X_ACCEL_HEADER = "x-accel-redirect"
15def quote_header_value(value):
16 """Quote an http header value"""
17 return '"%s"' % value.replace("\\", "\\\\").replace('"', '\\"')
20def file_size(file_path):
21 return os.stat(file_path).st_size
24class XAccelResponse(Response):
25 def set_x_accel(self):
26 rel_path = os.path.join(
27 "/", self.download_type, conf.CONF.db_name, self.file_path
28 )
29 self.headers.add(X_ACCEL_HEADER, str(rel_path))
32class XAccelAttachmentResponse(XAccelResponse):
33 """Webob Response subclass for X-Accel-Redirect to an attachment"""
35 download_type = "attachments"
37 def __init__(self, attachment, **kwargs):
38 super(XAccelAttachmentResponse, self).__init__(**kwargs)
39 self.content_type = attachment.mimetype
40 self.file_path = attachments.get_sub_path(attachment)
41 att_name = quote_header_value(attachment.filename)
42 self.content_disposition = "attachment; filename=%s" % att_name
43 self.set_x_accel()
46class XAccelTempResponse(XAccelResponse):
47 """
48 Webob Response for serving a randomly named file via nginx's X-Accel-Redirect
49 mechanism.
50 """
52 download_type = "cache"
54 def __init__(self, temp_file_name, real_file_name, **kwargs):
55 super(XAccelTempResponse, self).__init__(**kwargs)
56 self.file_path = str(temp_file_name)
57 full_path = os.path.join(conf.CONF.cache_dir, temp_file_name)
58 self.content_length = file_size(full_path)
59 quoted_name = quote_header_value(real_file_name)
60 self.content_disposition = f"attachment; filename={quoted_name}"
61 self.set_x_accel()