Mercurial > repos > jpayne > bioproject_to_srr_2
comparison charset_normalizer/api.py @ 7:5eb2d5e3bf22
planemo upload for repository https://toolrepo.galaxytrakr.org/view/jpayne/bioproject_to_srr_2/556cac4fb538
author | jpayne |
---|---|
date | Sun, 05 May 2024 23:32:17 -0400 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
6:b2745907b1eb | 7:5eb2d5e3bf22 |
---|---|
1 import logging | |
2 from os import PathLike | |
3 from typing import BinaryIO, List, Optional, Set, Union | |
4 | |
5 from .cd import ( | |
6 coherence_ratio, | |
7 encoding_languages, | |
8 mb_encoding_languages, | |
9 merge_coherence_ratios, | |
10 ) | |
11 from .constant import IANA_SUPPORTED, TOO_BIG_SEQUENCE, TOO_SMALL_SEQUENCE, TRACE | |
12 from .md import mess_ratio | |
13 from .models import CharsetMatch, CharsetMatches | |
14 from .utils import ( | |
15 any_specified_encoding, | |
16 cut_sequence_chunks, | |
17 iana_name, | |
18 identify_sig_or_bom, | |
19 is_cp_similar, | |
20 is_multi_byte_encoding, | |
21 should_strip_sig_or_bom, | |
22 ) | |
23 | |
24 # Will most likely be controversial | |
25 # logging.addLevelName(TRACE, "TRACE") | |
26 logger = logging.getLogger("charset_normalizer") | |
27 explain_handler = logging.StreamHandler() | |
28 explain_handler.setFormatter( | |
29 logging.Formatter("%(asctime)s | %(levelname)s | %(message)s") | |
30 ) | |
31 | |
32 | |
33 def from_bytes( | |
34 sequences: Union[bytes, bytearray], | |
35 steps: int = 5, | |
36 chunk_size: int = 512, | |
37 threshold: float = 0.2, | |
38 cp_isolation: Optional[List[str]] = None, | |
39 cp_exclusion: Optional[List[str]] = None, | |
40 preemptive_behaviour: bool = True, | |
41 explain: bool = False, | |
42 language_threshold: float = 0.1, | |
43 enable_fallback: bool = True, | |
44 ) -> CharsetMatches: | |
45 """ | |
46 Given a raw bytes sequence, return the best possibles charset usable to render str objects. | |
47 If there is no results, it is a strong indicator that the source is binary/not text. | |
48 By default, the process will extract 5 blocks of 512o each to assess the mess and coherence of a given sequence. | |
49 And will give up a particular code page after 20% of measured mess. Those criteria are customizable at will. | |
50 | |
51 The preemptive behavior DOES NOT replace the traditional detection workflow, it prioritize a particular code page | |
52 but never take it for granted. Can improve the performance. | |
53 | |
54 You may want to focus your attention to some code page or/and not others, use cp_isolation and cp_exclusion for that | |
55 purpose. | |
56 | |
57 This function will strip the SIG in the payload/sequence every time except on UTF-16, UTF-32. | |
58 By default the library does not setup any handler other than the NullHandler, if you choose to set the 'explain' | |
59 toggle to True it will alter the logger configuration to add a StreamHandler that is suitable for debugging. | |
60 Custom logging format and handler can be set manually. | |
61 """ | |
62 | |
63 if not isinstance(sequences, (bytearray, bytes)): | |
64 raise TypeError( | |
65 "Expected object of type bytes or bytearray, got: {0}".format( | |
66 type(sequences) | |
67 ) | |
68 ) | |
69 | |
70 if explain: | |
71 previous_logger_level: int = logger.level | |
72 logger.addHandler(explain_handler) | |
73 logger.setLevel(TRACE) | |
74 | |
75 length: int = len(sequences) | |
76 | |
77 if length == 0: | |
78 logger.debug("Encoding detection on empty bytes, assuming utf_8 intention.") | |
79 if explain: | |
80 logger.removeHandler(explain_handler) | |
81 logger.setLevel(previous_logger_level or logging.WARNING) | |
82 return CharsetMatches([CharsetMatch(sequences, "utf_8", 0.0, False, [], "")]) | |
83 | |
84 if cp_isolation is not None: | |
85 logger.log( | |
86 TRACE, | |
87 "cp_isolation is set. use this flag for debugging purpose. " | |
88 "limited list of encoding allowed : %s.", | |
89 ", ".join(cp_isolation), | |
90 ) | |
91 cp_isolation = [iana_name(cp, False) for cp in cp_isolation] | |
92 else: | |
93 cp_isolation = [] | |
94 | |
95 if cp_exclusion is not None: | |
96 logger.log( | |
97 TRACE, | |
98 "cp_exclusion is set. use this flag for debugging purpose. " | |
99 "limited list of encoding excluded : %s.", | |
100 ", ".join(cp_exclusion), | |
101 ) | |
102 cp_exclusion = [iana_name(cp, False) for cp in cp_exclusion] | |
103 else: | |
104 cp_exclusion = [] | |
105 | |
106 if length <= (chunk_size * steps): | |
107 logger.log( | |
108 TRACE, | |
109 "override steps (%i) and chunk_size (%i) as content does not fit (%i byte(s) given) parameters.", | |
110 steps, | |
111 chunk_size, | |
112 length, | |
113 ) | |
114 steps = 1 | |
115 chunk_size = length | |
116 | |
117 if steps > 1 and length / steps < chunk_size: | |
118 chunk_size = int(length / steps) | |
119 | |
120 is_too_small_sequence: bool = len(sequences) < TOO_SMALL_SEQUENCE | |
121 is_too_large_sequence: bool = len(sequences) >= TOO_BIG_SEQUENCE | |
122 | |
123 if is_too_small_sequence: | |
124 logger.log( | |
125 TRACE, | |
126 "Trying to detect encoding from a tiny portion of ({}) byte(s).".format( | |
127 length | |
128 ), | |
129 ) | |
130 elif is_too_large_sequence: | |
131 logger.log( | |
132 TRACE, | |
133 "Using lazy str decoding because the payload is quite large, ({}) byte(s).".format( | |
134 length | |
135 ), | |
136 ) | |
137 | |
138 prioritized_encodings: List[str] = [] | |
139 | |
140 specified_encoding: Optional[str] = ( | |
141 any_specified_encoding(sequences) if preemptive_behaviour else None | |
142 ) | |
143 | |
144 if specified_encoding is not None: | |
145 prioritized_encodings.append(specified_encoding) | |
146 logger.log( | |
147 TRACE, | |
148 "Detected declarative mark in sequence. Priority +1 given for %s.", | |
149 specified_encoding, | |
150 ) | |
151 | |
152 tested: Set[str] = set() | |
153 tested_but_hard_failure: List[str] = [] | |
154 tested_but_soft_failure: List[str] = [] | |
155 | |
156 fallback_ascii: Optional[CharsetMatch] = None | |
157 fallback_u8: Optional[CharsetMatch] = None | |
158 fallback_specified: Optional[CharsetMatch] = None | |
159 | |
160 results: CharsetMatches = CharsetMatches() | |
161 | |
162 sig_encoding, sig_payload = identify_sig_or_bom(sequences) | |
163 | |
164 if sig_encoding is not None: | |
165 prioritized_encodings.append(sig_encoding) | |
166 logger.log( | |
167 TRACE, | |
168 "Detected a SIG or BOM mark on first %i byte(s). Priority +1 given for %s.", | |
169 len(sig_payload), | |
170 sig_encoding, | |
171 ) | |
172 | |
173 prioritized_encodings.append("ascii") | |
174 | |
175 if "utf_8" not in prioritized_encodings: | |
176 prioritized_encodings.append("utf_8") | |
177 | |
178 for encoding_iana in prioritized_encodings + IANA_SUPPORTED: | |
179 if cp_isolation and encoding_iana not in cp_isolation: | |
180 continue | |
181 | |
182 if cp_exclusion and encoding_iana in cp_exclusion: | |
183 continue | |
184 | |
185 if encoding_iana in tested: | |
186 continue | |
187 | |
188 tested.add(encoding_iana) | |
189 | |
190 decoded_payload: Optional[str] = None | |
191 bom_or_sig_available: bool = sig_encoding == encoding_iana | |
192 strip_sig_or_bom: bool = bom_or_sig_available and should_strip_sig_or_bom( | |
193 encoding_iana | |
194 ) | |
195 | |
196 if encoding_iana in {"utf_16", "utf_32"} and not bom_or_sig_available: | |
197 logger.log( | |
198 TRACE, | |
199 "Encoding %s won't be tested as-is because it require a BOM. Will try some sub-encoder LE/BE.", | |
200 encoding_iana, | |
201 ) | |
202 continue | |
203 if encoding_iana in {"utf_7"} and not bom_or_sig_available: | |
204 logger.log( | |
205 TRACE, | |
206 "Encoding %s won't be tested as-is because detection is unreliable without BOM/SIG.", | |
207 encoding_iana, | |
208 ) | |
209 continue | |
210 | |
211 try: | |
212 is_multi_byte_decoder: bool = is_multi_byte_encoding(encoding_iana) | |
213 except (ModuleNotFoundError, ImportError): | |
214 logger.log( | |
215 TRACE, | |
216 "Encoding %s does not provide an IncrementalDecoder", | |
217 encoding_iana, | |
218 ) | |
219 continue | |
220 | |
221 try: | |
222 if is_too_large_sequence and is_multi_byte_decoder is False: | |
223 str( | |
224 sequences[: int(50e4)] | |
225 if strip_sig_or_bom is False | |
226 else sequences[len(sig_payload) : int(50e4)], | |
227 encoding=encoding_iana, | |
228 ) | |
229 else: | |
230 decoded_payload = str( | |
231 sequences | |
232 if strip_sig_or_bom is False | |
233 else sequences[len(sig_payload) :], | |
234 encoding=encoding_iana, | |
235 ) | |
236 except (UnicodeDecodeError, LookupError) as e: | |
237 if not isinstance(e, LookupError): | |
238 logger.log( | |
239 TRACE, | |
240 "Code page %s does not fit given bytes sequence at ALL. %s", | |
241 encoding_iana, | |
242 str(e), | |
243 ) | |
244 tested_but_hard_failure.append(encoding_iana) | |
245 continue | |
246 | |
247 similar_soft_failure_test: bool = False | |
248 | |
249 for encoding_soft_failed in tested_but_soft_failure: | |
250 if is_cp_similar(encoding_iana, encoding_soft_failed): | |
251 similar_soft_failure_test = True | |
252 break | |
253 | |
254 if similar_soft_failure_test: | |
255 logger.log( | |
256 TRACE, | |
257 "%s is deemed too similar to code page %s and was consider unsuited already. Continuing!", | |
258 encoding_iana, | |
259 encoding_soft_failed, | |
260 ) | |
261 continue | |
262 | |
263 r_ = range( | |
264 0 if not bom_or_sig_available else len(sig_payload), | |
265 length, | |
266 int(length / steps), | |
267 ) | |
268 | |
269 multi_byte_bonus: bool = ( | |
270 is_multi_byte_decoder | |
271 and decoded_payload is not None | |
272 and len(decoded_payload) < length | |
273 ) | |
274 | |
275 if multi_byte_bonus: | |
276 logger.log( | |
277 TRACE, | |
278 "Code page %s is a multi byte encoding table and it appear that at least one character " | |
279 "was encoded using n-bytes.", | |
280 encoding_iana, | |
281 ) | |
282 | |
283 max_chunk_gave_up: int = int(len(r_) / 4) | |
284 | |
285 max_chunk_gave_up = max(max_chunk_gave_up, 2) | |
286 early_stop_count: int = 0 | |
287 lazy_str_hard_failure = False | |
288 | |
289 md_chunks: List[str] = [] | |
290 md_ratios = [] | |
291 | |
292 try: | |
293 for chunk in cut_sequence_chunks( | |
294 sequences, | |
295 encoding_iana, | |
296 r_, | |
297 chunk_size, | |
298 bom_or_sig_available, | |
299 strip_sig_or_bom, | |
300 sig_payload, | |
301 is_multi_byte_decoder, | |
302 decoded_payload, | |
303 ): | |
304 md_chunks.append(chunk) | |
305 | |
306 md_ratios.append( | |
307 mess_ratio( | |
308 chunk, | |
309 threshold, | |
310 explain is True and 1 <= len(cp_isolation) <= 2, | |
311 ) | |
312 ) | |
313 | |
314 if md_ratios[-1] >= threshold: | |
315 early_stop_count += 1 | |
316 | |
317 if (early_stop_count >= max_chunk_gave_up) or ( | |
318 bom_or_sig_available and strip_sig_or_bom is False | |
319 ): | |
320 break | |
321 except ( | |
322 UnicodeDecodeError | |
323 ) as e: # Lazy str loading may have missed something there | |
324 logger.log( | |
325 TRACE, | |
326 "LazyStr Loading: After MD chunk decode, code page %s does not fit given bytes sequence at ALL. %s", | |
327 encoding_iana, | |
328 str(e), | |
329 ) | |
330 early_stop_count = max_chunk_gave_up | |
331 lazy_str_hard_failure = True | |
332 | |
333 # We might want to check the sequence again with the whole content | |
334 # Only if initial MD tests passes | |
335 if ( | |
336 not lazy_str_hard_failure | |
337 and is_too_large_sequence | |
338 and not is_multi_byte_decoder | |
339 ): | |
340 try: | |
341 sequences[int(50e3) :].decode(encoding_iana, errors="strict") | |
342 except UnicodeDecodeError as e: | |
343 logger.log( | |
344 TRACE, | |
345 "LazyStr Loading: After final lookup, code page %s does not fit given bytes sequence at ALL. %s", | |
346 encoding_iana, | |
347 str(e), | |
348 ) | |
349 tested_but_hard_failure.append(encoding_iana) | |
350 continue | |
351 | |
352 mean_mess_ratio: float = sum(md_ratios) / len(md_ratios) if md_ratios else 0.0 | |
353 if mean_mess_ratio >= threshold or early_stop_count >= max_chunk_gave_up: | |
354 tested_but_soft_failure.append(encoding_iana) | |
355 logger.log( | |
356 TRACE, | |
357 "%s was excluded because of initial chaos probing. Gave up %i time(s). " | |
358 "Computed mean chaos is %f %%.", | |
359 encoding_iana, | |
360 early_stop_count, | |
361 round(mean_mess_ratio * 100, ndigits=3), | |
362 ) | |
363 # Preparing those fallbacks in case we got nothing. | |
364 if ( | |
365 enable_fallback | |
366 and encoding_iana in ["ascii", "utf_8", specified_encoding] | |
367 and not lazy_str_hard_failure | |
368 ): | |
369 fallback_entry = CharsetMatch( | |
370 sequences, encoding_iana, threshold, False, [], decoded_payload | |
371 ) | |
372 if encoding_iana == specified_encoding: | |
373 fallback_specified = fallback_entry | |
374 elif encoding_iana == "ascii": | |
375 fallback_ascii = fallback_entry | |
376 else: | |
377 fallback_u8 = fallback_entry | |
378 continue | |
379 | |
380 logger.log( | |
381 TRACE, | |
382 "%s passed initial chaos probing. Mean measured chaos is %f %%", | |
383 encoding_iana, | |
384 round(mean_mess_ratio * 100, ndigits=3), | |
385 ) | |
386 | |
387 if not is_multi_byte_decoder: | |
388 target_languages: List[str] = encoding_languages(encoding_iana) | |
389 else: | |
390 target_languages = mb_encoding_languages(encoding_iana) | |
391 | |
392 if target_languages: | |
393 logger.log( | |
394 TRACE, | |
395 "{} should target any language(s) of {}".format( | |
396 encoding_iana, str(target_languages) | |
397 ), | |
398 ) | |
399 | |
400 cd_ratios = [] | |
401 | |
402 # We shall skip the CD when its about ASCII | |
403 # Most of the time its not relevant to run "language-detection" on it. | |
404 if encoding_iana != "ascii": | |
405 for chunk in md_chunks: | |
406 chunk_languages = coherence_ratio( | |
407 chunk, | |
408 language_threshold, | |
409 ",".join(target_languages) if target_languages else None, | |
410 ) | |
411 | |
412 cd_ratios.append(chunk_languages) | |
413 | |
414 cd_ratios_merged = merge_coherence_ratios(cd_ratios) | |
415 | |
416 if cd_ratios_merged: | |
417 logger.log( | |
418 TRACE, | |
419 "We detected language {} using {}".format( | |
420 cd_ratios_merged, encoding_iana | |
421 ), | |
422 ) | |
423 | |
424 results.append( | |
425 CharsetMatch( | |
426 sequences, | |
427 encoding_iana, | |
428 mean_mess_ratio, | |
429 bom_or_sig_available, | |
430 cd_ratios_merged, | |
431 decoded_payload, | |
432 ) | |
433 ) | |
434 | |
435 if ( | |
436 encoding_iana in [specified_encoding, "ascii", "utf_8"] | |
437 and mean_mess_ratio < 0.1 | |
438 ): | |
439 logger.debug( | |
440 "Encoding detection: %s is most likely the one.", encoding_iana | |
441 ) | |
442 if explain: | |
443 logger.removeHandler(explain_handler) | |
444 logger.setLevel(previous_logger_level) | |
445 return CharsetMatches([results[encoding_iana]]) | |
446 | |
447 if encoding_iana == sig_encoding: | |
448 logger.debug( | |
449 "Encoding detection: %s is most likely the one as we detected a BOM or SIG within " | |
450 "the beginning of the sequence.", | |
451 encoding_iana, | |
452 ) | |
453 if explain: | |
454 logger.removeHandler(explain_handler) | |
455 logger.setLevel(previous_logger_level) | |
456 return CharsetMatches([results[encoding_iana]]) | |
457 | |
458 if len(results) == 0: | |
459 if fallback_u8 or fallback_ascii or fallback_specified: | |
460 logger.log( | |
461 TRACE, | |
462 "Nothing got out of the detection process. Using ASCII/UTF-8/Specified fallback.", | |
463 ) | |
464 | |
465 if fallback_specified: | |
466 logger.debug( | |
467 "Encoding detection: %s will be used as a fallback match", | |
468 fallback_specified.encoding, | |
469 ) | |
470 results.append(fallback_specified) | |
471 elif ( | |
472 (fallback_u8 and fallback_ascii is None) | |
473 or ( | |
474 fallback_u8 | |
475 and fallback_ascii | |
476 and fallback_u8.fingerprint != fallback_ascii.fingerprint | |
477 ) | |
478 or (fallback_u8 is not None) | |
479 ): | |
480 logger.debug("Encoding detection: utf_8 will be used as a fallback match") | |
481 results.append(fallback_u8) | |
482 elif fallback_ascii: | |
483 logger.debug("Encoding detection: ascii will be used as a fallback match") | |
484 results.append(fallback_ascii) | |
485 | |
486 if results: | |
487 logger.debug( | |
488 "Encoding detection: Found %s as plausible (best-candidate) for content. With %i alternatives.", | |
489 results.best().encoding, # type: ignore | |
490 len(results) - 1, | |
491 ) | |
492 else: | |
493 logger.debug("Encoding detection: Unable to determine any suitable charset.") | |
494 | |
495 if explain: | |
496 logger.removeHandler(explain_handler) | |
497 logger.setLevel(previous_logger_level) | |
498 | |
499 return results | |
500 | |
501 | |
502 def from_fp( | |
503 fp: BinaryIO, | |
504 steps: int = 5, | |
505 chunk_size: int = 512, | |
506 threshold: float = 0.20, | |
507 cp_isolation: Optional[List[str]] = None, | |
508 cp_exclusion: Optional[List[str]] = None, | |
509 preemptive_behaviour: bool = True, | |
510 explain: bool = False, | |
511 language_threshold: float = 0.1, | |
512 enable_fallback: bool = True, | |
513 ) -> CharsetMatches: | |
514 """ | |
515 Same thing than the function from_bytes but using a file pointer that is already ready. | |
516 Will not close the file pointer. | |
517 """ | |
518 return from_bytes( | |
519 fp.read(), | |
520 steps, | |
521 chunk_size, | |
522 threshold, | |
523 cp_isolation, | |
524 cp_exclusion, | |
525 preemptive_behaviour, | |
526 explain, | |
527 language_threshold, | |
528 enable_fallback, | |
529 ) | |
530 | |
531 | |
532 def from_path( | |
533 path: Union[str, bytes, PathLike], # type: ignore[type-arg] | |
534 steps: int = 5, | |
535 chunk_size: int = 512, | |
536 threshold: float = 0.20, | |
537 cp_isolation: Optional[List[str]] = None, | |
538 cp_exclusion: Optional[List[str]] = None, | |
539 preemptive_behaviour: bool = True, | |
540 explain: bool = False, | |
541 language_threshold: float = 0.1, | |
542 enable_fallback: bool = True, | |
543 ) -> CharsetMatches: | |
544 """ | |
545 Same thing than the function from_bytes but with one extra step. Opening and reading given file path in binary mode. | |
546 Can raise IOError. | |
547 """ | |
548 with open(path, "rb") as fp: | |
549 return from_fp( | |
550 fp, | |
551 steps, | |
552 chunk_size, | |
553 threshold, | |
554 cp_isolation, | |
555 cp_exclusion, | |
556 preemptive_behaviour, | |
557 explain, | |
558 language_threshold, | |
559 enable_fallback, | |
560 ) | |
561 | |
562 | |
563 def is_binary( | |
564 fp_or_path_or_payload: Union[PathLike, str, BinaryIO, bytes], # type: ignore[type-arg] | |
565 steps: int = 5, | |
566 chunk_size: int = 512, | |
567 threshold: float = 0.20, | |
568 cp_isolation: Optional[List[str]] = None, | |
569 cp_exclusion: Optional[List[str]] = None, | |
570 preemptive_behaviour: bool = True, | |
571 explain: bool = False, | |
572 language_threshold: float = 0.1, | |
573 enable_fallback: bool = False, | |
574 ) -> bool: | |
575 """ | |
576 Detect if the given input (file, bytes, or path) points to a binary file. aka. not a string. | |
577 Based on the same main heuristic algorithms and default kwargs at the sole exception that fallbacks match | |
578 are disabled to be stricter around ASCII-compatible but unlikely to be a string. | |
579 """ | |
580 if isinstance(fp_or_path_or_payload, (str, PathLike)): | |
581 guesses = from_path( | |
582 fp_or_path_or_payload, | |
583 steps=steps, | |
584 chunk_size=chunk_size, | |
585 threshold=threshold, | |
586 cp_isolation=cp_isolation, | |
587 cp_exclusion=cp_exclusion, | |
588 preemptive_behaviour=preemptive_behaviour, | |
589 explain=explain, | |
590 language_threshold=language_threshold, | |
591 enable_fallback=enable_fallback, | |
592 ) | |
593 elif isinstance( | |
594 fp_or_path_or_payload, | |
595 ( | |
596 bytes, | |
597 bytearray, | |
598 ), | |
599 ): | |
600 guesses = from_bytes( | |
601 fp_or_path_or_payload, | |
602 steps=steps, | |
603 chunk_size=chunk_size, | |
604 threshold=threshold, | |
605 cp_isolation=cp_isolation, | |
606 cp_exclusion=cp_exclusion, | |
607 preemptive_behaviour=preemptive_behaviour, | |
608 explain=explain, | |
609 language_threshold=language_threshold, | |
610 enable_fallback=enable_fallback, | |
611 ) | |
612 else: | |
613 guesses = from_fp( | |
614 fp_or_path_or_payload, | |
615 steps=steps, | |
616 chunk_size=chunk_size, | |
617 threshold=threshold, | |
618 cp_isolation=cp_isolation, | |
619 cp_exclusion=cp_exclusion, | |
620 preemptive_behaviour=preemptive_behaviour, | |
621 explain=explain, | |
622 language_threshold=language_threshold, | |
623 enable_fallback=enable_fallback, | |
624 ) | |
625 | |
626 return not guesses |