Mercurial > repos > rliterman > csp2
comparison CSP2/CSP2_env/env-d9b9114564458d9d-741b3de822f2aaca6c6caa4325c4afce/lib/python3.8/xml/dom/pulldom.py @ 68:5028fdace37b
planemo upload commit 2e9511a184a1ca667c7be0c6321a36dc4e3d116d
author | jpayne |
---|---|
date | Tue, 18 Mar 2025 16:23:26 -0400 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
67:0e9998148a16 | 68:5028fdace37b |
---|---|
1 import xml.sax | |
2 import xml.sax.handler | |
3 | |
4 START_ELEMENT = "START_ELEMENT" | |
5 END_ELEMENT = "END_ELEMENT" | |
6 COMMENT = "COMMENT" | |
7 START_DOCUMENT = "START_DOCUMENT" | |
8 END_DOCUMENT = "END_DOCUMENT" | |
9 PROCESSING_INSTRUCTION = "PROCESSING_INSTRUCTION" | |
10 IGNORABLE_WHITESPACE = "IGNORABLE_WHITESPACE" | |
11 CHARACTERS = "CHARACTERS" | |
12 | |
13 class PullDOM(xml.sax.ContentHandler): | |
14 _locator = None | |
15 document = None | |
16 | |
17 def __init__(self, documentFactory=None): | |
18 from xml.dom import XML_NAMESPACE | |
19 self.documentFactory = documentFactory | |
20 self.firstEvent = [None, None] | |
21 self.lastEvent = self.firstEvent | |
22 self.elementStack = [] | |
23 self.push = self.elementStack.append | |
24 try: | |
25 self.pop = self.elementStack.pop | |
26 except AttributeError: | |
27 # use class' pop instead | |
28 pass | |
29 self._ns_contexts = [{XML_NAMESPACE:'xml'}] # contains uri -> prefix dicts | |
30 self._current_context = self._ns_contexts[-1] | |
31 self.pending_events = [] | |
32 | |
33 def pop(self): | |
34 result = self.elementStack[-1] | |
35 del self.elementStack[-1] | |
36 return result | |
37 | |
38 def setDocumentLocator(self, locator): | |
39 self._locator = locator | |
40 | |
41 def startPrefixMapping(self, prefix, uri): | |
42 if not hasattr(self, '_xmlns_attrs'): | |
43 self._xmlns_attrs = [] | |
44 self._xmlns_attrs.append((prefix or 'xmlns', uri)) | |
45 self._ns_contexts.append(self._current_context.copy()) | |
46 self._current_context[uri] = prefix or None | |
47 | |
48 def endPrefixMapping(self, prefix): | |
49 self._current_context = self._ns_contexts.pop() | |
50 | |
51 def startElementNS(self, name, tagName , attrs): | |
52 # Retrieve xml namespace declaration attributes. | |
53 xmlns_uri = 'http://www.w3.org/2000/xmlns/' | |
54 xmlns_attrs = getattr(self, '_xmlns_attrs', None) | |
55 if xmlns_attrs is not None: | |
56 for aname, value in xmlns_attrs: | |
57 attrs._attrs[(xmlns_uri, aname)] = value | |
58 self._xmlns_attrs = [] | |
59 uri, localname = name | |
60 if uri: | |
61 # When using namespaces, the reader may or may not | |
62 # provide us with the original name. If not, create | |
63 # *a* valid tagName from the current context. | |
64 if tagName is None: | |
65 prefix = self._current_context[uri] | |
66 if prefix: | |
67 tagName = prefix + ":" + localname | |
68 else: | |
69 tagName = localname | |
70 if self.document: | |
71 node = self.document.createElementNS(uri, tagName) | |
72 else: | |
73 node = self.buildDocument(uri, tagName) | |
74 else: | |
75 # When the tagname is not prefixed, it just appears as | |
76 # localname | |
77 if self.document: | |
78 node = self.document.createElement(localname) | |
79 else: | |
80 node = self.buildDocument(None, localname) | |
81 | |
82 for aname,value in attrs.items(): | |
83 a_uri, a_localname = aname | |
84 if a_uri == xmlns_uri: | |
85 if a_localname == 'xmlns': | |
86 qname = a_localname | |
87 else: | |
88 qname = 'xmlns:' + a_localname | |
89 attr = self.document.createAttributeNS(a_uri, qname) | |
90 node.setAttributeNodeNS(attr) | |
91 elif a_uri: | |
92 prefix = self._current_context[a_uri] | |
93 if prefix: | |
94 qname = prefix + ":" + a_localname | |
95 else: | |
96 qname = a_localname | |
97 attr = self.document.createAttributeNS(a_uri, qname) | |
98 node.setAttributeNodeNS(attr) | |
99 else: | |
100 attr = self.document.createAttribute(a_localname) | |
101 node.setAttributeNode(attr) | |
102 attr.value = value | |
103 | |
104 self.lastEvent[1] = [(START_ELEMENT, node), None] | |
105 self.lastEvent = self.lastEvent[1] | |
106 self.push(node) | |
107 | |
108 def endElementNS(self, name, tagName): | |
109 self.lastEvent[1] = [(END_ELEMENT, self.pop()), None] | |
110 self.lastEvent = self.lastEvent[1] | |
111 | |
112 def startElement(self, name, attrs): | |
113 if self.document: | |
114 node = self.document.createElement(name) | |
115 else: | |
116 node = self.buildDocument(None, name) | |
117 | |
118 for aname,value in attrs.items(): | |
119 attr = self.document.createAttribute(aname) | |
120 attr.value = value | |
121 node.setAttributeNode(attr) | |
122 | |
123 self.lastEvent[1] = [(START_ELEMENT, node), None] | |
124 self.lastEvent = self.lastEvent[1] | |
125 self.push(node) | |
126 | |
127 def endElement(self, name): | |
128 self.lastEvent[1] = [(END_ELEMENT, self.pop()), None] | |
129 self.lastEvent = self.lastEvent[1] | |
130 | |
131 def comment(self, s): | |
132 if self.document: | |
133 node = self.document.createComment(s) | |
134 self.lastEvent[1] = [(COMMENT, node), None] | |
135 self.lastEvent = self.lastEvent[1] | |
136 else: | |
137 event = [(COMMENT, s), None] | |
138 self.pending_events.append(event) | |
139 | |
140 def processingInstruction(self, target, data): | |
141 if self.document: | |
142 node = self.document.createProcessingInstruction(target, data) | |
143 self.lastEvent[1] = [(PROCESSING_INSTRUCTION, node), None] | |
144 self.lastEvent = self.lastEvent[1] | |
145 else: | |
146 event = [(PROCESSING_INSTRUCTION, target, data), None] | |
147 self.pending_events.append(event) | |
148 | |
149 def ignorableWhitespace(self, chars): | |
150 node = self.document.createTextNode(chars) | |
151 self.lastEvent[1] = [(IGNORABLE_WHITESPACE, node), None] | |
152 self.lastEvent = self.lastEvent[1] | |
153 | |
154 def characters(self, chars): | |
155 node = self.document.createTextNode(chars) | |
156 self.lastEvent[1] = [(CHARACTERS, node), None] | |
157 self.lastEvent = self.lastEvent[1] | |
158 | |
159 def startDocument(self): | |
160 if self.documentFactory is None: | |
161 import xml.dom.minidom | |
162 self.documentFactory = xml.dom.minidom.Document.implementation | |
163 | |
164 def buildDocument(self, uri, tagname): | |
165 # Can't do that in startDocument, since we need the tagname | |
166 # XXX: obtain DocumentType | |
167 node = self.documentFactory.createDocument(uri, tagname, None) | |
168 self.document = node | |
169 self.lastEvent[1] = [(START_DOCUMENT, node), None] | |
170 self.lastEvent = self.lastEvent[1] | |
171 self.push(node) | |
172 # Put everything we have seen so far into the document | |
173 for e in self.pending_events: | |
174 if e[0][0] == PROCESSING_INSTRUCTION: | |
175 _,target,data = e[0] | |
176 n = self.document.createProcessingInstruction(target, data) | |
177 e[0] = (PROCESSING_INSTRUCTION, n) | |
178 elif e[0][0] == COMMENT: | |
179 n = self.document.createComment(e[0][1]) | |
180 e[0] = (COMMENT, n) | |
181 else: | |
182 raise AssertionError("Unknown pending event ",e[0][0]) | |
183 self.lastEvent[1] = e | |
184 self.lastEvent = e | |
185 self.pending_events = None | |
186 return node.firstChild | |
187 | |
188 def endDocument(self): | |
189 self.lastEvent[1] = [(END_DOCUMENT, self.document), None] | |
190 self.pop() | |
191 | |
192 def clear(self): | |
193 "clear(): Explicitly release parsing structures" | |
194 self.document = None | |
195 | |
196 class ErrorHandler: | |
197 def warning(self, exception): | |
198 print(exception) | |
199 def error(self, exception): | |
200 raise exception | |
201 def fatalError(self, exception): | |
202 raise exception | |
203 | |
204 class DOMEventStream: | |
205 def __init__(self, stream, parser, bufsize): | |
206 self.stream = stream | |
207 self.parser = parser | |
208 self.bufsize = bufsize | |
209 if not hasattr(self.parser, 'feed'): | |
210 self.getEvent = self._slurp | |
211 self.reset() | |
212 | |
213 def reset(self): | |
214 self.pulldom = PullDOM() | |
215 # This content handler relies on namespace support | |
216 self.parser.setFeature(xml.sax.handler.feature_namespaces, 1) | |
217 self.parser.setContentHandler(self.pulldom) | |
218 | |
219 def __getitem__(self, pos): | |
220 import warnings | |
221 warnings.warn( | |
222 "DOMEventStream's __getitem__ method ignores 'pos' parameter. " | |
223 "Use iterator protocol instead.", | |
224 DeprecationWarning, | |
225 stacklevel=2 | |
226 ) | |
227 rc = self.getEvent() | |
228 if rc: | |
229 return rc | |
230 raise IndexError | |
231 | |
232 def __next__(self): | |
233 rc = self.getEvent() | |
234 if rc: | |
235 return rc | |
236 raise StopIteration | |
237 | |
238 def __iter__(self): | |
239 return self | |
240 | |
241 def expandNode(self, node): | |
242 event = self.getEvent() | |
243 parents = [node] | |
244 while event: | |
245 token, cur_node = event | |
246 if cur_node is node: | |
247 return | |
248 if token != END_ELEMENT: | |
249 parents[-1].appendChild(cur_node) | |
250 if token == START_ELEMENT: | |
251 parents.append(cur_node) | |
252 elif token == END_ELEMENT: | |
253 del parents[-1] | |
254 event = self.getEvent() | |
255 | |
256 def getEvent(self): | |
257 # use IncrementalParser interface, so we get the desired | |
258 # pull effect | |
259 if not self.pulldom.firstEvent[1]: | |
260 self.pulldom.lastEvent = self.pulldom.firstEvent | |
261 while not self.pulldom.firstEvent[1]: | |
262 buf = self.stream.read(self.bufsize) | |
263 if not buf: | |
264 self.parser.close() | |
265 return None | |
266 self.parser.feed(buf) | |
267 rc = self.pulldom.firstEvent[1][0] | |
268 self.pulldom.firstEvent[1] = self.pulldom.firstEvent[1][1] | |
269 return rc | |
270 | |
271 def _slurp(self): | |
272 """ Fallback replacement for getEvent() using the | |
273 standard SAX2 interface, which means we slurp the | |
274 SAX events into memory (no performance gain, but | |
275 we are compatible to all SAX parsers). | |
276 """ | |
277 self.parser.parse(self.stream) | |
278 self.getEvent = self._emit | |
279 return self._emit() | |
280 | |
281 def _emit(self): | |
282 """ Fallback replacement for getEvent() that emits | |
283 the events that _slurp() read previously. | |
284 """ | |
285 rc = self.pulldom.firstEvent[1][0] | |
286 self.pulldom.firstEvent[1] = self.pulldom.firstEvent[1][1] | |
287 return rc | |
288 | |
289 def clear(self): | |
290 """clear(): Explicitly release parsing objects""" | |
291 self.pulldom.clear() | |
292 del self.pulldom | |
293 self.parser = None | |
294 self.stream = None | |
295 | |
296 class SAX2DOM(PullDOM): | |
297 | |
298 def startElementNS(self, name, tagName , attrs): | |
299 PullDOM.startElementNS(self, name, tagName, attrs) | |
300 curNode = self.elementStack[-1] | |
301 parentNode = self.elementStack[-2] | |
302 parentNode.appendChild(curNode) | |
303 | |
304 def startElement(self, name, attrs): | |
305 PullDOM.startElement(self, name, attrs) | |
306 curNode = self.elementStack[-1] | |
307 parentNode = self.elementStack[-2] | |
308 parentNode.appendChild(curNode) | |
309 | |
310 def processingInstruction(self, target, data): | |
311 PullDOM.processingInstruction(self, target, data) | |
312 node = self.lastEvent[0][1] | |
313 parentNode = self.elementStack[-1] | |
314 parentNode.appendChild(node) | |
315 | |
316 def ignorableWhitespace(self, chars): | |
317 PullDOM.ignorableWhitespace(self, chars) | |
318 node = self.lastEvent[0][1] | |
319 parentNode = self.elementStack[-1] | |
320 parentNode.appendChild(node) | |
321 | |
322 def characters(self, chars): | |
323 PullDOM.characters(self, chars) | |
324 node = self.lastEvent[0][1] | |
325 parentNode = self.elementStack[-1] | |
326 parentNode.appendChild(node) | |
327 | |
328 | |
329 default_bufsize = (2 ** 14) - 20 | |
330 | |
331 def parse(stream_or_string, parser=None, bufsize=None): | |
332 if bufsize is None: | |
333 bufsize = default_bufsize | |
334 if isinstance(stream_or_string, str): | |
335 stream = open(stream_or_string, 'rb') | |
336 else: | |
337 stream = stream_or_string | |
338 if not parser: | |
339 parser = xml.sax.make_parser() | |
340 return DOMEventStream(stream, parser, bufsize) | |
341 | |
342 def parseString(string, parser=None): | |
343 from io import StringIO | |
344 | |
345 bufsize = len(string) | |
346 buf = StringIO(string) | |
347 if not parser: | |
348 parser = xml.sax.make_parser() | |
349 return DOMEventStream(buf, parser, bufsize) |