jpayne@7
|
1 import logging
|
jpayne@7
|
2 from os import PathLike
|
jpayne@7
|
3 from typing import BinaryIO, List, Optional, Set, Union
|
jpayne@7
|
4
|
jpayne@7
|
5 from .cd import (
|
jpayne@7
|
6 coherence_ratio,
|
jpayne@7
|
7 encoding_languages,
|
jpayne@7
|
8 mb_encoding_languages,
|
jpayne@7
|
9 merge_coherence_ratios,
|
jpayne@7
|
10 )
|
jpayne@7
|
11 from .constant import IANA_SUPPORTED, TOO_BIG_SEQUENCE, TOO_SMALL_SEQUENCE, TRACE
|
jpayne@7
|
12 from .md import mess_ratio
|
jpayne@7
|
13 from .models import CharsetMatch, CharsetMatches
|
jpayne@7
|
14 from .utils import (
|
jpayne@7
|
15 any_specified_encoding,
|
jpayne@7
|
16 cut_sequence_chunks,
|
jpayne@7
|
17 iana_name,
|
jpayne@7
|
18 identify_sig_or_bom,
|
jpayne@7
|
19 is_cp_similar,
|
jpayne@7
|
20 is_multi_byte_encoding,
|
jpayne@7
|
21 should_strip_sig_or_bom,
|
jpayne@7
|
22 )
|
jpayne@7
|
23
|
jpayne@7
|
24 # Will most likely be controversial
|
jpayne@7
|
25 # logging.addLevelName(TRACE, "TRACE")
|
jpayne@7
|
26 logger = logging.getLogger("charset_normalizer")
|
jpayne@7
|
27 explain_handler = logging.StreamHandler()
|
jpayne@7
|
28 explain_handler.setFormatter(
|
jpayne@7
|
29 logging.Formatter("%(asctime)s | %(levelname)s | %(message)s")
|
jpayne@7
|
30 )
|
jpayne@7
|
31
|
jpayne@7
|
32
|
jpayne@7
|
33 def from_bytes(
|
jpayne@7
|
34 sequences: Union[bytes, bytearray],
|
jpayne@7
|
35 steps: int = 5,
|
jpayne@7
|
36 chunk_size: int = 512,
|
jpayne@7
|
37 threshold: float = 0.2,
|
jpayne@7
|
38 cp_isolation: Optional[List[str]] = None,
|
jpayne@7
|
39 cp_exclusion: Optional[List[str]] = None,
|
jpayne@7
|
40 preemptive_behaviour: bool = True,
|
jpayne@7
|
41 explain: bool = False,
|
jpayne@7
|
42 language_threshold: float = 0.1,
|
jpayne@7
|
43 enable_fallback: bool = True,
|
jpayne@7
|
44 ) -> CharsetMatches:
|
jpayne@7
|
45 """
|
jpayne@7
|
46 Given a raw bytes sequence, return the best possibles charset usable to render str objects.
|
jpayne@7
|
47 If there is no results, it is a strong indicator that the source is binary/not text.
|
jpayne@7
|
48 By default, the process will extract 5 blocks of 512o each to assess the mess and coherence of a given sequence.
|
jpayne@7
|
49 And will give up a particular code page after 20% of measured mess. Those criteria are customizable at will.
|
jpayne@7
|
50
|
jpayne@7
|
51 The preemptive behavior DOES NOT replace the traditional detection workflow, it prioritize a particular code page
|
jpayne@7
|
52 but never take it for granted. Can improve the performance.
|
jpayne@7
|
53
|
jpayne@7
|
54 You may want to focus your attention to some code page or/and not others, use cp_isolation and cp_exclusion for that
|
jpayne@7
|
55 purpose.
|
jpayne@7
|
56
|
jpayne@7
|
57 This function will strip the SIG in the payload/sequence every time except on UTF-16, UTF-32.
|
jpayne@7
|
58 By default the library does not setup any handler other than the NullHandler, if you choose to set the 'explain'
|
jpayne@7
|
59 toggle to True it will alter the logger configuration to add a StreamHandler that is suitable for debugging.
|
jpayne@7
|
60 Custom logging format and handler can be set manually.
|
jpayne@7
|
61 """
|
jpayne@7
|
62
|
jpayne@7
|
63 if not isinstance(sequences, (bytearray, bytes)):
|
jpayne@7
|
64 raise TypeError(
|
jpayne@7
|
65 "Expected object of type bytes or bytearray, got: {0}".format(
|
jpayne@7
|
66 type(sequences)
|
jpayne@7
|
67 )
|
jpayne@7
|
68 )
|
jpayne@7
|
69
|
jpayne@7
|
70 if explain:
|
jpayne@7
|
71 previous_logger_level: int = logger.level
|
jpayne@7
|
72 logger.addHandler(explain_handler)
|
jpayne@7
|
73 logger.setLevel(TRACE)
|
jpayne@7
|
74
|
jpayne@7
|
75 length: int = len(sequences)
|
jpayne@7
|
76
|
jpayne@7
|
77 if length == 0:
|
jpayne@7
|
78 logger.debug("Encoding detection on empty bytes, assuming utf_8 intention.")
|
jpayne@7
|
79 if explain:
|
jpayne@7
|
80 logger.removeHandler(explain_handler)
|
jpayne@7
|
81 logger.setLevel(previous_logger_level or logging.WARNING)
|
jpayne@7
|
82 return CharsetMatches([CharsetMatch(sequences, "utf_8", 0.0, False, [], "")])
|
jpayne@7
|
83
|
jpayne@7
|
84 if cp_isolation is not None:
|
jpayne@7
|
85 logger.log(
|
jpayne@7
|
86 TRACE,
|
jpayne@7
|
87 "cp_isolation is set. use this flag for debugging purpose. "
|
jpayne@7
|
88 "limited list of encoding allowed : %s.",
|
jpayne@7
|
89 ", ".join(cp_isolation),
|
jpayne@7
|
90 )
|
jpayne@7
|
91 cp_isolation = [iana_name(cp, False) for cp in cp_isolation]
|
jpayne@7
|
92 else:
|
jpayne@7
|
93 cp_isolation = []
|
jpayne@7
|
94
|
jpayne@7
|
95 if cp_exclusion is not None:
|
jpayne@7
|
96 logger.log(
|
jpayne@7
|
97 TRACE,
|
jpayne@7
|
98 "cp_exclusion is set. use this flag for debugging purpose. "
|
jpayne@7
|
99 "limited list of encoding excluded : %s.",
|
jpayne@7
|
100 ", ".join(cp_exclusion),
|
jpayne@7
|
101 )
|
jpayne@7
|
102 cp_exclusion = [iana_name(cp, False) for cp in cp_exclusion]
|
jpayne@7
|
103 else:
|
jpayne@7
|
104 cp_exclusion = []
|
jpayne@7
|
105
|
jpayne@7
|
106 if length <= (chunk_size * steps):
|
jpayne@7
|
107 logger.log(
|
jpayne@7
|
108 TRACE,
|
jpayne@7
|
109 "override steps (%i) and chunk_size (%i) as content does not fit (%i byte(s) given) parameters.",
|
jpayne@7
|
110 steps,
|
jpayne@7
|
111 chunk_size,
|
jpayne@7
|
112 length,
|
jpayne@7
|
113 )
|
jpayne@7
|
114 steps = 1
|
jpayne@7
|
115 chunk_size = length
|
jpayne@7
|
116
|
jpayne@7
|
117 if steps > 1 and length / steps < chunk_size:
|
jpayne@7
|
118 chunk_size = int(length / steps)
|
jpayne@7
|
119
|
jpayne@7
|
120 is_too_small_sequence: bool = len(sequences) < TOO_SMALL_SEQUENCE
|
jpayne@7
|
121 is_too_large_sequence: bool = len(sequences) >= TOO_BIG_SEQUENCE
|
jpayne@7
|
122
|
jpayne@7
|
123 if is_too_small_sequence:
|
jpayne@7
|
124 logger.log(
|
jpayne@7
|
125 TRACE,
|
jpayne@7
|
126 "Trying to detect encoding from a tiny portion of ({}) byte(s).".format(
|
jpayne@7
|
127 length
|
jpayne@7
|
128 ),
|
jpayne@7
|
129 )
|
jpayne@7
|
130 elif is_too_large_sequence:
|
jpayne@7
|
131 logger.log(
|
jpayne@7
|
132 TRACE,
|
jpayne@7
|
133 "Using lazy str decoding because the payload is quite large, ({}) byte(s).".format(
|
jpayne@7
|
134 length
|
jpayne@7
|
135 ),
|
jpayne@7
|
136 )
|
jpayne@7
|
137
|
jpayne@7
|
138 prioritized_encodings: List[str] = []
|
jpayne@7
|
139
|
jpayne@7
|
140 specified_encoding: Optional[str] = (
|
jpayne@7
|
141 any_specified_encoding(sequences) if preemptive_behaviour else None
|
jpayne@7
|
142 )
|
jpayne@7
|
143
|
jpayne@7
|
144 if specified_encoding is not None:
|
jpayne@7
|
145 prioritized_encodings.append(specified_encoding)
|
jpayne@7
|
146 logger.log(
|
jpayne@7
|
147 TRACE,
|
jpayne@7
|
148 "Detected declarative mark in sequence. Priority +1 given for %s.",
|
jpayne@7
|
149 specified_encoding,
|
jpayne@7
|
150 )
|
jpayne@7
|
151
|
jpayne@7
|
152 tested: Set[str] = set()
|
jpayne@7
|
153 tested_but_hard_failure: List[str] = []
|
jpayne@7
|
154 tested_but_soft_failure: List[str] = []
|
jpayne@7
|
155
|
jpayne@7
|
156 fallback_ascii: Optional[CharsetMatch] = None
|
jpayne@7
|
157 fallback_u8: Optional[CharsetMatch] = None
|
jpayne@7
|
158 fallback_specified: Optional[CharsetMatch] = None
|
jpayne@7
|
159
|
jpayne@7
|
160 results: CharsetMatches = CharsetMatches()
|
jpayne@7
|
161
|
jpayne@7
|
162 sig_encoding, sig_payload = identify_sig_or_bom(sequences)
|
jpayne@7
|
163
|
jpayne@7
|
164 if sig_encoding is not None:
|
jpayne@7
|
165 prioritized_encodings.append(sig_encoding)
|
jpayne@7
|
166 logger.log(
|
jpayne@7
|
167 TRACE,
|
jpayne@7
|
168 "Detected a SIG or BOM mark on first %i byte(s). Priority +1 given for %s.",
|
jpayne@7
|
169 len(sig_payload),
|
jpayne@7
|
170 sig_encoding,
|
jpayne@7
|
171 )
|
jpayne@7
|
172
|
jpayne@7
|
173 prioritized_encodings.append("ascii")
|
jpayne@7
|
174
|
jpayne@7
|
175 if "utf_8" not in prioritized_encodings:
|
jpayne@7
|
176 prioritized_encodings.append("utf_8")
|
jpayne@7
|
177
|
jpayne@7
|
178 for encoding_iana in prioritized_encodings + IANA_SUPPORTED:
|
jpayne@7
|
179 if cp_isolation and encoding_iana not in cp_isolation:
|
jpayne@7
|
180 continue
|
jpayne@7
|
181
|
jpayne@7
|
182 if cp_exclusion and encoding_iana in cp_exclusion:
|
jpayne@7
|
183 continue
|
jpayne@7
|
184
|
jpayne@7
|
185 if encoding_iana in tested:
|
jpayne@7
|
186 continue
|
jpayne@7
|
187
|
jpayne@7
|
188 tested.add(encoding_iana)
|
jpayne@7
|
189
|
jpayne@7
|
190 decoded_payload: Optional[str] = None
|
jpayne@7
|
191 bom_or_sig_available: bool = sig_encoding == encoding_iana
|
jpayne@7
|
192 strip_sig_or_bom: bool = bom_or_sig_available and should_strip_sig_or_bom(
|
jpayne@7
|
193 encoding_iana
|
jpayne@7
|
194 )
|
jpayne@7
|
195
|
jpayne@7
|
196 if encoding_iana in {"utf_16", "utf_32"} and not bom_or_sig_available:
|
jpayne@7
|
197 logger.log(
|
jpayne@7
|
198 TRACE,
|
jpayne@7
|
199 "Encoding %s won't be tested as-is because it require a BOM. Will try some sub-encoder LE/BE.",
|
jpayne@7
|
200 encoding_iana,
|
jpayne@7
|
201 )
|
jpayne@7
|
202 continue
|
jpayne@7
|
203 if encoding_iana in {"utf_7"} and not bom_or_sig_available:
|
jpayne@7
|
204 logger.log(
|
jpayne@7
|
205 TRACE,
|
jpayne@7
|
206 "Encoding %s won't be tested as-is because detection is unreliable without BOM/SIG.",
|
jpayne@7
|
207 encoding_iana,
|
jpayne@7
|
208 )
|
jpayne@7
|
209 continue
|
jpayne@7
|
210
|
jpayne@7
|
211 try:
|
jpayne@7
|
212 is_multi_byte_decoder: bool = is_multi_byte_encoding(encoding_iana)
|
jpayne@7
|
213 except (ModuleNotFoundError, ImportError):
|
jpayne@7
|
214 logger.log(
|
jpayne@7
|
215 TRACE,
|
jpayne@7
|
216 "Encoding %s does not provide an IncrementalDecoder",
|
jpayne@7
|
217 encoding_iana,
|
jpayne@7
|
218 )
|
jpayne@7
|
219 continue
|
jpayne@7
|
220
|
jpayne@7
|
221 try:
|
jpayne@7
|
222 if is_too_large_sequence and is_multi_byte_decoder is False:
|
jpayne@7
|
223 str(
|
jpayne@7
|
224 sequences[: int(50e4)]
|
jpayne@7
|
225 if strip_sig_or_bom is False
|
jpayne@7
|
226 else sequences[len(sig_payload) : int(50e4)],
|
jpayne@7
|
227 encoding=encoding_iana,
|
jpayne@7
|
228 )
|
jpayne@7
|
229 else:
|
jpayne@7
|
230 decoded_payload = str(
|
jpayne@7
|
231 sequences
|
jpayne@7
|
232 if strip_sig_or_bom is False
|
jpayne@7
|
233 else sequences[len(sig_payload) :],
|
jpayne@7
|
234 encoding=encoding_iana,
|
jpayne@7
|
235 )
|
jpayne@7
|
236 except (UnicodeDecodeError, LookupError) as e:
|
jpayne@7
|
237 if not isinstance(e, LookupError):
|
jpayne@7
|
238 logger.log(
|
jpayne@7
|
239 TRACE,
|
jpayne@7
|
240 "Code page %s does not fit given bytes sequence at ALL. %s",
|
jpayne@7
|
241 encoding_iana,
|
jpayne@7
|
242 str(e),
|
jpayne@7
|
243 )
|
jpayne@7
|
244 tested_but_hard_failure.append(encoding_iana)
|
jpayne@7
|
245 continue
|
jpayne@7
|
246
|
jpayne@7
|
247 similar_soft_failure_test: bool = False
|
jpayne@7
|
248
|
jpayne@7
|
249 for encoding_soft_failed in tested_but_soft_failure:
|
jpayne@7
|
250 if is_cp_similar(encoding_iana, encoding_soft_failed):
|
jpayne@7
|
251 similar_soft_failure_test = True
|
jpayne@7
|
252 break
|
jpayne@7
|
253
|
jpayne@7
|
254 if similar_soft_failure_test:
|
jpayne@7
|
255 logger.log(
|
jpayne@7
|
256 TRACE,
|
jpayne@7
|
257 "%s is deemed too similar to code page %s and was consider unsuited already. Continuing!",
|
jpayne@7
|
258 encoding_iana,
|
jpayne@7
|
259 encoding_soft_failed,
|
jpayne@7
|
260 )
|
jpayne@7
|
261 continue
|
jpayne@7
|
262
|
jpayne@7
|
263 r_ = range(
|
jpayne@7
|
264 0 if not bom_or_sig_available else len(sig_payload),
|
jpayne@7
|
265 length,
|
jpayne@7
|
266 int(length / steps),
|
jpayne@7
|
267 )
|
jpayne@7
|
268
|
jpayne@7
|
269 multi_byte_bonus: bool = (
|
jpayne@7
|
270 is_multi_byte_decoder
|
jpayne@7
|
271 and decoded_payload is not None
|
jpayne@7
|
272 and len(decoded_payload) < length
|
jpayne@7
|
273 )
|
jpayne@7
|
274
|
jpayne@7
|
275 if multi_byte_bonus:
|
jpayne@7
|
276 logger.log(
|
jpayne@7
|
277 TRACE,
|
jpayne@7
|
278 "Code page %s is a multi byte encoding table and it appear that at least one character "
|
jpayne@7
|
279 "was encoded using n-bytes.",
|
jpayne@7
|
280 encoding_iana,
|
jpayne@7
|
281 )
|
jpayne@7
|
282
|
jpayne@7
|
283 max_chunk_gave_up: int = int(len(r_) / 4)
|
jpayne@7
|
284
|
jpayne@7
|
285 max_chunk_gave_up = max(max_chunk_gave_up, 2)
|
jpayne@7
|
286 early_stop_count: int = 0
|
jpayne@7
|
287 lazy_str_hard_failure = False
|
jpayne@7
|
288
|
jpayne@7
|
289 md_chunks: List[str] = []
|
jpayne@7
|
290 md_ratios = []
|
jpayne@7
|
291
|
jpayne@7
|
292 try:
|
jpayne@7
|
293 for chunk in cut_sequence_chunks(
|
jpayne@7
|
294 sequences,
|
jpayne@7
|
295 encoding_iana,
|
jpayne@7
|
296 r_,
|
jpayne@7
|
297 chunk_size,
|
jpayne@7
|
298 bom_or_sig_available,
|
jpayne@7
|
299 strip_sig_or_bom,
|
jpayne@7
|
300 sig_payload,
|
jpayne@7
|
301 is_multi_byte_decoder,
|
jpayne@7
|
302 decoded_payload,
|
jpayne@7
|
303 ):
|
jpayne@7
|
304 md_chunks.append(chunk)
|
jpayne@7
|
305
|
jpayne@7
|
306 md_ratios.append(
|
jpayne@7
|
307 mess_ratio(
|
jpayne@7
|
308 chunk,
|
jpayne@7
|
309 threshold,
|
jpayne@7
|
310 explain is True and 1 <= len(cp_isolation) <= 2,
|
jpayne@7
|
311 )
|
jpayne@7
|
312 )
|
jpayne@7
|
313
|
jpayne@7
|
314 if md_ratios[-1] >= threshold:
|
jpayne@7
|
315 early_stop_count += 1
|
jpayne@7
|
316
|
jpayne@7
|
317 if (early_stop_count >= max_chunk_gave_up) or (
|
jpayne@7
|
318 bom_or_sig_available and strip_sig_or_bom is False
|
jpayne@7
|
319 ):
|
jpayne@7
|
320 break
|
jpayne@7
|
321 except (
|
jpayne@7
|
322 UnicodeDecodeError
|
jpayne@7
|
323 ) as e: # Lazy str loading may have missed something there
|
jpayne@7
|
324 logger.log(
|
jpayne@7
|
325 TRACE,
|
jpayne@7
|
326 "LazyStr Loading: After MD chunk decode, code page %s does not fit given bytes sequence at ALL. %s",
|
jpayne@7
|
327 encoding_iana,
|
jpayne@7
|
328 str(e),
|
jpayne@7
|
329 )
|
jpayne@7
|
330 early_stop_count = max_chunk_gave_up
|
jpayne@7
|
331 lazy_str_hard_failure = True
|
jpayne@7
|
332
|
jpayne@7
|
333 # We might want to check the sequence again with the whole content
|
jpayne@7
|
334 # Only if initial MD tests passes
|
jpayne@7
|
335 if (
|
jpayne@7
|
336 not lazy_str_hard_failure
|
jpayne@7
|
337 and is_too_large_sequence
|
jpayne@7
|
338 and not is_multi_byte_decoder
|
jpayne@7
|
339 ):
|
jpayne@7
|
340 try:
|
jpayne@7
|
341 sequences[int(50e3) :].decode(encoding_iana, errors="strict")
|
jpayne@7
|
342 except UnicodeDecodeError as e:
|
jpayne@7
|
343 logger.log(
|
jpayne@7
|
344 TRACE,
|
jpayne@7
|
345 "LazyStr Loading: After final lookup, code page %s does not fit given bytes sequence at ALL. %s",
|
jpayne@7
|
346 encoding_iana,
|
jpayne@7
|
347 str(e),
|
jpayne@7
|
348 )
|
jpayne@7
|
349 tested_but_hard_failure.append(encoding_iana)
|
jpayne@7
|
350 continue
|
jpayne@7
|
351
|
jpayne@7
|
352 mean_mess_ratio: float = sum(md_ratios) / len(md_ratios) if md_ratios else 0.0
|
jpayne@7
|
353 if mean_mess_ratio >= threshold or early_stop_count >= max_chunk_gave_up:
|
jpayne@7
|
354 tested_but_soft_failure.append(encoding_iana)
|
jpayne@7
|
355 logger.log(
|
jpayne@7
|
356 TRACE,
|
jpayne@7
|
357 "%s was excluded because of initial chaos probing. Gave up %i time(s). "
|
jpayne@7
|
358 "Computed mean chaos is %f %%.",
|
jpayne@7
|
359 encoding_iana,
|
jpayne@7
|
360 early_stop_count,
|
jpayne@7
|
361 round(mean_mess_ratio * 100, ndigits=3),
|
jpayne@7
|
362 )
|
jpayne@7
|
363 # Preparing those fallbacks in case we got nothing.
|
jpayne@7
|
364 if (
|
jpayne@7
|
365 enable_fallback
|
jpayne@7
|
366 and encoding_iana in ["ascii", "utf_8", specified_encoding]
|
jpayne@7
|
367 and not lazy_str_hard_failure
|
jpayne@7
|
368 ):
|
jpayne@7
|
369 fallback_entry = CharsetMatch(
|
jpayne@7
|
370 sequences, encoding_iana, threshold, False, [], decoded_payload
|
jpayne@7
|
371 )
|
jpayne@7
|
372 if encoding_iana == specified_encoding:
|
jpayne@7
|
373 fallback_specified = fallback_entry
|
jpayne@7
|
374 elif encoding_iana == "ascii":
|
jpayne@7
|
375 fallback_ascii = fallback_entry
|
jpayne@7
|
376 else:
|
jpayne@7
|
377 fallback_u8 = fallback_entry
|
jpayne@7
|
378 continue
|
jpayne@7
|
379
|
jpayne@7
|
380 logger.log(
|
jpayne@7
|
381 TRACE,
|
jpayne@7
|
382 "%s passed initial chaos probing. Mean measured chaos is %f %%",
|
jpayne@7
|
383 encoding_iana,
|
jpayne@7
|
384 round(mean_mess_ratio * 100, ndigits=3),
|
jpayne@7
|
385 )
|
jpayne@7
|
386
|
jpayne@7
|
387 if not is_multi_byte_decoder:
|
jpayne@7
|
388 target_languages: List[str] = encoding_languages(encoding_iana)
|
jpayne@7
|
389 else:
|
jpayne@7
|
390 target_languages = mb_encoding_languages(encoding_iana)
|
jpayne@7
|
391
|
jpayne@7
|
392 if target_languages:
|
jpayne@7
|
393 logger.log(
|
jpayne@7
|
394 TRACE,
|
jpayne@7
|
395 "{} should target any language(s) of {}".format(
|
jpayne@7
|
396 encoding_iana, str(target_languages)
|
jpayne@7
|
397 ),
|
jpayne@7
|
398 )
|
jpayne@7
|
399
|
jpayne@7
|
400 cd_ratios = []
|
jpayne@7
|
401
|
jpayne@7
|
402 # We shall skip the CD when its about ASCII
|
jpayne@7
|
403 # Most of the time its not relevant to run "language-detection" on it.
|
jpayne@7
|
404 if encoding_iana != "ascii":
|
jpayne@7
|
405 for chunk in md_chunks:
|
jpayne@7
|
406 chunk_languages = coherence_ratio(
|
jpayne@7
|
407 chunk,
|
jpayne@7
|
408 language_threshold,
|
jpayne@7
|
409 ",".join(target_languages) if target_languages else None,
|
jpayne@7
|
410 )
|
jpayne@7
|
411
|
jpayne@7
|
412 cd_ratios.append(chunk_languages)
|
jpayne@7
|
413
|
jpayne@7
|
414 cd_ratios_merged = merge_coherence_ratios(cd_ratios)
|
jpayne@7
|
415
|
jpayne@7
|
416 if cd_ratios_merged:
|
jpayne@7
|
417 logger.log(
|
jpayne@7
|
418 TRACE,
|
jpayne@7
|
419 "We detected language {} using {}".format(
|
jpayne@7
|
420 cd_ratios_merged, encoding_iana
|
jpayne@7
|
421 ),
|
jpayne@7
|
422 )
|
jpayne@7
|
423
|
jpayne@7
|
424 results.append(
|
jpayne@7
|
425 CharsetMatch(
|
jpayne@7
|
426 sequences,
|
jpayne@7
|
427 encoding_iana,
|
jpayne@7
|
428 mean_mess_ratio,
|
jpayne@7
|
429 bom_or_sig_available,
|
jpayne@7
|
430 cd_ratios_merged,
|
jpayne@7
|
431 decoded_payload,
|
jpayne@7
|
432 )
|
jpayne@7
|
433 )
|
jpayne@7
|
434
|
jpayne@7
|
435 if (
|
jpayne@7
|
436 encoding_iana in [specified_encoding, "ascii", "utf_8"]
|
jpayne@7
|
437 and mean_mess_ratio < 0.1
|
jpayne@7
|
438 ):
|
jpayne@7
|
439 logger.debug(
|
jpayne@7
|
440 "Encoding detection: %s is most likely the one.", encoding_iana
|
jpayne@7
|
441 )
|
jpayne@7
|
442 if explain:
|
jpayne@7
|
443 logger.removeHandler(explain_handler)
|
jpayne@7
|
444 logger.setLevel(previous_logger_level)
|
jpayne@7
|
445 return CharsetMatches([results[encoding_iana]])
|
jpayne@7
|
446
|
jpayne@7
|
447 if encoding_iana == sig_encoding:
|
jpayne@7
|
448 logger.debug(
|
jpayne@7
|
449 "Encoding detection: %s is most likely the one as we detected a BOM or SIG within "
|
jpayne@7
|
450 "the beginning of the sequence.",
|
jpayne@7
|
451 encoding_iana,
|
jpayne@7
|
452 )
|
jpayne@7
|
453 if explain:
|
jpayne@7
|
454 logger.removeHandler(explain_handler)
|
jpayne@7
|
455 logger.setLevel(previous_logger_level)
|
jpayne@7
|
456 return CharsetMatches([results[encoding_iana]])
|
jpayne@7
|
457
|
jpayne@7
|
458 if len(results) == 0:
|
jpayne@7
|
459 if fallback_u8 or fallback_ascii or fallback_specified:
|
jpayne@7
|
460 logger.log(
|
jpayne@7
|
461 TRACE,
|
jpayne@7
|
462 "Nothing got out of the detection process. Using ASCII/UTF-8/Specified fallback.",
|
jpayne@7
|
463 )
|
jpayne@7
|
464
|
jpayne@7
|
465 if fallback_specified:
|
jpayne@7
|
466 logger.debug(
|
jpayne@7
|
467 "Encoding detection: %s will be used as a fallback match",
|
jpayne@7
|
468 fallback_specified.encoding,
|
jpayne@7
|
469 )
|
jpayne@7
|
470 results.append(fallback_specified)
|
jpayne@7
|
471 elif (
|
jpayne@7
|
472 (fallback_u8 and fallback_ascii is None)
|
jpayne@7
|
473 or (
|
jpayne@7
|
474 fallback_u8
|
jpayne@7
|
475 and fallback_ascii
|
jpayne@7
|
476 and fallback_u8.fingerprint != fallback_ascii.fingerprint
|
jpayne@7
|
477 )
|
jpayne@7
|
478 or (fallback_u8 is not None)
|
jpayne@7
|
479 ):
|
jpayne@7
|
480 logger.debug("Encoding detection: utf_8 will be used as a fallback match")
|
jpayne@7
|
481 results.append(fallback_u8)
|
jpayne@7
|
482 elif fallback_ascii:
|
jpayne@7
|
483 logger.debug("Encoding detection: ascii will be used as a fallback match")
|
jpayne@7
|
484 results.append(fallback_ascii)
|
jpayne@7
|
485
|
jpayne@7
|
486 if results:
|
jpayne@7
|
487 logger.debug(
|
jpayne@7
|
488 "Encoding detection: Found %s as plausible (best-candidate) for content. With %i alternatives.",
|
jpayne@7
|
489 results.best().encoding, # type: ignore
|
jpayne@7
|
490 len(results) - 1,
|
jpayne@7
|
491 )
|
jpayne@7
|
492 else:
|
jpayne@7
|
493 logger.debug("Encoding detection: Unable to determine any suitable charset.")
|
jpayne@7
|
494
|
jpayne@7
|
495 if explain:
|
jpayne@7
|
496 logger.removeHandler(explain_handler)
|
jpayne@7
|
497 logger.setLevel(previous_logger_level)
|
jpayne@7
|
498
|
jpayne@7
|
499 return results
|
jpayne@7
|
500
|
jpayne@7
|
501
|
jpayne@7
|
502 def from_fp(
|
jpayne@7
|
503 fp: BinaryIO,
|
jpayne@7
|
504 steps: int = 5,
|
jpayne@7
|
505 chunk_size: int = 512,
|
jpayne@7
|
506 threshold: float = 0.20,
|
jpayne@7
|
507 cp_isolation: Optional[List[str]] = None,
|
jpayne@7
|
508 cp_exclusion: Optional[List[str]] = None,
|
jpayne@7
|
509 preemptive_behaviour: bool = True,
|
jpayne@7
|
510 explain: bool = False,
|
jpayne@7
|
511 language_threshold: float = 0.1,
|
jpayne@7
|
512 enable_fallback: bool = True,
|
jpayne@7
|
513 ) -> CharsetMatches:
|
jpayne@7
|
514 """
|
jpayne@7
|
515 Same thing than the function from_bytes but using a file pointer that is already ready.
|
jpayne@7
|
516 Will not close the file pointer.
|
jpayne@7
|
517 """
|
jpayne@7
|
518 return from_bytes(
|
jpayne@7
|
519 fp.read(),
|
jpayne@7
|
520 steps,
|
jpayne@7
|
521 chunk_size,
|
jpayne@7
|
522 threshold,
|
jpayne@7
|
523 cp_isolation,
|
jpayne@7
|
524 cp_exclusion,
|
jpayne@7
|
525 preemptive_behaviour,
|
jpayne@7
|
526 explain,
|
jpayne@7
|
527 language_threshold,
|
jpayne@7
|
528 enable_fallback,
|
jpayne@7
|
529 )
|
jpayne@7
|
530
|
jpayne@7
|
531
|
jpayne@7
|
532 def from_path(
|
jpayne@7
|
533 path: Union[str, bytes, PathLike], # type: ignore[type-arg]
|
jpayne@7
|
534 steps: int = 5,
|
jpayne@7
|
535 chunk_size: int = 512,
|
jpayne@7
|
536 threshold: float = 0.20,
|
jpayne@7
|
537 cp_isolation: Optional[List[str]] = None,
|
jpayne@7
|
538 cp_exclusion: Optional[List[str]] = None,
|
jpayne@7
|
539 preemptive_behaviour: bool = True,
|
jpayne@7
|
540 explain: bool = False,
|
jpayne@7
|
541 language_threshold: float = 0.1,
|
jpayne@7
|
542 enable_fallback: bool = True,
|
jpayne@7
|
543 ) -> CharsetMatches:
|
jpayne@7
|
544 """
|
jpayne@7
|
545 Same thing than the function from_bytes but with one extra step. Opening and reading given file path in binary mode.
|
jpayne@7
|
546 Can raise IOError.
|
jpayne@7
|
547 """
|
jpayne@7
|
548 with open(path, "rb") as fp:
|
jpayne@7
|
549 return from_fp(
|
jpayne@7
|
550 fp,
|
jpayne@7
|
551 steps,
|
jpayne@7
|
552 chunk_size,
|
jpayne@7
|
553 threshold,
|
jpayne@7
|
554 cp_isolation,
|
jpayne@7
|
555 cp_exclusion,
|
jpayne@7
|
556 preemptive_behaviour,
|
jpayne@7
|
557 explain,
|
jpayne@7
|
558 language_threshold,
|
jpayne@7
|
559 enable_fallback,
|
jpayne@7
|
560 )
|
jpayne@7
|
561
|
jpayne@7
|
562
|
jpayne@7
|
563 def is_binary(
|
jpayne@7
|
564 fp_or_path_or_payload: Union[PathLike, str, BinaryIO, bytes], # type: ignore[type-arg]
|
jpayne@7
|
565 steps: int = 5,
|
jpayne@7
|
566 chunk_size: int = 512,
|
jpayne@7
|
567 threshold: float = 0.20,
|
jpayne@7
|
568 cp_isolation: Optional[List[str]] = None,
|
jpayne@7
|
569 cp_exclusion: Optional[List[str]] = None,
|
jpayne@7
|
570 preemptive_behaviour: bool = True,
|
jpayne@7
|
571 explain: bool = False,
|
jpayne@7
|
572 language_threshold: float = 0.1,
|
jpayne@7
|
573 enable_fallback: bool = False,
|
jpayne@7
|
574 ) -> bool:
|
jpayne@7
|
575 """
|
jpayne@7
|
576 Detect if the given input (file, bytes, or path) points to a binary file. aka. not a string.
|
jpayne@7
|
577 Based on the same main heuristic algorithms and default kwargs at the sole exception that fallbacks match
|
jpayne@7
|
578 are disabled to be stricter around ASCII-compatible but unlikely to be a string.
|
jpayne@7
|
579 """
|
jpayne@7
|
580 if isinstance(fp_or_path_or_payload, (str, PathLike)):
|
jpayne@7
|
581 guesses = from_path(
|
jpayne@7
|
582 fp_or_path_or_payload,
|
jpayne@7
|
583 steps=steps,
|
jpayne@7
|
584 chunk_size=chunk_size,
|
jpayne@7
|
585 threshold=threshold,
|
jpayne@7
|
586 cp_isolation=cp_isolation,
|
jpayne@7
|
587 cp_exclusion=cp_exclusion,
|
jpayne@7
|
588 preemptive_behaviour=preemptive_behaviour,
|
jpayne@7
|
589 explain=explain,
|
jpayne@7
|
590 language_threshold=language_threshold,
|
jpayne@7
|
591 enable_fallback=enable_fallback,
|
jpayne@7
|
592 )
|
jpayne@7
|
593 elif isinstance(
|
jpayne@7
|
594 fp_or_path_or_payload,
|
jpayne@7
|
595 (
|
jpayne@7
|
596 bytes,
|
jpayne@7
|
597 bytearray,
|
jpayne@7
|
598 ),
|
jpayne@7
|
599 ):
|
jpayne@7
|
600 guesses = from_bytes(
|
jpayne@7
|
601 fp_or_path_or_payload,
|
jpayne@7
|
602 steps=steps,
|
jpayne@7
|
603 chunk_size=chunk_size,
|
jpayne@7
|
604 threshold=threshold,
|
jpayne@7
|
605 cp_isolation=cp_isolation,
|
jpayne@7
|
606 cp_exclusion=cp_exclusion,
|
jpayne@7
|
607 preemptive_behaviour=preemptive_behaviour,
|
jpayne@7
|
608 explain=explain,
|
jpayne@7
|
609 language_threshold=language_threshold,
|
jpayne@7
|
610 enable_fallback=enable_fallback,
|
jpayne@7
|
611 )
|
jpayne@7
|
612 else:
|
jpayne@7
|
613 guesses = from_fp(
|
jpayne@7
|
614 fp_or_path_or_payload,
|
jpayne@7
|
615 steps=steps,
|
jpayne@7
|
616 chunk_size=chunk_size,
|
jpayne@7
|
617 threshold=threshold,
|
jpayne@7
|
618 cp_isolation=cp_isolation,
|
jpayne@7
|
619 cp_exclusion=cp_exclusion,
|
jpayne@7
|
620 preemptive_behaviour=preemptive_behaviour,
|
jpayne@7
|
621 explain=explain,
|
jpayne@7
|
622 language_threshold=language_threshold,
|
jpayne@7
|
623 enable_fallback=enable_fallback,
|
jpayne@7
|
624 )
|
jpayne@7
|
625
|
jpayne@7
|
626 return not guesses
|