jpayne@68
|
1 """Miscellaneous WSGI-related Utilities"""
|
jpayne@68
|
2
|
jpayne@68
|
3 import posixpath
|
jpayne@68
|
4
|
jpayne@68
|
5 __all__ = [
|
jpayne@68
|
6 'FileWrapper', 'guess_scheme', 'application_uri', 'request_uri',
|
jpayne@68
|
7 'shift_path_info', 'setup_testing_defaults',
|
jpayne@68
|
8 ]
|
jpayne@68
|
9
|
jpayne@68
|
10
|
jpayne@68
|
11 class FileWrapper:
|
jpayne@68
|
12 """Wrapper to convert file-like objects to iterables"""
|
jpayne@68
|
13
|
jpayne@68
|
14 def __init__(self, filelike, blksize=8192):
|
jpayne@68
|
15 self.filelike = filelike
|
jpayne@68
|
16 self.blksize = blksize
|
jpayne@68
|
17 if hasattr(filelike,'close'):
|
jpayne@68
|
18 self.close = filelike.close
|
jpayne@68
|
19
|
jpayne@68
|
20 def __getitem__(self,key):
|
jpayne@68
|
21 import warnings
|
jpayne@68
|
22 warnings.warn(
|
jpayne@68
|
23 "FileWrapper's __getitem__ method ignores 'key' parameter. "
|
jpayne@68
|
24 "Use iterator protocol instead.",
|
jpayne@68
|
25 DeprecationWarning,
|
jpayne@68
|
26 stacklevel=2
|
jpayne@68
|
27 )
|
jpayne@68
|
28 data = self.filelike.read(self.blksize)
|
jpayne@68
|
29 if data:
|
jpayne@68
|
30 return data
|
jpayne@68
|
31 raise IndexError
|
jpayne@68
|
32
|
jpayne@68
|
33 def __iter__(self):
|
jpayne@68
|
34 return self
|
jpayne@68
|
35
|
jpayne@68
|
36 def __next__(self):
|
jpayne@68
|
37 data = self.filelike.read(self.blksize)
|
jpayne@68
|
38 if data:
|
jpayne@68
|
39 return data
|
jpayne@68
|
40 raise StopIteration
|
jpayne@68
|
41
|
jpayne@68
|
42 def guess_scheme(environ):
|
jpayne@68
|
43 """Return a guess for whether 'wsgi.url_scheme' should be 'http' or 'https'
|
jpayne@68
|
44 """
|
jpayne@68
|
45 if environ.get("HTTPS") in ('yes','on','1'):
|
jpayne@68
|
46 return 'https'
|
jpayne@68
|
47 else:
|
jpayne@68
|
48 return 'http'
|
jpayne@68
|
49
|
jpayne@68
|
50 def application_uri(environ):
|
jpayne@68
|
51 """Return the application's base URI (no PATH_INFO or QUERY_STRING)"""
|
jpayne@68
|
52 url = environ['wsgi.url_scheme']+'://'
|
jpayne@68
|
53 from urllib.parse import quote
|
jpayne@68
|
54
|
jpayne@68
|
55 if environ.get('HTTP_HOST'):
|
jpayne@68
|
56 url += environ['HTTP_HOST']
|
jpayne@68
|
57 else:
|
jpayne@68
|
58 url += environ['SERVER_NAME']
|
jpayne@68
|
59
|
jpayne@68
|
60 if environ['wsgi.url_scheme'] == 'https':
|
jpayne@68
|
61 if environ['SERVER_PORT'] != '443':
|
jpayne@68
|
62 url += ':' + environ['SERVER_PORT']
|
jpayne@68
|
63 else:
|
jpayne@68
|
64 if environ['SERVER_PORT'] != '80':
|
jpayne@68
|
65 url += ':' + environ['SERVER_PORT']
|
jpayne@68
|
66
|
jpayne@68
|
67 url += quote(environ.get('SCRIPT_NAME') or '/', encoding='latin1')
|
jpayne@68
|
68 return url
|
jpayne@68
|
69
|
jpayne@68
|
70 def request_uri(environ, include_query=True):
|
jpayne@68
|
71 """Return the full request URI, optionally including the query string"""
|
jpayne@68
|
72 url = application_uri(environ)
|
jpayne@68
|
73 from urllib.parse import quote
|
jpayne@68
|
74 path_info = quote(environ.get('PATH_INFO',''), safe='/;=,', encoding='latin1')
|
jpayne@68
|
75 if not environ.get('SCRIPT_NAME'):
|
jpayne@68
|
76 url += path_info[1:]
|
jpayne@68
|
77 else:
|
jpayne@68
|
78 url += path_info
|
jpayne@68
|
79 if include_query and environ.get('QUERY_STRING'):
|
jpayne@68
|
80 url += '?' + environ['QUERY_STRING']
|
jpayne@68
|
81 return url
|
jpayne@68
|
82
|
jpayne@68
|
83 def shift_path_info(environ):
|
jpayne@68
|
84 """Shift a name from PATH_INFO to SCRIPT_NAME, returning it
|
jpayne@68
|
85
|
jpayne@68
|
86 If there are no remaining path segments in PATH_INFO, return None.
|
jpayne@68
|
87 Note: 'environ' is modified in-place; use a copy if you need to keep
|
jpayne@68
|
88 the original PATH_INFO or SCRIPT_NAME.
|
jpayne@68
|
89
|
jpayne@68
|
90 Note: when PATH_INFO is just a '/', this returns '' and appends a trailing
|
jpayne@68
|
91 '/' to SCRIPT_NAME, even though empty path segments are normally ignored,
|
jpayne@68
|
92 and SCRIPT_NAME doesn't normally end in a '/'. This is intentional
|
jpayne@68
|
93 behavior, to ensure that an application can tell the difference between
|
jpayne@68
|
94 '/x' and '/x/' when traversing to objects.
|
jpayne@68
|
95 """
|
jpayne@68
|
96 path_info = environ.get('PATH_INFO','')
|
jpayne@68
|
97 if not path_info:
|
jpayne@68
|
98 return None
|
jpayne@68
|
99
|
jpayne@68
|
100 path_parts = path_info.split('/')
|
jpayne@68
|
101 path_parts[1:-1] = [p for p in path_parts[1:-1] if p and p != '.']
|
jpayne@68
|
102 name = path_parts[1]
|
jpayne@68
|
103 del path_parts[1]
|
jpayne@68
|
104
|
jpayne@68
|
105 script_name = environ.get('SCRIPT_NAME','')
|
jpayne@68
|
106 script_name = posixpath.normpath(script_name+'/'+name)
|
jpayne@68
|
107 if script_name.endswith('/'):
|
jpayne@68
|
108 script_name = script_name[:-1]
|
jpayne@68
|
109 if not name and not script_name.endswith('/'):
|
jpayne@68
|
110 script_name += '/'
|
jpayne@68
|
111
|
jpayne@68
|
112 environ['SCRIPT_NAME'] = script_name
|
jpayne@68
|
113 environ['PATH_INFO'] = '/'.join(path_parts)
|
jpayne@68
|
114
|
jpayne@68
|
115 # Special case: '/.' on PATH_INFO doesn't get stripped,
|
jpayne@68
|
116 # because we don't strip the last element of PATH_INFO
|
jpayne@68
|
117 # if there's only one path part left. Instead of fixing this
|
jpayne@68
|
118 # above, we fix it here so that PATH_INFO gets normalized to
|
jpayne@68
|
119 # an empty string in the environ.
|
jpayne@68
|
120 if name=='.':
|
jpayne@68
|
121 name = None
|
jpayne@68
|
122 return name
|
jpayne@68
|
123
|
jpayne@68
|
124 def setup_testing_defaults(environ):
|
jpayne@68
|
125 """Update 'environ' with trivial defaults for testing purposes
|
jpayne@68
|
126
|
jpayne@68
|
127 This adds various parameters required for WSGI, including HTTP_HOST,
|
jpayne@68
|
128 SERVER_NAME, SERVER_PORT, REQUEST_METHOD, SCRIPT_NAME, PATH_INFO,
|
jpayne@68
|
129 and all of the wsgi.* variables. It only supplies default values,
|
jpayne@68
|
130 and does not replace any existing settings for these variables.
|
jpayne@68
|
131
|
jpayne@68
|
132 This routine is intended to make it easier for unit tests of WSGI
|
jpayne@68
|
133 servers and applications to set up dummy environments. It should *not*
|
jpayne@68
|
134 be used by actual WSGI servers or applications, since the data is fake!
|
jpayne@68
|
135 """
|
jpayne@68
|
136
|
jpayne@68
|
137 environ.setdefault('SERVER_NAME','127.0.0.1')
|
jpayne@68
|
138 environ.setdefault('SERVER_PROTOCOL','HTTP/1.0')
|
jpayne@68
|
139
|
jpayne@68
|
140 environ.setdefault('HTTP_HOST',environ['SERVER_NAME'])
|
jpayne@68
|
141 environ.setdefault('REQUEST_METHOD','GET')
|
jpayne@68
|
142
|
jpayne@68
|
143 if 'SCRIPT_NAME' not in environ and 'PATH_INFO' not in environ:
|
jpayne@68
|
144 environ.setdefault('SCRIPT_NAME','')
|
jpayne@68
|
145 environ.setdefault('PATH_INFO','/')
|
jpayne@68
|
146
|
jpayne@68
|
147 environ.setdefault('wsgi.version', (1,0))
|
jpayne@68
|
148 environ.setdefault('wsgi.run_once', 0)
|
jpayne@68
|
149 environ.setdefault('wsgi.multithread', 0)
|
jpayne@68
|
150 environ.setdefault('wsgi.multiprocess', 0)
|
jpayne@68
|
151
|
jpayne@68
|
152 from io import StringIO, BytesIO
|
jpayne@68
|
153 environ.setdefault('wsgi.input', BytesIO())
|
jpayne@68
|
154 environ.setdefault('wsgi.errors', StringIO())
|
jpayne@68
|
155 environ.setdefault('wsgi.url_scheme',guess_scheme(environ))
|
jpayne@68
|
156
|
jpayne@68
|
157 if environ['wsgi.url_scheme']=='http':
|
jpayne@68
|
158 environ.setdefault('SERVER_PORT', '80')
|
jpayne@68
|
159 elif environ['wsgi.url_scheme']=='https':
|
jpayne@68
|
160 environ.setdefault('SERVER_PORT', '443')
|
jpayne@68
|
161
|
jpayne@68
|
162
|
jpayne@68
|
163
|
jpayne@68
|
164 _hoppish = {
|
jpayne@68
|
165 'connection', 'keep-alive', 'proxy-authenticate',
|
jpayne@68
|
166 'proxy-authorization', 'te', 'trailers', 'transfer-encoding',
|
jpayne@68
|
167 'upgrade'
|
jpayne@68
|
168 }.__contains__
|
jpayne@68
|
169
|
jpayne@68
|
170 def is_hop_by_hop(header_name):
|
jpayne@68
|
171 """Return true if 'header_name' is an HTTP/1.1 "Hop-by-Hop" header"""
|
jpayne@68
|
172 return _hoppish(header_name.lower())
|