comparison CSP2/CSP2_env/env-d9b9114564458d9d-741b3de822f2aaca6c6caa4325c4afce/lib/python3.8/site-packages/tqdm/utils.py @ 68:5028fdace37b

planemo upload commit 2e9511a184a1ca667c7be0c6321a36dc4e3d116d
author jpayne
date Tue, 18 Mar 2025 16:23:26 -0400
parents
children
comparison
equal deleted inserted replaced
67:0e9998148a16 68:5028fdace37b
1 """
2 General helpers required for `tqdm.std`.
3 """
4 import os
5 import re
6 import sys
7 from functools import partial, partialmethod, wraps
8 from inspect import signature
9 # TODO consider using wcswidth third-party package for 0-width characters
10 from unicodedata import east_asian_width
11 from warnings import warn
12 from weakref import proxy
13
14 _range, _unich, _unicode, _basestring = range, chr, str, str
15 CUR_OS = sys.platform
16 IS_WIN = any(CUR_OS.startswith(i) for i in ['win32', 'cygwin'])
17 IS_NIX = any(CUR_OS.startswith(i) for i in ['aix', 'linux', 'darwin', 'freebsd'])
18 RE_ANSI = re.compile(r"\x1b\[[;\d]*[A-Za-z]")
19
20 try:
21 if IS_WIN:
22 import colorama
23 else:
24 raise ImportError
25 except ImportError:
26 colorama = None
27 else:
28 try:
29 colorama.init(strip=False)
30 except TypeError:
31 colorama.init()
32
33
34 def envwrap(prefix, types=None, is_method=False):
35 """
36 Override parameter defaults via `os.environ[prefix + param_name]`.
37 Maps UPPER_CASE env vars map to lower_case param names.
38 camelCase isn't supported (because Windows ignores case).
39
40 Precedence (highest first):
41
42 - call (`foo(a=3)`)
43 - environ (`FOO_A=2`)
44 - signature (`def foo(a=1)`)
45
46 Parameters
47 ----------
48 prefix : str
49 Env var prefix, e.g. "FOO_"
50 types : dict, optional
51 Fallback mappings `{'param_name': type, ...}` if types cannot be
52 inferred from function signature.
53 Consider using `types=collections.defaultdict(lambda: ast.literal_eval)`.
54 is_method : bool, optional
55 Whether to use `functools.partialmethod`. If (default: False) use `functools.partial`.
56
57 Examples
58 --------
59 ```
60 $ cat foo.py
61 from tqdm.utils import envwrap
62 @envwrap("FOO_")
63 def test(a=1, b=2, c=3):
64 print(f"received: a={a}, b={b}, c={c}")
65
66 $ FOO_A=42 FOO_C=1337 python -c 'import foo; foo.test(c=99)'
67 received: a=42, b=2, c=99
68 ```
69 """
70 if types is None:
71 types = {}
72 i = len(prefix)
73 env_overrides = {k[i:].lower(): v for k, v in os.environ.items() if k.startswith(prefix)}
74 part = partialmethod if is_method else partial
75
76 def wrap(func):
77 params = signature(func).parameters
78 # ignore unknown env vars
79 overrides = {k: v for k, v in env_overrides.items() if k in params}
80 # infer overrides' `type`s
81 for k in overrides:
82 param = params[k]
83 if param.annotation is not param.empty: # typehints
84 for typ in getattr(param.annotation, '__args__', (param.annotation,)):
85 try:
86 overrides[k] = typ(overrides[k])
87 except Exception:
88 pass
89 else:
90 break
91 elif param.default is not None: # type of default value
92 overrides[k] = type(param.default)(overrides[k])
93 else:
94 try: # `types` fallback
95 overrides[k] = types[k](overrides[k])
96 except KeyError: # keep unconverted (`str`)
97 pass
98 return part(func, **overrides)
99 return wrap
100
101
102 class FormatReplace(object):
103 """
104 >>> a = FormatReplace('something')
105 >>> f"{a:5d}"
106 'something'
107 """ # NOQA: P102
108 def __init__(self, replace=''):
109 self.replace = replace
110 self.format_called = 0
111
112 def __format__(self, _):
113 self.format_called += 1
114 return self.replace
115
116
117 class Comparable(object):
118 """Assumes child has self._comparable attr/@property"""
119 def __lt__(self, other):
120 return self._comparable < other._comparable
121
122 def __le__(self, other):
123 return (self < other) or (self == other)
124
125 def __eq__(self, other):
126 return self._comparable == other._comparable
127
128 def __ne__(self, other):
129 return not self == other
130
131 def __gt__(self, other):
132 return not self <= other
133
134 def __ge__(self, other):
135 return not self < other
136
137
138 class ObjectWrapper(object):
139 def __getattr__(self, name):
140 return getattr(self._wrapped, name)
141
142 def __setattr__(self, name, value):
143 return setattr(self._wrapped, name, value)
144
145 def wrapper_getattr(self, name):
146 """Actual `self.getattr` rather than self._wrapped.getattr"""
147 try:
148 return object.__getattr__(self, name)
149 except AttributeError: # py2
150 return getattr(self, name)
151
152 def wrapper_setattr(self, name, value):
153 """Actual `self.setattr` rather than self._wrapped.setattr"""
154 return object.__setattr__(self, name, value)
155
156 def __init__(self, wrapped):
157 """
158 Thin wrapper around a given object
159 """
160 self.wrapper_setattr('_wrapped', wrapped)
161
162
163 class SimpleTextIOWrapper(ObjectWrapper):
164 """
165 Change only `.write()` of the wrapped object by encoding the passed
166 value and passing the result to the wrapped object's `.write()` method.
167 """
168 # pylint: disable=too-few-public-methods
169 def __init__(self, wrapped, encoding):
170 super().__init__(wrapped)
171 self.wrapper_setattr('encoding', encoding)
172
173 def write(self, s):
174 """
175 Encode `s` and pass to the wrapped object's `.write()` method.
176 """
177 return self._wrapped.write(s.encode(self.wrapper_getattr('encoding')))
178
179 def __eq__(self, other):
180 return self._wrapped == getattr(other, '_wrapped', other)
181
182
183 class DisableOnWriteError(ObjectWrapper):
184 """
185 Disable the given `tqdm_instance` upon `write()` or `flush()` errors.
186 """
187 @staticmethod
188 def disable_on_exception(tqdm_instance, func):
189 """
190 Quietly set `tqdm_instance.miniters=inf` if `func` raises `errno=5`.
191 """
192 tqdm_instance = proxy(tqdm_instance)
193
194 def inner(*args, **kwargs):
195 try:
196 return func(*args, **kwargs)
197 except OSError as e:
198 if e.errno != 5:
199 raise
200 try:
201 tqdm_instance.miniters = float('inf')
202 except ReferenceError:
203 pass
204 except ValueError as e:
205 if 'closed' not in str(e):
206 raise
207 try:
208 tqdm_instance.miniters = float('inf')
209 except ReferenceError:
210 pass
211 return inner
212
213 def __init__(self, wrapped, tqdm_instance):
214 super().__init__(wrapped)
215 if hasattr(wrapped, 'write'):
216 self.wrapper_setattr(
217 'write', self.disable_on_exception(tqdm_instance, wrapped.write))
218 if hasattr(wrapped, 'flush'):
219 self.wrapper_setattr(
220 'flush', self.disable_on_exception(tqdm_instance, wrapped.flush))
221
222 def __eq__(self, other):
223 return self._wrapped == getattr(other, '_wrapped', other)
224
225
226 class CallbackIOWrapper(ObjectWrapper):
227 def __init__(self, callback, stream, method="read"):
228 """
229 Wrap a given `file`-like object's `read()` or `write()` to report
230 lengths to the given `callback`
231 """
232 super().__init__(stream)
233 func = getattr(stream, method)
234 if method == "write":
235 @wraps(func)
236 def write(data, *args, **kwargs):
237 res = func(data, *args, **kwargs)
238 callback(len(data))
239 return res
240 self.wrapper_setattr('write', write)
241 elif method == "read":
242 @wraps(func)
243 def read(*args, **kwargs):
244 data = func(*args, **kwargs)
245 callback(len(data))
246 return data
247 self.wrapper_setattr('read', read)
248 else:
249 raise KeyError("Can only wrap read/write methods")
250
251
252 def _is_utf(encoding):
253 try:
254 u'\u2588\u2589'.encode(encoding)
255 except UnicodeEncodeError:
256 return False
257 except Exception:
258 try:
259 return encoding.lower().startswith('utf-') or ('U8' == encoding)
260 except Exception:
261 return False
262 else:
263 return True
264
265
266 def _supports_unicode(fp):
267 try:
268 return _is_utf(fp.encoding)
269 except AttributeError:
270 return False
271
272
273 def _is_ascii(s):
274 if isinstance(s, str):
275 for c in s:
276 if ord(c) > 255:
277 return False
278 return True
279 return _supports_unicode(s)
280
281
282 def _screen_shape_wrapper(): # pragma: no cover
283 """
284 Return a function which returns console dimensions (width, height).
285 Supported: linux, osx, windows, cygwin.
286 """
287 _screen_shape = None
288 if IS_WIN:
289 _screen_shape = _screen_shape_windows
290 if _screen_shape is None:
291 _screen_shape = _screen_shape_tput
292 if IS_NIX:
293 _screen_shape = _screen_shape_linux
294 return _screen_shape
295
296
297 def _screen_shape_windows(fp): # pragma: no cover
298 try:
299 import struct
300 from ctypes import create_string_buffer, windll
301 from sys import stdin, stdout
302
303 io_handle = -12 # assume stderr
304 if fp == stdin:
305 io_handle = -10
306 elif fp == stdout:
307 io_handle = -11
308
309 h = windll.kernel32.GetStdHandle(io_handle)
310 csbi = create_string_buffer(22)
311 res = windll.kernel32.GetConsoleScreenBufferInfo(h, csbi)
312 if res:
313 (_bufx, _bufy, _curx, _cury, _wattr, left, top, right, bottom,
314 _maxx, _maxy) = struct.unpack("hhhhHhhhhhh", csbi.raw)
315 return right - left, bottom - top # +1
316 except Exception: # nosec
317 pass
318 return None, None
319
320
321 def _screen_shape_tput(*_): # pragma: no cover
322 """cygwin xterm (windows)"""
323 try:
324 import shlex
325 from subprocess import check_call # nosec
326 return [int(check_call(shlex.split('tput ' + i))) - 1
327 for i in ('cols', 'lines')]
328 except Exception: # nosec
329 pass
330 return None, None
331
332
333 def _screen_shape_linux(fp): # pragma: no cover
334
335 try:
336 from array import array
337 from fcntl import ioctl
338 from termios import TIOCGWINSZ
339 except ImportError:
340 return None, None
341 else:
342 try:
343 rows, cols = array('h', ioctl(fp, TIOCGWINSZ, '\0' * 8))[:2]
344 return cols, rows
345 except Exception:
346 try:
347 return [int(os.environ[i]) - 1 for i in ("COLUMNS", "LINES")]
348 except (KeyError, ValueError):
349 return None, None
350
351
352 def _environ_cols_wrapper(): # pragma: no cover
353 """
354 Return a function which returns console width.
355 Supported: linux, osx, windows, cygwin.
356 """
357 warn("Use `_screen_shape_wrapper()(file)[0]` instead of"
358 " `_environ_cols_wrapper()(file)`", DeprecationWarning, stacklevel=2)
359 shape = _screen_shape_wrapper()
360 if not shape:
361 return None
362
363 @wraps(shape)
364 def inner(fp):
365 return shape(fp)[0]
366
367 return inner
368
369
370 def _term_move_up(): # pragma: no cover
371 return '' if (os.name == 'nt') and (colorama is None) else '\x1b[A'
372
373
374 def _text_width(s):
375 return sum(2 if east_asian_width(ch) in 'FW' else 1 for ch in str(s))
376
377
378 def disp_len(data):
379 """
380 Returns the real on-screen length of a string which may contain
381 ANSI control codes and wide chars.
382 """
383 return _text_width(RE_ANSI.sub('', data))
384
385
386 def disp_trim(data, length):
387 """
388 Trim a string which may contain ANSI control characters.
389 """
390 if len(data) == disp_len(data):
391 return data[:length]
392
393 ansi_present = bool(RE_ANSI.search(data))
394 while disp_len(data) > length: # carefully delete one char at a time
395 data = data[:-1]
396 if ansi_present and bool(RE_ANSI.search(data)):
397 # assume ANSI reset is required
398 return data if data.endswith("\033[0m") else data + "\033[0m"
399 return data