Mercurial > repos > rliterman > csp2
comparison CSP2/CSP2_env/env-d9b9114564458d9d-741b3de822f2aaca6c6caa4325c4afce/lib/python3.8/xml/sax/expatreader.py @ 69:33d812a61356
planemo upload commit 2e9511a184a1ca667c7be0c6321a36dc4e3d116d
author | jpayne |
---|---|
date | Tue, 18 Mar 2025 17:55:14 -0400 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
67:0e9998148a16 | 69:33d812a61356 |
---|---|
1 """ | |
2 SAX driver for the pyexpat C module. This driver works with | |
3 pyexpat.__version__ == '2.22'. | |
4 """ | |
5 | |
6 version = "0.20" | |
7 | |
8 from xml.sax._exceptions import * | |
9 from xml.sax.handler import feature_validation, feature_namespaces | |
10 from xml.sax.handler import feature_namespace_prefixes | |
11 from xml.sax.handler import feature_external_ges, feature_external_pes | |
12 from xml.sax.handler import feature_string_interning | |
13 from xml.sax.handler import property_xml_string, property_interning_dict | |
14 | |
15 # xml.parsers.expat does not raise ImportError in Jython | |
16 import sys | |
17 if sys.platform[:4] == "java": | |
18 raise SAXReaderNotAvailable("expat not available in Java", None) | |
19 del sys | |
20 | |
21 try: | |
22 from xml.parsers import expat | |
23 except ImportError: | |
24 raise SAXReaderNotAvailable("expat not supported", None) | |
25 else: | |
26 if not hasattr(expat, "ParserCreate"): | |
27 raise SAXReaderNotAvailable("expat not supported", None) | |
28 from xml.sax import xmlreader, saxutils, handler | |
29 | |
30 AttributesImpl = xmlreader.AttributesImpl | |
31 AttributesNSImpl = xmlreader.AttributesNSImpl | |
32 | |
33 # If we're using a sufficiently recent version of Python, we can use | |
34 # weak references to avoid cycles between the parser and content | |
35 # handler, otherwise we'll just have to pretend. | |
36 try: | |
37 import _weakref | |
38 except ImportError: | |
39 def _mkproxy(o): | |
40 return o | |
41 else: | |
42 import weakref | |
43 _mkproxy = weakref.proxy | |
44 del weakref, _weakref | |
45 | |
46 class _ClosedParser: | |
47 pass | |
48 | |
49 # --- ExpatLocator | |
50 | |
51 class ExpatLocator(xmlreader.Locator): | |
52 """Locator for use with the ExpatParser class. | |
53 | |
54 This uses a weak reference to the parser object to avoid creating | |
55 a circular reference between the parser and the content handler. | |
56 """ | |
57 def __init__(self, parser): | |
58 self._ref = _mkproxy(parser) | |
59 | |
60 def getColumnNumber(self): | |
61 parser = self._ref | |
62 if parser._parser is None: | |
63 return None | |
64 return parser._parser.ErrorColumnNumber | |
65 | |
66 def getLineNumber(self): | |
67 parser = self._ref | |
68 if parser._parser is None: | |
69 return 1 | |
70 return parser._parser.ErrorLineNumber | |
71 | |
72 def getPublicId(self): | |
73 parser = self._ref | |
74 if parser is None: | |
75 return None | |
76 return parser._source.getPublicId() | |
77 | |
78 def getSystemId(self): | |
79 parser = self._ref | |
80 if parser is None: | |
81 return None | |
82 return parser._source.getSystemId() | |
83 | |
84 | |
85 # --- ExpatParser | |
86 | |
87 class ExpatParser(xmlreader.IncrementalParser, xmlreader.Locator): | |
88 """SAX driver for the pyexpat C module.""" | |
89 | |
90 def __init__(self, namespaceHandling=0, bufsize=2**16-20): | |
91 xmlreader.IncrementalParser.__init__(self, bufsize) | |
92 self._source = xmlreader.InputSource() | |
93 self._parser = None | |
94 self._namespaces = namespaceHandling | |
95 self._lex_handler_prop = None | |
96 self._parsing = 0 | |
97 self._entity_stack = [] | |
98 self._external_ges = 0 | |
99 self._interning = None | |
100 | |
101 # XMLReader methods | |
102 | |
103 def parse(self, source): | |
104 "Parse an XML document from a URL or an InputSource." | |
105 source = saxutils.prepare_input_source(source) | |
106 | |
107 self._source = source | |
108 try: | |
109 self.reset() | |
110 self._cont_handler.setDocumentLocator(ExpatLocator(self)) | |
111 xmlreader.IncrementalParser.parse(self, source) | |
112 except: | |
113 # bpo-30264: Close the source on error to not leak resources: | |
114 # xml.sax.parse() doesn't give access to the underlying parser | |
115 # to the caller | |
116 self._close_source() | |
117 raise | |
118 | |
119 def prepareParser(self, source): | |
120 if source.getSystemId() is not None: | |
121 self._parser.SetBase(source.getSystemId()) | |
122 | |
123 # Redefined setContentHandler to allow changing handlers during parsing | |
124 | |
125 def setContentHandler(self, handler): | |
126 xmlreader.IncrementalParser.setContentHandler(self, handler) | |
127 if self._parsing: | |
128 self._reset_cont_handler() | |
129 | |
130 def getFeature(self, name): | |
131 if name == feature_namespaces: | |
132 return self._namespaces | |
133 elif name == feature_string_interning: | |
134 return self._interning is not None | |
135 elif name in (feature_validation, feature_external_pes, | |
136 feature_namespace_prefixes): | |
137 return 0 | |
138 elif name == feature_external_ges: | |
139 return self._external_ges | |
140 raise SAXNotRecognizedException("Feature '%s' not recognized" % name) | |
141 | |
142 def setFeature(self, name, state): | |
143 if self._parsing: | |
144 raise SAXNotSupportedException("Cannot set features while parsing") | |
145 | |
146 if name == feature_namespaces: | |
147 self._namespaces = state | |
148 elif name == feature_external_ges: | |
149 self._external_ges = state | |
150 elif name == feature_string_interning: | |
151 if state: | |
152 if self._interning is None: | |
153 self._interning = {} | |
154 else: | |
155 self._interning = None | |
156 elif name == feature_validation: | |
157 if state: | |
158 raise SAXNotSupportedException( | |
159 "expat does not support validation") | |
160 elif name == feature_external_pes: | |
161 if state: | |
162 raise SAXNotSupportedException( | |
163 "expat does not read external parameter entities") | |
164 elif name == feature_namespace_prefixes: | |
165 if state: | |
166 raise SAXNotSupportedException( | |
167 "expat does not report namespace prefixes") | |
168 else: | |
169 raise SAXNotRecognizedException( | |
170 "Feature '%s' not recognized" % name) | |
171 | |
172 def getProperty(self, name): | |
173 if name == handler.property_lexical_handler: | |
174 return self._lex_handler_prop | |
175 elif name == property_interning_dict: | |
176 return self._interning | |
177 elif name == property_xml_string: | |
178 if self._parser: | |
179 if hasattr(self._parser, "GetInputContext"): | |
180 return self._parser.GetInputContext() | |
181 else: | |
182 raise SAXNotRecognizedException( | |
183 "This version of expat does not support getting" | |
184 " the XML string") | |
185 else: | |
186 raise SAXNotSupportedException( | |
187 "XML string cannot be returned when not parsing") | |
188 raise SAXNotRecognizedException("Property '%s' not recognized" % name) | |
189 | |
190 def setProperty(self, name, value): | |
191 if name == handler.property_lexical_handler: | |
192 self._lex_handler_prop = value | |
193 if self._parsing: | |
194 self._reset_lex_handler_prop() | |
195 elif name == property_interning_dict: | |
196 self._interning = value | |
197 elif name == property_xml_string: | |
198 raise SAXNotSupportedException("Property '%s' cannot be set" % | |
199 name) | |
200 else: | |
201 raise SAXNotRecognizedException("Property '%s' not recognized" % | |
202 name) | |
203 | |
204 # IncrementalParser methods | |
205 | |
206 def feed(self, data, isFinal = 0): | |
207 if not self._parsing: | |
208 self.reset() | |
209 self._parsing = 1 | |
210 self._cont_handler.startDocument() | |
211 | |
212 try: | |
213 # The isFinal parameter is internal to the expat reader. | |
214 # If it is set to true, expat will check validity of the entire | |
215 # document. When feeding chunks, they are not normally final - | |
216 # except when invoked from close. | |
217 self._parser.Parse(data, isFinal) | |
218 except expat.error as e: | |
219 exc = SAXParseException(expat.ErrorString(e.code), e, self) | |
220 # FIXME: when to invoke error()? | |
221 self._err_handler.fatalError(exc) | |
222 | |
223 def _close_source(self): | |
224 source = self._source | |
225 try: | |
226 file = source.getCharacterStream() | |
227 if file is not None: | |
228 file.close() | |
229 finally: | |
230 file = source.getByteStream() | |
231 if file is not None: | |
232 file.close() | |
233 | |
234 def close(self): | |
235 if (self._entity_stack or self._parser is None or | |
236 isinstance(self._parser, _ClosedParser)): | |
237 # If we are completing an external entity, do nothing here | |
238 return | |
239 try: | |
240 self.feed("", isFinal = 1) | |
241 self._cont_handler.endDocument() | |
242 self._parsing = 0 | |
243 # break cycle created by expat handlers pointing to our methods | |
244 self._parser = None | |
245 finally: | |
246 self._parsing = 0 | |
247 if self._parser is not None: | |
248 # Keep ErrorColumnNumber and ErrorLineNumber after closing. | |
249 parser = _ClosedParser() | |
250 parser.ErrorColumnNumber = self._parser.ErrorColumnNumber | |
251 parser.ErrorLineNumber = self._parser.ErrorLineNumber | |
252 self._parser = parser | |
253 self._close_source() | |
254 | |
255 def _reset_cont_handler(self): | |
256 self._parser.ProcessingInstructionHandler = \ | |
257 self._cont_handler.processingInstruction | |
258 self._parser.CharacterDataHandler = self._cont_handler.characters | |
259 | |
260 def _reset_lex_handler_prop(self): | |
261 lex = self._lex_handler_prop | |
262 parser = self._parser | |
263 if lex is None: | |
264 parser.CommentHandler = None | |
265 parser.StartCdataSectionHandler = None | |
266 parser.EndCdataSectionHandler = None | |
267 parser.StartDoctypeDeclHandler = None | |
268 parser.EndDoctypeDeclHandler = None | |
269 else: | |
270 parser.CommentHandler = lex.comment | |
271 parser.StartCdataSectionHandler = lex.startCDATA | |
272 parser.EndCdataSectionHandler = lex.endCDATA | |
273 parser.StartDoctypeDeclHandler = self.start_doctype_decl | |
274 parser.EndDoctypeDeclHandler = lex.endDTD | |
275 | |
276 def reset(self): | |
277 if self._namespaces: | |
278 self._parser = expat.ParserCreate(self._source.getEncoding(), " ", | |
279 intern=self._interning) | |
280 self._parser.namespace_prefixes = 1 | |
281 self._parser.StartElementHandler = self.start_element_ns | |
282 self._parser.EndElementHandler = self.end_element_ns | |
283 else: | |
284 self._parser = expat.ParserCreate(self._source.getEncoding(), | |
285 intern = self._interning) | |
286 self._parser.StartElementHandler = self.start_element | |
287 self._parser.EndElementHandler = self.end_element | |
288 | |
289 self._reset_cont_handler() | |
290 self._parser.UnparsedEntityDeclHandler = self.unparsed_entity_decl | |
291 self._parser.NotationDeclHandler = self.notation_decl | |
292 self._parser.StartNamespaceDeclHandler = self.start_namespace_decl | |
293 self._parser.EndNamespaceDeclHandler = self.end_namespace_decl | |
294 | |
295 self._decl_handler_prop = None | |
296 if self._lex_handler_prop: | |
297 self._reset_lex_handler_prop() | |
298 # self._parser.DefaultHandler = | |
299 # self._parser.DefaultHandlerExpand = | |
300 # self._parser.NotStandaloneHandler = | |
301 self._parser.ExternalEntityRefHandler = self.external_entity_ref | |
302 try: | |
303 self._parser.SkippedEntityHandler = self.skipped_entity_handler | |
304 except AttributeError: | |
305 # This pyexpat does not support SkippedEntity | |
306 pass | |
307 self._parser.SetParamEntityParsing( | |
308 expat.XML_PARAM_ENTITY_PARSING_UNLESS_STANDALONE) | |
309 | |
310 self._parsing = 0 | |
311 self._entity_stack = [] | |
312 | |
313 # Locator methods | |
314 | |
315 def getColumnNumber(self): | |
316 if self._parser is None: | |
317 return None | |
318 return self._parser.ErrorColumnNumber | |
319 | |
320 def getLineNumber(self): | |
321 if self._parser is None: | |
322 return 1 | |
323 return self._parser.ErrorLineNumber | |
324 | |
325 def getPublicId(self): | |
326 return self._source.getPublicId() | |
327 | |
328 def getSystemId(self): | |
329 return self._source.getSystemId() | |
330 | |
331 # event handlers | |
332 def start_element(self, name, attrs): | |
333 self._cont_handler.startElement(name, AttributesImpl(attrs)) | |
334 | |
335 def end_element(self, name): | |
336 self._cont_handler.endElement(name) | |
337 | |
338 def start_element_ns(self, name, attrs): | |
339 pair = name.split() | |
340 if len(pair) == 1: | |
341 # no namespace | |
342 pair = (None, name) | |
343 elif len(pair) == 3: | |
344 pair = pair[0], pair[1] | |
345 else: | |
346 # default namespace | |
347 pair = tuple(pair) | |
348 | |
349 newattrs = {} | |
350 qnames = {} | |
351 for (aname, value) in attrs.items(): | |
352 parts = aname.split() | |
353 length = len(parts) | |
354 if length == 1: | |
355 # no namespace | |
356 qname = aname | |
357 apair = (None, aname) | |
358 elif length == 3: | |
359 qname = "%s:%s" % (parts[2], parts[1]) | |
360 apair = parts[0], parts[1] | |
361 else: | |
362 # default namespace | |
363 qname = parts[1] | |
364 apair = tuple(parts) | |
365 | |
366 newattrs[apair] = value | |
367 qnames[apair] = qname | |
368 | |
369 self._cont_handler.startElementNS(pair, None, | |
370 AttributesNSImpl(newattrs, qnames)) | |
371 | |
372 def end_element_ns(self, name): | |
373 pair = name.split() | |
374 if len(pair) == 1: | |
375 pair = (None, name) | |
376 elif len(pair) == 3: | |
377 pair = pair[0], pair[1] | |
378 else: | |
379 pair = tuple(pair) | |
380 | |
381 self._cont_handler.endElementNS(pair, None) | |
382 | |
383 # this is not used (call directly to ContentHandler) | |
384 def processing_instruction(self, target, data): | |
385 self._cont_handler.processingInstruction(target, data) | |
386 | |
387 # this is not used (call directly to ContentHandler) | |
388 def character_data(self, data): | |
389 self._cont_handler.characters(data) | |
390 | |
391 def start_namespace_decl(self, prefix, uri): | |
392 self._cont_handler.startPrefixMapping(prefix, uri) | |
393 | |
394 def end_namespace_decl(self, prefix): | |
395 self._cont_handler.endPrefixMapping(prefix) | |
396 | |
397 def start_doctype_decl(self, name, sysid, pubid, has_internal_subset): | |
398 self._lex_handler_prop.startDTD(name, pubid, sysid) | |
399 | |
400 def unparsed_entity_decl(self, name, base, sysid, pubid, notation_name): | |
401 self._dtd_handler.unparsedEntityDecl(name, pubid, sysid, notation_name) | |
402 | |
403 def notation_decl(self, name, base, sysid, pubid): | |
404 self._dtd_handler.notationDecl(name, pubid, sysid) | |
405 | |
406 def external_entity_ref(self, context, base, sysid, pubid): | |
407 if not self._external_ges: | |
408 return 1 | |
409 | |
410 source = self._ent_handler.resolveEntity(pubid, sysid) | |
411 source = saxutils.prepare_input_source(source, | |
412 self._source.getSystemId() or | |
413 "") | |
414 | |
415 self._entity_stack.append((self._parser, self._source)) | |
416 self._parser = self._parser.ExternalEntityParserCreate(context) | |
417 self._source = source | |
418 | |
419 try: | |
420 xmlreader.IncrementalParser.parse(self, source) | |
421 except: | |
422 return 0 # FIXME: save error info here? | |
423 | |
424 (self._parser, self._source) = self._entity_stack[-1] | |
425 del self._entity_stack[-1] | |
426 return 1 | |
427 | |
428 def skipped_entity_handler(self, name, is_pe): | |
429 if is_pe: | |
430 # The SAX spec requires to report skipped PEs with a '%' | |
431 name = '%'+name | |
432 self._cont_handler.skippedEntity(name) | |
433 | |
434 # --- | |
435 | |
436 def create_parser(*args, **kwargs): | |
437 return ExpatParser(*args, **kwargs) | |
438 | |
439 # --- | |
440 | |
441 if __name__ == "__main__": | |
442 import xml.sax.saxutils | |
443 p = create_parser() | |
444 p.setContentHandler(xml.sax.saxutils.XMLGenerator()) | |
445 p.setErrorHandler(xml.sax.ErrorHandler()) | |
446 p.parse("http://www.ibiblio.org/xml/examples/shakespeare/hamlet.xml") |