Mercurial > repos > rliterman > csp2
comparison CSP2/CSP2_env/env-d9b9114564458d9d-741b3de822f2aaca6c6caa4325c4afce/lib/python3.8/site-packages/idna/codec.py @ 69:33d812a61356
planemo upload commit 2e9511a184a1ca667c7be0c6321a36dc4e3d116d
author | jpayne |
---|---|
date | Tue, 18 Mar 2025 17:55:14 -0400 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
67:0e9998148a16 | 69:33d812a61356 |
---|---|
1 import codecs | |
2 import re | |
3 from typing import Any, Optional, Tuple | |
4 | |
5 from .core import IDNAError, alabel, decode, encode, ulabel | |
6 | |
7 _unicode_dots_re = re.compile("[\u002e\u3002\uff0e\uff61]") | |
8 | |
9 | |
10 class Codec(codecs.Codec): | |
11 def encode(self, data: str, errors: str = "strict") -> Tuple[bytes, int]: | |
12 if errors != "strict": | |
13 raise IDNAError('Unsupported error handling "{}"'.format(errors)) | |
14 | |
15 if not data: | |
16 return b"", 0 | |
17 | |
18 return encode(data), len(data) | |
19 | |
20 def decode(self, data: bytes, errors: str = "strict") -> Tuple[str, int]: | |
21 if errors != "strict": | |
22 raise IDNAError('Unsupported error handling "{}"'.format(errors)) | |
23 | |
24 if not data: | |
25 return "", 0 | |
26 | |
27 return decode(data), len(data) | |
28 | |
29 | |
30 class IncrementalEncoder(codecs.BufferedIncrementalEncoder): | |
31 def _buffer_encode(self, data: str, errors: str, final: bool) -> Tuple[bytes, int]: | |
32 if errors != "strict": | |
33 raise IDNAError('Unsupported error handling "{}"'.format(errors)) | |
34 | |
35 if not data: | |
36 return b"", 0 | |
37 | |
38 labels = _unicode_dots_re.split(data) | |
39 trailing_dot = b"" | |
40 if labels: | |
41 if not labels[-1]: | |
42 trailing_dot = b"." | |
43 del labels[-1] | |
44 elif not final: | |
45 # Keep potentially unfinished label until the next call | |
46 del labels[-1] | |
47 if labels: | |
48 trailing_dot = b"." | |
49 | |
50 result = [] | |
51 size = 0 | |
52 for label in labels: | |
53 result.append(alabel(label)) | |
54 if size: | |
55 size += 1 | |
56 size += len(label) | |
57 | |
58 # Join with U+002E | |
59 result_bytes = b".".join(result) + trailing_dot | |
60 size += len(trailing_dot) | |
61 return result_bytes, size | |
62 | |
63 | |
64 class IncrementalDecoder(codecs.BufferedIncrementalDecoder): | |
65 def _buffer_decode(self, data: Any, errors: str, final: bool) -> Tuple[str, int]: | |
66 if errors != "strict": | |
67 raise IDNAError('Unsupported error handling "{}"'.format(errors)) | |
68 | |
69 if not data: | |
70 return ("", 0) | |
71 | |
72 if not isinstance(data, str): | |
73 data = str(data, "ascii") | |
74 | |
75 labels = _unicode_dots_re.split(data) | |
76 trailing_dot = "" | |
77 if labels: | |
78 if not labels[-1]: | |
79 trailing_dot = "." | |
80 del labels[-1] | |
81 elif not final: | |
82 # Keep potentially unfinished label until the next call | |
83 del labels[-1] | |
84 if labels: | |
85 trailing_dot = "." | |
86 | |
87 result = [] | |
88 size = 0 | |
89 for label in labels: | |
90 result.append(ulabel(label)) | |
91 if size: | |
92 size += 1 | |
93 size += len(label) | |
94 | |
95 result_str = ".".join(result) + trailing_dot | |
96 size += len(trailing_dot) | |
97 return (result_str, size) | |
98 | |
99 | |
100 class StreamWriter(Codec, codecs.StreamWriter): | |
101 pass | |
102 | |
103 | |
104 class StreamReader(Codec, codecs.StreamReader): | |
105 pass | |
106 | |
107 | |
108 def search_function(name: str) -> Optional[codecs.CodecInfo]: | |
109 if name != "idna2008": | |
110 return None | |
111 return codecs.CodecInfo( | |
112 name=name, | |
113 encode=Codec().encode, | |
114 decode=Codec().decode, | |
115 incrementalencoder=IncrementalEncoder, | |
116 incrementaldecoder=IncrementalDecoder, | |
117 streamwriter=StreamWriter, | |
118 streamreader=StreamReader, | |
119 ) | |
120 | |
121 | |
122 codecs.register(search_function) |