Mercurial > repos > jpayne > bioproject_to_srr_2
comparison charset_normalizer/models.py @ 7:5eb2d5e3bf22
planemo upload for repository https://toolrepo.galaxytrakr.org/view/jpayne/bioproject_to_srr_2/556cac4fb538
author | jpayne |
---|---|
date | Sun, 05 May 2024 23:32:17 -0400 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
6:b2745907b1eb | 7:5eb2d5e3bf22 |
---|---|
1 from encodings.aliases import aliases | |
2 from hashlib import sha256 | |
3 from json import dumps | |
4 from typing import Any, Dict, Iterator, List, Optional, Tuple, Union | |
5 | |
6 from .constant import TOO_BIG_SEQUENCE | |
7 from .utils import iana_name, is_multi_byte_encoding, unicode_range | |
8 | |
9 | |
10 class CharsetMatch: | |
11 def __init__( | |
12 self, | |
13 payload: bytes, | |
14 guessed_encoding: str, | |
15 mean_mess_ratio: float, | |
16 has_sig_or_bom: bool, | |
17 languages: "CoherenceMatches", | |
18 decoded_payload: Optional[str] = None, | |
19 ): | |
20 self._payload: bytes = payload | |
21 | |
22 self._encoding: str = guessed_encoding | |
23 self._mean_mess_ratio: float = mean_mess_ratio | |
24 self._languages: CoherenceMatches = languages | |
25 self._has_sig_or_bom: bool = has_sig_or_bom | |
26 self._unicode_ranges: Optional[List[str]] = None | |
27 | |
28 self._leaves: List[CharsetMatch] = [] | |
29 self._mean_coherence_ratio: float = 0.0 | |
30 | |
31 self._output_payload: Optional[bytes] = None | |
32 self._output_encoding: Optional[str] = None | |
33 | |
34 self._string: Optional[str] = decoded_payload | |
35 | |
36 def __eq__(self, other: object) -> bool: | |
37 if not isinstance(other, CharsetMatch): | |
38 raise TypeError( | |
39 "__eq__ cannot be invoked on {} and {}.".format( | |
40 str(other.__class__), str(self.__class__) | |
41 ) | |
42 ) | |
43 return self.encoding == other.encoding and self.fingerprint == other.fingerprint | |
44 | |
45 def __lt__(self, other: object) -> bool: | |
46 """ | |
47 Implemented to make sorted available upon CharsetMatches items. | |
48 """ | |
49 if not isinstance(other, CharsetMatch): | |
50 raise ValueError | |
51 | |
52 chaos_difference: float = abs(self.chaos - other.chaos) | |
53 coherence_difference: float = abs(self.coherence - other.coherence) | |
54 | |
55 # Below 1% difference --> Use Coherence | |
56 if chaos_difference < 0.01 and coherence_difference > 0.02: | |
57 return self.coherence > other.coherence | |
58 elif chaos_difference < 0.01 and coherence_difference <= 0.02: | |
59 # When having a difficult decision, use the result that decoded as many multi-byte as possible. | |
60 # preserve RAM usage! | |
61 if len(self._payload) >= TOO_BIG_SEQUENCE: | |
62 return self.chaos < other.chaos | |
63 return self.multi_byte_usage > other.multi_byte_usage | |
64 | |
65 return self.chaos < other.chaos | |
66 | |
67 @property | |
68 def multi_byte_usage(self) -> float: | |
69 return 1.0 - (len(str(self)) / len(self.raw)) | |
70 | |
71 def __str__(self) -> str: | |
72 # Lazy Str Loading | |
73 if self._string is None: | |
74 self._string = str(self._payload, self._encoding, "strict") | |
75 return self._string | |
76 | |
77 def __repr__(self) -> str: | |
78 return "<CharsetMatch '{}' bytes({})>".format(self.encoding, self.fingerprint) | |
79 | |
80 def add_submatch(self, other: "CharsetMatch") -> None: | |
81 if not isinstance(other, CharsetMatch) or other == self: | |
82 raise ValueError( | |
83 "Unable to add instance <{}> as a submatch of a CharsetMatch".format( | |
84 other.__class__ | |
85 ) | |
86 ) | |
87 | |
88 other._string = None # Unload RAM usage; dirty trick. | |
89 self._leaves.append(other) | |
90 | |
91 @property | |
92 def encoding(self) -> str: | |
93 return self._encoding | |
94 | |
95 @property | |
96 def encoding_aliases(self) -> List[str]: | |
97 """ | |
98 Encoding name are known by many name, using this could help when searching for IBM855 when it's listed as CP855. | |
99 """ | |
100 also_known_as: List[str] = [] | |
101 for u, p in aliases.items(): | |
102 if self.encoding == u: | |
103 also_known_as.append(p) | |
104 elif self.encoding == p: | |
105 also_known_as.append(u) | |
106 return also_known_as | |
107 | |
108 @property | |
109 def bom(self) -> bool: | |
110 return self._has_sig_or_bom | |
111 | |
112 @property | |
113 def byte_order_mark(self) -> bool: | |
114 return self._has_sig_or_bom | |
115 | |
116 @property | |
117 def languages(self) -> List[str]: | |
118 """ | |
119 Return the complete list of possible languages found in decoded sequence. | |
120 Usually not really useful. Returned list may be empty even if 'language' property return something != 'Unknown'. | |
121 """ | |
122 return [e[0] for e in self._languages] | |
123 | |
124 @property | |
125 def language(self) -> str: | |
126 """ | |
127 Most probable language found in decoded sequence. If none were detected or inferred, the property will return | |
128 "Unknown". | |
129 """ | |
130 if not self._languages: | |
131 # Trying to infer the language based on the given encoding | |
132 # Its either English or we should not pronounce ourselves in certain cases. | |
133 if "ascii" in self.could_be_from_charset: | |
134 return "English" | |
135 | |
136 # doing it there to avoid circular import | |
137 from charset_normalizer.cd import encoding_languages, mb_encoding_languages | |
138 | |
139 languages = ( | |
140 mb_encoding_languages(self.encoding) | |
141 if is_multi_byte_encoding(self.encoding) | |
142 else encoding_languages(self.encoding) | |
143 ) | |
144 | |
145 if len(languages) == 0 or "Latin Based" in languages: | |
146 return "Unknown" | |
147 | |
148 return languages[0] | |
149 | |
150 return self._languages[0][0] | |
151 | |
152 @property | |
153 def chaos(self) -> float: | |
154 return self._mean_mess_ratio | |
155 | |
156 @property | |
157 def coherence(self) -> float: | |
158 if not self._languages: | |
159 return 0.0 | |
160 return self._languages[0][1] | |
161 | |
162 @property | |
163 def percent_chaos(self) -> float: | |
164 return round(self.chaos * 100, ndigits=3) | |
165 | |
166 @property | |
167 def percent_coherence(self) -> float: | |
168 return round(self.coherence * 100, ndigits=3) | |
169 | |
170 @property | |
171 def raw(self) -> bytes: | |
172 """ | |
173 Original untouched bytes. | |
174 """ | |
175 return self._payload | |
176 | |
177 @property | |
178 def submatch(self) -> List["CharsetMatch"]: | |
179 return self._leaves | |
180 | |
181 @property | |
182 def has_submatch(self) -> bool: | |
183 return len(self._leaves) > 0 | |
184 | |
185 @property | |
186 def alphabets(self) -> List[str]: | |
187 if self._unicode_ranges is not None: | |
188 return self._unicode_ranges | |
189 # list detected ranges | |
190 detected_ranges: List[Optional[str]] = [ | |
191 unicode_range(char) for char in str(self) | |
192 ] | |
193 # filter and sort | |
194 self._unicode_ranges = sorted(list({r for r in detected_ranges if r})) | |
195 return self._unicode_ranges | |
196 | |
197 @property | |
198 def could_be_from_charset(self) -> List[str]: | |
199 """ | |
200 The complete list of encoding that output the exact SAME str result and therefore could be the originating | |
201 encoding. | |
202 This list does include the encoding available in property 'encoding'. | |
203 """ | |
204 return [self._encoding] + [m.encoding for m in self._leaves] | |
205 | |
206 def output(self, encoding: str = "utf_8") -> bytes: | |
207 """ | |
208 Method to get re-encoded bytes payload using given target encoding. Default to UTF-8. | |
209 Any errors will be simply ignored by the encoder NOT replaced. | |
210 """ | |
211 if self._output_encoding is None or self._output_encoding != encoding: | |
212 self._output_encoding = encoding | |
213 self._output_payload = str(self).encode(encoding, "replace") | |
214 | |
215 return self._output_payload # type: ignore | |
216 | |
217 @property | |
218 def fingerprint(self) -> str: | |
219 """ | |
220 Retrieve the unique SHA256 computed using the transformed (re-encoded) payload. Not the original one. | |
221 """ | |
222 return sha256(self.output()).hexdigest() | |
223 | |
224 | |
225 class CharsetMatches: | |
226 """ | |
227 Container with every CharsetMatch items ordered by default from most probable to the less one. | |
228 Act like a list(iterable) but does not implements all related methods. | |
229 """ | |
230 | |
231 def __init__(self, results: Optional[List[CharsetMatch]] = None): | |
232 self._results: List[CharsetMatch] = sorted(results) if results else [] | |
233 | |
234 def __iter__(self) -> Iterator[CharsetMatch]: | |
235 yield from self._results | |
236 | |
237 def __getitem__(self, item: Union[int, str]) -> CharsetMatch: | |
238 """ | |
239 Retrieve a single item either by its position or encoding name (alias may be used here). | |
240 Raise KeyError upon invalid index or encoding not present in results. | |
241 """ | |
242 if isinstance(item, int): | |
243 return self._results[item] | |
244 if isinstance(item, str): | |
245 item = iana_name(item, False) | |
246 for result in self._results: | |
247 if item in result.could_be_from_charset: | |
248 return result | |
249 raise KeyError | |
250 | |
251 def __len__(self) -> int: | |
252 return len(self._results) | |
253 | |
254 def __bool__(self) -> bool: | |
255 return len(self._results) > 0 | |
256 | |
257 def append(self, item: CharsetMatch) -> None: | |
258 """ | |
259 Insert a single match. Will be inserted accordingly to preserve sort. | |
260 Can be inserted as a submatch. | |
261 """ | |
262 if not isinstance(item, CharsetMatch): | |
263 raise ValueError( | |
264 "Cannot append instance '{}' to CharsetMatches".format( | |
265 str(item.__class__) | |
266 ) | |
267 ) | |
268 # We should disable the submatch factoring when the input file is too heavy (conserve RAM usage) | |
269 if len(item.raw) <= TOO_BIG_SEQUENCE: | |
270 for match in self._results: | |
271 if match.fingerprint == item.fingerprint and match.chaos == item.chaos: | |
272 match.add_submatch(item) | |
273 return | |
274 self._results.append(item) | |
275 self._results = sorted(self._results) | |
276 | |
277 def best(self) -> Optional["CharsetMatch"]: | |
278 """ | |
279 Simply return the first match. Strict equivalent to matches[0]. | |
280 """ | |
281 if not self._results: | |
282 return None | |
283 return self._results[0] | |
284 | |
285 def first(self) -> Optional["CharsetMatch"]: | |
286 """ | |
287 Redundant method, call the method best(). Kept for BC reasons. | |
288 """ | |
289 return self.best() | |
290 | |
291 | |
292 CoherenceMatch = Tuple[str, float] | |
293 CoherenceMatches = List[CoherenceMatch] | |
294 | |
295 | |
296 class CliDetectionResult: | |
297 def __init__( | |
298 self, | |
299 path: str, | |
300 encoding: Optional[str], | |
301 encoding_aliases: List[str], | |
302 alternative_encodings: List[str], | |
303 language: str, | |
304 alphabets: List[str], | |
305 has_sig_or_bom: bool, | |
306 chaos: float, | |
307 coherence: float, | |
308 unicode_path: Optional[str], | |
309 is_preferred: bool, | |
310 ): | |
311 self.path: str = path | |
312 self.unicode_path: Optional[str] = unicode_path | |
313 self.encoding: Optional[str] = encoding | |
314 self.encoding_aliases: List[str] = encoding_aliases | |
315 self.alternative_encodings: List[str] = alternative_encodings | |
316 self.language: str = language | |
317 self.alphabets: List[str] = alphabets | |
318 self.has_sig_or_bom: bool = has_sig_or_bom | |
319 self.chaos: float = chaos | |
320 self.coherence: float = coherence | |
321 self.is_preferred: bool = is_preferred | |
322 | |
323 @property | |
324 def __dict__(self) -> Dict[str, Any]: # type: ignore | |
325 return { | |
326 "path": self.path, | |
327 "encoding": self.encoding, | |
328 "encoding_aliases": self.encoding_aliases, | |
329 "alternative_encodings": self.alternative_encodings, | |
330 "language": self.language, | |
331 "alphabets": self.alphabets, | |
332 "has_sig_or_bom": self.has_sig_or_bom, | |
333 "chaos": self.chaos, | |
334 "coherence": self.coherence, | |
335 "unicode_path": self.unicode_path, | |
336 "is_preferred": self.is_preferred, | |
337 } | |
338 | |
339 def to_json(self) -> str: | |
340 return dumps(self.__dict__, ensure_ascii=True, indent=4) |