jpayne@68: # (c) 2005 Ian Bicking and contributors; written for Paste (http://pythonpaste.org) jpayne@68: # Licensed under the MIT license: http://www.opensource.org/licenses/mit-license.php jpayne@68: # Also licenced under the Apache License, 2.0: http://opensource.org/licenses/apache2.0.php jpayne@68: # Licensed to PSF under a Contributor Agreement jpayne@68: """ jpayne@68: Middleware to check for obedience to the WSGI specification. jpayne@68: jpayne@68: Some of the things this checks: jpayne@68: jpayne@68: * Signature of the application and start_response (including that jpayne@68: keyword arguments are not used). jpayne@68: jpayne@68: * Environment checks: jpayne@68: jpayne@68: - Environment is a dictionary (and not a subclass). jpayne@68: jpayne@68: - That all the required keys are in the environment: REQUEST_METHOD, jpayne@68: SERVER_NAME, SERVER_PORT, wsgi.version, wsgi.input, wsgi.errors, jpayne@68: wsgi.multithread, wsgi.multiprocess, wsgi.run_once jpayne@68: jpayne@68: - That HTTP_CONTENT_TYPE and HTTP_CONTENT_LENGTH are not in the jpayne@68: environment (these headers should appear as CONTENT_LENGTH and jpayne@68: CONTENT_TYPE). jpayne@68: jpayne@68: - Warns if QUERY_STRING is missing, as the cgi module acts jpayne@68: unpredictably in that case. jpayne@68: jpayne@68: - That CGI-style variables (that don't contain a .) have jpayne@68: (non-unicode) string values jpayne@68: jpayne@68: - That wsgi.version is a tuple jpayne@68: jpayne@68: - That wsgi.url_scheme is 'http' or 'https' (@@: is this too jpayne@68: restrictive?) jpayne@68: jpayne@68: - Warns if the REQUEST_METHOD is not known (@@: probably too jpayne@68: restrictive). jpayne@68: jpayne@68: - That SCRIPT_NAME and PATH_INFO are empty or start with / jpayne@68: jpayne@68: - That at least one of SCRIPT_NAME or PATH_INFO are set. jpayne@68: jpayne@68: - That CONTENT_LENGTH is a positive integer. jpayne@68: jpayne@68: - That SCRIPT_NAME is not '/' (it should be '', and PATH_INFO should jpayne@68: be '/'). jpayne@68: jpayne@68: - That wsgi.input has the methods read, readline, readlines, and jpayne@68: __iter__ jpayne@68: jpayne@68: - That wsgi.errors has the methods flush, write, writelines jpayne@68: jpayne@68: * The status is a string, contains a space, starts with an integer, jpayne@68: and that integer is in range (> 100). jpayne@68: jpayne@68: * That the headers is a list (not a subclass, not another kind of jpayne@68: sequence). jpayne@68: jpayne@68: * That the items of the headers are tuples of strings. jpayne@68: jpayne@68: * That there is no 'status' header (that is used in CGI, but not in jpayne@68: WSGI). jpayne@68: jpayne@68: * That the headers don't contain newlines or colons, end in _ or -, or jpayne@68: contain characters codes below 037. jpayne@68: jpayne@68: * That Content-Type is given if there is content (CGI often has a jpayne@68: default content type, but WSGI does not). jpayne@68: jpayne@68: * That no Content-Type is given when there is no content (@@: is this jpayne@68: too restrictive?) jpayne@68: jpayne@68: * That the exc_info argument to start_response is a tuple or None. jpayne@68: jpayne@68: * That all calls to the writer are with strings, and no other methods jpayne@68: on the writer are accessed. jpayne@68: jpayne@68: * That wsgi.input is used properly: jpayne@68: jpayne@68: - .read() is called with exactly one argument jpayne@68: jpayne@68: - That it returns a string jpayne@68: jpayne@68: - That readline, readlines, and __iter__ return strings jpayne@68: jpayne@68: - That .close() is not called jpayne@68: jpayne@68: - No other methods are provided jpayne@68: jpayne@68: * That wsgi.errors is used properly: jpayne@68: jpayne@68: - .write() and .writelines() is called with a string jpayne@68: jpayne@68: - That .close() is not called, and no other methods are provided. jpayne@68: jpayne@68: * The response iterator: jpayne@68: jpayne@68: - That it is not a string (it should be a list of a single string; a jpayne@68: string will work, but perform horribly). jpayne@68: jpayne@68: - That .__next__() returns a string jpayne@68: jpayne@68: - That the iterator is not iterated over until start_response has jpayne@68: been called (that can signal either a server or application jpayne@68: error). jpayne@68: jpayne@68: - That .close() is called (doesn't raise exception, only prints to jpayne@68: sys.stderr, because we only know it isn't called when the object jpayne@68: is garbage collected). jpayne@68: """ jpayne@68: __all__ = ['validator'] jpayne@68: jpayne@68: jpayne@68: import re jpayne@68: import sys jpayne@68: import warnings jpayne@68: jpayne@68: header_re = re.compile(r'^[a-zA-Z][a-zA-Z0-9\-_]*$') jpayne@68: bad_header_value_re = re.compile(r'[\000-\037]') jpayne@68: jpayne@68: class WSGIWarning(Warning): jpayne@68: """ jpayne@68: Raised in response to WSGI-spec-related warnings jpayne@68: """ jpayne@68: jpayne@68: def assert_(cond, *args): jpayne@68: if not cond: jpayne@68: raise AssertionError(*args) jpayne@68: jpayne@68: def check_string_type(value, title): jpayne@68: if type (value) is str: jpayne@68: return value jpayne@68: raise AssertionError( jpayne@68: "{0} must be of type str (got {1})".format(title, repr(value))) jpayne@68: jpayne@68: def validator(application): jpayne@68: jpayne@68: """ jpayne@68: When applied between a WSGI server and a WSGI application, this jpayne@68: middleware will check for WSGI compliancy on a number of levels. jpayne@68: This middleware does not modify the request or response in any jpayne@68: way, but will raise an AssertionError if anything seems off jpayne@68: (except for a failure to close the application iterator, which jpayne@68: will be printed to stderr -- there's no way to raise an exception jpayne@68: at that point). jpayne@68: """ jpayne@68: jpayne@68: def lint_app(*args, **kw): jpayne@68: assert_(len(args) == 2, "Two arguments required") jpayne@68: assert_(not kw, "No keyword arguments allowed") jpayne@68: environ, start_response = args jpayne@68: jpayne@68: check_environ(environ) jpayne@68: jpayne@68: # We use this to check if the application returns without jpayne@68: # calling start_response: jpayne@68: start_response_started = [] jpayne@68: jpayne@68: def start_response_wrapper(*args, **kw): jpayne@68: assert_(len(args) == 2 or len(args) == 3, ( jpayne@68: "Invalid number of arguments: %s" % (args,))) jpayne@68: assert_(not kw, "No keyword arguments allowed") jpayne@68: status = args[0] jpayne@68: headers = args[1] jpayne@68: if len(args) == 3: jpayne@68: exc_info = args[2] jpayne@68: else: jpayne@68: exc_info = None jpayne@68: jpayne@68: check_status(status) jpayne@68: check_headers(headers) jpayne@68: check_content_type(status, headers) jpayne@68: check_exc_info(exc_info) jpayne@68: jpayne@68: start_response_started.append(None) jpayne@68: return WriteWrapper(start_response(*args)) jpayne@68: jpayne@68: environ['wsgi.input'] = InputWrapper(environ['wsgi.input']) jpayne@68: environ['wsgi.errors'] = ErrorWrapper(environ['wsgi.errors']) jpayne@68: jpayne@68: iterator = application(environ, start_response_wrapper) jpayne@68: assert_(iterator is not None and iterator != False, jpayne@68: "The application must return an iterator, if only an empty list") jpayne@68: jpayne@68: check_iterator(iterator) jpayne@68: jpayne@68: return IteratorWrapper(iterator, start_response_started) jpayne@68: jpayne@68: return lint_app jpayne@68: jpayne@68: class InputWrapper: jpayne@68: jpayne@68: def __init__(self, wsgi_input): jpayne@68: self.input = wsgi_input jpayne@68: jpayne@68: def read(self, *args): jpayne@68: assert_(len(args) == 1) jpayne@68: v = self.input.read(*args) jpayne@68: assert_(type(v) is bytes) jpayne@68: return v jpayne@68: jpayne@68: def readline(self, *args): jpayne@68: assert_(len(args) <= 1) jpayne@68: v = self.input.readline(*args) jpayne@68: assert_(type(v) is bytes) jpayne@68: return v jpayne@68: jpayne@68: def readlines(self, *args): jpayne@68: assert_(len(args) <= 1) jpayne@68: lines = self.input.readlines(*args) jpayne@68: assert_(type(lines) is list) jpayne@68: for line in lines: jpayne@68: assert_(type(line) is bytes) jpayne@68: return lines jpayne@68: jpayne@68: def __iter__(self): jpayne@68: while 1: jpayne@68: line = self.readline() jpayne@68: if not line: jpayne@68: return jpayne@68: yield line jpayne@68: jpayne@68: def close(self): jpayne@68: assert_(0, "input.close() must not be called") jpayne@68: jpayne@68: class ErrorWrapper: jpayne@68: jpayne@68: def __init__(self, wsgi_errors): jpayne@68: self.errors = wsgi_errors jpayne@68: jpayne@68: def write(self, s): jpayne@68: assert_(type(s) is str) jpayne@68: self.errors.write(s) jpayne@68: jpayne@68: def flush(self): jpayne@68: self.errors.flush() jpayne@68: jpayne@68: def writelines(self, seq): jpayne@68: for line in seq: jpayne@68: self.write(line) jpayne@68: jpayne@68: def close(self): jpayne@68: assert_(0, "errors.close() must not be called") jpayne@68: jpayne@68: class WriteWrapper: jpayne@68: jpayne@68: def __init__(self, wsgi_writer): jpayne@68: self.writer = wsgi_writer jpayne@68: jpayne@68: def __call__(self, s): jpayne@68: assert_(type(s) is bytes) jpayne@68: self.writer(s) jpayne@68: jpayne@68: class PartialIteratorWrapper: jpayne@68: jpayne@68: def __init__(self, wsgi_iterator): jpayne@68: self.iterator = wsgi_iterator jpayne@68: jpayne@68: def __iter__(self): jpayne@68: # We want to make sure __iter__ is called jpayne@68: return IteratorWrapper(self.iterator, None) jpayne@68: jpayne@68: class IteratorWrapper: jpayne@68: jpayne@68: def __init__(self, wsgi_iterator, check_start_response): jpayne@68: self.original_iterator = wsgi_iterator jpayne@68: self.iterator = iter(wsgi_iterator) jpayne@68: self.closed = False jpayne@68: self.check_start_response = check_start_response jpayne@68: jpayne@68: def __iter__(self): jpayne@68: return self jpayne@68: jpayne@68: def __next__(self): jpayne@68: assert_(not self.closed, jpayne@68: "Iterator read after closed") jpayne@68: v = next(self.iterator) jpayne@68: if type(v) is not bytes: jpayne@68: assert_(False, "Iterator yielded non-bytestring (%r)" % (v,)) jpayne@68: if self.check_start_response is not None: jpayne@68: assert_(self.check_start_response, jpayne@68: "The application returns and we started iterating over its body, but start_response has not yet been called") jpayne@68: self.check_start_response = None jpayne@68: return v jpayne@68: jpayne@68: def close(self): jpayne@68: self.closed = True jpayne@68: if hasattr(self.original_iterator, 'close'): jpayne@68: self.original_iterator.close() jpayne@68: jpayne@68: def __del__(self): jpayne@68: if not self.closed: jpayne@68: sys.stderr.write( jpayne@68: "Iterator garbage collected without being closed") jpayne@68: assert_(self.closed, jpayne@68: "Iterator garbage collected without being closed") jpayne@68: jpayne@68: def check_environ(environ): jpayne@68: assert_(type(environ) is dict, jpayne@68: "Environment is not of the right type: %r (environment: %r)" jpayne@68: % (type(environ), environ)) jpayne@68: jpayne@68: for key in ['REQUEST_METHOD', 'SERVER_NAME', 'SERVER_PORT', jpayne@68: 'wsgi.version', 'wsgi.input', 'wsgi.errors', jpayne@68: 'wsgi.multithread', 'wsgi.multiprocess', jpayne@68: 'wsgi.run_once']: jpayne@68: assert_(key in environ, jpayne@68: "Environment missing required key: %r" % (key,)) jpayne@68: jpayne@68: for key in ['HTTP_CONTENT_TYPE', 'HTTP_CONTENT_LENGTH']: jpayne@68: assert_(key not in environ, jpayne@68: "Environment should not have the key: %s " jpayne@68: "(use %s instead)" % (key, key[5:])) jpayne@68: jpayne@68: if 'QUERY_STRING' not in environ: jpayne@68: warnings.warn( jpayne@68: 'QUERY_STRING is not in the WSGI environment; the cgi ' jpayne@68: 'module will use sys.argv when this variable is missing, ' jpayne@68: 'so application errors are more likely', jpayne@68: WSGIWarning) jpayne@68: jpayne@68: for key in environ.keys(): jpayne@68: if '.' in key: jpayne@68: # Extension, we don't care about its type jpayne@68: continue jpayne@68: assert_(type(environ[key]) is str, jpayne@68: "Environmental variable %s is not a string: %r (value: %r)" jpayne@68: % (key, type(environ[key]), environ[key])) jpayne@68: jpayne@68: assert_(type(environ['wsgi.version']) is tuple, jpayne@68: "wsgi.version should be a tuple (%r)" % (environ['wsgi.version'],)) jpayne@68: assert_(environ['wsgi.url_scheme'] in ('http', 'https'), jpayne@68: "wsgi.url_scheme unknown: %r" % environ['wsgi.url_scheme']) jpayne@68: jpayne@68: check_input(environ['wsgi.input']) jpayne@68: check_errors(environ['wsgi.errors']) jpayne@68: jpayne@68: # @@: these need filling out: jpayne@68: if environ['REQUEST_METHOD'] not in ( jpayne@68: 'GET', 'HEAD', 'POST', 'OPTIONS', 'PATCH', 'PUT', 'DELETE', 'TRACE'): jpayne@68: warnings.warn( jpayne@68: "Unknown REQUEST_METHOD: %r" % environ['REQUEST_METHOD'], jpayne@68: WSGIWarning) jpayne@68: jpayne@68: assert_(not environ.get('SCRIPT_NAME') jpayne@68: or environ['SCRIPT_NAME'].startswith('/'), jpayne@68: "SCRIPT_NAME doesn't start with /: %r" % environ['SCRIPT_NAME']) jpayne@68: assert_(not environ.get('PATH_INFO') jpayne@68: or environ['PATH_INFO'].startswith('/'), jpayne@68: "PATH_INFO doesn't start with /: %r" % environ['PATH_INFO']) jpayne@68: if environ.get('CONTENT_LENGTH'): jpayne@68: assert_(int(environ['CONTENT_LENGTH']) >= 0, jpayne@68: "Invalid CONTENT_LENGTH: %r" % environ['CONTENT_LENGTH']) jpayne@68: jpayne@68: if not environ.get('SCRIPT_NAME'): jpayne@68: assert_('PATH_INFO' in environ, jpayne@68: "One of SCRIPT_NAME or PATH_INFO are required (PATH_INFO " jpayne@68: "should at least be '/' if SCRIPT_NAME is empty)") jpayne@68: assert_(environ.get('SCRIPT_NAME') != '/', jpayne@68: "SCRIPT_NAME cannot be '/'; it should instead be '', and " jpayne@68: "PATH_INFO should be '/'") jpayne@68: jpayne@68: def check_input(wsgi_input): jpayne@68: for attr in ['read', 'readline', 'readlines', '__iter__']: jpayne@68: assert_(hasattr(wsgi_input, attr), jpayne@68: "wsgi.input (%r) doesn't have the attribute %s" jpayne@68: % (wsgi_input, attr)) jpayne@68: jpayne@68: def check_errors(wsgi_errors): jpayne@68: for attr in ['flush', 'write', 'writelines']: jpayne@68: assert_(hasattr(wsgi_errors, attr), jpayne@68: "wsgi.errors (%r) doesn't have the attribute %s" jpayne@68: % (wsgi_errors, attr)) jpayne@68: jpayne@68: def check_status(status): jpayne@68: status = check_string_type(status, "Status") jpayne@68: # Implicitly check that we can turn it into an integer: jpayne@68: status_code = status.split(None, 1)[0] jpayne@68: assert_(len(status_code) == 3, jpayne@68: "Status codes must be three characters: %r" % status_code) jpayne@68: status_int = int(status_code) jpayne@68: assert_(status_int >= 100, "Status code is invalid: %r" % status_int) jpayne@68: if len(status) < 4 or status[3] != ' ': jpayne@68: warnings.warn( jpayne@68: "The status string (%r) should be a three-digit integer " jpayne@68: "followed by a single space and a status explanation" jpayne@68: % status, WSGIWarning) jpayne@68: jpayne@68: def check_headers(headers): jpayne@68: assert_(type(headers) is list, jpayne@68: "Headers (%r) must be of type list: %r" jpayne@68: % (headers, type(headers))) jpayne@68: for item in headers: jpayne@68: assert_(type(item) is tuple, jpayne@68: "Individual headers (%r) must be of type tuple: %r" jpayne@68: % (item, type(item))) jpayne@68: assert_(len(item) == 2) jpayne@68: name, value = item jpayne@68: name = check_string_type(name, "Header name") jpayne@68: value = check_string_type(value, "Header value") jpayne@68: assert_(name.lower() != 'status', jpayne@68: "The Status header cannot be used; it conflicts with CGI " jpayne@68: "script, and HTTP status is not given through headers " jpayne@68: "(value: %r)." % value) jpayne@68: assert_('\n' not in name and ':' not in name, jpayne@68: "Header names may not contain ':' or '\\n': %r" % name) jpayne@68: assert_(header_re.search(name), "Bad header name: %r" % name) jpayne@68: assert_(not name.endswith('-') and not name.endswith('_'), jpayne@68: "Names may not end in '-' or '_': %r" % name) jpayne@68: if bad_header_value_re.search(value): jpayne@68: assert_(0, "Bad header value: %r (bad char: %r)" jpayne@68: % (value, bad_header_value_re.search(value).group(0))) jpayne@68: jpayne@68: def check_content_type(status, headers): jpayne@68: status = check_string_type(status, "Status") jpayne@68: code = int(status.split(None, 1)[0]) jpayne@68: # @@: need one more person to verify this interpretation of RFC 2616 jpayne@68: # http://www.w3.org/Protocols/rfc2616/rfc2616-sec10.html jpayne@68: NO_MESSAGE_BODY = (204, 304) jpayne@68: for name, value in headers: jpayne@68: name = check_string_type(name, "Header name") jpayne@68: if name.lower() == 'content-type': jpayne@68: if code not in NO_MESSAGE_BODY: jpayne@68: return jpayne@68: assert_(0, ("Content-Type header found in a %s response, " jpayne@68: "which must not return content.") % code) jpayne@68: if code not in NO_MESSAGE_BODY: jpayne@68: assert_(0, "No Content-Type header found in headers (%s)" % headers) jpayne@68: jpayne@68: def check_exc_info(exc_info): jpayne@68: assert_(exc_info is None or type(exc_info) is tuple, jpayne@68: "exc_info (%r) is not a tuple: %r" % (exc_info, type(exc_info))) jpayne@68: # More exc_info checks? jpayne@68: jpayne@68: def check_iterator(iterator): jpayne@68: # Technically a bytestring is legal, which is why it's a really bad jpayne@68: # idea, because it may cause the response to be returned jpayne@68: # character-by-character jpayne@68: assert_(not isinstance(iterator, (str, bytes)), jpayne@68: "You should not return a string as your application iterator, " jpayne@68: "instead return a single-item list containing a bytestring.")