Mercurial > repos > rliterman > csp2
comparison CSP2/CSP2_env/env-d9b9114564458d9d-741b3de822f2aaca6c6caa4325c4afce/lib/python3.8/site-packages/tqdm/utils.py @ 68:5028fdace37b
planemo upload commit 2e9511a184a1ca667c7be0c6321a36dc4e3d116d
author | jpayne |
---|---|
date | Tue, 18 Mar 2025 16:23:26 -0400 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
67:0e9998148a16 | 68:5028fdace37b |
---|---|
1 """ | |
2 General helpers required for `tqdm.std`. | |
3 """ | |
4 import os | |
5 import re | |
6 import sys | |
7 from functools import partial, partialmethod, wraps | |
8 from inspect import signature | |
9 # TODO consider using wcswidth third-party package for 0-width characters | |
10 from unicodedata import east_asian_width | |
11 from warnings import warn | |
12 from weakref import proxy | |
13 | |
14 _range, _unich, _unicode, _basestring = range, chr, str, str | |
15 CUR_OS = sys.platform | |
16 IS_WIN = any(CUR_OS.startswith(i) for i in ['win32', 'cygwin']) | |
17 IS_NIX = any(CUR_OS.startswith(i) for i in ['aix', 'linux', 'darwin', 'freebsd']) | |
18 RE_ANSI = re.compile(r"\x1b\[[;\d]*[A-Za-z]") | |
19 | |
20 try: | |
21 if IS_WIN: | |
22 import colorama | |
23 else: | |
24 raise ImportError | |
25 except ImportError: | |
26 colorama = None | |
27 else: | |
28 try: | |
29 colorama.init(strip=False) | |
30 except TypeError: | |
31 colorama.init() | |
32 | |
33 | |
34 def envwrap(prefix, types=None, is_method=False): | |
35 """ | |
36 Override parameter defaults via `os.environ[prefix + param_name]`. | |
37 Maps UPPER_CASE env vars map to lower_case param names. | |
38 camelCase isn't supported (because Windows ignores case). | |
39 | |
40 Precedence (highest first): | |
41 | |
42 - call (`foo(a=3)`) | |
43 - environ (`FOO_A=2`) | |
44 - signature (`def foo(a=1)`) | |
45 | |
46 Parameters | |
47 ---------- | |
48 prefix : str | |
49 Env var prefix, e.g. "FOO_" | |
50 types : dict, optional | |
51 Fallback mappings `{'param_name': type, ...}` if types cannot be | |
52 inferred from function signature. | |
53 Consider using `types=collections.defaultdict(lambda: ast.literal_eval)`. | |
54 is_method : bool, optional | |
55 Whether to use `functools.partialmethod`. If (default: False) use `functools.partial`. | |
56 | |
57 Examples | |
58 -------- | |
59 ``` | |
60 $ cat foo.py | |
61 from tqdm.utils import envwrap | |
62 @envwrap("FOO_") | |
63 def test(a=1, b=2, c=3): | |
64 print(f"received: a={a}, b={b}, c={c}") | |
65 | |
66 $ FOO_A=42 FOO_C=1337 python -c 'import foo; foo.test(c=99)' | |
67 received: a=42, b=2, c=99 | |
68 ``` | |
69 """ | |
70 if types is None: | |
71 types = {} | |
72 i = len(prefix) | |
73 env_overrides = {k[i:].lower(): v for k, v in os.environ.items() if k.startswith(prefix)} | |
74 part = partialmethod if is_method else partial | |
75 | |
76 def wrap(func): | |
77 params = signature(func).parameters | |
78 # ignore unknown env vars | |
79 overrides = {k: v for k, v in env_overrides.items() if k in params} | |
80 # infer overrides' `type`s | |
81 for k in overrides: | |
82 param = params[k] | |
83 if param.annotation is not param.empty: # typehints | |
84 for typ in getattr(param.annotation, '__args__', (param.annotation,)): | |
85 try: | |
86 overrides[k] = typ(overrides[k]) | |
87 except Exception: | |
88 pass | |
89 else: | |
90 break | |
91 elif param.default is not None: # type of default value | |
92 overrides[k] = type(param.default)(overrides[k]) | |
93 else: | |
94 try: # `types` fallback | |
95 overrides[k] = types[k](overrides[k]) | |
96 except KeyError: # keep unconverted (`str`) | |
97 pass | |
98 return part(func, **overrides) | |
99 return wrap | |
100 | |
101 | |
102 class FormatReplace(object): | |
103 """ | |
104 >>> a = FormatReplace('something') | |
105 >>> f"{a:5d}" | |
106 'something' | |
107 """ # NOQA: P102 | |
108 def __init__(self, replace=''): | |
109 self.replace = replace | |
110 self.format_called = 0 | |
111 | |
112 def __format__(self, _): | |
113 self.format_called += 1 | |
114 return self.replace | |
115 | |
116 | |
117 class Comparable(object): | |
118 """Assumes child has self._comparable attr/@property""" | |
119 def __lt__(self, other): | |
120 return self._comparable < other._comparable | |
121 | |
122 def __le__(self, other): | |
123 return (self < other) or (self == other) | |
124 | |
125 def __eq__(self, other): | |
126 return self._comparable == other._comparable | |
127 | |
128 def __ne__(self, other): | |
129 return not self == other | |
130 | |
131 def __gt__(self, other): | |
132 return not self <= other | |
133 | |
134 def __ge__(self, other): | |
135 return not self < other | |
136 | |
137 | |
138 class ObjectWrapper(object): | |
139 def __getattr__(self, name): | |
140 return getattr(self._wrapped, name) | |
141 | |
142 def __setattr__(self, name, value): | |
143 return setattr(self._wrapped, name, value) | |
144 | |
145 def wrapper_getattr(self, name): | |
146 """Actual `self.getattr` rather than self._wrapped.getattr""" | |
147 try: | |
148 return object.__getattr__(self, name) | |
149 except AttributeError: # py2 | |
150 return getattr(self, name) | |
151 | |
152 def wrapper_setattr(self, name, value): | |
153 """Actual `self.setattr` rather than self._wrapped.setattr""" | |
154 return object.__setattr__(self, name, value) | |
155 | |
156 def __init__(self, wrapped): | |
157 """ | |
158 Thin wrapper around a given object | |
159 """ | |
160 self.wrapper_setattr('_wrapped', wrapped) | |
161 | |
162 | |
163 class SimpleTextIOWrapper(ObjectWrapper): | |
164 """ | |
165 Change only `.write()` of the wrapped object by encoding the passed | |
166 value and passing the result to the wrapped object's `.write()` method. | |
167 """ | |
168 # pylint: disable=too-few-public-methods | |
169 def __init__(self, wrapped, encoding): | |
170 super().__init__(wrapped) | |
171 self.wrapper_setattr('encoding', encoding) | |
172 | |
173 def write(self, s): | |
174 """ | |
175 Encode `s` and pass to the wrapped object's `.write()` method. | |
176 """ | |
177 return self._wrapped.write(s.encode(self.wrapper_getattr('encoding'))) | |
178 | |
179 def __eq__(self, other): | |
180 return self._wrapped == getattr(other, '_wrapped', other) | |
181 | |
182 | |
183 class DisableOnWriteError(ObjectWrapper): | |
184 """ | |
185 Disable the given `tqdm_instance` upon `write()` or `flush()` errors. | |
186 """ | |
187 @staticmethod | |
188 def disable_on_exception(tqdm_instance, func): | |
189 """ | |
190 Quietly set `tqdm_instance.miniters=inf` if `func` raises `errno=5`. | |
191 """ | |
192 tqdm_instance = proxy(tqdm_instance) | |
193 | |
194 def inner(*args, **kwargs): | |
195 try: | |
196 return func(*args, **kwargs) | |
197 except OSError as e: | |
198 if e.errno != 5: | |
199 raise | |
200 try: | |
201 tqdm_instance.miniters = float('inf') | |
202 except ReferenceError: | |
203 pass | |
204 except ValueError as e: | |
205 if 'closed' not in str(e): | |
206 raise | |
207 try: | |
208 tqdm_instance.miniters = float('inf') | |
209 except ReferenceError: | |
210 pass | |
211 return inner | |
212 | |
213 def __init__(self, wrapped, tqdm_instance): | |
214 super().__init__(wrapped) | |
215 if hasattr(wrapped, 'write'): | |
216 self.wrapper_setattr( | |
217 'write', self.disable_on_exception(tqdm_instance, wrapped.write)) | |
218 if hasattr(wrapped, 'flush'): | |
219 self.wrapper_setattr( | |
220 'flush', self.disable_on_exception(tqdm_instance, wrapped.flush)) | |
221 | |
222 def __eq__(self, other): | |
223 return self._wrapped == getattr(other, '_wrapped', other) | |
224 | |
225 | |
226 class CallbackIOWrapper(ObjectWrapper): | |
227 def __init__(self, callback, stream, method="read"): | |
228 """ | |
229 Wrap a given `file`-like object's `read()` or `write()` to report | |
230 lengths to the given `callback` | |
231 """ | |
232 super().__init__(stream) | |
233 func = getattr(stream, method) | |
234 if method == "write": | |
235 @wraps(func) | |
236 def write(data, *args, **kwargs): | |
237 res = func(data, *args, **kwargs) | |
238 callback(len(data)) | |
239 return res | |
240 self.wrapper_setattr('write', write) | |
241 elif method == "read": | |
242 @wraps(func) | |
243 def read(*args, **kwargs): | |
244 data = func(*args, **kwargs) | |
245 callback(len(data)) | |
246 return data | |
247 self.wrapper_setattr('read', read) | |
248 else: | |
249 raise KeyError("Can only wrap read/write methods") | |
250 | |
251 | |
252 def _is_utf(encoding): | |
253 try: | |
254 u'\u2588\u2589'.encode(encoding) | |
255 except UnicodeEncodeError: | |
256 return False | |
257 except Exception: | |
258 try: | |
259 return encoding.lower().startswith('utf-') or ('U8' == encoding) | |
260 except Exception: | |
261 return False | |
262 else: | |
263 return True | |
264 | |
265 | |
266 def _supports_unicode(fp): | |
267 try: | |
268 return _is_utf(fp.encoding) | |
269 except AttributeError: | |
270 return False | |
271 | |
272 | |
273 def _is_ascii(s): | |
274 if isinstance(s, str): | |
275 for c in s: | |
276 if ord(c) > 255: | |
277 return False | |
278 return True | |
279 return _supports_unicode(s) | |
280 | |
281 | |
282 def _screen_shape_wrapper(): # pragma: no cover | |
283 """ | |
284 Return a function which returns console dimensions (width, height). | |
285 Supported: linux, osx, windows, cygwin. | |
286 """ | |
287 _screen_shape = None | |
288 if IS_WIN: | |
289 _screen_shape = _screen_shape_windows | |
290 if _screen_shape is None: | |
291 _screen_shape = _screen_shape_tput | |
292 if IS_NIX: | |
293 _screen_shape = _screen_shape_linux | |
294 return _screen_shape | |
295 | |
296 | |
297 def _screen_shape_windows(fp): # pragma: no cover | |
298 try: | |
299 import struct | |
300 from ctypes import create_string_buffer, windll | |
301 from sys import stdin, stdout | |
302 | |
303 io_handle = -12 # assume stderr | |
304 if fp == stdin: | |
305 io_handle = -10 | |
306 elif fp == stdout: | |
307 io_handle = -11 | |
308 | |
309 h = windll.kernel32.GetStdHandle(io_handle) | |
310 csbi = create_string_buffer(22) | |
311 res = windll.kernel32.GetConsoleScreenBufferInfo(h, csbi) | |
312 if res: | |
313 (_bufx, _bufy, _curx, _cury, _wattr, left, top, right, bottom, | |
314 _maxx, _maxy) = struct.unpack("hhhhHhhhhhh", csbi.raw) | |
315 return right - left, bottom - top # +1 | |
316 except Exception: # nosec | |
317 pass | |
318 return None, None | |
319 | |
320 | |
321 def _screen_shape_tput(*_): # pragma: no cover | |
322 """cygwin xterm (windows)""" | |
323 try: | |
324 import shlex | |
325 from subprocess import check_call # nosec | |
326 return [int(check_call(shlex.split('tput ' + i))) - 1 | |
327 for i in ('cols', 'lines')] | |
328 except Exception: # nosec | |
329 pass | |
330 return None, None | |
331 | |
332 | |
333 def _screen_shape_linux(fp): # pragma: no cover | |
334 | |
335 try: | |
336 from array import array | |
337 from fcntl import ioctl | |
338 from termios import TIOCGWINSZ | |
339 except ImportError: | |
340 return None, None | |
341 else: | |
342 try: | |
343 rows, cols = array('h', ioctl(fp, TIOCGWINSZ, '\0' * 8))[:2] | |
344 return cols, rows | |
345 except Exception: | |
346 try: | |
347 return [int(os.environ[i]) - 1 for i in ("COLUMNS", "LINES")] | |
348 except (KeyError, ValueError): | |
349 return None, None | |
350 | |
351 | |
352 def _environ_cols_wrapper(): # pragma: no cover | |
353 """ | |
354 Return a function which returns console width. | |
355 Supported: linux, osx, windows, cygwin. | |
356 """ | |
357 warn("Use `_screen_shape_wrapper()(file)[0]` instead of" | |
358 " `_environ_cols_wrapper()(file)`", DeprecationWarning, stacklevel=2) | |
359 shape = _screen_shape_wrapper() | |
360 if not shape: | |
361 return None | |
362 | |
363 @wraps(shape) | |
364 def inner(fp): | |
365 return shape(fp)[0] | |
366 | |
367 return inner | |
368 | |
369 | |
370 def _term_move_up(): # pragma: no cover | |
371 return '' if (os.name == 'nt') and (colorama is None) else '\x1b[A' | |
372 | |
373 | |
374 def _text_width(s): | |
375 return sum(2 if east_asian_width(ch) in 'FW' else 1 for ch in str(s)) | |
376 | |
377 | |
378 def disp_len(data): | |
379 """ | |
380 Returns the real on-screen length of a string which may contain | |
381 ANSI control codes and wide chars. | |
382 """ | |
383 return _text_width(RE_ANSI.sub('', data)) | |
384 | |
385 | |
386 def disp_trim(data, length): | |
387 """ | |
388 Trim a string which may contain ANSI control characters. | |
389 """ | |
390 if len(data) == disp_len(data): | |
391 return data[:length] | |
392 | |
393 ansi_present = bool(RE_ANSI.search(data)) | |
394 while disp_len(data) > length: # carefully delete one char at a time | |
395 data = data[:-1] | |
396 if ansi_present and bool(RE_ANSI.search(data)): | |
397 # assume ANSI reset is required | |
398 return data if data.endswith("\033[0m") else data + "\033[0m" | |
399 return data |