annotate idna/codec.py @ 14:18e1cb6018fd

planemo upload for repository https://toolrepo.galaxytrakr.org/view/jpayne/bioproject_to_srr_2/556cac4fb538
author jpayne
date Mon, 20 May 2024 02:25:23 -0400
parents 5eb2d5e3bf22
children
rev   line source
jpayne@7 1 from .core import encode, decode, alabel, ulabel, IDNAError
jpayne@7 2 import codecs
jpayne@7 3 import re
jpayne@7 4 from typing import Any, Tuple, Optional
jpayne@7 5
jpayne@7 6 _unicode_dots_re = re.compile('[\u002e\u3002\uff0e\uff61]')
jpayne@7 7
jpayne@7 8 class Codec(codecs.Codec):
jpayne@7 9
jpayne@7 10 def encode(self, data: str, errors: str = 'strict') -> Tuple[bytes, int]:
jpayne@7 11 if errors != 'strict':
jpayne@7 12 raise IDNAError('Unsupported error handling \"{}\"'.format(errors))
jpayne@7 13
jpayne@7 14 if not data:
jpayne@7 15 return b"", 0
jpayne@7 16
jpayne@7 17 return encode(data), len(data)
jpayne@7 18
jpayne@7 19 def decode(self, data: bytes, errors: str = 'strict') -> Tuple[str, int]:
jpayne@7 20 if errors != 'strict':
jpayne@7 21 raise IDNAError('Unsupported error handling \"{}\"'.format(errors))
jpayne@7 22
jpayne@7 23 if not data:
jpayne@7 24 return '', 0
jpayne@7 25
jpayne@7 26 return decode(data), len(data)
jpayne@7 27
jpayne@7 28 class IncrementalEncoder(codecs.BufferedIncrementalEncoder):
jpayne@7 29 def _buffer_encode(self, data: str, errors: str, final: bool) -> Tuple[bytes, int]:
jpayne@7 30 if errors != 'strict':
jpayne@7 31 raise IDNAError('Unsupported error handling \"{}\"'.format(errors))
jpayne@7 32
jpayne@7 33 if not data:
jpayne@7 34 return b'', 0
jpayne@7 35
jpayne@7 36 labels = _unicode_dots_re.split(data)
jpayne@7 37 trailing_dot = b''
jpayne@7 38 if labels:
jpayne@7 39 if not labels[-1]:
jpayne@7 40 trailing_dot = b'.'
jpayne@7 41 del labels[-1]
jpayne@7 42 elif not final:
jpayne@7 43 # Keep potentially unfinished label until the next call
jpayne@7 44 del labels[-1]
jpayne@7 45 if labels:
jpayne@7 46 trailing_dot = b'.'
jpayne@7 47
jpayne@7 48 result = []
jpayne@7 49 size = 0
jpayne@7 50 for label in labels:
jpayne@7 51 result.append(alabel(label))
jpayne@7 52 if size:
jpayne@7 53 size += 1
jpayne@7 54 size += len(label)
jpayne@7 55
jpayne@7 56 # Join with U+002E
jpayne@7 57 result_bytes = b'.'.join(result) + trailing_dot
jpayne@7 58 size += len(trailing_dot)
jpayne@7 59 return result_bytes, size
jpayne@7 60
jpayne@7 61 class IncrementalDecoder(codecs.BufferedIncrementalDecoder):
jpayne@7 62 def _buffer_decode(self, data: Any, errors: str, final: bool) -> Tuple[str, int]:
jpayne@7 63 if errors != 'strict':
jpayne@7 64 raise IDNAError('Unsupported error handling \"{}\"'.format(errors))
jpayne@7 65
jpayne@7 66 if not data:
jpayne@7 67 return ('', 0)
jpayne@7 68
jpayne@7 69 if not isinstance(data, str):
jpayne@7 70 data = str(data, 'ascii')
jpayne@7 71
jpayne@7 72 labels = _unicode_dots_re.split(data)
jpayne@7 73 trailing_dot = ''
jpayne@7 74 if labels:
jpayne@7 75 if not labels[-1]:
jpayne@7 76 trailing_dot = '.'
jpayne@7 77 del labels[-1]
jpayne@7 78 elif not final:
jpayne@7 79 # Keep potentially unfinished label until the next call
jpayne@7 80 del labels[-1]
jpayne@7 81 if labels:
jpayne@7 82 trailing_dot = '.'
jpayne@7 83
jpayne@7 84 result = []
jpayne@7 85 size = 0
jpayne@7 86 for label in labels:
jpayne@7 87 result.append(ulabel(label))
jpayne@7 88 if size:
jpayne@7 89 size += 1
jpayne@7 90 size += len(label)
jpayne@7 91
jpayne@7 92 result_str = '.'.join(result) + trailing_dot
jpayne@7 93 size += len(trailing_dot)
jpayne@7 94 return (result_str, size)
jpayne@7 95
jpayne@7 96
jpayne@7 97 class StreamWriter(Codec, codecs.StreamWriter):
jpayne@7 98 pass
jpayne@7 99
jpayne@7 100
jpayne@7 101 class StreamReader(Codec, codecs.StreamReader):
jpayne@7 102 pass
jpayne@7 103
jpayne@7 104
jpayne@7 105 def search_function(name: str) -> Optional[codecs.CodecInfo]:
jpayne@7 106 if name != 'idna2008':
jpayne@7 107 return None
jpayne@7 108 return codecs.CodecInfo(
jpayne@7 109 name=name,
jpayne@7 110 encode=Codec().encode,
jpayne@7 111 decode=Codec().decode,
jpayne@7 112 incrementalencoder=IncrementalEncoder,
jpayne@7 113 incrementaldecoder=IncrementalDecoder,
jpayne@7 114 streamwriter=StreamWriter,
jpayne@7 115 streamreader=StreamReader,
jpayne@7 116 )
jpayne@7 117
jpayne@7 118 codecs.register(search_function)