annotate charset_normalizer/api.py @ 16:dc2c003078e9 tip

planemo upload for repository https://toolrepo.galaxytrakr.org/view/jpayne/bioproject_to_srr_2/556cac4fb538
author jpayne
date Tue, 21 May 2024 01:09:25 -0400
parents 5eb2d5e3bf22
children
rev   line source
jpayne@7 1 import logging
jpayne@7 2 from os import PathLike
jpayne@7 3 from typing import BinaryIO, List, Optional, Set, Union
jpayne@7 4
jpayne@7 5 from .cd import (
jpayne@7 6 coherence_ratio,
jpayne@7 7 encoding_languages,
jpayne@7 8 mb_encoding_languages,
jpayne@7 9 merge_coherence_ratios,
jpayne@7 10 )
jpayne@7 11 from .constant import IANA_SUPPORTED, TOO_BIG_SEQUENCE, TOO_SMALL_SEQUENCE, TRACE
jpayne@7 12 from .md import mess_ratio
jpayne@7 13 from .models import CharsetMatch, CharsetMatches
jpayne@7 14 from .utils import (
jpayne@7 15 any_specified_encoding,
jpayne@7 16 cut_sequence_chunks,
jpayne@7 17 iana_name,
jpayne@7 18 identify_sig_or_bom,
jpayne@7 19 is_cp_similar,
jpayne@7 20 is_multi_byte_encoding,
jpayne@7 21 should_strip_sig_or_bom,
jpayne@7 22 )
jpayne@7 23
jpayne@7 24 # Will most likely be controversial
jpayne@7 25 # logging.addLevelName(TRACE, "TRACE")
jpayne@7 26 logger = logging.getLogger("charset_normalizer")
jpayne@7 27 explain_handler = logging.StreamHandler()
jpayne@7 28 explain_handler.setFormatter(
jpayne@7 29 logging.Formatter("%(asctime)s | %(levelname)s | %(message)s")
jpayne@7 30 )
jpayne@7 31
jpayne@7 32
jpayne@7 33 def from_bytes(
jpayne@7 34 sequences: Union[bytes, bytearray],
jpayne@7 35 steps: int = 5,
jpayne@7 36 chunk_size: int = 512,
jpayne@7 37 threshold: float = 0.2,
jpayne@7 38 cp_isolation: Optional[List[str]] = None,
jpayne@7 39 cp_exclusion: Optional[List[str]] = None,
jpayne@7 40 preemptive_behaviour: bool = True,
jpayne@7 41 explain: bool = False,
jpayne@7 42 language_threshold: float = 0.1,
jpayne@7 43 enable_fallback: bool = True,
jpayne@7 44 ) -> CharsetMatches:
jpayne@7 45 """
jpayne@7 46 Given a raw bytes sequence, return the best possibles charset usable to render str objects.
jpayne@7 47 If there is no results, it is a strong indicator that the source is binary/not text.
jpayne@7 48 By default, the process will extract 5 blocks of 512o each to assess the mess and coherence of a given sequence.
jpayne@7 49 And will give up a particular code page after 20% of measured mess. Those criteria are customizable at will.
jpayne@7 50
jpayne@7 51 The preemptive behavior DOES NOT replace the traditional detection workflow, it prioritize a particular code page
jpayne@7 52 but never take it for granted. Can improve the performance.
jpayne@7 53
jpayne@7 54 You may want to focus your attention to some code page or/and not others, use cp_isolation and cp_exclusion for that
jpayne@7 55 purpose.
jpayne@7 56
jpayne@7 57 This function will strip the SIG in the payload/sequence every time except on UTF-16, UTF-32.
jpayne@7 58 By default the library does not setup any handler other than the NullHandler, if you choose to set the 'explain'
jpayne@7 59 toggle to True it will alter the logger configuration to add a StreamHandler that is suitable for debugging.
jpayne@7 60 Custom logging format and handler can be set manually.
jpayne@7 61 """
jpayne@7 62
jpayne@7 63 if not isinstance(sequences, (bytearray, bytes)):
jpayne@7 64 raise TypeError(
jpayne@7 65 "Expected object of type bytes or bytearray, got: {0}".format(
jpayne@7 66 type(sequences)
jpayne@7 67 )
jpayne@7 68 )
jpayne@7 69
jpayne@7 70 if explain:
jpayne@7 71 previous_logger_level: int = logger.level
jpayne@7 72 logger.addHandler(explain_handler)
jpayne@7 73 logger.setLevel(TRACE)
jpayne@7 74
jpayne@7 75 length: int = len(sequences)
jpayne@7 76
jpayne@7 77 if length == 0:
jpayne@7 78 logger.debug("Encoding detection on empty bytes, assuming utf_8 intention.")
jpayne@7 79 if explain:
jpayne@7 80 logger.removeHandler(explain_handler)
jpayne@7 81 logger.setLevel(previous_logger_level or logging.WARNING)
jpayne@7 82 return CharsetMatches([CharsetMatch(sequences, "utf_8", 0.0, False, [], "")])
jpayne@7 83
jpayne@7 84 if cp_isolation is not None:
jpayne@7 85 logger.log(
jpayne@7 86 TRACE,
jpayne@7 87 "cp_isolation is set. use this flag for debugging purpose. "
jpayne@7 88 "limited list of encoding allowed : %s.",
jpayne@7 89 ", ".join(cp_isolation),
jpayne@7 90 )
jpayne@7 91 cp_isolation = [iana_name(cp, False) for cp in cp_isolation]
jpayne@7 92 else:
jpayne@7 93 cp_isolation = []
jpayne@7 94
jpayne@7 95 if cp_exclusion is not None:
jpayne@7 96 logger.log(
jpayne@7 97 TRACE,
jpayne@7 98 "cp_exclusion is set. use this flag for debugging purpose. "
jpayne@7 99 "limited list of encoding excluded : %s.",
jpayne@7 100 ", ".join(cp_exclusion),
jpayne@7 101 )
jpayne@7 102 cp_exclusion = [iana_name(cp, False) for cp in cp_exclusion]
jpayne@7 103 else:
jpayne@7 104 cp_exclusion = []
jpayne@7 105
jpayne@7 106 if length <= (chunk_size * steps):
jpayne@7 107 logger.log(
jpayne@7 108 TRACE,
jpayne@7 109 "override steps (%i) and chunk_size (%i) as content does not fit (%i byte(s) given) parameters.",
jpayne@7 110 steps,
jpayne@7 111 chunk_size,
jpayne@7 112 length,
jpayne@7 113 )
jpayne@7 114 steps = 1
jpayne@7 115 chunk_size = length
jpayne@7 116
jpayne@7 117 if steps > 1 and length / steps < chunk_size:
jpayne@7 118 chunk_size = int(length / steps)
jpayne@7 119
jpayne@7 120 is_too_small_sequence: bool = len(sequences) < TOO_SMALL_SEQUENCE
jpayne@7 121 is_too_large_sequence: bool = len(sequences) >= TOO_BIG_SEQUENCE
jpayne@7 122
jpayne@7 123 if is_too_small_sequence:
jpayne@7 124 logger.log(
jpayne@7 125 TRACE,
jpayne@7 126 "Trying to detect encoding from a tiny portion of ({}) byte(s).".format(
jpayne@7 127 length
jpayne@7 128 ),
jpayne@7 129 )
jpayne@7 130 elif is_too_large_sequence:
jpayne@7 131 logger.log(
jpayne@7 132 TRACE,
jpayne@7 133 "Using lazy str decoding because the payload is quite large, ({}) byte(s).".format(
jpayne@7 134 length
jpayne@7 135 ),
jpayne@7 136 )
jpayne@7 137
jpayne@7 138 prioritized_encodings: List[str] = []
jpayne@7 139
jpayne@7 140 specified_encoding: Optional[str] = (
jpayne@7 141 any_specified_encoding(sequences) if preemptive_behaviour else None
jpayne@7 142 )
jpayne@7 143
jpayne@7 144 if specified_encoding is not None:
jpayne@7 145 prioritized_encodings.append(specified_encoding)
jpayne@7 146 logger.log(
jpayne@7 147 TRACE,
jpayne@7 148 "Detected declarative mark in sequence. Priority +1 given for %s.",
jpayne@7 149 specified_encoding,
jpayne@7 150 )
jpayne@7 151
jpayne@7 152 tested: Set[str] = set()
jpayne@7 153 tested_but_hard_failure: List[str] = []
jpayne@7 154 tested_but_soft_failure: List[str] = []
jpayne@7 155
jpayne@7 156 fallback_ascii: Optional[CharsetMatch] = None
jpayne@7 157 fallback_u8: Optional[CharsetMatch] = None
jpayne@7 158 fallback_specified: Optional[CharsetMatch] = None
jpayne@7 159
jpayne@7 160 results: CharsetMatches = CharsetMatches()
jpayne@7 161
jpayne@7 162 sig_encoding, sig_payload = identify_sig_or_bom(sequences)
jpayne@7 163
jpayne@7 164 if sig_encoding is not None:
jpayne@7 165 prioritized_encodings.append(sig_encoding)
jpayne@7 166 logger.log(
jpayne@7 167 TRACE,
jpayne@7 168 "Detected a SIG or BOM mark on first %i byte(s). Priority +1 given for %s.",
jpayne@7 169 len(sig_payload),
jpayne@7 170 sig_encoding,
jpayne@7 171 )
jpayne@7 172
jpayne@7 173 prioritized_encodings.append("ascii")
jpayne@7 174
jpayne@7 175 if "utf_8" not in prioritized_encodings:
jpayne@7 176 prioritized_encodings.append("utf_8")
jpayne@7 177
jpayne@7 178 for encoding_iana in prioritized_encodings + IANA_SUPPORTED:
jpayne@7 179 if cp_isolation and encoding_iana not in cp_isolation:
jpayne@7 180 continue
jpayne@7 181
jpayne@7 182 if cp_exclusion and encoding_iana in cp_exclusion:
jpayne@7 183 continue
jpayne@7 184
jpayne@7 185 if encoding_iana in tested:
jpayne@7 186 continue
jpayne@7 187
jpayne@7 188 tested.add(encoding_iana)
jpayne@7 189
jpayne@7 190 decoded_payload: Optional[str] = None
jpayne@7 191 bom_or_sig_available: bool = sig_encoding == encoding_iana
jpayne@7 192 strip_sig_or_bom: bool = bom_or_sig_available and should_strip_sig_or_bom(
jpayne@7 193 encoding_iana
jpayne@7 194 )
jpayne@7 195
jpayne@7 196 if encoding_iana in {"utf_16", "utf_32"} and not bom_or_sig_available:
jpayne@7 197 logger.log(
jpayne@7 198 TRACE,
jpayne@7 199 "Encoding %s won't be tested as-is because it require a BOM. Will try some sub-encoder LE/BE.",
jpayne@7 200 encoding_iana,
jpayne@7 201 )
jpayne@7 202 continue
jpayne@7 203 if encoding_iana in {"utf_7"} and not bom_or_sig_available:
jpayne@7 204 logger.log(
jpayne@7 205 TRACE,
jpayne@7 206 "Encoding %s won't be tested as-is because detection is unreliable without BOM/SIG.",
jpayne@7 207 encoding_iana,
jpayne@7 208 )
jpayne@7 209 continue
jpayne@7 210
jpayne@7 211 try:
jpayne@7 212 is_multi_byte_decoder: bool = is_multi_byte_encoding(encoding_iana)
jpayne@7 213 except (ModuleNotFoundError, ImportError):
jpayne@7 214 logger.log(
jpayne@7 215 TRACE,
jpayne@7 216 "Encoding %s does not provide an IncrementalDecoder",
jpayne@7 217 encoding_iana,
jpayne@7 218 )
jpayne@7 219 continue
jpayne@7 220
jpayne@7 221 try:
jpayne@7 222 if is_too_large_sequence and is_multi_byte_decoder is False:
jpayne@7 223 str(
jpayne@7 224 sequences[: int(50e4)]
jpayne@7 225 if strip_sig_or_bom is False
jpayne@7 226 else sequences[len(sig_payload) : int(50e4)],
jpayne@7 227 encoding=encoding_iana,
jpayne@7 228 )
jpayne@7 229 else:
jpayne@7 230 decoded_payload = str(
jpayne@7 231 sequences
jpayne@7 232 if strip_sig_or_bom is False
jpayne@7 233 else sequences[len(sig_payload) :],
jpayne@7 234 encoding=encoding_iana,
jpayne@7 235 )
jpayne@7 236 except (UnicodeDecodeError, LookupError) as e:
jpayne@7 237 if not isinstance(e, LookupError):
jpayne@7 238 logger.log(
jpayne@7 239 TRACE,
jpayne@7 240 "Code page %s does not fit given bytes sequence at ALL. %s",
jpayne@7 241 encoding_iana,
jpayne@7 242 str(e),
jpayne@7 243 )
jpayne@7 244 tested_but_hard_failure.append(encoding_iana)
jpayne@7 245 continue
jpayne@7 246
jpayne@7 247 similar_soft_failure_test: bool = False
jpayne@7 248
jpayne@7 249 for encoding_soft_failed in tested_but_soft_failure:
jpayne@7 250 if is_cp_similar(encoding_iana, encoding_soft_failed):
jpayne@7 251 similar_soft_failure_test = True
jpayne@7 252 break
jpayne@7 253
jpayne@7 254 if similar_soft_failure_test:
jpayne@7 255 logger.log(
jpayne@7 256 TRACE,
jpayne@7 257 "%s is deemed too similar to code page %s and was consider unsuited already. Continuing!",
jpayne@7 258 encoding_iana,
jpayne@7 259 encoding_soft_failed,
jpayne@7 260 )
jpayne@7 261 continue
jpayne@7 262
jpayne@7 263 r_ = range(
jpayne@7 264 0 if not bom_or_sig_available else len(sig_payload),
jpayne@7 265 length,
jpayne@7 266 int(length / steps),
jpayne@7 267 )
jpayne@7 268
jpayne@7 269 multi_byte_bonus: bool = (
jpayne@7 270 is_multi_byte_decoder
jpayne@7 271 and decoded_payload is not None
jpayne@7 272 and len(decoded_payload) < length
jpayne@7 273 )
jpayne@7 274
jpayne@7 275 if multi_byte_bonus:
jpayne@7 276 logger.log(
jpayne@7 277 TRACE,
jpayne@7 278 "Code page %s is a multi byte encoding table and it appear that at least one character "
jpayne@7 279 "was encoded using n-bytes.",
jpayne@7 280 encoding_iana,
jpayne@7 281 )
jpayne@7 282
jpayne@7 283 max_chunk_gave_up: int = int(len(r_) / 4)
jpayne@7 284
jpayne@7 285 max_chunk_gave_up = max(max_chunk_gave_up, 2)
jpayne@7 286 early_stop_count: int = 0
jpayne@7 287 lazy_str_hard_failure = False
jpayne@7 288
jpayne@7 289 md_chunks: List[str] = []
jpayne@7 290 md_ratios = []
jpayne@7 291
jpayne@7 292 try:
jpayne@7 293 for chunk in cut_sequence_chunks(
jpayne@7 294 sequences,
jpayne@7 295 encoding_iana,
jpayne@7 296 r_,
jpayne@7 297 chunk_size,
jpayne@7 298 bom_or_sig_available,
jpayne@7 299 strip_sig_or_bom,
jpayne@7 300 sig_payload,
jpayne@7 301 is_multi_byte_decoder,
jpayne@7 302 decoded_payload,
jpayne@7 303 ):
jpayne@7 304 md_chunks.append(chunk)
jpayne@7 305
jpayne@7 306 md_ratios.append(
jpayne@7 307 mess_ratio(
jpayne@7 308 chunk,
jpayne@7 309 threshold,
jpayne@7 310 explain is True and 1 <= len(cp_isolation) <= 2,
jpayne@7 311 )
jpayne@7 312 )
jpayne@7 313
jpayne@7 314 if md_ratios[-1] >= threshold:
jpayne@7 315 early_stop_count += 1
jpayne@7 316
jpayne@7 317 if (early_stop_count >= max_chunk_gave_up) or (
jpayne@7 318 bom_or_sig_available and strip_sig_or_bom is False
jpayne@7 319 ):
jpayne@7 320 break
jpayne@7 321 except (
jpayne@7 322 UnicodeDecodeError
jpayne@7 323 ) as e: # Lazy str loading may have missed something there
jpayne@7 324 logger.log(
jpayne@7 325 TRACE,
jpayne@7 326 "LazyStr Loading: After MD chunk decode, code page %s does not fit given bytes sequence at ALL. %s",
jpayne@7 327 encoding_iana,
jpayne@7 328 str(e),
jpayne@7 329 )
jpayne@7 330 early_stop_count = max_chunk_gave_up
jpayne@7 331 lazy_str_hard_failure = True
jpayne@7 332
jpayne@7 333 # We might want to check the sequence again with the whole content
jpayne@7 334 # Only if initial MD tests passes
jpayne@7 335 if (
jpayne@7 336 not lazy_str_hard_failure
jpayne@7 337 and is_too_large_sequence
jpayne@7 338 and not is_multi_byte_decoder
jpayne@7 339 ):
jpayne@7 340 try:
jpayne@7 341 sequences[int(50e3) :].decode(encoding_iana, errors="strict")
jpayne@7 342 except UnicodeDecodeError as e:
jpayne@7 343 logger.log(
jpayne@7 344 TRACE,
jpayne@7 345 "LazyStr Loading: After final lookup, code page %s does not fit given bytes sequence at ALL. %s",
jpayne@7 346 encoding_iana,
jpayne@7 347 str(e),
jpayne@7 348 )
jpayne@7 349 tested_but_hard_failure.append(encoding_iana)
jpayne@7 350 continue
jpayne@7 351
jpayne@7 352 mean_mess_ratio: float = sum(md_ratios) / len(md_ratios) if md_ratios else 0.0
jpayne@7 353 if mean_mess_ratio >= threshold or early_stop_count >= max_chunk_gave_up:
jpayne@7 354 tested_but_soft_failure.append(encoding_iana)
jpayne@7 355 logger.log(
jpayne@7 356 TRACE,
jpayne@7 357 "%s was excluded because of initial chaos probing. Gave up %i time(s). "
jpayne@7 358 "Computed mean chaos is %f %%.",
jpayne@7 359 encoding_iana,
jpayne@7 360 early_stop_count,
jpayne@7 361 round(mean_mess_ratio * 100, ndigits=3),
jpayne@7 362 )
jpayne@7 363 # Preparing those fallbacks in case we got nothing.
jpayne@7 364 if (
jpayne@7 365 enable_fallback
jpayne@7 366 and encoding_iana in ["ascii", "utf_8", specified_encoding]
jpayne@7 367 and not lazy_str_hard_failure
jpayne@7 368 ):
jpayne@7 369 fallback_entry = CharsetMatch(
jpayne@7 370 sequences, encoding_iana, threshold, False, [], decoded_payload
jpayne@7 371 )
jpayne@7 372 if encoding_iana == specified_encoding:
jpayne@7 373 fallback_specified = fallback_entry
jpayne@7 374 elif encoding_iana == "ascii":
jpayne@7 375 fallback_ascii = fallback_entry
jpayne@7 376 else:
jpayne@7 377 fallback_u8 = fallback_entry
jpayne@7 378 continue
jpayne@7 379
jpayne@7 380 logger.log(
jpayne@7 381 TRACE,
jpayne@7 382 "%s passed initial chaos probing. Mean measured chaos is %f %%",
jpayne@7 383 encoding_iana,
jpayne@7 384 round(mean_mess_ratio * 100, ndigits=3),
jpayne@7 385 )
jpayne@7 386
jpayne@7 387 if not is_multi_byte_decoder:
jpayne@7 388 target_languages: List[str] = encoding_languages(encoding_iana)
jpayne@7 389 else:
jpayne@7 390 target_languages = mb_encoding_languages(encoding_iana)
jpayne@7 391
jpayne@7 392 if target_languages:
jpayne@7 393 logger.log(
jpayne@7 394 TRACE,
jpayne@7 395 "{} should target any language(s) of {}".format(
jpayne@7 396 encoding_iana, str(target_languages)
jpayne@7 397 ),
jpayne@7 398 )
jpayne@7 399
jpayne@7 400 cd_ratios = []
jpayne@7 401
jpayne@7 402 # We shall skip the CD when its about ASCII
jpayne@7 403 # Most of the time its not relevant to run "language-detection" on it.
jpayne@7 404 if encoding_iana != "ascii":
jpayne@7 405 for chunk in md_chunks:
jpayne@7 406 chunk_languages = coherence_ratio(
jpayne@7 407 chunk,
jpayne@7 408 language_threshold,
jpayne@7 409 ",".join(target_languages) if target_languages else None,
jpayne@7 410 )
jpayne@7 411
jpayne@7 412 cd_ratios.append(chunk_languages)
jpayne@7 413
jpayne@7 414 cd_ratios_merged = merge_coherence_ratios(cd_ratios)
jpayne@7 415
jpayne@7 416 if cd_ratios_merged:
jpayne@7 417 logger.log(
jpayne@7 418 TRACE,
jpayne@7 419 "We detected language {} using {}".format(
jpayne@7 420 cd_ratios_merged, encoding_iana
jpayne@7 421 ),
jpayne@7 422 )
jpayne@7 423
jpayne@7 424 results.append(
jpayne@7 425 CharsetMatch(
jpayne@7 426 sequences,
jpayne@7 427 encoding_iana,
jpayne@7 428 mean_mess_ratio,
jpayne@7 429 bom_or_sig_available,
jpayne@7 430 cd_ratios_merged,
jpayne@7 431 decoded_payload,
jpayne@7 432 )
jpayne@7 433 )
jpayne@7 434
jpayne@7 435 if (
jpayne@7 436 encoding_iana in [specified_encoding, "ascii", "utf_8"]
jpayne@7 437 and mean_mess_ratio < 0.1
jpayne@7 438 ):
jpayne@7 439 logger.debug(
jpayne@7 440 "Encoding detection: %s is most likely the one.", encoding_iana
jpayne@7 441 )
jpayne@7 442 if explain:
jpayne@7 443 logger.removeHandler(explain_handler)
jpayne@7 444 logger.setLevel(previous_logger_level)
jpayne@7 445 return CharsetMatches([results[encoding_iana]])
jpayne@7 446
jpayne@7 447 if encoding_iana == sig_encoding:
jpayne@7 448 logger.debug(
jpayne@7 449 "Encoding detection: %s is most likely the one as we detected a BOM or SIG within "
jpayne@7 450 "the beginning of the sequence.",
jpayne@7 451 encoding_iana,
jpayne@7 452 )
jpayne@7 453 if explain:
jpayne@7 454 logger.removeHandler(explain_handler)
jpayne@7 455 logger.setLevel(previous_logger_level)
jpayne@7 456 return CharsetMatches([results[encoding_iana]])
jpayne@7 457
jpayne@7 458 if len(results) == 0:
jpayne@7 459 if fallback_u8 or fallback_ascii or fallback_specified:
jpayne@7 460 logger.log(
jpayne@7 461 TRACE,
jpayne@7 462 "Nothing got out of the detection process. Using ASCII/UTF-8/Specified fallback.",
jpayne@7 463 )
jpayne@7 464
jpayne@7 465 if fallback_specified:
jpayne@7 466 logger.debug(
jpayne@7 467 "Encoding detection: %s will be used as a fallback match",
jpayne@7 468 fallback_specified.encoding,
jpayne@7 469 )
jpayne@7 470 results.append(fallback_specified)
jpayne@7 471 elif (
jpayne@7 472 (fallback_u8 and fallback_ascii is None)
jpayne@7 473 or (
jpayne@7 474 fallback_u8
jpayne@7 475 and fallback_ascii
jpayne@7 476 and fallback_u8.fingerprint != fallback_ascii.fingerprint
jpayne@7 477 )
jpayne@7 478 or (fallback_u8 is not None)
jpayne@7 479 ):
jpayne@7 480 logger.debug("Encoding detection: utf_8 will be used as a fallback match")
jpayne@7 481 results.append(fallback_u8)
jpayne@7 482 elif fallback_ascii:
jpayne@7 483 logger.debug("Encoding detection: ascii will be used as a fallback match")
jpayne@7 484 results.append(fallback_ascii)
jpayne@7 485
jpayne@7 486 if results:
jpayne@7 487 logger.debug(
jpayne@7 488 "Encoding detection: Found %s as plausible (best-candidate) for content. With %i alternatives.",
jpayne@7 489 results.best().encoding, # type: ignore
jpayne@7 490 len(results) - 1,
jpayne@7 491 )
jpayne@7 492 else:
jpayne@7 493 logger.debug("Encoding detection: Unable to determine any suitable charset.")
jpayne@7 494
jpayne@7 495 if explain:
jpayne@7 496 logger.removeHandler(explain_handler)
jpayne@7 497 logger.setLevel(previous_logger_level)
jpayne@7 498
jpayne@7 499 return results
jpayne@7 500
jpayne@7 501
jpayne@7 502 def from_fp(
jpayne@7 503 fp: BinaryIO,
jpayne@7 504 steps: int = 5,
jpayne@7 505 chunk_size: int = 512,
jpayne@7 506 threshold: float = 0.20,
jpayne@7 507 cp_isolation: Optional[List[str]] = None,
jpayne@7 508 cp_exclusion: Optional[List[str]] = None,
jpayne@7 509 preemptive_behaviour: bool = True,
jpayne@7 510 explain: bool = False,
jpayne@7 511 language_threshold: float = 0.1,
jpayne@7 512 enable_fallback: bool = True,
jpayne@7 513 ) -> CharsetMatches:
jpayne@7 514 """
jpayne@7 515 Same thing than the function from_bytes but using a file pointer that is already ready.
jpayne@7 516 Will not close the file pointer.
jpayne@7 517 """
jpayne@7 518 return from_bytes(
jpayne@7 519 fp.read(),
jpayne@7 520 steps,
jpayne@7 521 chunk_size,
jpayne@7 522 threshold,
jpayne@7 523 cp_isolation,
jpayne@7 524 cp_exclusion,
jpayne@7 525 preemptive_behaviour,
jpayne@7 526 explain,
jpayne@7 527 language_threshold,
jpayne@7 528 enable_fallback,
jpayne@7 529 )
jpayne@7 530
jpayne@7 531
jpayne@7 532 def from_path(
jpayne@7 533 path: Union[str, bytes, PathLike], # type: ignore[type-arg]
jpayne@7 534 steps: int = 5,
jpayne@7 535 chunk_size: int = 512,
jpayne@7 536 threshold: float = 0.20,
jpayne@7 537 cp_isolation: Optional[List[str]] = None,
jpayne@7 538 cp_exclusion: Optional[List[str]] = None,
jpayne@7 539 preemptive_behaviour: bool = True,
jpayne@7 540 explain: bool = False,
jpayne@7 541 language_threshold: float = 0.1,
jpayne@7 542 enable_fallback: bool = True,
jpayne@7 543 ) -> CharsetMatches:
jpayne@7 544 """
jpayne@7 545 Same thing than the function from_bytes but with one extra step. Opening and reading given file path in binary mode.
jpayne@7 546 Can raise IOError.
jpayne@7 547 """
jpayne@7 548 with open(path, "rb") as fp:
jpayne@7 549 return from_fp(
jpayne@7 550 fp,
jpayne@7 551 steps,
jpayne@7 552 chunk_size,
jpayne@7 553 threshold,
jpayne@7 554 cp_isolation,
jpayne@7 555 cp_exclusion,
jpayne@7 556 preemptive_behaviour,
jpayne@7 557 explain,
jpayne@7 558 language_threshold,
jpayne@7 559 enable_fallback,
jpayne@7 560 )
jpayne@7 561
jpayne@7 562
jpayne@7 563 def is_binary(
jpayne@7 564 fp_or_path_or_payload: Union[PathLike, str, BinaryIO, bytes], # type: ignore[type-arg]
jpayne@7 565 steps: int = 5,
jpayne@7 566 chunk_size: int = 512,
jpayne@7 567 threshold: float = 0.20,
jpayne@7 568 cp_isolation: Optional[List[str]] = None,
jpayne@7 569 cp_exclusion: Optional[List[str]] = None,
jpayne@7 570 preemptive_behaviour: bool = True,
jpayne@7 571 explain: bool = False,
jpayne@7 572 language_threshold: float = 0.1,
jpayne@7 573 enable_fallback: bool = False,
jpayne@7 574 ) -> bool:
jpayne@7 575 """
jpayne@7 576 Detect if the given input (file, bytes, or path) points to a binary file. aka. not a string.
jpayne@7 577 Based on the same main heuristic algorithms and default kwargs at the sole exception that fallbacks match
jpayne@7 578 are disabled to be stricter around ASCII-compatible but unlikely to be a string.
jpayne@7 579 """
jpayne@7 580 if isinstance(fp_or_path_or_payload, (str, PathLike)):
jpayne@7 581 guesses = from_path(
jpayne@7 582 fp_or_path_or_payload,
jpayne@7 583 steps=steps,
jpayne@7 584 chunk_size=chunk_size,
jpayne@7 585 threshold=threshold,
jpayne@7 586 cp_isolation=cp_isolation,
jpayne@7 587 cp_exclusion=cp_exclusion,
jpayne@7 588 preemptive_behaviour=preemptive_behaviour,
jpayne@7 589 explain=explain,
jpayne@7 590 language_threshold=language_threshold,
jpayne@7 591 enable_fallback=enable_fallback,
jpayne@7 592 )
jpayne@7 593 elif isinstance(
jpayne@7 594 fp_or_path_or_payload,
jpayne@7 595 (
jpayne@7 596 bytes,
jpayne@7 597 bytearray,
jpayne@7 598 ),
jpayne@7 599 ):
jpayne@7 600 guesses = from_bytes(
jpayne@7 601 fp_or_path_or_payload,
jpayne@7 602 steps=steps,
jpayne@7 603 chunk_size=chunk_size,
jpayne@7 604 threshold=threshold,
jpayne@7 605 cp_isolation=cp_isolation,
jpayne@7 606 cp_exclusion=cp_exclusion,
jpayne@7 607 preemptive_behaviour=preemptive_behaviour,
jpayne@7 608 explain=explain,
jpayne@7 609 language_threshold=language_threshold,
jpayne@7 610 enable_fallback=enable_fallback,
jpayne@7 611 )
jpayne@7 612 else:
jpayne@7 613 guesses = from_fp(
jpayne@7 614 fp_or_path_or_payload,
jpayne@7 615 steps=steps,
jpayne@7 616 chunk_size=chunk_size,
jpayne@7 617 threshold=threshold,
jpayne@7 618 cp_isolation=cp_isolation,
jpayne@7 619 cp_exclusion=cp_exclusion,
jpayne@7 620 preemptive_behaviour=preemptive_behaviour,
jpayne@7 621 explain=explain,
jpayne@7 622 language_threshold=language_threshold,
jpayne@7 623 enable_fallback=enable_fallback,
jpayne@7 624 )
jpayne@7 625
jpayne@7 626 return not guesses