jpayne@7
|
1 from .core import encode, decode, alabel, ulabel, IDNAError
|
jpayne@7
|
2 import codecs
|
jpayne@7
|
3 import re
|
jpayne@7
|
4 from typing import Any, Tuple, Optional
|
jpayne@7
|
5
|
jpayne@7
|
6 _unicode_dots_re = re.compile('[\u002e\u3002\uff0e\uff61]')
|
jpayne@7
|
7
|
jpayne@7
|
8 class Codec(codecs.Codec):
|
jpayne@7
|
9
|
jpayne@7
|
10 def encode(self, data: str, errors: str = 'strict') -> Tuple[bytes, int]:
|
jpayne@7
|
11 if errors != 'strict':
|
jpayne@7
|
12 raise IDNAError('Unsupported error handling \"{}\"'.format(errors))
|
jpayne@7
|
13
|
jpayne@7
|
14 if not data:
|
jpayne@7
|
15 return b"", 0
|
jpayne@7
|
16
|
jpayne@7
|
17 return encode(data), len(data)
|
jpayne@7
|
18
|
jpayne@7
|
19 def decode(self, data: bytes, errors: str = 'strict') -> Tuple[str, int]:
|
jpayne@7
|
20 if errors != 'strict':
|
jpayne@7
|
21 raise IDNAError('Unsupported error handling \"{}\"'.format(errors))
|
jpayne@7
|
22
|
jpayne@7
|
23 if not data:
|
jpayne@7
|
24 return '', 0
|
jpayne@7
|
25
|
jpayne@7
|
26 return decode(data), len(data)
|
jpayne@7
|
27
|
jpayne@7
|
28 class IncrementalEncoder(codecs.BufferedIncrementalEncoder):
|
jpayne@7
|
29 def _buffer_encode(self, data: str, errors: str, final: bool) -> Tuple[bytes, int]:
|
jpayne@7
|
30 if errors != 'strict':
|
jpayne@7
|
31 raise IDNAError('Unsupported error handling \"{}\"'.format(errors))
|
jpayne@7
|
32
|
jpayne@7
|
33 if not data:
|
jpayne@7
|
34 return b'', 0
|
jpayne@7
|
35
|
jpayne@7
|
36 labels = _unicode_dots_re.split(data)
|
jpayne@7
|
37 trailing_dot = b''
|
jpayne@7
|
38 if labels:
|
jpayne@7
|
39 if not labels[-1]:
|
jpayne@7
|
40 trailing_dot = b'.'
|
jpayne@7
|
41 del labels[-1]
|
jpayne@7
|
42 elif not final:
|
jpayne@7
|
43 # Keep potentially unfinished label until the next call
|
jpayne@7
|
44 del labels[-1]
|
jpayne@7
|
45 if labels:
|
jpayne@7
|
46 trailing_dot = b'.'
|
jpayne@7
|
47
|
jpayne@7
|
48 result = []
|
jpayne@7
|
49 size = 0
|
jpayne@7
|
50 for label in labels:
|
jpayne@7
|
51 result.append(alabel(label))
|
jpayne@7
|
52 if size:
|
jpayne@7
|
53 size += 1
|
jpayne@7
|
54 size += len(label)
|
jpayne@7
|
55
|
jpayne@7
|
56 # Join with U+002E
|
jpayne@7
|
57 result_bytes = b'.'.join(result) + trailing_dot
|
jpayne@7
|
58 size += len(trailing_dot)
|
jpayne@7
|
59 return result_bytes, size
|
jpayne@7
|
60
|
jpayne@7
|
61 class IncrementalDecoder(codecs.BufferedIncrementalDecoder):
|
jpayne@7
|
62 def _buffer_decode(self, data: Any, errors: str, final: bool) -> Tuple[str, int]:
|
jpayne@7
|
63 if errors != 'strict':
|
jpayne@7
|
64 raise IDNAError('Unsupported error handling \"{}\"'.format(errors))
|
jpayne@7
|
65
|
jpayne@7
|
66 if not data:
|
jpayne@7
|
67 return ('', 0)
|
jpayne@7
|
68
|
jpayne@7
|
69 if not isinstance(data, str):
|
jpayne@7
|
70 data = str(data, 'ascii')
|
jpayne@7
|
71
|
jpayne@7
|
72 labels = _unicode_dots_re.split(data)
|
jpayne@7
|
73 trailing_dot = ''
|
jpayne@7
|
74 if labels:
|
jpayne@7
|
75 if not labels[-1]:
|
jpayne@7
|
76 trailing_dot = '.'
|
jpayne@7
|
77 del labels[-1]
|
jpayne@7
|
78 elif not final:
|
jpayne@7
|
79 # Keep potentially unfinished label until the next call
|
jpayne@7
|
80 del labels[-1]
|
jpayne@7
|
81 if labels:
|
jpayne@7
|
82 trailing_dot = '.'
|
jpayne@7
|
83
|
jpayne@7
|
84 result = []
|
jpayne@7
|
85 size = 0
|
jpayne@7
|
86 for label in labels:
|
jpayne@7
|
87 result.append(ulabel(label))
|
jpayne@7
|
88 if size:
|
jpayne@7
|
89 size += 1
|
jpayne@7
|
90 size += len(label)
|
jpayne@7
|
91
|
jpayne@7
|
92 result_str = '.'.join(result) + trailing_dot
|
jpayne@7
|
93 size += len(trailing_dot)
|
jpayne@7
|
94 return (result_str, size)
|
jpayne@7
|
95
|
jpayne@7
|
96
|
jpayne@7
|
97 class StreamWriter(Codec, codecs.StreamWriter):
|
jpayne@7
|
98 pass
|
jpayne@7
|
99
|
jpayne@7
|
100
|
jpayne@7
|
101 class StreamReader(Codec, codecs.StreamReader):
|
jpayne@7
|
102 pass
|
jpayne@7
|
103
|
jpayne@7
|
104
|
jpayne@7
|
105 def search_function(name: str) -> Optional[codecs.CodecInfo]:
|
jpayne@7
|
106 if name != 'idna2008':
|
jpayne@7
|
107 return None
|
jpayne@7
|
108 return codecs.CodecInfo(
|
jpayne@7
|
109 name=name,
|
jpayne@7
|
110 encode=Codec().encode,
|
jpayne@7
|
111 decode=Codec().decode,
|
jpayne@7
|
112 incrementalencoder=IncrementalEncoder,
|
jpayne@7
|
113 incrementaldecoder=IncrementalDecoder,
|
jpayne@7
|
114 streamwriter=StreamWriter,
|
jpayne@7
|
115 streamreader=StreamReader,
|
jpayne@7
|
116 )
|
jpayne@7
|
117
|
jpayne@7
|
118 codecs.register(search_function)
|