jpayne@69: # (c) 2005 Ian Bicking and contributors; written for Paste (http://pythonpaste.org) jpayne@69: # Licensed under the MIT license: http://www.opensource.org/licenses/mit-license.php jpayne@69: # Also licenced under the Apache License, 2.0: http://opensource.org/licenses/apache2.0.php jpayne@69: # Licensed to PSF under a Contributor Agreement jpayne@69: """ jpayne@69: Middleware to check for obedience to the WSGI specification. jpayne@69: jpayne@69: Some of the things this checks: jpayne@69: jpayne@69: * Signature of the application and start_response (including that jpayne@69: keyword arguments are not used). jpayne@69: jpayne@69: * Environment checks: jpayne@69: jpayne@69: - Environment is a dictionary (and not a subclass). jpayne@69: jpayne@69: - That all the required keys are in the environment: REQUEST_METHOD, jpayne@69: SERVER_NAME, SERVER_PORT, wsgi.version, wsgi.input, wsgi.errors, jpayne@69: wsgi.multithread, wsgi.multiprocess, wsgi.run_once jpayne@69: jpayne@69: - That HTTP_CONTENT_TYPE and HTTP_CONTENT_LENGTH are not in the jpayne@69: environment (these headers should appear as CONTENT_LENGTH and jpayne@69: CONTENT_TYPE). jpayne@69: jpayne@69: - Warns if QUERY_STRING is missing, as the cgi module acts jpayne@69: unpredictably in that case. jpayne@69: jpayne@69: - That CGI-style variables (that don't contain a .) have jpayne@69: (non-unicode) string values jpayne@69: jpayne@69: - That wsgi.version is a tuple jpayne@69: jpayne@69: - That wsgi.url_scheme is 'http' or 'https' (@@: is this too jpayne@69: restrictive?) jpayne@69: jpayne@69: - Warns if the REQUEST_METHOD is not known (@@: probably too jpayne@69: restrictive). jpayne@69: jpayne@69: - That SCRIPT_NAME and PATH_INFO are empty or start with / jpayne@69: jpayne@69: - That at least one of SCRIPT_NAME or PATH_INFO are set. jpayne@69: jpayne@69: - That CONTENT_LENGTH is a positive integer. jpayne@69: jpayne@69: - That SCRIPT_NAME is not '/' (it should be '', and PATH_INFO should jpayne@69: be '/'). jpayne@69: jpayne@69: - That wsgi.input has the methods read, readline, readlines, and jpayne@69: __iter__ jpayne@69: jpayne@69: - That wsgi.errors has the methods flush, write, writelines jpayne@69: jpayne@69: * The status is a string, contains a space, starts with an integer, jpayne@69: and that integer is in range (> 100). jpayne@69: jpayne@69: * That the headers is a list (not a subclass, not another kind of jpayne@69: sequence). jpayne@69: jpayne@69: * That the items of the headers are tuples of strings. jpayne@69: jpayne@69: * That there is no 'status' header (that is used in CGI, but not in jpayne@69: WSGI). jpayne@69: jpayne@69: * That the headers don't contain newlines or colons, end in _ or -, or jpayne@69: contain characters codes below 037. jpayne@69: jpayne@69: * That Content-Type is given if there is content (CGI often has a jpayne@69: default content type, but WSGI does not). jpayne@69: jpayne@69: * That no Content-Type is given when there is no content (@@: is this jpayne@69: too restrictive?) jpayne@69: jpayne@69: * That the exc_info argument to start_response is a tuple or None. jpayne@69: jpayne@69: * That all calls to the writer are with strings, and no other methods jpayne@69: on the writer are accessed. jpayne@69: jpayne@69: * That wsgi.input is used properly: jpayne@69: jpayne@69: - .read() is called with exactly one argument jpayne@69: jpayne@69: - That it returns a string jpayne@69: jpayne@69: - That readline, readlines, and __iter__ return strings jpayne@69: jpayne@69: - That .close() is not called jpayne@69: jpayne@69: - No other methods are provided jpayne@69: jpayne@69: * That wsgi.errors is used properly: jpayne@69: jpayne@69: - .write() and .writelines() is called with a string jpayne@69: jpayne@69: - That .close() is not called, and no other methods are provided. jpayne@69: jpayne@69: * The response iterator: jpayne@69: jpayne@69: - That it is not a string (it should be a list of a single string; a jpayne@69: string will work, but perform horribly). jpayne@69: jpayne@69: - That .__next__() returns a string jpayne@69: jpayne@69: - That the iterator is not iterated over until start_response has jpayne@69: been called (that can signal either a server or application jpayne@69: error). jpayne@69: jpayne@69: - That .close() is called (doesn't raise exception, only prints to jpayne@69: sys.stderr, because we only know it isn't called when the object jpayne@69: is garbage collected). jpayne@69: """ jpayne@69: __all__ = ['validator'] jpayne@69: jpayne@69: jpayne@69: import re jpayne@69: import sys jpayne@69: import warnings jpayne@69: jpayne@69: header_re = re.compile(r'^[a-zA-Z][a-zA-Z0-9\-_]*$') jpayne@69: bad_header_value_re = re.compile(r'[\000-\037]') jpayne@69: jpayne@69: class WSGIWarning(Warning): jpayne@69: """ jpayne@69: Raised in response to WSGI-spec-related warnings jpayne@69: """ jpayne@69: jpayne@69: def assert_(cond, *args): jpayne@69: if not cond: jpayne@69: raise AssertionError(*args) jpayne@69: jpayne@69: def check_string_type(value, title): jpayne@69: if type (value) is str: jpayne@69: return value jpayne@69: raise AssertionError( jpayne@69: "{0} must be of type str (got {1})".format(title, repr(value))) jpayne@69: jpayne@69: def validator(application): jpayne@69: jpayne@69: """ jpayne@69: When applied between a WSGI server and a WSGI application, this jpayne@69: middleware will check for WSGI compliancy on a number of levels. jpayne@69: This middleware does not modify the request or response in any jpayne@69: way, but will raise an AssertionError if anything seems off jpayne@69: (except for a failure to close the application iterator, which jpayne@69: will be printed to stderr -- there's no way to raise an exception jpayne@69: at that point). jpayne@69: """ jpayne@69: jpayne@69: def lint_app(*args, **kw): jpayne@69: assert_(len(args) == 2, "Two arguments required") jpayne@69: assert_(not kw, "No keyword arguments allowed") jpayne@69: environ, start_response = args jpayne@69: jpayne@69: check_environ(environ) jpayne@69: jpayne@69: # We use this to check if the application returns without jpayne@69: # calling start_response: jpayne@69: start_response_started = [] jpayne@69: jpayne@69: def start_response_wrapper(*args, **kw): jpayne@69: assert_(len(args) == 2 or len(args) == 3, ( jpayne@69: "Invalid number of arguments: %s" % (args,))) jpayne@69: assert_(not kw, "No keyword arguments allowed") jpayne@69: status = args[0] jpayne@69: headers = args[1] jpayne@69: if len(args) == 3: jpayne@69: exc_info = args[2] jpayne@69: else: jpayne@69: exc_info = None jpayne@69: jpayne@69: check_status(status) jpayne@69: check_headers(headers) jpayne@69: check_content_type(status, headers) jpayne@69: check_exc_info(exc_info) jpayne@69: jpayne@69: start_response_started.append(None) jpayne@69: return WriteWrapper(start_response(*args)) jpayne@69: jpayne@69: environ['wsgi.input'] = InputWrapper(environ['wsgi.input']) jpayne@69: environ['wsgi.errors'] = ErrorWrapper(environ['wsgi.errors']) jpayne@69: jpayne@69: iterator = application(environ, start_response_wrapper) jpayne@69: assert_(iterator is not None and iterator != False, jpayne@69: "The application must return an iterator, if only an empty list") jpayne@69: jpayne@69: check_iterator(iterator) jpayne@69: jpayne@69: return IteratorWrapper(iterator, start_response_started) jpayne@69: jpayne@69: return lint_app jpayne@69: jpayne@69: class InputWrapper: jpayne@69: jpayne@69: def __init__(self, wsgi_input): jpayne@69: self.input = wsgi_input jpayne@69: jpayne@69: def read(self, *args): jpayne@69: assert_(len(args) == 1) jpayne@69: v = self.input.read(*args) jpayne@69: assert_(type(v) is bytes) jpayne@69: return v jpayne@69: jpayne@69: def readline(self, *args): jpayne@69: assert_(len(args) <= 1) jpayne@69: v = self.input.readline(*args) jpayne@69: assert_(type(v) is bytes) jpayne@69: return v jpayne@69: jpayne@69: def readlines(self, *args): jpayne@69: assert_(len(args) <= 1) jpayne@69: lines = self.input.readlines(*args) jpayne@69: assert_(type(lines) is list) jpayne@69: for line in lines: jpayne@69: assert_(type(line) is bytes) jpayne@69: return lines jpayne@69: jpayne@69: def __iter__(self): jpayne@69: while 1: jpayne@69: line = self.readline() jpayne@69: if not line: jpayne@69: return jpayne@69: yield line jpayne@69: jpayne@69: def close(self): jpayne@69: assert_(0, "input.close() must not be called") jpayne@69: jpayne@69: class ErrorWrapper: jpayne@69: jpayne@69: def __init__(self, wsgi_errors): jpayne@69: self.errors = wsgi_errors jpayne@69: jpayne@69: def write(self, s): jpayne@69: assert_(type(s) is str) jpayne@69: self.errors.write(s) jpayne@69: jpayne@69: def flush(self): jpayne@69: self.errors.flush() jpayne@69: jpayne@69: def writelines(self, seq): jpayne@69: for line in seq: jpayne@69: self.write(line) jpayne@69: jpayne@69: def close(self): jpayne@69: assert_(0, "errors.close() must not be called") jpayne@69: jpayne@69: class WriteWrapper: jpayne@69: jpayne@69: def __init__(self, wsgi_writer): jpayne@69: self.writer = wsgi_writer jpayne@69: jpayne@69: def __call__(self, s): jpayne@69: assert_(type(s) is bytes) jpayne@69: self.writer(s) jpayne@69: jpayne@69: class PartialIteratorWrapper: jpayne@69: jpayne@69: def __init__(self, wsgi_iterator): jpayne@69: self.iterator = wsgi_iterator jpayne@69: jpayne@69: def __iter__(self): jpayne@69: # We want to make sure __iter__ is called jpayne@69: return IteratorWrapper(self.iterator, None) jpayne@69: jpayne@69: class IteratorWrapper: jpayne@69: jpayne@69: def __init__(self, wsgi_iterator, check_start_response): jpayne@69: self.original_iterator = wsgi_iterator jpayne@69: self.iterator = iter(wsgi_iterator) jpayne@69: self.closed = False jpayne@69: self.check_start_response = check_start_response jpayne@69: jpayne@69: def __iter__(self): jpayne@69: return self jpayne@69: jpayne@69: def __next__(self): jpayne@69: assert_(not self.closed, jpayne@69: "Iterator read after closed") jpayne@69: v = next(self.iterator) jpayne@69: if type(v) is not bytes: jpayne@69: assert_(False, "Iterator yielded non-bytestring (%r)" % (v,)) jpayne@69: if self.check_start_response is not None: jpayne@69: assert_(self.check_start_response, jpayne@69: "The application returns and we started iterating over its body, but start_response has not yet been called") jpayne@69: self.check_start_response = None jpayne@69: return v jpayne@69: jpayne@69: def close(self): jpayne@69: self.closed = True jpayne@69: if hasattr(self.original_iterator, 'close'): jpayne@69: self.original_iterator.close() jpayne@69: jpayne@69: def __del__(self): jpayne@69: if not self.closed: jpayne@69: sys.stderr.write( jpayne@69: "Iterator garbage collected without being closed") jpayne@69: assert_(self.closed, jpayne@69: "Iterator garbage collected without being closed") jpayne@69: jpayne@69: def check_environ(environ): jpayne@69: assert_(type(environ) is dict, jpayne@69: "Environment is not of the right type: %r (environment: %r)" jpayne@69: % (type(environ), environ)) jpayne@69: jpayne@69: for key in ['REQUEST_METHOD', 'SERVER_NAME', 'SERVER_PORT', jpayne@69: 'wsgi.version', 'wsgi.input', 'wsgi.errors', jpayne@69: 'wsgi.multithread', 'wsgi.multiprocess', jpayne@69: 'wsgi.run_once']: jpayne@69: assert_(key in environ, jpayne@69: "Environment missing required key: %r" % (key,)) jpayne@69: jpayne@69: for key in ['HTTP_CONTENT_TYPE', 'HTTP_CONTENT_LENGTH']: jpayne@69: assert_(key not in environ, jpayne@69: "Environment should not have the key: %s " jpayne@69: "(use %s instead)" % (key, key[5:])) jpayne@69: jpayne@69: if 'QUERY_STRING' not in environ: jpayne@69: warnings.warn( jpayne@69: 'QUERY_STRING is not in the WSGI environment; the cgi ' jpayne@69: 'module will use sys.argv when this variable is missing, ' jpayne@69: 'so application errors are more likely', jpayne@69: WSGIWarning) jpayne@69: jpayne@69: for key in environ.keys(): jpayne@69: if '.' in key: jpayne@69: # Extension, we don't care about its type jpayne@69: continue jpayne@69: assert_(type(environ[key]) is str, jpayne@69: "Environmental variable %s is not a string: %r (value: %r)" jpayne@69: % (key, type(environ[key]), environ[key])) jpayne@69: jpayne@69: assert_(type(environ['wsgi.version']) is tuple, jpayne@69: "wsgi.version should be a tuple (%r)" % (environ['wsgi.version'],)) jpayne@69: assert_(environ['wsgi.url_scheme'] in ('http', 'https'), jpayne@69: "wsgi.url_scheme unknown: %r" % environ['wsgi.url_scheme']) jpayne@69: jpayne@69: check_input(environ['wsgi.input']) jpayne@69: check_errors(environ['wsgi.errors']) jpayne@69: jpayne@69: # @@: these need filling out: jpayne@69: if environ['REQUEST_METHOD'] not in ( jpayne@69: 'GET', 'HEAD', 'POST', 'OPTIONS', 'PATCH', 'PUT', 'DELETE', 'TRACE'): jpayne@69: warnings.warn( jpayne@69: "Unknown REQUEST_METHOD: %r" % environ['REQUEST_METHOD'], jpayne@69: WSGIWarning) jpayne@69: jpayne@69: assert_(not environ.get('SCRIPT_NAME') jpayne@69: or environ['SCRIPT_NAME'].startswith('/'), jpayne@69: "SCRIPT_NAME doesn't start with /: %r" % environ['SCRIPT_NAME']) jpayne@69: assert_(not environ.get('PATH_INFO') jpayne@69: or environ['PATH_INFO'].startswith('/'), jpayne@69: "PATH_INFO doesn't start with /: %r" % environ['PATH_INFO']) jpayne@69: if environ.get('CONTENT_LENGTH'): jpayne@69: assert_(int(environ['CONTENT_LENGTH']) >= 0, jpayne@69: "Invalid CONTENT_LENGTH: %r" % environ['CONTENT_LENGTH']) jpayne@69: jpayne@69: if not environ.get('SCRIPT_NAME'): jpayne@69: assert_('PATH_INFO' in environ, jpayne@69: "One of SCRIPT_NAME or PATH_INFO are required (PATH_INFO " jpayne@69: "should at least be '/' if SCRIPT_NAME is empty)") jpayne@69: assert_(environ.get('SCRIPT_NAME') != '/', jpayne@69: "SCRIPT_NAME cannot be '/'; it should instead be '', and " jpayne@69: "PATH_INFO should be '/'") jpayne@69: jpayne@69: def check_input(wsgi_input): jpayne@69: for attr in ['read', 'readline', 'readlines', '__iter__']: jpayne@69: assert_(hasattr(wsgi_input, attr), jpayne@69: "wsgi.input (%r) doesn't have the attribute %s" jpayne@69: % (wsgi_input, attr)) jpayne@69: jpayne@69: def check_errors(wsgi_errors): jpayne@69: for attr in ['flush', 'write', 'writelines']: jpayne@69: assert_(hasattr(wsgi_errors, attr), jpayne@69: "wsgi.errors (%r) doesn't have the attribute %s" jpayne@69: % (wsgi_errors, attr)) jpayne@69: jpayne@69: def check_status(status): jpayne@69: status = check_string_type(status, "Status") jpayne@69: # Implicitly check that we can turn it into an integer: jpayne@69: status_code = status.split(None, 1)[0] jpayne@69: assert_(len(status_code) == 3, jpayne@69: "Status codes must be three characters: %r" % status_code) jpayne@69: status_int = int(status_code) jpayne@69: assert_(status_int >= 100, "Status code is invalid: %r" % status_int) jpayne@69: if len(status) < 4 or status[3] != ' ': jpayne@69: warnings.warn( jpayne@69: "The status string (%r) should be a three-digit integer " jpayne@69: "followed by a single space and a status explanation" jpayne@69: % status, WSGIWarning) jpayne@69: jpayne@69: def check_headers(headers): jpayne@69: assert_(type(headers) is list, jpayne@69: "Headers (%r) must be of type list: %r" jpayne@69: % (headers, type(headers))) jpayne@69: for item in headers: jpayne@69: assert_(type(item) is tuple, jpayne@69: "Individual headers (%r) must be of type tuple: %r" jpayne@69: % (item, type(item))) jpayne@69: assert_(len(item) == 2) jpayne@69: name, value = item jpayne@69: name = check_string_type(name, "Header name") jpayne@69: value = check_string_type(value, "Header value") jpayne@69: assert_(name.lower() != 'status', jpayne@69: "The Status header cannot be used; it conflicts with CGI " jpayne@69: "script, and HTTP status is not given through headers " jpayne@69: "(value: %r)." % value) jpayne@69: assert_('\n' not in name and ':' not in name, jpayne@69: "Header names may not contain ':' or '\\n': %r" % name) jpayne@69: assert_(header_re.search(name), "Bad header name: %r" % name) jpayne@69: assert_(not name.endswith('-') and not name.endswith('_'), jpayne@69: "Names may not end in '-' or '_': %r" % name) jpayne@69: if bad_header_value_re.search(value): jpayne@69: assert_(0, "Bad header value: %r (bad char: %r)" jpayne@69: % (value, bad_header_value_re.search(value).group(0))) jpayne@69: jpayne@69: def check_content_type(status, headers): jpayne@69: status = check_string_type(status, "Status") jpayne@69: code = int(status.split(None, 1)[0]) jpayne@69: # @@: need one more person to verify this interpretation of RFC 2616 jpayne@69: # http://www.w3.org/Protocols/rfc2616/rfc2616-sec10.html jpayne@69: NO_MESSAGE_BODY = (204, 304) jpayne@69: for name, value in headers: jpayne@69: name = check_string_type(name, "Header name") jpayne@69: if name.lower() == 'content-type': jpayne@69: if code not in NO_MESSAGE_BODY: jpayne@69: return jpayne@69: assert_(0, ("Content-Type header found in a %s response, " jpayne@69: "which must not return content.") % code) jpayne@69: if code not in NO_MESSAGE_BODY: jpayne@69: assert_(0, "No Content-Type header found in headers (%s)" % headers) jpayne@69: jpayne@69: def check_exc_info(exc_info): jpayne@69: assert_(exc_info is None or type(exc_info) is tuple, jpayne@69: "exc_info (%r) is not a tuple: %r" % (exc_info, type(exc_info))) jpayne@69: # More exc_info checks? jpayne@69: jpayne@69: def check_iterator(iterator): jpayne@69: # Technically a bytestring is legal, which is why it's a really bad jpayne@69: # idea, because it may cause the response to be returned jpayne@69: # character-by-character jpayne@69: assert_(not isinstance(iterator, (str, bytes)), jpayne@69: "You should not return a string as your application iterator, " jpayne@69: "instead return a single-item list containing a bytestring.")