jpayne@69
|
1 import codecs
|
jpayne@69
|
2 import re
|
jpayne@69
|
3 from typing import Any, Optional, Tuple
|
jpayne@69
|
4
|
jpayne@69
|
5 from .core import IDNAError, alabel, decode, encode, ulabel
|
jpayne@69
|
6
|
jpayne@69
|
7 _unicode_dots_re = re.compile("[\u002e\u3002\uff0e\uff61]")
|
jpayne@69
|
8
|
jpayne@69
|
9
|
jpayne@69
|
10 class Codec(codecs.Codec):
|
jpayne@69
|
11 def encode(self, data: str, errors: str = "strict") -> Tuple[bytes, int]:
|
jpayne@69
|
12 if errors != "strict":
|
jpayne@69
|
13 raise IDNAError('Unsupported error handling "{}"'.format(errors))
|
jpayne@69
|
14
|
jpayne@69
|
15 if not data:
|
jpayne@69
|
16 return b"", 0
|
jpayne@69
|
17
|
jpayne@69
|
18 return encode(data), len(data)
|
jpayne@69
|
19
|
jpayne@69
|
20 def decode(self, data: bytes, errors: str = "strict") -> Tuple[str, int]:
|
jpayne@69
|
21 if errors != "strict":
|
jpayne@69
|
22 raise IDNAError('Unsupported error handling "{}"'.format(errors))
|
jpayne@69
|
23
|
jpayne@69
|
24 if not data:
|
jpayne@69
|
25 return "", 0
|
jpayne@69
|
26
|
jpayne@69
|
27 return decode(data), len(data)
|
jpayne@69
|
28
|
jpayne@69
|
29
|
jpayne@69
|
30 class IncrementalEncoder(codecs.BufferedIncrementalEncoder):
|
jpayne@69
|
31 def _buffer_encode(self, data: str, errors: str, final: bool) -> Tuple[bytes, int]:
|
jpayne@69
|
32 if errors != "strict":
|
jpayne@69
|
33 raise IDNAError('Unsupported error handling "{}"'.format(errors))
|
jpayne@69
|
34
|
jpayne@69
|
35 if not data:
|
jpayne@69
|
36 return b"", 0
|
jpayne@69
|
37
|
jpayne@69
|
38 labels = _unicode_dots_re.split(data)
|
jpayne@69
|
39 trailing_dot = b""
|
jpayne@69
|
40 if labels:
|
jpayne@69
|
41 if not labels[-1]:
|
jpayne@69
|
42 trailing_dot = b"."
|
jpayne@69
|
43 del labels[-1]
|
jpayne@69
|
44 elif not final:
|
jpayne@69
|
45 # Keep potentially unfinished label until the next call
|
jpayne@69
|
46 del labels[-1]
|
jpayne@69
|
47 if labels:
|
jpayne@69
|
48 trailing_dot = b"."
|
jpayne@69
|
49
|
jpayne@69
|
50 result = []
|
jpayne@69
|
51 size = 0
|
jpayne@69
|
52 for label in labels:
|
jpayne@69
|
53 result.append(alabel(label))
|
jpayne@69
|
54 if size:
|
jpayne@69
|
55 size += 1
|
jpayne@69
|
56 size += len(label)
|
jpayne@69
|
57
|
jpayne@69
|
58 # Join with U+002E
|
jpayne@69
|
59 result_bytes = b".".join(result) + trailing_dot
|
jpayne@69
|
60 size += len(trailing_dot)
|
jpayne@69
|
61 return result_bytes, size
|
jpayne@69
|
62
|
jpayne@69
|
63
|
jpayne@69
|
64 class IncrementalDecoder(codecs.BufferedIncrementalDecoder):
|
jpayne@69
|
65 def _buffer_decode(self, data: Any, errors: str, final: bool) -> Tuple[str, int]:
|
jpayne@69
|
66 if errors != "strict":
|
jpayne@69
|
67 raise IDNAError('Unsupported error handling "{}"'.format(errors))
|
jpayne@69
|
68
|
jpayne@69
|
69 if not data:
|
jpayne@69
|
70 return ("", 0)
|
jpayne@69
|
71
|
jpayne@69
|
72 if not isinstance(data, str):
|
jpayne@69
|
73 data = str(data, "ascii")
|
jpayne@69
|
74
|
jpayne@69
|
75 labels = _unicode_dots_re.split(data)
|
jpayne@69
|
76 trailing_dot = ""
|
jpayne@69
|
77 if labels:
|
jpayne@69
|
78 if not labels[-1]:
|
jpayne@69
|
79 trailing_dot = "."
|
jpayne@69
|
80 del labels[-1]
|
jpayne@69
|
81 elif not final:
|
jpayne@69
|
82 # Keep potentially unfinished label until the next call
|
jpayne@69
|
83 del labels[-1]
|
jpayne@69
|
84 if labels:
|
jpayne@69
|
85 trailing_dot = "."
|
jpayne@69
|
86
|
jpayne@69
|
87 result = []
|
jpayne@69
|
88 size = 0
|
jpayne@69
|
89 for label in labels:
|
jpayne@69
|
90 result.append(ulabel(label))
|
jpayne@69
|
91 if size:
|
jpayne@69
|
92 size += 1
|
jpayne@69
|
93 size += len(label)
|
jpayne@69
|
94
|
jpayne@69
|
95 result_str = ".".join(result) + trailing_dot
|
jpayne@69
|
96 size += len(trailing_dot)
|
jpayne@69
|
97 return (result_str, size)
|
jpayne@69
|
98
|
jpayne@69
|
99
|
jpayne@69
|
100 class StreamWriter(Codec, codecs.StreamWriter):
|
jpayne@69
|
101 pass
|
jpayne@69
|
102
|
jpayne@69
|
103
|
jpayne@69
|
104 class StreamReader(Codec, codecs.StreamReader):
|
jpayne@69
|
105 pass
|
jpayne@69
|
106
|
jpayne@69
|
107
|
jpayne@69
|
108 def search_function(name: str) -> Optional[codecs.CodecInfo]:
|
jpayne@69
|
109 if name != "idna2008":
|
jpayne@69
|
110 return None
|
jpayne@69
|
111 return codecs.CodecInfo(
|
jpayne@69
|
112 name=name,
|
jpayne@69
|
113 encode=Codec().encode,
|
jpayne@69
|
114 decode=Codec().decode,
|
jpayne@69
|
115 incrementalencoder=IncrementalEncoder,
|
jpayne@69
|
116 incrementaldecoder=IncrementalDecoder,
|
jpayne@69
|
117 streamwriter=StreamWriter,
|
jpayne@69
|
118 streamreader=StreamReader,
|
jpayne@69
|
119 )
|
jpayne@69
|
120
|
jpayne@69
|
121
|
jpayne@69
|
122 codecs.register(search_function)
|