Coverage for rfpy/web/response.py: 100%

34 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-04-24 10:52 +0000

1import os 

2import logging 

3 

4from webob import Response 

5 

6from rfpy import conf 

7from rfpy.api import attachments 

8 

9log = logging.getLogger(__name__) 

10 

11 

12X_ACCEL_HEADER = "x-accel-redirect" 

13 

14 

15def quote_header_value(value): 

16 """Quote an http header value""" 

17 return '"%s"' % value.replace("\\", "\\\\").replace('"', '\\"') 

18 

19 

20def file_size(file_path): 

21 return os.stat(file_path).st_size 

22 

23 

24class XAccelResponse(Response): 

25 def set_x_accel(self): 

26 rel_path = os.path.join( 

27 "/", self.download_type, conf.CONF.db_name, self.file_path 

28 ) 

29 self.headers.add(X_ACCEL_HEADER, str(rel_path)) 

30 

31 

32class XAccelAttachmentResponse(XAccelResponse): 

33 """Webob Response subclass for X-Accel-Redirect to an attachment""" 

34 

35 download_type = "attachments" 

36 

37 def __init__(self, attachment, **kwargs): 

38 super(XAccelAttachmentResponse, self).__init__(**kwargs) 

39 self.content_type = attachment.mimetype 

40 self.file_path = attachments.get_sub_path(attachment) 

41 att_name = quote_header_value(attachment.filename) 

42 self.content_disposition = "attachment; filename=%s" % att_name 

43 self.set_x_accel() 

44 

45 

46class XAccelTempResponse(XAccelResponse): 

47 """ 

48 Webob Response for serving a randomly named file via nginx's X-Accel-Redirect 

49 mechanism. 

50 """ 

51 

52 download_type = "cache" 

53 

54 def __init__(self, temp_file_name, real_file_name, **kwargs): 

55 super(XAccelTempResponse, self).__init__(**kwargs) 

56 self.file_path = str(temp_file_name) 

57 full_path = os.path.join(conf.CONF.cache_dir, temp_file_name) 

58 self.content_length = file_size(full_path) 

59 quoted_name = quote_header_value(real_file_name) 

60 self.content_disposition = f"attachment; filename={quoted_name}" 

61 self.set_x_accel()