jpayne@7
|
1 import importlib
|
jpayne@7
|
2 import logging
|
jpayne@7
|
3 import unicodedata
|
jpayne@7
|
4 from codecs import IncrementalDecoder
|
jpayne@7
|
5 from encodings.aliases import aliases
|
jpayne@7
|
6 from functools import lru_cache
|
jpayne@7
|
7 from re import findall
|
jpayne@7
|
8 from typing import Generator, List, Optional, Set, Tuple, Union
|
jpayne@7
|
9
|
jpayne@7
|
10 from _multibytecodec import MultibyteIncrementalDecoder
|
jpayne@7
|
11
|
jpayne@7
|
12 from .constant import (
|
jpayne@7
|
13 ENCODING_MARKS,
|
jpayne@7
|
14 IANA_SUPPORTED_SIMILAR,
|
jpayne@7
|
15 RE_POSSIBLE_ENCODING_INDICATION,
|
jpayne@7
|
16 UNICODE_RANGES_COMBINED,
|
jpayne@7
|
17 UNICODE_SECONDARY_RANGE_KEYWORD,
|
jpayne@7
|
18 UTF8_MAXIMAL_ALLOCATION,
|
jpayne@7
|
19 )
|
jpayne@7
|
20
|
jpayne@7
|
21
|
jpayne@7
|
22 @lru_cache(maxsize=UTF8_MAXIMAL_ALLOCATION)
|
jpayne@7
|
23 def is_accentuated(character: str) -> bool:
|
jpayne@7
|
24 try:
|
jpayne@7
|
25 description: str = unicodedata.name(character)
|
jpayne@7
|
26 except ValueError:
|
jpayne@7
|
27 return False
|
jpayne@7
|
28 return (
|
jpayne@7
|
29 "WITH GRAVE" in description
|
jpayne@7
|
30 or "WITH ACUTE" in description
|
jpayne@7
|
31 or "WITH CEDILLA" in description
|
jpayne@7
|
32 or "WITH DIAERESIS" in description
|
jpayne@7
|
33 or "WITH CIRCUMFLEX" in description
|
jpayne@7
|
34 or "WITH TILDE" in description
|
jpayne@7
|
35 or "WITH MACRON" in description
|
jpayne@7
|
36 or "WITH RING ABOVE" in description
|
jpayne@7
|
37 )
|
jpayne@7
|
38
|
jpayne@7
|
39
|
jpayne@7
|
40 @lru_cache(maxsize=UTF8_MAXIMAL_ALLOCATION)
|
jpayne@7
|
41 def remove_accent(character: str) -> str:
|
jpayne@7
|
42 decomposed: str = unicodedata.decomposition(character)
|
jpayne@7
|
43 if not decomposed:
|
jpayne@7
|
44 return character
|
jpayne@7
|
45
|
jpayne@7
|
46 codes: List[str] = decomposed.split(" ")
|
jpayne@7
|
47
|
jpayne@7
|
48 return chr(int(codes[0], 16))
|
jpayne@7
|
49
|
jpayne@7
|
50
|
jpayne@7
|
51 @lru_cache(maxsize=UTF8_MAXIMAL_ALLOCATION)
|
jpayne@7
|
52 def unicode_range(character: str) -> Optional[str]:
|
jpayne@7
|
53 """
|
jpayne@7
|
54 Retrieve the Unicode range official name from a single character.
|
jpayne@7
|
55 """
|
jpayne@7
|
56 character_ord: int = ord(character)
|
jpayne@7
|
57
|
jpayne@7
|
58 for range_name, ord_range in UNICODE_RANGES_COMBINED.items():
|
jpayne@7
|
59 if character_ord in ord_range:
|
jpayne@7
|
60 return range_name
|
jpayne@7
|
61
|
jpayne@7
|
62 return None
|
jpayne@7
|
63
|
jpayne@7
|
64
|
jpayne@7
|
65 @lru_cache(maxsize=UTF8_MAXIMAL_ALLOCATION)
|
jpayne@7
|
66 def is_latin(character: str) -> bool:
|
jpayne@7
|
67 try:
|
jpayne@7
|
68 description: str = unicodedata.name(character)
|
jpayne@7
|
69 except ValueError:
|
jpayne@7
|
70 return False
|
jpayne@7
|
71 return "LATIN" in description
|
jpayne@7
|
72
|
jpayne@7
|
73
|
jpayne@7
|
74 @lru_cache(maxsize=UTF8_MAXIMAL_ALLOCATION)
|
jpayne@7
|
75 def is_punctuation(character: str) -> bool:
|
jpayne@7
|
76 character_category: str = unicodedata.category(character)
|
jpayne@7
|
77
|
jpayne@7
|
78 if "P" in character_category:
|
jpayne@7
|
79 return True
|
jpayne@7
|
80
|
jpayne@7
|
81 character_range: Optional[str] = unicode_range(character)
|
jpayne@7
|
82
|
jpayne@7
|
83 if character_range is None:
|
jpayne@7
|
84 return False
|
jpayne@7
|
85
|
jpayne@7
|
86 return "Punctuation" in character_range
|
jpayne@7
|
87
|
jpayne@7
|
88
|
jpayne@7
|
89 @lru_cache(maxsize=UTF8_MAXIMAL_ALLOCATION)
|
jpayne@7
|
90 def is_symbol(character: str) -> bool:
|
jpayne@7
|
91 character_category: str = unicodedata.category(character)
|
jpayne@7
|
92
|
jpayne@7
|
93 if "S" in character_category or "N" in character_category:
|
jpayne@7
|
94 return True
|
jpayne@7
|
95
|
jpayne@7
|
96 character_range: Optional[str] = unicode_range(character)
|
jpayne@7
|
97
|
jpayne@7
|
98 if character_range is None:
|
jpayne@7
|
99 return False
|
jpayne@7
|
100
|
jpayne@7
|
101 return "Forms" in character_range and character_category != "Lo"
|
jpayne@7
|
102
|
jpayne@7
|
103
|
jpayne@7
|
104 @lru_cache(maxsize=UTF8_MAXIMAL_ALLOCATION)
|
jpayne@7
|
105 def is_emoticon(character: str) -> bool:
|
jpayne@7
|
106 character_range: Optional[str] = unicode_range(character)
|
jpayne@7
|
107
|
jpayne@7
|
108 if character_range is None:
|
jpayne@7
|
109 return False
|
jpayne@7
|
110
|
jpayne@7
|
111 return "Emoticons" in character_range or "Pictographs" in character_range
|
jpayne@7
|
112
|
jpayne@7
|
113
|
jpayne@7
|
114 @lru_cache(maxsize=UTF8_MAXIMAL_ALLOCATION)
|
jpayne@7
|
115 def is_separator(character: str) -> bool:
|
jpayne@7
|
116 if character.isspace() or character in {"|", "+", "<", ">"}:
|
jpayne@7
|
117 return True
|
jpayne@7
|
118
|
jpayne@7
|
119 character_category: str = unicodedata.category(character)
|
jpayne@7
|
120
|
jpayne@7
|
121 return "Z" in character_category or character_category in {"Po", "Pd", "Pc"}
|
jpayne@7
|
122
|
jpayne@7
|
123
|
jpayne@7
|
124 @lru_cache(maxsize=UTF8_MAXIMAL_ALLOCATION)
|
jpayne@7
|
125 def is_case_variable(character: str) -> bool:
|
jpayne@7
|
126 return character.islower() != character.isupper()
|
jpayne@7
|
127
|
jpayne@7
|
128
|
jpayne@7
|
129 @lru_cache(maxsize=UTF8_MAXIMAL_ALLOCATION)
|
jpayne@7
|
130 def is_cjk(character: str) -> bool:
|
jpayne@7
|
131 try:
|
jpayne@7
|
132 character_name = unicodedata.name(character)
|
jpayne@7
|
133 except ValueError:
|
jpayne@7
|
134 return False
|
jpayne@7
|
135
|
jpayne@7
|
136 return "CJK" in character_name
|
jpayne@7
|
137
|
jpayne@7
|
138
|
jpayne@7
|
139 @lru_cache(maxsize=UTF8_MAXIMAL_ALLOCATION)
|
jpayne@7
|
140 def is_hiragana(character: str) -> bool:
|
jpayne@7
|
141 try:
|
jpayne@7
|
142 character_name = unicodedata.name(character)
|
jpayne@7
|
143 except ValueError:
|
jpayne@7
|
144 return False
|
jpayne@7
|
145
|
jpayne@7
|
146 return "HIRAGANA" in character_name
|
jpayne@7
|
147
|
jpayne@7
|
148
|
jpayne@7
|
149 @lru_cache(maxsize=UTF8_MAXIMAL_ALLOCATION)
|
jpayne@7
|
150 def is_katakana(character: str) -> bool:
|
jpayne@7
|
151 try:
|
jpayne@7
|
152 character_name = unicodedata.name(character)
|
jpayne@7
|
153 except ValueError:
|
jpayne@7
|
154 return False
|
jpayne@7
|
155
|
jpayne@7
|
156 return "KATAKANA" in character_name
|
jpayne@7
|
157
|
jpayne@7
|
158
|
jpayne@7
|
159 @lru_cache(maxsize=UTF8_MAXIMAL_ALLOCATION)
|
jpayne@7
|
160 def is_hangul(character: str) -> bool:
|
jpayne@7
|
161 try:
|
jpayne@7
|
162 character_name = unicodedata.name(character)
|
jpayne@7
|
163 except ValueError:
|
jpayne@7
|
164 return False
|
jpayne@7
|
165
|
jpayne@7
|
166 return "HANGUL" in character_name
|
jpayne@7
|
167
|
jpayne@7
|
168
|
jpayne@7
|
169 @lru_cache(maxsize=UTF8_MAXIMAL_ALLOCATION)
|
jpayne@7
|
170 def is_thai(character: str) -> bool:
|
jpayne@7
|
171 try:
|
jpayne@7
|
172 character_name = unicodedata.name(character)
|
jpayne@7
|
173 except ValueError:
|
jpayne@7
|
174 return False
|
jpayne@7
|
175
|
jpayne@7
|
176 return "THAI" in character_name
|
jpayne@7
|
177
|
jpayne@7
|
178
|
jpayne@7
|
179 @lru_cache(maxsize=UTF8_MAXIMAL_ALLOCATION)
|
jpayne@7
|
180 def is_arabic(character: str) -> bool:
|
jpayne@7
|
181 try:
|
jpayne@7
|
182 character_name = unicodedata.name(character)
|
jpayne@7
|
183 except ValueError:
|
jpayne@7
|
184 return False
|
jpayne@7
|
185
|
jpayne@7
|
186 return "ARABIC" in character_name
|
jpayne@7
|
187
|
jpayne@7
|
188
|
jpayne@7
|
189 @lru_cache(maxsize=UTF8_MAXIMAL_ALLOCATION)
|
jpayne@7
|
190 def is_arabic_isolated_form(character: str) -> bool:
|
jpayne@7
|
191 try:
|
jpayne@7
|
192 character_name = unicodedata.name(character)
|
jpayne@7
|
193 except ValueError:
|
jpayne@7
|
194 return False
|
jpayne@7
|
195
|
jpayne@7
|
196 return "ARABIC" in character_name and "ISOLATED FORM" in character_name
|
jpayne@7
|
197
|
jpayne@7
|
198
|
jpayne@7
|
199 @lru_cache(maxsize=len(UNICODE_RANGES_COMBINED))
|
jpayne@7
|
200 def is_unicode_range_secondary(range_name: str) -> bool:
|
jpayne@7
|
201 return any(keyword in range_name for keyword in UNICODE_SECONDARY_RANGE_KEYWORD)
|
jpayne@7
|
202
|
jpayne@7
|
203
|
jpayne@7
|
204 @lru_cache(maxsize=UTF8_MAXIMAL_ALLOCATION)
|
jpayne@7
|
205 def is_unprintable(character: str) -> bool:
|
jpayne@7
|
206 return (
|
jpayne@7
|
207 character.isspace() is False # includes \n \t \r \v
|
jpayne@7
|
208 and character.isprintable() is False
|
jpayne@7
|
209 and character != "\x1A" # Why? Its the ASCII substitute character.
|
jpayne@7
|
210 and character != "\ufeff" # bug discovered in Python,
|
jpayne@7
|
211 # Zero Width No-Break Space located in Arabic Presentation Forms-B, Unicode 1.1 not acknowledged as space.
|
jpayne@7
|
212 )
|
jpayne@7
|
213
|
jpayne@7
|
214
|
jpayne@7
|
215 def any_specified_encoding(sequence: bytes, search_zone: int = 8192) -> Optional[str]:
|
jpayne@7
|
216 """
|
jpayne@7
|
217 Extract using ASCII-only decoder any specified encoding in the first n-bytes.
|
jpayne@7
|
218 """
|
jpayne@7
|
219 if not isinstance(sequence, bytes):
|
jpayne@7
|
220 raise TypeError
|
jpayne@7
|
221
|
jpayne@7
|
222 seq_len: int = len(sequence)
|
jpayne@7
|
223
|
jpayne@7
|
224 results: List[str] = findall(
|
jpayne@7
|
225 RE_POSSIBLE_ENCODING_INDICATION,
|
jpayne@7
|
226 sequence[: min(seq_len, search_zone)].decode("ascii", errors="ignore"),
|
jpayne@7
|
227 )
|
jpayne@7
|
228
|
jpayne@7
|
229 if len(results) == 0:
|
jpayne@7
|
230 return None
|
jpayne@7
|
231
|
jpayne@7
|
232 for specified_encoding in results:
|
jpayne@7
|
233 specified_encoding = specified_encoding.lower().replace("-", "_")
|
jpayne@7
|
234
|
jpayne@7
|
235 encoding_alias: str
|
jpayne@7
|
236 encoding_iana: str
|
jpayne@7
|
237
|
jpayne@7
|
238 for encoding_alias, encoding_iana in aliases.items():
|
jpayne@7
|
239 if encoding_alias == specified_encoding:
|
jpayne@7
|
240 return encoding_iana
|
jpayne@7
|
241 if encoding_iana == specified_encoding:
|
jpayne@7
|
242 return encoding_iana
|
jpayne@7
|
243
|
jpayne@7
|
244 return None
|
jpayne@7
|
245
|
jpayne@7
|
246
|
jpayne@7
|
247 @lru_cache(maxsize=128)
|
jpayne@7
|
248 def is_multi_byte_encoding(name: str) -> bool:
|
jpayne@7
|
249 """
|
jpayne@7
|
250 Verify is a specific encoding is a multi byte one based on it IANA name
|
jpayne@7
|
251 """
|
jpayne@7
|
252 return name in {
|
jpayne@7
|
253 "utf_8",
|
jpayne@7
|
254 "utf_8_sig",
|
jpayne@7
|
255 "utf_16",
|
jpayne@7
|
256 "utf_16_be",
|
jpayne@7
|
257 "utf_16_le",
|
jpayne@7
|
258 "utf_32",
|
jpayne@7
|
259 "utf_32_le",
|
jpayne@7
|
260 "utf_32_be",
|
jpayne@7
|
261 "utf_7",
|
jpayne@7
|
262 } or issubclass(
|
jpayne@7
|
263 importlib.import_module("encodings.{}".format(name)).IncrementalDecoder,
|
jpayne@7
|
264 MultibyteIncrementalDecoder,
|
jpayne@7
|
265 )
|
jpayne@7
|
266
|
jpayne@7
|
267
|
jpayne@7
|
268 def identify_sig_or_bom(sequence: bytes) -> Tuple[Optional[str], bytes]:
|
jpayne@7
|
269 """
|
jpayne@7
|
270 Identify and extract SIG/BOM in given sequence.
|
jpayne@7
|
271 """
|
jpayne@7
|
272
|
jpayne@7
|
273 for iana_encoding in ENCODING_MARKS:
|
jpayne@7
|
274 marks: Union[bytes, List[bytes]] = ENCODING_MARKS[iana_encoding]
|
jpayne@7
|
275
|
jpayne@7
|
276 if isinstance(marks, bytes):
|
jpayne@7
|
277 marks = [marks]
|
jpayne@7
|
278
|
jpayne@7
|
279 for mark in marks:
|
jpayne@7
|
280 if sequence.startswith(mark):
|
jpayne@7
|
281 return iana_encoding, mark
|
jpayne@7
|
282
|
jpayne@7
|
283 return None, b""
|
jpayne@7
|
284
|
jpayne@7
|
285
|
jpayne@7
|
286 def should_strip_sig_or_bom(iana_encoding: str) -> bool:
|
jpayne@7
|
287 return iana_encoding not in {"utf_16", "utf_32"}
|
jpayne@7
|
288
|
jpayne@7
|
289
|
jpayne@7
|
290 def iana_name(cp_name: str, strict: bool = True) -> str:
|
jpayne@7
|
291 cp_name = cp_name.lower().replace("-", "_")
|
jpayne@7
|
292
|
jpayne@7
|
293 encoding_alias: str
|
jpayne@7
|
294 encoding_iana: str
|
jpayne@7
|
295
|
jpayne@7
|
296 for encoding_alias, encoding_iana in aliases.items():
|
jpayne@7
|
297 if cp_name in [encoding_alias, encoding_iana]:
|
jpayne@7
|
298 return encoding_iana
|
jpayne@7
|
299
|
jpayne@7
|
300 if strict:
|
jpayne@7
|
301 raise ValueError("Unable to retrieve IANA for '{}'".format(cp_name))
|
jpayne@7
|
302
|
jpayne@7
|
303 return cp_name
|
jpayne@7
|
304
|
jpayne@7
|
305
|
jpayne@7
|
306 def range_scan(decoded_sequence: str) -> List[str]:
|
jpayne@7
|
307 ranges: Set[str] = set()
|
jpayne@7
|
308
|
jpayne@7
|
309 for character in decoded_sequence:
|
jpayne@7
|
310 character_range: Optional[str] = unicode_range(character)
|
jpayne@7
|
311
|
jpayne@7
|
312 if character_range is None:
|
jpayne@7
|
313 continue
|
jpayne@7
|
314
|
jpayne@7
|
315 ranges.add(character_range)
|
jpayne@7
|
316
|
jpayne@7
|
317 return list(ranges)
|
jpayne@7
|
318
|
jpayne@7
|
319
|
jpayne@7
|
320 def cp_similarity(iana_name_a: str, iana_name_b: str) -> float:
|
jpayne@7
|
321 if is_multi_byte_encoding(iana_name_a) or is_multi_byte_encoding(iana_name_b):
|
jpayne@7
|
322 return 0.0
|
jpayne@7
|
323
|
jpayne@7
|
324 decoder_a = importlib.import_module(
|
jpayne@7
|
325 "encodings.{}".format(iana_name_a)
|
jpayne@7
|
326 ).IncrementalDecoder
|
jpayne@7
|
327 decoder_b = importlib.import_module(
|
jpayne@7
|
328 "encodings.{}".format(iana_name_b)
|
jpayne@7
|
329 ).IncrementalDecoder
|
jpayne@7
|
330
|
jpayne@7
|
331 id_a: IncrementalDecoder = decoder_a(errors="ignore")
|
jpayne@7
|
332 id_b: IncrementalDecoder = decoder_b(errors="ignore")
|
jpayne@7
|
333
|
jpayne@7
|
334 character_match_count: int = 0
|
jpayne@7
|
335
|
jpayne@7
|
336 for i in range(255):
|
jpayne@7
|
337 to_be_decoded: bytes = bytes([i])
|
jpayne@7
|
338 if id_a.decode(to_be_decoded) == id_b.decode(to_be_decoded):
|
jpayne@7
|
339 character_match_count += 1
|
jpayne@7
|
340
|
jpayne@7
|
341 return character_match_count / 254
|
jpayne@7
|
342
|
jpayne@7
|
343
|
jpayne@7
|
344 def is_cp_similar(iana_name_a: str, iana_name_b: str) -> bool:
|
jpayne@7
|
345 """
|
jpayne@7
|
346 Determine if two code page are at least 80% similar. IANA_SUPPORTED_SIMILAR dict was generated using
|
jpayne@7
|
347 the function cp_similarity.
|
jpayne@7
|
348 """
|
jpayne@7
|
349 return (
|
jpayne@7
|
350 iana_name_a in IANA_SUPPORTED_SIMILAR
|
jpayne@7
|
351 and iana_name_b in IANA_SUPPORTED_SIMILAR[iana_name_a]
|
jpayne@7
|
352 )
|
jpayne@7
|
353
|
jpayne@7
|
354
|
jpayne@7
|
355 def set_logging_handler(
|
jpayne@7
|
356 name: str = "charset_normalizer",
|
jpayne@7
|
357 level: int = logging.INFO,
|
jpayne@7
|
358 format_string: str = "%(asctime)s | %(levelname)s | %(message)s",
|
jpayne@7
|
359 ) -> None:
|
jpayne@7
|
360 logger = logging.getLogger(name)
|
jpayne@7
|
361 logger.setLevel(level)
|
jpayne@7
|
362
|
jpayne@7
|
363 handler = logging.StreamHandler()
|
jpayne@7
|
364 handler.setFormatter(logging.Formatter(format_string))
|
jpayne@7
|
365 logger.addHandler(handler)
|
jpayne@7
|
366
|
jpayne@7
|
367
|
jpayne@7
|
368 def cut_sequence_chunks(
|
jpayne@7
|
369 sequences: bytes,
|
jpayne@7
|
370 encoding_iana: str,
|
jpayne@7
|
371 offsets: range,
|
jpayne@7
|
372 chunk_size: int,
|
jpayne@7
|
373 bom_or_sig_available: bool,
|
jpayne@7
|
374 strip_sig_or_bom: bool,
|
jpayne@7
|
375 sig_payload: bytes,
|
jpayne@7
|
376 is_multi_byte_decoder: bool,
|
jpayne@7
|
377 decoded_payload: Optional[str] = None,
|
jpayne@7
|
378 ) -> Generator[str, None, None]:
|
jpayne@7
|
379 if decoded_payload and is_multi_byte_decoder is False:
|
jpayne@7
|
380 for i in offsets:
|
jpayne@7
|
381 chunk = decoded_payload[i : i + chunk_size]
|
jpayne@7
|
382 if not chunk:
|
jpayne@7
|
383 break
|
jpayne@7
|
384 yield chunk
|
jpayne@7
|
385 else:
|
jpayne@7
|
386 for i in offsets:
|
jpayne@7
|
387 chunk_end = i + chunk_size
|
jpayne@7
|
388 if chunk_end > len(sequences) + 8:
|
jpayne@7
|
389 continue
|
jpayne@7
|
390
|
jpayne@7
|
391 cut_sequence = sequences[i : i + chunk_size]
|
jpayne@7
|
392
|
jpayne@7
|
393 if bom_or_sig_available and strip_sig_or_bom is False:
|
jpayne@7
|
394 cut_sequence = sig_payload + cut_sequence
|
jpayne@7
|
395
|
jpayne@7
|
396 chunk = cut_sequence.decode(
|
jpayne@7
|
397 encoding_iana,
|
jpayne@7
|
398 errors="ignore" if is_multi_byte_decoder else "strict",
|
jpayne@7
|
399 )
|
jpayne@7
|
400
|
jpayne@7
|
401 # multi-byte bad cutting detector and adjustment
|
jpayne@7
|
402 # not the cleanest way to perform that fix but clever enough for now.
|
jpayne@7
|
403 if is_multi_byte_decoder and i > 0:
|
jpayne@7
|
404 chunk_partial_size_chk: int = min(chunk_size, 16)
|
jpayne@7
|
405
|
jpayne@7
|
406 if (
|
jpayne@7
|
407 decoded_payload
|
jpayne@7
|
408 and chunk[:chunk_partial_size_chk] not in decoded_payload
|
jpayne@7
|
409 ):
|
jpayne@7
|
410 for j in range(i, i - 4, -1):
|
jpayne@7
|
411 cut_sequence = sequences[j:chunk_end]
|
jpayne@7
|
412
|
jpayne@7
|
413 if bom_or_sig_available and strip_sig_or_bom is False:
|
jpayne@7
|
414 cut_sequence = sig_payload + cut_sequence
|
jpayne@7
|
415
|
jpayne@7
|
416 chunk = cut_sequence.decode(encoding_iana, errors="ignore")
|
jpayne@7
|
417
|
jpayne@7
|
418 if chunk[:chunk_partial_size_chk] in decoded_payload:
|
jpayne@7
|
419 break
|
jpayne@7
|
420
|
jpayne@7
|
421 yield chunk
|