jpayne@68: import xml.sax jpayne@68: import xml.sax.handler jpayne@68: jpayne@68: START_ELEMENT = "START_ELEMENT" jpayne@68: END_ELEMENT = "END_ELEMENT" jpayne@68: COMMENT = "COMMENT" jpayne@68: START_DOCUMENT = "START_DOCUMENT" jpayne@68: END_DOCUMENT = "END_DOCUMENT" jpayne@68: PROCESSING_INSTRUCTION = "PROCESSING_INSTRUCTION" jpayne@68: IGNORABLE_WHITESPACE = "IGNORABLE_WHITESPACE" jpayne@68: CHARACTERS = "CHARACTERS" jpayne@68: jpayne@68: class PullDOM(xml.sax.ContentHandler): jpayne@68: _locator = None jpayne@68: document = None jpayne@68: jpayne@68: def __init__(self, documentFactory=None): jpayne@68: from xml.dom import XML_NAMESPACE jpayne@68: self.documentFactory = documentFactory jpayne@68: self.firstEvent = [None, None] jpayne@68: self.lastEvent = self.firstEvent jpayne@68: self.elementStack = [] jpayne@68: self.push = self.elementStack.append jpayne@68: try: jpayne@68: self.pop = self.elementStack.pop jpayne@68: except AttributeError: jpayne@68: # use class' pop instead jpayne@68: pass jpayne@68: self._ns_contexts = [{XML_NAMESPACE:'xml'}] # contains uri -> prefix dicts jpayne@68: self._current_context = self._ns_contexts[-1] jpayne@68: self.pending_events = [] jpayne@68: jpayne@68: def pop(self): jpayne@68: result = self.elementStack[-1] jpayne@68: del self.elementStack[-1] jpayne@68: return result jpayne@68: jpayne@68: def setDocumentLocator(self, locator): jpayne@68: self._locator = locator jpayne@68: jpayne@68: def startPrefixMapping(self, prefix, uri): jpayne@68: if not hasattr(self, '_xmlns_attrs'): jpayne@68: self._xmlns_attrs = [] jpayne@68: self._xmlns_attrs.append((prefix or 'xmlns', uri)) jpayne@68: self._ns_contexts.append(self._current_context.copy()) jpayne@68: self._current_context[uri] = prefix or None jpayne@68: jpayne@68: def endPrefixMapping(self, prefix): jpayne@68: self._current_context = self._ns_contexts.pop() jpayne@68: jpayne@68: def startElementNS(self, name, tagName , attrs): jpayne@68: # Retrieve xml namespace declaration attributes. jpayne@68: xmlns_uri = 'http://www.w3.org/2000/xmlns/' jpayne@68: xmlns_attrs = getattr(self, '_xmlns_attrs', None) jpayne@68: if xmlns_attrs is not None: jpayne@68: for aname, value in xmlns_attrs: jpayne@68: attrs._attrs[(xmlns_uri, aname)] = value jpayne@68: self._xmlns_attrs = [] jpayne@68: uri, localname = name jpayne@68: if uri: jpayne@68: # When using namespaces, the reader may or may not jpayne@68: # provide us with the original name. If not, create jpayne@68: # *a* valid tagName from the current context. jpayne@68: if tagName is None: jpayne@68: prefix = self._current_context[uri] jpayne@68: if prefix: jpayne@68: tagName = prefix + ":" + localname jpayne@68: else: jpayne@68: tagName = localname jpayne@68: if self.document: jpayne@68: node = self.document.createElementNS(uri, tagName) jpayne@68: else: jpayne@68: node = self.buildDocument(uri, tagName) jpayne@68: else: jpayne@68: # When the tagname is not prefixed, it just appears as jpayne@68: # localname jpayne@68: if self.document: jpayne@68: node = self.document.createElement(localname) jpayne@68: else: jpayne@68: node = self.buildDocument(None, localname) jpayne@68: jpayne@68: for aname,value in attrs.items(): jpayne@68: a_uri, a_localname = aname jpayne@68: if a_uri == xmlns_uri: jpayne@68: if a_localname == 'xmlns': jpayne@68: qname = a_localname jpayne@68: else: jpayne@68: qname = 'xmlns:' + a_localname jpayne@68: attr = self.document.createAttributeNS(a_uri, qname) jpayne@68: node.setAttributeNodeNS(attr) jpayne@68: elif a_uri: jpayne@68: prefix = self._current_context[a_uri] jpayne@68: if prefix: jpayne@68: qname = prefix + ":" + a_localname jpayne@68: else: jpayne@68: qname = a_localname jpayne@68: attr = self.document.createAttributeNS(a_uri, qname) jpayne@68: node.setAttributeNodeNS(attr) jpayne@68: else: jpayne@68: attr = self.document.createAttribute(a_localname) jpayne@68: node.setAttributeNode(attr) jpayne@68: attr.value = value jpayne@68: jpayne@68: self.lastEvent[1] = [(START_ELEMENT, node), None] jpayne@68: self.lastEvent = self.lastEvent[1] jpayne@68: self.push(node) jpayne@68: jpayne@68: def endElementNS(self, name, tagName): jpayne@68: self.lastEvent[1] = [(END_ELEMENT, self.pop()), None] jpayne@68: self.lastEvent = self.lastEvent[1] jpayne@68: jpayne@68: def startElement(self, name, attrs): jpayne@68: if self.document: jpayne@68: node = self.document.createElement(name) jpayne@68: else: jpayne@68: node = self.buildDocument(None, name) jpayne@68: jpayne@68: for aname,value in attrs.items(): jpayne@68: attr = self.document.createAttribute(aname) jpayne@68: attr.value = value jpayne@68: node.setAttributeNode(attr) jpayne@68: jpayne@68: self.lastEvent[1] = [(START_ELEMENT, node), None] jpayne@68: self.lastEvent = self.lastEvent[1] jpayne@68: self.push(node) jpayne@68: jpayne@68: def endElement(self, name): jpayne@68: self.lastEvent[1] = [(END_ELEMENT, self.pop()), None] jpayne@68: self.lastEvent = self.lastEvent[1] jpayne@68: jpayne@68: def comment(self, s): jpayne@68: if self.document: jpayne@68: node = self.document.createComment(s) jpayne@68: self.lastEvent[1] = [(COMMENT, node), None] jpayne@68: self.lastEvent = self.lastEvent[1] jpayne@68: else: jpayne@68: event = [(COMMENT, s), None] jpayne@68: self.pending_events.append(event) jpayne@68: jpayne@68: def processingInstruction(self, target, data): jpayne@68: if self.document: jpayne@68: node = self.document.createProcessingInstruction(target, data) jpayne@68: self.lastEvent[1] = [(PROCESSING_INSTRUCTION, node), None] jpayne@68: self.lastEvent = self.lastEvent[1] jpayne@68: else: jpayne@68: event = [(PROCESSING_INSTRUCTION, target, data), None] jpayne@68: self.pending_events.append(event) jpayne@68: jpayne@68: def ignorableWhitespace(self, chars): jpayne@68: node = self.document.createTextNode(chars) jpayne@68: self.lastEvent[1] = [(IGNORABLE_WHITESPACE, node), None] jpayne@68: self.lastEvent = self.lastEvent[1] jpayne@68: jpayne@68: def characters(self, chars): jpayne@68: node = self.document.createTextNode(chars) jpayne@68: self.lastEvent[1] = [(CHARACTERS, node), None] jpayne@68: self.lastEvent = self.lastEvent[1] jpayne@68: jpayne@68: def startDocument(self): jpayne@68: if self.documentFactory is None: jpayne@68: import xml.dom.minidom jpayne@68: self.documentFactory = xml.dom.minidom.Document.implementation jpayne@68: jpayne@68: def buildDocument(self, uri, tagname): jpayne@68: # Can't do that in startDocument, since we need the tagname jpayne@68: # XXX: obtain DocumentType jpayne@68: node = self.documentFactory.createDocument(uri, tagname, None) jpayne@68: self.document = node jpayne@68: self.lastEvent[1] = [(START_DOCUMENT, node), None] jpayne@68: self.lastEvent = self.lastEvent[1] jpayne@68: self.push(node) jpayne@68: # Put everything we have seen so far into the document jpayne@68: for e in self.pending_events: jpayne@68: if e[0][0] == PROCESSING_INSTRUCTION: jpayne@68: _,target,data = e[0] jpayne@68: n = self.document.createProcessingInstruction(target, data) jpayne@68: e[0] = (PROCESSING_INSTRUCTION, n) jpayne@68: elif e[0][0] == COMMENT: jpayne@68: n = self.document.createComment(e[0][1]) jpayne@68: e[0] = (COMMENT, n) jpayne@68: else: jpayne@68: raise AssertionError("Unknown pending event ",e[0][0]) jpayne@68: self.lastEvent[1] = e jpayne@68: self.lastEvent = e jpayne@68: self.pending_events = None jpayne@68: return node.firstChild jpayne@68: jpayne@68: def endDocument(self): jpayne@68: self.lastEvent[1] = [(END_DOCUMENT, self.document), None] jpayne@68: self.pop() jpayne@68: jpayne@68: def clear(self): jpayne@68: "clear(): Explicitly release parsing structures" jpayne@68: self.document = None jpayne@68: jpayne@68: class ErrorHandler: jpayne@68: def warning(self, exception): jpayne@68: print(exception) jpayne@68: def error(self, exception): jpayne@68: raise exception jpayne@68: def fatalError(self, exception): jpayne@68: raise exception jpayne@68: jpayne@68: class DOMEventStream: jpayne@68: def __init__(self, stream, parser, bufsize): jpayne@68: self.stream = stream jpayne@68: self.parser = parser jpayne@68: self.bufsize = bufsize jpayne@68: if not hasattr(self.parser, 'feed'): jpayne@68: self.getEvent = self._slurp jpayne@68: self.reset() jpayne@68: jpayne@68: def reset(self): jpayne@68: self.pulldom = PullDOM() jpayne@68: # This content handler relies on namespace support jpayne@68: self.parser.setFeature(xml.sax.handler.feature_namespaces, 1) jpayne@68: self.parser.setContentHandler(self.pulldom) jpayne@68: jpayne@68: def __getitem__(self, pos): jpayne@68: import warnings jpayne@68: warnings.warn( jpayne@68: "DOMEventStream's __getitem__ method ignores 'pos' parameter. " jpayne@68: "Use iterator protocol instead.", jpayne@68: DeprecationWarning, jpayne@68: stacklevel=2 jpayne@68: ) jpayne@68: rc = self.getEvent() jpayne@68: if rc: jpayne@68: return rc jpayne@68: raise IndexError jpayne@68: jpayne@68: def __next__(self): jpayne@68: rc = self.getEvent() jpayne@68: if rc: jpayne@68: return rc jpayne@68: raise StopIteration jpayne@68: jpayne@68: def __iter__(self): jpayne@68: return self jpayne@68: jpayne@68: def expandNode(self, node): jpayne@68: event = self.getEvent() jpayne@68: parents = [node] jpayne@68: while event: jpayne@68: token, cur_node = event jpayne@68: if cur_node is node: jpayne@68: return jpayne@68: if token != END_ELEMENT: jpayne@68: parents[-1].appendChild(cur_node) jpayne@68: if token == START_ELEMENT: jpayne@68: parents.append(cur_node) jpayne@68: elif token == END_ELEMENT: jpayne@68: del parents[-1] jpayne@68: event = self.getEvent() jpayne@68: jpayne@68: def getEvent(self): jpayne@68: # use IncrementalParser interface, so we get the desired jpayne@68: # pull effect jpayne@68: if not self.pulldom.firstEvent[1]: jpayne@68: self.pulldom.lastEvent = self.pulldom.firstEvent jpayne@68: while not self.pulldom.firstEvent[1]: jpayne@68: buf = self.stream.read(self.bufsize) jpayne@68: if not buf: jpayne@68: self.parser.close() jpayne@68: return None jpayne@68: self.parser.feed(buf) jpayne@68: rc = self.pulldom.firstEvent[1][0] jpayne@68: self.pulldom.firstEvent[1] = self.pulldom.firstEvent[1][1] jpayne@68: return rc jpayne@68: jpayne@68: def _slurp(self): jpayne@68: """ Fallback replacement for getEvent() using the jpayne@68: standard SAX2 interface, which means we slurp the jpayne@68: SAX events into memory (no performance gain, but jpayne@68: we are compatible to all SAX parsers). jpayne@68: """ jpayne@68: self.parser.parse(self.stream) jpayne@68: self.getEvent = self._emit jpayne@68: return self._emit() jpayne@68: jpayne@68: def _emit(self): jpayne@68: """ Fallback replacement for getEvent() that emits jpayne@68: the events that _slurp() read previously. jpayne@68: """ jpayne@68: rc = self.pulldom.firstEvent[1][0] jpayne@68: self.pulldom.firstEvent[1] = self.pulldom.firstEvent[1][1] jpayne@68: return rc jpayne@68: jpayne@68: def clear(self): jpayne@68: """clear(): Explicitly release parsing objects""" jpayne@68: self.pulldom.clear() jpayne@68: del self.pulldom jpayne@68: self.parser = None jpayne@68: self.stream = None jpayne@68: jpayne@68: class SAX2DOM(PullDOM): jpayne@68: jpayne@68: def startElementNS(self, name, tagName , attrs): jpayne@68: PullDOM.startElementNS(self, name, tagName, attrs) jpayne@68: curNode = self.elementStack[-1] jpayne@68: parentNode = self.elementStack[-2] jpayne@68: parentNode.appendChild(curNode) jpayne@68: jpayne@68: def startElement(self, name, attrs): jpayne@68: PullDOM.startElement(self, name, attrs) jpayne@68: curNode = self.elementStack[-1] jpayne@68: parentNode = self.elementStack[-2] jpayne@68: parentNode.appendChild(curNode) jpayne@68: jpayne@68: def processingInstruction(self, target, data): jpayne@68: PullDOM.processingInstruction(self, target, data) jpayne@68: node = self.lastEvent[0][1] jpayne@68: parentNode = self.elementStack[-1] jpayne@68: parentNode.appendChild(node) jpayne@68: jpayne@68: def ignorableWhitespace(self, chars): jpayne@68: PullDOM.ignorableWhitespace(self, chars) jpayne@68: node = self.lastEvent[0][1] jpayne@68: parentNode = self.elementStack[-1] jpayne@68: parentNode.appendChild(node) jpayne@68: jpayne@68: def characters(self, chars): jpayne@68: PullDOM.characters(self, chars) jpayne@68: node = self.lastEvent[0][1] jpayne@68: parentNode = self.elementStack[-1] jpayne@68: parentNode.appendChild(node) jpayne@68: jpayne@68: jpayne@68: default_bufsize = (2 ** 14) - 20 jpayne@68: jpayne@68: def parse(stream_or_string, parser=None, bufsize=None): jpayne@68: if bufsize is None: jpayne@68: bufsize = default_bufsize jpayne@68: if isinstance(stream_or_string, str): jpayne@68: stream = open(stream_or_string, 'rb') jpayne@68: else: jpayne@68: stream = stream_or_string jpayne@68: if not parser: jpayne@68: parser = xml.sax.make_parser() jpayne@68: return DOMEventStream(stream, parser, bufsize) jpayne@68: jpayne@68: def parseString(string, parser=None): jpayne@68: from io import StringIO jpayne@68: jpayne@68: bufsize = len(string) jpayne@68: buf = StringIO(string) jpayne@68: if not parser: jpayne@68: parser = xml.sax.make_parser() jpayne@68: return DOMEventStream(buf, parser, bufsize)