annotate CSP2/CSP2_env/env-d9b9114564458d9d-741b3de822f2aaca6c6caa4325c4afce/lib/python3.8/xml/dom/pulldom.py @ 68:5028fdace37b

planemo upload commit 2e9511a184a1ca667c7be0c6321a36dc4e3d116d
author jpayne
date Tue, 18 Mar 2025 16:23:26 -0400
parents
children
rev   line source
jpayne@68 1 import xml.sax
jpayne@68 2 import xml.sax.handler
jpayne@68 3
jpayne@68 4 START_ELEMENT = "START_ELEMENT"
jpayne@68 5 END_ELEMENT = "END_ELEMENT"
jpayne@68 6 COMMENT = "COMMENT"
jpayne@68 7 START_DOCUMENT = "START_DOCUMENT"
jpayne@68 8 END_DOCUMENT = "END_DOCUMENT"
jpayne@68 9 PROCESSING_INSTRUCTION = "PROCESSING_INSTRUCTION"
jpayne@68 10 IGNORABLE_WHITESPACE = "IGNORABLE_WHITESPACE"
jpayne@68 11 CHARACTERS = "CHARACTERS"
jpayne@68 12
jpayne@68 13 class PullDOM(xml.sax.ContentHandler):
jpayne@68 14 _locator = None
jpayne@68 15 document = None
jpayne@68 16
jpayne@68 17 def __init__(self, documentFactory=None):
jpayne@68 18 from xml.dom import XML_NAMESPACE
jpayne@68 19 self.documentFactory = documentFactory
jpayne@68 20 self.firstEvent = [None, None]
jpayne@68 21 self.lastEvent = self.firstEvent
jpayne@68 22 self.elementStack = []
jpayne@68 23 self.push = self.elementStack.append
jpayne@68 24 try:
jpayne@68 25 self.pop = self.elementStack.pop
jpayne@68 26 except AttributeError:
jpayne@68 27 # use class' pop instead
jpayne@68 28 pass
jpayne@68 29 self._ns_contexts = [{XML_NAMESPACE:'xml'}] # contains uri -> prefix dicts
jpayne@68 30 self._current_context = self._ns_contexts[-1]
jpayne@68 31 self.pending_events = []
jpayne@68 32
jpayne@68 33 def pop(self):
jpayne@68 34 result = self.elementStack[-1]
jpayne@68 35 del self.elementStack[-1]
jpayne@68 36 return result
jpayne@68 37
jpayne@68 38 def setDocumentLocator(self, locator):
jpayne@68 39 self._locator = locator
jpayne@68 40
jpayne@68 41 def startPrefixMapping(self, prefix, uri):
jpayne@68 42 if not hasattr(self, '_xmlns_attrs'):
jpayne@68 43 self._xmlns_attrs = []
jpayne@68 44 self._xmlns_attrs.append((prefix or 'xmlns', uri))
jpayne@68 45 self._ns_contexts.append(self._current_context.copy())
jpayne@68 46 self._current_context[uri] = prefix or None
jpayne@68 47
jpayne@68 48 def endPrefixMapping(self, prefix):
jpayne@68 49 self._current_context = self._ns_contexts.pop()
jpayne@68 50
jpayne@68 51 def startElementNS(self, name, tagName , attrs):
jpayne@68 52 # Retrieve xml namespace declaration attributes.
jpayne@68 53 xmlns_uri = 'http://www.w3.org/2000/xmlns/'
jpayne@68 54 xmlns_attrs = getattr(self, '_xmlns_attrs', None)
jpayne@68 55 if xmlns_attrs is not None:
jpayne@68 56 for aname, value in xmlns_attrs:
jpayne@68 57 attrs._attrs[(xmlns_uri, aname)] = value
jpayne@68 58 self._xmlns_attrs = []
jpayne@68 59 uri, localname = name
jpayne@68 60 if uri:
jpayne@68 61 # When using namespaces, the reader may or may not
jpayne@68 62 # provide us with the original name. If not, create
jpayne@68 63 # *a* valid tagName from the current context.
jpayne@68 64 if tagName is None:
jpayne@68 65 prefix = self._current_context[uri]
jpayne@68 66 if prefix:
jpayne@68 67 tagName = prefix + ":" + localname
jpayne@68 68 else:
jpayne@68 69 tagName = localname
jpayne@68 70 if self.document:
jpayne@68 71 node = self.document.createElementNS(uri, tagName)
jpayne@68 72 else:
jpayne@68 73 node = self.buildDocument(uri, tagName)
jpayne@68 74 else:
jpayne@68 75 # When the tagname is not prefixed, it just appears as
jpayne@68 76 # localname
jpayne@68 77 if self.document:
jpayne@68 78 node = self.document.createElement(localname)
jpayne@68 79 else:
jpayne@68 80 node = self.buildDocument(None, localname)
jpayne@68 81
jpayne@68 82 for aname,value in attrs.items():
jpayne@68 83 a_uri, a_localname = aname
jpayne@68 84 if a_uri == xmlns_uri:
jpayne@68 85 if a_localname == 'xmlns':
jpayne@68 86 qname = a_localname
jpayne@68 87 else:
jpayne@68 88 qname = 'xmlns:' + a_localname
jpayne@68 89 attr = self.document.createAttributeNS(a_uri, qname)
jpayne@68 90 node.setAttributeNodeNS(attr)
jpayne@68 91 elif a_uri:
jpayne@68 92 prefix = self._current_context[a_uri]
jpayne@68 93 if prefix:
jpayne@68 94 qname = prefix + ":" + a_localname
jpayne@68 95 else:
jpayne@68 96 qname = a_localname
jpayne@68 97 attr = self.document.createAttributeNS(a_uri, qname)
jpayne@68 98 node.setAttributeNodeNS(attr)
jpayne@68 99 else:
jpayne@68 100 attr = self.document.createAttribute(a_localname)
jpayne@68 101 node.setAttributeNode(attr)
jpayne@68 102 attr.value = value
jpayne@68 103
jpayne@68 104 self.lastEvent[1] = [(START_ELEMENT, node), None]
jpayne@68 105 self.lastEvent = self.lastEvent[1]
jpayne@68 106 self.push(node)
jpayne@68 107
jpayne@68 108 def endElementNS(self, name, tagName):
jpayne@68 109 self.lastEvent[1] = [(END_ELEMENT, self.pop()), None]
jpayne@68 110 self.lastEvent = self.lastEvent[1]
jpayne@68 111
jpayne@68 112 def startElement(self, name, attrs):
jpayne@68 113 if self.document:
jpayne@68 114 node = self.document.createElement(name)
jpayne@68 115 else:
jpayne@68 116 node = self.buildDocument(None, name)
jpayne@68 117
jpayne@68 118 for aname,value in attrs.items():
jpayne@68 119 attr = self.document.createAttribute(aname)
jpayne@68 120 attr.value = value
jpayne@68 121 node.setAttributeNode(attr)
jpayne@68 122
jpayne@68 123 self.lastEvent[1] = [(START_ELEMENT, node), None]
jpayne@68 124 self.lastEvent = self.lastEvent[1]
jpayne@68 125 self.push(node)
jpayne@68 126
jpayne@68 127 def endElement(self, name):
jpayne@68 128 self.lastEvent[1] = [(END_ELEMENT, self.pop()), None]
jpayne@68 129 self.lastEvent = self.lastEvent[1]
jpayne@68 130
jpayne@68 131 def comment(self, s):
jpayne@68 132 if self.document:
jpayne@68 133 node = self.document.createComment(s)
jpayne@68 134 self.lastEvent[1] = [(COMMENT, node), None]
jpayne@68 135 self.lastEvent = self.lastEvent[1]
jpayne@68 136 else:
jpayne@68 137 event = [(COMMENT, s), None]
jpayne@68 138 self.pending_events.append(event)
jpayne@68 139
jpayne@68 140 def processingInstruction(self, target, data):
jpayne@68 141 if self.document:
jpayne@68 142 node = self.document.createProcessingInstruction(target, data)
jpayne@68 143 self.lastEvent[1] = [(PROCESSING_INSTRUCTION, node), None]
jpayne@68 144 self.lastEvent = self.lastEvent[1]
jpayne@68 145 else:
jpayne@68 146 event = [(PROCESSING_INSTRUCTION, target, data), None]
jpayne@68 147 self.pending_events.append(event)
jpayne@68 148
jpayne@68 149 def ignorableWhitespace(self, chars):
jpayne@68 150 node = self.document.createTextNode(chars)
jpayne@68 151 self.lastEvent[1] = [(IGNORABLE_WHITESPACE, node), None]
jpayne@68 152 self.lastEvent = self.lastEvent[1]
jpayne@68 153
jpayne@68 154 def characters(self, chars):
jpayne@68 155 node = self.document.createTextNode(chars)
jpayne@68 156 self.lastEvent[1] = [(CHARACTERS, node), None]
jpayne@68 157 self.lastEvent = self.lastEvent[1]
jpayne@68 158
jpayne@68 159 def startDocument(self):
jpayne@68 160 if self.documentFactory is None:
jpayne@68 161 import xml.dom.minidom
jpayne@68 162 self.documentFactory = xml.dom.minidom.Document.implementation
jpayne@68 163
jpayne@68 164 def buildDocument(self, uri, tagname):
jpayne@68 165 # Can't do that in startDocument, since we need the tagname
jpayne@68 166 # XXX: obtain DocumentType
jpayne@68 167 node = self.documentFactory.createDocument(uri, tagname, None)
jpayne@68 168 self.document = node
jpayne@68 169 self.lastEvent[1] = [(START_DOCUMENT, node), None]
jpayne@68 170 self.lastEvent = self.lastEvent[1]
jpayne@68 171 self.push(node)
jpayne@68 172 # Put everything we have seen so far into the document
jpayne@68 173 for e in self.pending_events:
jpayne@68 174 if e[0][0] == PROCESSING_INSTRUCTION:
jpayne@68 175 _,target,data = e[0]
jpayne@68 176 n = self.document.createProcessingInstruction(target, data)
jpayne@68 177 e[0] = (PROCESSING_INSTRUCTION, n)
jpayne@68 178 elif e[0][0] == COMMENT:
jpayne@68 179 n = self.document.createComment(e[0][1])
jpayne@68 180 e[0] = (COMMENT, n)
jpayne@68 181 else:
jpayne@68 182 raise AssertionError("Unknown pending event ",e[0][0])
jpayne@68 183 self.lastEvent[1] = e
jpayne@68 184 self.lastEvent = e
jpayne@68 185 self.pending_events = None
jpayne@68 186 return node.firstChild
jpayne@68 187
jpayne@68 188 def endDocument(self):
jpayne@68 189 self.lastEvent[1] = [(END_DOCUMENT, self.document), None]
jpayne@68 190 self.pop()
jpayne@68 191
jpayne@68 192 def clear(self):
jpayne@68 193 "clear(): Explicitly release parsing structures"
jpayne@68 194 self.document = None
jpayne@68 195
jpayne@68 196 class ErrorHandler:
jpayne@68 197 def warning(self, exception):
jpayne@68 198 print(exception)
jpayne@68 199 def error(self, exception):
jpayne@68 200 raise exception
jpayne@68 201 def fatalError(self, exception):
jpayne@68 202 raise exception
jpayne@68 203
jpayne@68 204 class DOMEventStream:
jpayne@68 205 def __init__(self, stream, parser, bufsize):
jpayne@68 206 self.stream = stream
jpayne@68 207 self.parser = parser
jpayne@68 208 self.bufsize = bufsize
jpayne@68 209 if not hasattr(self.parser, 'feed'):
jpayne@68 210 self.getEvent = self._slurp
jpayne@68 211 self.reset()
jpayne@68 212
jpayne@68 213 def reset(self):
jpayne@68 214 self.pulldom = PullDOM()
jpayne@68 215 # This content handler relies on namespace support
jpayne@68 216 self.parser.setFeature(xml.sax.handler.feature_namespaces, 1)
jpayne@68 217 self.parser.setContentHandler(self.pulldom)
jpayne@68 218
jpayne@68 219 def __getitem__(self, pos):
jpayne@68 220 import warnings
jpayne@68 221 warnings.warn(
jpayne@68 222 "DOMEventStream's __getitem__ method ignores 'pos' parameter. "
jpayne@68 223 "Use iterator protocol instead.",
jpayne@68 224 DeprecationWarning,
jpayne@68 225 stacklevel=2
jpayne@68 226 )
jpayne@68 227 rc = self.getEvent()
jpayne@68 228 if rc:
jpayne@68 229 return rc
jpayne@68 230 raise IndexError
jpayne@68 231
jpayne@68 232 def __next__(self):
jpayne@68 233 rc = self.getEvent()
jpayne@68 234 if rc:
jpayne@68 235 return rc
jpayne@68 236 raise StopIteration
jpayne@68 237
jpayne@68 238 def __iter__(self):
jpayne@68 239 return self
jpayne@68 240
jpayne@68 241 def expandNode(self, node):
jpayne@68 242 event = self.getEvent()
jpayne@68 243 parents = [node]
jpayne@68 244 while event:
jpayne@68 245 token, cur_node = event
jpayne@68 246 if cur_node is node:
jpayne@68 247 return
jpayne@68 248 if token != END_ELEMENT:
jpayne@68 249 parents[-1].appendChild(cur_node)
jpayne@68 250 if token == START_ELEMENT:
jpayne@68 251 parents.append(cur_node)
jpayne@68 252 elif token == END_ELEMENT:
jpayne@68 253 del parents[-1]
jpayne@68 254 event = self.getEvent()
jpayne@68 255
jpayne@68 256 def getEvent(self):
jpayne@68 257 # use IncrementalParser interface, so we get the desired
jpayne@68 258 # pull effect
jpayne@68 259 if not self.pulldom.firstEvent[1]:
jpayne@68 260 self.pulldom.lastEvent = self.pulldom.firstEvent
jpayne@68 261 while not self.pulldom.firstEvent[1]:
jpayne@68 262 buf = self.stream.read(self.bufsize)
jpayne@68 263 if not buf:
jpayne@68 264 self.parser.close()
jpayne@68 265 return None
jpayne@68 266 self.parser.feed(buf)
jpayne@68 267 rc = self.pulldom.firstEvent[1][0]
jpayne@68 268 self.pulldom.firstEvent[1] = self.pulldom.firstEvent[1][1]
jpayne@68 269 return rc
jpayne@68 270
jpayne@68 271 def _slurp(self):
jpayne@68 272 """ Fallback replacement for getEvent() using the
jpayne@68 273 standard SAX2 interface, which means we slurp the
jpayne@68 274 SAX events into memory (no performance gain, but
jpayne@68 275 we are compatible to all SAX parsers).
jpayne@68 276 """
jpayne@68 277 self.parser.parse(self.stream)
jpayne@68 278 self.getEvent = self._emit
jpayne@68 279 return self._emit()
jpayne@68 280
jpayne@68 281 def _emit(self):
jpayne@68 282 """ Fallback replacement for getEvent() that emits
jpayne@68 283 the events that _slurp() read previously.
jpayne@68 284 """
jpayne@68 285 rc = self.pulldom.firstEvent[1][0]
jpayne@68 286 self.pulldom.firstEvent[1] = self.pulldom.firstEvent[1][1]
jpayne@68 287 return rc
jpayne@68 288
jpayne@68 289 def clear(self):
jpayne@68 290 """clear(): Explicitly release parsing objects"""
jpayne@68 291 self.pulldom.clear()
jpayne@68 292 del self.pulldom
jpayne@68 293 self.parser = None
jpayne@68 294 self.stream = None
jpayne@68 295
jpayne@68 296 class SAX2DOM(PullDOM):
jpayne@68 297
jpayne@68 298 def startElementNS(self, name, tagName , attrs):
jpayne@68 299 PullDOM.startElementNS(self, name, tagName, attrs)
jpayne@68 300 curNode = self.elementStack[-1]
jpayne@68 301 parentNode = self.elementStack[-2]
jpayne@68 302 parentNode.appendChild(curNode)
jpayne@68 303
jpayne@68 304 def startElement(self, name, attrs):
jpayne@68 305 PullDOM.startElement(self, name, attrs)
jpayne@68 306 curNode = self.elementStack[-1]
jpayne@68 307 parentNode = self.elementStack[-2]
jpayne@68 308 parentNode.appendChild(curNode)
jpayne@68 309
jpayne@68 310 def processingInstruction(self, target, data):
jpayne@68 311 PullDOM.processingInstruction(self, target, data)
jpayne@68 312 node = self.lastEvent[0][1]
jpayne@68 313 parentNode = self.elementStack[-1]
jpayne@68 314 parentNode.appendChild(node)
jpayne@68 315
jpayne@68 316 def ignorableWhitespace(self, chars):
jpayne@68 317 PullDOM.ignorableWhitespace(self, chars)
jpayne@68 318 node = self.lastEvent[0][1]
jpayne@68 319 parentNode = self.elementStack[-1]
jpayne@68 320 parentNode.appendChild(node)
jpayne@68 321
jpayne@68 322 def characters(self, chars):
jpayne@68 323 PullDOM.characters(self, chars)
jpayne@68 324 node = self.lastEvent[0][1]
jpayne@68 325 parentNode = self.elementStack[-1]
jpayne@68 326 parentNode.appendChild(node)
jpayne@68 327
jpayne@68 328
jpayne@68 329 default_bufsize = (2 ** 14) - 20
jpayne@68 330
jpayne@68 331 def parse(stream_or_string, parser=None, bufsize=None):
jpayne@68 332 if bufsize is None:
jpayne@68 333 bufsize = default_bufsize
jpayne@68 334 if isinstance(stream_or_string, str):
jpayne@68 335 stream = open(stream_or_string, 'rb')
jpayne@68 336 else:
jpayne@68 337 stream = stream_or_string
jpayne@68 338 if not parser:
jpayne@68 339 parser = xml.sax.make_parser()
jpayne@68 340 return DOMEventStream(stream, parser, bufsize)
jpayne@68 341
jpayne@68 342 def parseString(string, parser=None):
jpayne@68 343 from io import StringIO
jpayne@68 344
jpayne@68 345 bufsize = len(string)
jpayne@68 346 buf = StringIO(string)
jpayne@68 347 if not parser:
jpayne@68 348 parser = xml.sax.make_parser()
jpayne@68 349 return DOMEventStream(buf, parser, bufsize)