jpayne@68: """ jpayne@68: General helpers required for `tqdm.std`. jpayne@68: """ jpayne@68: import os jpayne@68: import re jpayne@68: import sys jpayne@68: from functools import partial, partialmethod, wraps jpayne@68: from inspect import signature jpayne@68: # TODO consider using wcswidth third-party package for 0-width characters jpayne@68: from unicodedata import east_asian_width jpayne@68: from warnings import warn jpayne@68: from weakref import proxy jpayne@68: jpayne@68: _range, _unich, _unicode, _basestring = range, chr, str, str jpayne@68: CUR_OS = sys.platform jpayne@68: IS_WIN = any(CUR_OS.startswith(i) for i in ['win32', 'cygwin']) jpayne@68: IS_NIX = any(CUR_OS.startswith(i) for i in ['aix', 'linux', 'darwin', 'freebsd']) jpayne@68: RE_ANSI = re.compile(r"\x1b\[[;\d]*[A-Za-z]") jpayne@68: jpayne@68: try: jpayne@68: if IS_WIN: jpayne@68: import colorama jpayne@68: else: jpayne@68: raise ImportError jpayne@68: except ImportError: jpayne@68: colorama = None jpayne@68: else: jpayne@68: try: jpayne@68: colorama.init(strip=False) jpayne@68: except TypeError: jpayne@68: colorama.init() jpayne@68: jpayne@68: jpayne@68: def envwrap(prefix, types=None, is_method=False): jpayne@68: """ jpayne@68: Override parameter defaults via `os.environ[prefix + param_name]`. jpayne@68: Maps UPPER_CASE env vars map to lower_case param names. jpayne@68: camelCase isn't supported (because Windows ignores case). jpayne@68: jpayne@68: Precedence (highest first): jpayne@68: jpayne@68: - call (`foo(a=3)`) jpayne@68: - environ (`FOO_A=2`) jpayne@68: - signature (`def foo(a=1)`) jpayne@68: jpayne@68: Parameters jpayne@68: ---------- jpayne@68: prefix : str jpayne@68: Env var prefix, e.g. "FOO_" jpayne@68: types : dict, optional jpayne@68: Fallback mappings `{'param_name': type, ...}` if types cannot be jpayne@68: inferred from function signature. jpayne@68: Consider using `types=collections.defaultdict(lambda: ast.literal_eval)`. jpayne@68: is_method : bool, optional jpayne@68: Whether to use `functools.partialmethod`. If (default: False) use `functools.partial`. jpayne@68: jpayne@68: Examples jpayne@68: -------- jpayne@68: ``` jpayne@68: $ cat foo.py jpayne@68: from tqdm.utils import envwrap jpayne@68: @envwrap("FOO_") jpayne@68: def test(a=1, b=2, c=3): jpayne@68: print(f"received: a={a}, b={b}, c={c}") jpayne@68: jpayne@68: $ FOO_A=42 FOO_C=1337 python -c 'import foo; foo.test(c=99)' jpayne@68: received: a=42, b=2, c=99 jpayne@68: ``` jpayne@68: """ jpayne@68: if types is None: jpayne@68: types = {} jpayne@68: i = len(prefix) jpayne@68: env_overrides = {k[i:].lower(): v for k, v in os.environ.items() if k.startswith(prefix)} jpayne@68: part = partialmethod if is_method else partial jpayne@68: jpayne@68: def wrap(func): jpayne@68: params = signature(func).parameters jpayne@68: # ignore unknown env vars jpayne@68: overrides = {k: v for k, v in env_overrides.items() if k in params} jpayne@68: # infer overrides' `type`s jpayne@68: for k in overrides: jpayne@68: param = params[k] jpayne@68: if param.annotation is not param.empty: # typehints jpayne@68: for typ in getattr(param.annotation, '__args__', (param.annotation,)): jpayne@68: try: jpayne@68: overrides[k] = typ(overrides[k]) jpayne@68: except Exception: jpayne@68: pass jpayne@68: else: jpayne@68: break jpayne@68: elif param.default is not None: # type of default value jpayne@68: overrides[k] = type(param.default)(overrides[k]) jpayne@68: else: jpayne@68: try: # `types` fallback jpayne@68: overrides[k] = types[k](overrides[k]) jpayne@68: except KeyError: # keep unconverted (`str`) jpayne@68: pass jpayne@68: return part(func, **overrides) jpayne@68: return wrap jpayne@68: jpayne@68: jpayne@68: class FormatReplace(object): jpayne@68: """ jpayne@68: >>> a = FormatReplace('something') jpayne@68: >>> f"{a:5d}" jpayne@68: 'something' jpayne@68: """ # NOQA: P102 jpayne@68: def __init__(self, replace=''): jpayne@68: self.replace = replace jpayne@68: self.format_called = 0 jpayne@68: jpayne@68: def __format__(self, _): jpayne@68: self.format_called += 1 jpayne@68: return self.replace jpayne@68: jpayne@68: jpayne@68: class Comparable(object): jpayne@68: """Assumes child has self._comparable attr/@property""" jpayne@68: def __lt__(self, other): jpayne@68: return self._comparable < other._comparable jpayne@68: jpayne@68: def __le__(self, other): jpayne@68: return (self < other) or (self == other) jpayne@68: jpayne@68: def __eq__(self, other): jpayne@68: return self._comparable == other._comparable jpayne@68: jpayne@68: def __ne__(self, other): jpayne@68: return not self == other jpayne@68: jpayne@68: def __gt__(self, other): jpayne@68: return not self <= other jpayne@68: jpayne@68: def __ge__(self, other): jpayne@68: return not self < other jpayne@68: jpayne@68: jpayne@68: class ObjectWrapper(object): jpayne@68: def __getattr__(self, name): jpayne@68: return getattr(self._wrapped, name) jpayne@68: jpayne@68: def __setattr__(self, name, value): jpayne@68: return setattr(self._wrapped, name, value) jpayne@68: jpayne@68: def wrapper_getattr(self, name): jpayne@68: """Actual `self.getattr` rather than self._wrapped.getattr""" jpayne@68: try: jpayne@68: return object.__getattr__(self, name) jpayne@68: except AttributeError: # py2 jpayne@68: return getattr(self, name) jpayne@68: jpayne@68: def wrapper_setattr(self, name, value): jpayne@68: """Actual `self.setattr` rather than self._wrapped.setattr""" jpayne@68: return object.__setattr__(self, name, value) jpayne@68: jpayne@68: def __init__(self, wrapped): jpayne@68: """ jpayne@68: Thin wrapper around a given object jpayne@68: """ jpayne@68: self.wrapper_setattr('_wrapped', wrapped) jpayne@68: jpayne@68: jpayne@68: class SimpleTextIOWrapper(ObjectWrapper): jpayne@68: """ jpayne@68: Change only `.write()` of the wrapped object by encoding the passed jpayne@68: value and passing the result to the wrapped object's `.write()` method. jpayne@68: """ jpayne@68: # pylint: disable=too-few-public-methods jpayne@68: def __init__(self, wrapped, encoding): jpayne@68: super().__init__(wrapped) jpayne@68: self.wrapper_setattr('encoding', encoding) jpayne@68: jpayne@68: def write(self, s): jpayne@68: """ jpayne@68: Encode `s` and pass to the wrapped object's `.write()` method. jpayne@68: """ jpayne@68: return self._wrapped.write(s.encode(self.wrapper_getattr('encoding'))) jpayne@68: jpayne@68: def __eq__(self, other): jpayne@68: return self._wrapped == getattr(other, '_wrapped', other) jpayne@68: jpayne@68: jpayne@68: class DisableOnWriteError(ObjectWrapper): jpayne@68: """ jpayne@68: Disable the given `tqdm_instance` upon `write()` or `flush()` errors. jpayne@68: """ jpayne@68: @staticmethod jpayne@68: def disable_on_exception(tqdm_instance, func): jpayne@68: """ jpayne@68: Quietly set `tqdm_instance.miniters=inf` if `func` raises `errno=5`. jpayne@68: """ jpayne@68: tqdm_instance = proxy(tqdm_instance) jpayne@68: jpayne@68: def inner(*args, **kwargs): jpayne@68: try: jpayne@68: return func(*args, **kwargs) jpayne@68: except OSError as e: jpayne@68: if e.errno != 5: jpayne@68: raise jpayne@68: try: jpayne@68: tqdm_instance.miniters = float('inf') jpayne@68: except ReferenceError: jpayne@68: pass jpayne@68: except ValueError as e: jpayne@68: if 'closed' not in str(e): jpayne@68: raise jpayne@68: try: jpayne@68: tqdm_instance.miniters = float('inf') jpayne@68: except ReferenceError: jpayne@68: pass jpayne@68: return inner jpayne@68: jpayne@68: def __init__(self, wrapped, tqdm_instance): jpayne@68: super().__init__(wrapped) jpayne@68: if hasattr(wrapped, 'write'): jpayne@68: self.wrapper_setattr( jpayne@68: 'write', self.disable_on_exception(tqdm_instance, wrapped.write)) jpayne@68: if hasattr(wrapped, 'flush'): jpayne@68: self.wrapper_setattr( jpayne@68: 'flush', self.disable_on_exception(tqdm_instance, wrapped.flush)) jpayne@68: jpayne@68: def __eq__(self, other): jpayne@68: return self._wrapped == getattr(other, '_wrapped', other) jpayne@68: jpayne@68: jpayne@68: class CallbackIOWrapper(ObjectWrapper): jpayne@68: def __init__(self, callback, stream, method="read"): jpayne@68: """ jpayne@68: Wrap a given `file`-like object's `read()` or `write()` to report jpayne@68: lengths to the given `callback` jpayne@68: """ jpayne@68: super().__init__(stream) jpayne@68: func = getattr(stream, method) jpayne@68: if method == "write": jpayne@68: @wraps(func) jpayne@68: def write(data, *args, **kwargs): jpayne@68: res = func(data, *args, **kwargs) jpayne@68: callback(len(data)) jpayne@68: return res jpayne@68: self.wrapper_setattr('write', write) jpayne@68: elif method == "read": jpayne@68: @wraps(func) jpayne@68: def read(*args, **kwargs): jpayne@68: data = func(*args, **kwargs) jpayne@68: callback(len(data)) jpayne@68: return data jpayne@68: self.wrapper_setattr('read', read) jpayne@68: else: jpayne@68: raise KeyError("Can only wrap read/write methods") jpayne@68: jpayne@68: jpayne@68: def _is_utf(encoding): jpayne@68: try: jpayne@68: u'\u2588\u2589'.encode(encoding) jpayne@68: except UnicodeEncodeError: jpayne@68: return False jpayne@68: except Exception: jpayne@68: try: jpayne@68: return encoding.lower().startswith('utf-') or ('U8' == encoding) jpayne@68: except Exception: jpayne@68: return False jpayne@68: else: jpayne@68: return True jpayne@68: jpayne@68: jpayne@68: def _supports_unicode(fp): jpayne@68: try: jpayne@68: return _is_utf(fp.encoding) jpayne@68: except AttributeError: jpayne@68: return False jpayne@68: jpayne@68: jpayne@68: def _is_ascii(s): jpayne@68: if isinstance(s, str): jpayne@68: for c in s: jpayne@68: if ord(c) > 255: jpayne@68: return False jpayne@68: return True jpayne@68: return _supports_unicode(s) jpayne@68: jpayne@68: jpayne@68: def _screen_shape_wrapper(): # pragma: no cover jpayne@68: """ jpayne@68: Return a function which returns console dimensions (width, height). jpayne@68: Supported: linux, osx, windows, cygwin. jpayne@68: """ jpayne@68: _screen_shape = None jpayne@68: if IS_WIN: jpayne@68: _screen_shape = _screen_shape_windows jpayne@68: if _screen_shape is None: jpayne@68: _screen_shape = _screen_shape_tput jpayne@68: if IS_NIX: jpayne@68: _screen_shape = _screen_shape_linux jpayne@68: return _screen_shape jpayne@68: jpayne@68: jpayne@68: def _screen_shape_windows(fp): # pragma: no cover jpayne@68: try: jpayne@68: import struct jpayne@68: from ctypes import create_string_buffer, windll jpayne@68: from sys import stdin, stdout jpayne@68: jpayne@68: io_handle = -12 # assume stderr jpayne@68: if fp == stdin: jpayne@68: io_handle = -10 jpayne@68: elif fp == stdout: jpayne@68: io_handle = -11 jpayne@68: jpayne@68: h = windll.kernel32.GetStdHandle(io_handle) jpayne@68: csbi = create_string_buffer(22) jpayne@68: res = windll.kernel32.GetConsoleScreenBufferInfo(h, csbi) jpayne@68: if res: jpayne@68: (_bufx, _bufy, _curx, _cury, _wattr, left, top, right, bottom, jpayne@68: _maxx, _maxy) = struct.unpack("hhhhHhhhhhh", csbi.raw) jpayne@68: return right - left, bottom - top # +1 jpayne@68: except Exception: # nosec jpayne@68: pass jpayne@68: return None, None jpayne@68: jpayne@68: jpayne@68: def _screen_shape_tput(*_): # pragma: no cover jpayne@68: """cygwin xterm (windows)""" jpayne@68: try: jpayne@68: import shlex jpayne@68: from subprocess import check_call # nosec jpayne@68: return [int(check_call(shlex.split('tput ' + i))) - 1 jpayne@68: for i in ('cols', 'lines')] jpayne@68: except Exception: # nosec jpayne@68: pass jpayne@68: return None, None jpayne@68: jpayne@68: jpayne@68: def _screen_shape_linux(fp): # pragma: no cover jpayne@68: jpayne@68: try: jpayne@68: from array import array jpayne@68: from fcntl import ioctl jpayne@68: from termios import TIOCGWINSZ jpayne@68: except ImportError: jpayne@68: return None, None jpayne@68: else: jpayne@68: try: jpayne@68: rows, cols = array('h', ioctl(fp, TIOCGWINSZ, '\0' * 8))[:2] jpayne@68: return cols, rows jpayne@68: except Exception: jpayne@68: try: jpayne@68: return [int(os.environ[i]) - 1 for i in ("COLUMNS", "LINES")] jpayne@68: except (KeyError, ValueError): jpayne@68: return None, None jpayne@68: jpayne@68: jpayne@68: def _environ_cols_wrapper(): # pragma: no cover jpayne@68: """ jpayne@68: Return a function which returns console width. jpayne@68: Supported: linux, osx, windows, cygwin. jpayne@68: """ jpayne@68: warn("Use `_screen_shape_wrapper()(file)[0]` instead of" jpayne@68: " `_environ_cols_wrapper()(file)`", DeprecationWarning, stacklevel=2) jpayne@68: shape = _screen_shape_wrapper() jpayne@68: if not shape: jpayne@68: return None jpayne@68: jpayne@68: @wraps(shape) jpayne@68: def inner(fp): jpayne@68: return shape(fp)[0] jpayne@68: jpayne@68: return inner jpayne@68: jpayne@68: jpayne@68: def _term_move_up(): # pragma: no cover jpayne@68: return '' if (os.name == 'nt') and (colorama is None) else '\x1b[A' jpayne@68: jpayne@68: jpayne@68: def _text_width(s): jpayne@68: return sum(2 if east_asian_width(ch) in 'FW' else 1 for ch in str(s)) jpayne@68: jpayne@68: jpayne@68: def disp_len(data): jpayne@68: """ jpayne@68: Returns the real on-screen length of a string which may contain jpayne@68: ANSI control codes and wide chars. jpayne@68: """ jpayne@68: return _text_width(RE_ANSI.sub('', data)) jpayne@68: jpayne@68: jpayne@68: def disp_trim(data, length): jpayne@68: """ jpayne@68: Trim a string which may contain ANSI control characters. jpayne@68: """ jpayne@68: if len(data) == disp_len(data): jpayne@68: return data[:length] jpayne@68: jpayne@68: ansi_present = bool(RE_ANSI.search(data)) jpayne@68: while disp_len(data) > length: # carefully delete one char at a time jpayne@68: data = data[:-1] jpayne@68: if ansi_present and bool(RE_ANSI.search(data)): jpayne@68: # assume ANSI reset is required jpayne@68: return data if data.endswith("\033[0m") else data + "\033[0m" jpayne@68: return data