comparison CSP2/CSP2_env/env-d9b9114564458d9d-741b3de822f2aaca6c6caa4325c4afce/lib/python3.8/xml/dom/pulldom.py @ 68:5028fdace37b

planemo upload commit 2e9511a184a1ca667c7be0c6321a36dc4e3d116d
author jpayne
date Tue, 18 Mar 2025 16:23:26 -0400
parents
children
comparison
equal deleted inserted replaced
67:0e9998148a16 68:5028fdace37b
1 import xml.sax
2 import xml.sax.handler
3
4 START_ELEMENT = "START_ELEMENT"
5 END_ELEMENT = "END_ELEMENT"
6 COMMENT = "COMMENT"
7 START_DOCUMENT = "START_DOCUMENT"
8 END_DOCUMENT = "END_DOCUMENT"
9 PROCESSING_INSTRUCTION = "PROCESSING_INSTRUCTION"
10 IGNORABLE_WHITESPACE = "IGNORABLE_WHITESPACE"
11 CHARACTERS = "CHARACTERS"
12
13 class PullDOM(xml.sax.ContentHandler):
14 _locator = None
15 document = None
16
17 def __init__(self, documentFactory=None):
18 from xml.dom import XML_NAMESPACE
19 self.documentFactory = documentFactory
20 self.firstEvent = [None, None]
21 self.lastEvent = self.firstEvent
22 self.elementStack = []
23 self.push = self.elementStack.append
24 try:
25 self.pop = self.elementStack.pop
26 except AttributeError:
27 # use class' pop instead
28 pass
29 self._ns_contexts = [{XML_NAMESPACE:'xml'}] # contains uri -> prefix dicts
30 self._current_context = self._ns_contexts[-1]
31 self.pending_events = []
32
33 def pop(self):
34 result = self.elementStack[-1]
35 del self.elementStack[-1]
36 return result
37
38 def setDocumentLocator(self, locator):
39 self._locator = locator
40
41 def startPrefixMapping(self, prefix, uri):
42 if not hasattr(self, '_xmlns_attrs'):
43 self._xmlns_attrs = []
44 self._xmlns_attrs.append((prefix or 'xmlns', uri))
45 self._ns_contexts.append(self._current_context.copy())
46 self._current_context[uri] = prefix or None
47
48 def endPrefixMapping(self, prefix):
49 self._current_context = self._ns_contexts.pop()
50
51 def startElementNS(self, name, tagName , attrs):
52 # Retrieve xml namespace declaration attributes.
53 xmlns_uri = 'http://www.w3.org/2000/xmlns/'
54 xmlns_attrs = getattr(self, '_xmlns_attrs', None)
55 if xmlns_attrs is not None:
56 for aname, value in xmlns_attrs:
57 attrs._attrs[(xmlns_uri, aname)] = value
58 self._xmlns_attrs = []
59 uri, localname = name
60 if uri:
61 # When using namespaces, the reader may or may not
62 # provide us with the original name. If not, create
63 # *a* valid tagName from the current context.
64 if tagName is None:
65 prefix = self._current_context[uri]
66 if prefix:
67 tagName = prefix + ":" + localname
68 else:
69 tagName = localname
70 if self.document:
71 node = self.document.createElementNS(uri, tagName)
72 else:
73 node = self.buildDocument(uri, tagName)
74 else:
75 # When the tagname is not prefixed, it just appears as
76 # localname
77 if self.document:
78 node = self.document.createElement(localname)
79 else:
80 node = self.buildDocument(None, localname)
81
82 for aname,value in attrs.items():
83 a_uri, a_localname = aname
84 if a_uri == xmlns_uri:
85 if a_localname == 'xmlns':
86 qname = a_localname
87 else:
88 qname = 'xmlns:' + a_localname
89 attr = self.document.createAttributeNS(a_uri, qname)
90 node.setAttributeNodeNS(attr)
91 elif a_uri:
92 prefix = self._current_context[a_uri]
93 if prefix:
94 qname = prefix + ":" + a_localname
95 else:
96 qname = a_localname
97 attr = self.document.createAttributeNS(a_uri, qname)
98 node.setAttributeNodeNS(attr)
99 else:
100 attr = self.document.createAttribute(a_localname)
101 node.setAttributeNode(attr)
102 attr.value = value
103
104 self.lastEvent[1] = [(START_ELEMENT, node), None]
105 self.lastEvent = self.lastEvent[1]
106 self.push(node)
107
108 def endElementNS(self, name, tagName):
109 self.lastEvent[1] = [(END_ELEMENT, self.pop()), None]
110 self.lastEvent = self.lastEvent[1]
111
112 def startElement(self, name, attrs):
113 if self.document:
114 node = self.document.createElement(name)
115 else:
116 node = self.buildDocument(None, name)
117
118 for aname,value in attrs.items():
119 attr = self.document.createAttribute(aname)
120 attr.value = value
121 node.setAttributeNode(attr)
122
123 self.lastEvent[1] = [(START_ELEMENT, node), None]
124 self.lastEvent = self.lastEvent[1]
125 self.push(node)
126
127 def endElement(self, name):
128 self.lastEvent[1] = [(END_ELEMENT, self.pop()), None]
129 self.lastEvent = self.lastEvent[1]
130
131 def comment(self, s):
132 if self.document:
133 node = self.document.createComment(s)
134 self.lastEvent[1] = [(COMMENT, node), None]
135 self.lastEvent = self.lastEvent[1]
136 else:
137 event = [(COMMENT, s), None]
138 self.pending_events.append(event)
139
140 def processingInstruction(self, target, data):
141 if self.document:
142 node = self.document.createProcessingInstruction(target, data)
143 self.lastEvent[1] = [(PROCESSING_INSTRUCTION, node), None]
144 self.lastEvent = self.lastEvent[1]
145 else:
146 event = [(PROCESSING_INSTRUCTION, target, data), None]
147 self.pending_events.append(event)
148
149 def ignorableWhitespace(self, chars):
150 node = self.document.createTextNode(chars)
151 self.lastEvent[1] = [(IGNORABLE_WHITESPACE, node), None]
152 self.lastEvent = self.lastEvent[1]
153
154 def characters(self, chars):
155 node = self.document.createTextNode(chars)
156 self.lastEvent[1] = [(CHARACTERS, node), None]
157 self.lastEvent = self.lastEvent[1]
158
159 def startDocument(self):
160 if self.documentFactory is None:
161 import xml.dom.minidom
162 self.documentFactory = xml.dom.minidom.Document.implementation
163
164 def buildDocument(self, uri, tagname):
165 # Can't do that in startDocument, since we need the tagname
166 # XXX: obtain DocumentType
167 node = self.documentFactory.createDocument(uri, tagname, None)
168 self.document = node
169 self.lastEvent[1] = [(START_DOCUMENT, node), None]
170 self.lastEvent = self.lastEvent[1]
171 self.push(node)
172 # Put everything we have seen so far into the document
173 for e in self.pending_events:
174 if e[0][0] == PROCESSING_INSTRUCTION:
175 _,target,data = e[0]
176 n = self.document.createProcessingInstruction(target, data)
177 e[0] = (PROCESSING_INSTRUCTION, n)
178 elif e[0][0] == COMMENT:
179 n = self.document.createComment(e[0][1])
180 e[0] = (COMMENT, n)
181 else:
182 raise AssertionError("Unknown pending event ",e[0][0])
183 self.lastEvent[1] = e
184 self.lastEvent = e
185 self.pending_events = None
186 return node.firstChild
187
188 def endDocument(self):
189 self.lastEvent[1] = [(END_DOCUMENT, self.document), None]
190 self.pop()
191
192 def clear(self):
193 "clear(): Explicitly release parsing structures"
194 self.document = None
195
196 class ErrorHandler:
197 def warning(self, exception):
198 print(exception)
199 def error(self, exception):
200 raise exception
201 def fatalError(self, exception):
202 raise exception
203
204 class DOMEventStream:
205 def __init__(self, stream, parser, bufsize):
206 self.stream = stream
207 self.parser = parser
208 self.bufsize = bufsize
209 if not hasattr(self.parser, 'feed'):
210 self.getEvent = self._slurp
211 self.reset()
212
213 def reset(self):
214 self.pulldom = PullDOM()
215 # This content handler relies on namespace support
216 self.parser.setFeature(xml.sax.handler.feature_namespaces, 1)
217 self.parser.setContentHandler(self.pulldom)
218
219 def __getitem__(self, pos):
220 import warnings
221 warnings.warn(
222 "DOMEventStream's __getitem__ method ignores 'pos' parameter. "
223 "Use iterator protocol instead.",
224 DeprecationWarning,
225 stacklevel=2
226 )
227 rc = self.getEvent()
228 if rc:
229 return rc
230 raise IndexError
231
232 def __next__(self):
233 rc = self.getEvent()
234 if rc:
235 return rc
236 raise StopIteration
237
238 def __iter__(self):
239 return self
240
241 def expandNode(self, node):
242 event = self.getEvent()
243 parents = [node]
244 while event:
245 token, cur_node = event
246 if cur_node is node:
247 return
248 if token != END_ELEMENT:
249 parents[-1].appendChild(cur_node)
250 if token == START_ELEMENT:
251 parents.append(cur_node)
252 elif token == END_ELEMENT:
253 del parents[-1]
254 event = self.getEvent()
255
256 def getEvent(self):
257 # use IncrementalParser interface, so we get the desired
258 # pull effect
259 if not self.pulldom.firstEvent[1]:
260 self.pulldom.lastEvent = self.pulldom.firstEvent
261 while not self.pulldom.firstEvent[1]:
262 buf = self.stream.read(self.bufsize)
263 if not buf:
264 self.parser.close()
265 return None
266 self.parser.feed(buf)
267 rc = self.pulldom.firstEvent[1][0]
268 self.pulldom.firstEvent[1] = self.pulldom.firstEvent[1][1]
269 return rc
270
271 def _slurp(self):
272 """ Fallback replacement for getEvent() using the
273 standard SAX2 interface, which means we slurp the
274 SAX events into memory (no performance gain, but
275 we are compatible to all SAX parsers).
276 """
277 self.parser.parse(self.stream)
278 self.getEvent = self._emit
279 return self._emit()
280
281 def _emit(self):
282 """ Fallback replacement for getEvent() that emits
283 the events that _slurp() read previously.
284 """
285 rc = self.pulldom.firstEvent[1][0]
286 self.pulldom.firstEvent[1] = self.pulldom.firstEvent[1][1]
287 return rc
288
289 def clear(self):
290 """clear(): Explicitly release parsing objects"""
291 self.pulldom.clear()
292 del self.pulldom
293 self.parser = None
294 self.stream = None
295
296 class SAX2DOM(PullDOM):
297
298 def startElementNS(self, name, tagName , attrs):
299 PullDOM.startElementNS(self, name, tagName, attrs)
300 curNode = self.elementStack[-1]
301 parentNode = self.elementStack[-2]
302 parentNode.appendChild(curNode)
303
304 def startElement(self, name, attrs):
305 PullDOM.startElement(self, name, attrs)
306 curNode = self.elementStack[-1]
307 parentNode = self.elementStack[-2]
308 parentNode.appendChild(curNode)
309
310 def processingInstruction(self, target, data):
311 PullDOM.processingInstruction(self, target, data)
312 node = self.lastEvent[0][1]
313 parentNode = self.elementStack[-1]
314 parentNode.appendChild(node)
315
316 def ignorableWhitespace(self, chars):
317 PullDOM.ignorableWhitespace(self, chars)
318 node = self.lastEvent[0][1]
319 parentNode = self.elementStack[-1]
320 parentNode.appendChild(node)
321
322 def characters(self, chars):
323 PullDOM.characters(self, chars)
324 node = self.lastEvent[0][1]
325 parentNode = self.elementStack[-1]
326 parentNode.appendChild(node)
327
328
329 default_bufsize = (2 ** 14) - 20
330
331 def parse(stream_or_string, parser=None, bufsize=None):
332 if bufsize is None:
333 bufsize = default_bufsize
334 if isinstance(stream_or_string, str):
335 stream = open(stream_or_string, 'rb')
336 else:
337 stream = stream_or_string
338 if not parser:
339 parser = xml.sax.make_parser()
340 return DOMEventStream(stream, parser, bufsize)
341
342 def parseString(string, parser=None):
343 from io import StringIO
344
345 bufsize = len(string)
346 buf = StringIO(string)
347 if not parser:
348 parser = xml.sax.make_parser()
349 return DOMEventStream(buf, parser, bufsize)