This repository has been archived by the owner on Mar 8, 2019. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathwsgi_static_middleware.py
118 lines (93 loc) · 3.65 KB
/
wsgi_static_middleware.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
import os
import time
import mimetypes
from wsgiref.headers import Headers
# Search File
def is_accessible(abs_file_path):
return (
os.path.exists(abs_file_path) and
os.path.isfile(abs_file_path) and
os.access(abs_file_path, os.R_OK)
)
def search_file(relative_file_path, dirs):
for d in dirs:
if not os.path.isabs(d):
d = os.path.abspath(d) + os.sep
file = os.path.join(d, relative_file_path)
if is_accessible(file):
return file
# Header utils
def get_content_length(filename):
stats = os.stat(filename)
return str(stats.st_size)
def generate_last_modified():
last_modified = time.strftime("%a, %d %b %Y %H:%M:%sS GMT", time.gmtime())
return last_modified
def get_content_type(mimetype, charset):
if mimetype.startswith('text/') or mimetype == 'application/javascript':
mimetype += '; charset={}'.format(charset)
return mimetype
# Response body iterator
def _iter_and_close(file_obj, block_size, charset):
"""Yield file contents by block then close the file."""
while True:
try:
block = file_obj.read(block_size)
if block:
if isinstance(block, bytes):
yield block
else:
yield block.encode(charset)
else:
raise StopIteration
except StopIteration:
file_obj.close()
break
def _get_body(filename, method, block_size, charset):
if method == 'HEAD':
return [b'']
return _iter_and_close(open(filename), block_size, charset)
# View functions
def static_file_view(env, start_response, filename, block_size, charset):
method = env['REQUEST_METHOD'].upper()
if method not in ('HEAD', 'GET'):
start_response('405 METHOD NOT ALLOWED',
[('Content-Type', 'text/plain; UTF-8')])
return [b'']
mimetype, encoding = mimetypes.guess_type(filename)
headers = Headers([])
headers.add_header('Content-Encodings', encoding)
headers.add_header('Content-Type', get_content_type(mimetype, charset))
headers.add_header('Content-Length', get_content_length(filename))
headers.add_header('Last-Modified', generate_last_modified())
headers.add_header("Accept-Ranges", "bytes")
start_response('200 OK', headers.items())
return _get_body(filename, method, block_size, charset)
def http404(env, start_response):
start_response('404 Not Found',
[('Content-type', 'text/plain; charset=utf-8')])
return [b'404 Not Found']
# Middleware class
class StaticMiddleware:
def __init__(self, app, static_root, static_dirs=None,
block_size=16*4096, charset='UTF-8'):
self.app = app
self.static_root = static_root.lstrip('/').rstrip('/')
if static_dirs is None:
static_dirs = [os.path.join(os.path.abspath('.'), 'static')]
self.static_dirs = static_dirs
self.charset = charset
self.block_size = block_size
def __call__(self, env, start_response):
path = env['PATH_INFO'].lstrip('/')
if path.startswith(self.static_root):
relative_file_path = '/'.join(path.split('/')[1:])
return self.handle(env, start_response, relative_file_path)
return self.app(env, start_response)
def handle(self, env, start_response, filename):
abs_file_path = search_file(filename, self.static_dirs)
if abs_file_path:
return static_file_view(env, start_response, abs_file_path,
self.block_size, self.charset)
else:
return http404(env, start_response)