jpayne@68: """ jpayne@68: Module version for monitoring CLI pipes (`... | python -m tqdm | ...`). jpayne@68: """ jpayne@68: import logging jpayne@68: import re jpayne@68: import sys jpayne@68: from ast import literal_eval as numeric jpayne@68: from textwrap import indent jpayne@68: jpayne@68: from .std import TqdmKeyError, TqdmTypeError, tqdm jpayne@68: from .version import __version__ jpayne@68: jpayne@68: __all__ = ["main"] jpayne@68: log = logging.getLogger(__name__) jpayne@68: jpayne@68: jpayne@68: def cast(val, typ): jpayne@68: log.debug((val, typ)) jpayne@68: if " or " in typ: jpayne@68: for t in typ.split(" or "): jpayne@68: try: jpayne@68: return cast(val, t) jpayne@68: except TqdmTypeError: jpayne@68: pass jpayne@68: raise TqdmTypeError(f"{val} : {typ}") jpayne@68: jpayne@68: # sys.stderr.write('\ndebug | `val:type`: `' + val + ':' + typ + '`.\n') jpayne@68: if typ == 'bool': jpayne@68: if (val == 'True') or (val == ''): jpayne@68: return True jpayne@68: if val == 'False': jpayne@68: return False jpayne@68: raise TqdmTypeError(val + ' : ' + typ) jpayne@68: if typ == 'chr': jpayne@68: if len(val) == 1: jpayne@68: return val.encode() jpayne@68: if re.match(r"^\\\w+$", val): jpayne@68: return eval(f'"{val}"').encode() jpayne@68: raise TqdmTypeError(f"{val} : {typ}") jpayne@68: if typ == 'str': jpayne@68: return val jpayne@68: if typ == 'int': jpayne@68: try: jpayne@68: return int(val) jpayne@68: except ValueError as exc: jpayne@68: raise TqdmTypeError(f"{val} : {typ}") from exc jpayne@68: if typ == 'float': jpayne@68: try: jpayne@68: return float(val) jpayne@68: except ValueError as exc: jpayne@68: raise TqdmTypeError(f"{val} : {typ}") from exc jpayne@68: raise TqdmTypeError(f"{val} : {typ}") jpayne@68: jpayne@68: jpayne@68: def posix_pipe(fin, fout, delim=b'\\n', buf_size=256, jpayne@68: callback=lambda float: None, callback_len=True): jpayne@68: """ jpayne@68: Params jpayne@68: ------ jpayne@68: fin : binary file with `read(buf_size : int)` method jpayne@68: fout : binary file with `write` (and optionally `flush`) methods. jpayne@68: callback : function(float), e.g.: `tqdm.update` jpayne@68: callback_len : If (default: True) do `callback(len(buffer))`. jpayne@68: Otherwise, do `callback(data) for data in buffer.split(delim)`. jpayne@68: """ jpayne@68: fp_write = fout.write jpayne@68: jpayne@68: if not delim: jpayne@68: while True: jpayne@68: tmp = fin.read(buf_size) jpayne@68: jpayne@68: # flush at EOF jpayne@68: if not tmp: jpayne@68: getattr(fout, 'flush', lambda: None)() jpayne@68: return jpayne@68: jpayne@68: fp_write(tmp) jpayne@68: callback(len(tmp)) jpayne@68: # return jpayne@68: jpayne@68: buf = b'' jpayne@68: len_delim = len(delim) jpayne@68: # n = 0 jpayne@68: while True: jpayne@68: tmp = fin.read(buf_size) jpayne@68: jpayne@68: # flush at EOF jpayne@68: if not tmp: jpayne@68: if buf: jpayne@68: fp_write(buf) jpayne@68: if callback_len: jpayne@68: # n += 1 + buf.count(delim) jpayne@68: callback(1 + buf.count(delim)) jpayne@68: else: jpayne@68: for i in buf.split(delim): jpayne@68: callback(i) jpayne@68: getattr(fout, 'flush', lambda: None)() jpayne@68: return # n jpayne@68: jpayne@68: while True: jpayne@68: i = tmp.find(delim) jpayne@68: if i < 0: jpayne@68: buf += tmp jpayne@68: break jpayne@68: fp_write(buf + tmp[:i + len(delim)]) jpayne@68: # n += 1 jpayne@68: callback(1 if callback_len else (buf + tmp[:i])) jpayne@68: buf = b'' jpayne@68: tmp = tmp[i + len_delim:] jpayne@68: jpayne@68: jpayne@68: # ((opt, type), ... ) jpayne@68: RE_OPTS = re.compile(r'\n {4}(\S+)\s{2,}:\s*([^,]+)') jpayne@68: # better split method assuming no positional args jpayne@68: RE_SHLEX = re.compile(r'\s*(? : \2', d) jpayne@68: split = RE_OPTS.split(d) jpayne@68: opt_types_desc = zip(split[1::3], split[2::3], split[3::3]) jpayne@68: d = ''.join(('\n --{0} : {2}{3}' if otd[1] == 'bool' else jpayne@68: '\n --{0}=<{1}> : {2}{3}').format( jpayne@68: otd[0].replace('_', '-'), otd[0], *otd[1:]) jpayne@68: for otd in opt_types_desc if otd[0] not in UNSUPPORTED_OPTS) jpayne@68: jpayne@68: help_short = "Usage:\n tqdm [--help | options]\n" jpayne@68: d = help_short + """ jpayne@68: Options: jpayne@68: -h, --help Print this help and exit. jpayne@68: -v, --version Print version and exit. jpayne@68: """ + d.strip('\n') + '\n' jpayne@68: jpayne@68: # opts = docopt(d, version=__version__) jpayne@68: if any(v in argv for v in ('-v', '--version')): jpayne@68: sys.stdout.write(__version__ + '\n') jpayne@68: sys.exit(0) jpayne@68: elif any(v in argv for v in ('-h', '--help')): jpayne@68: sys.stdout.write(d + '\n') jpayne@68: sys.exit(0) jpayne@68: elif argv and argv[0][:2] != '--': jpayne@68: sys.stderr.write(f"Error:Unknown argument:{argv[0]}\n{help_short}") jpayne@68: jpayne@68: argv = RE_SHLEX.split(' '.join(["tqdm"] + argv)) jpayne@68: opts = dict(zip(argv[1::3], argv[3::3])) jpayne@68: jpayne@68: log.debug(opts) jpayne@68: opts.pop('log', True) jpayne@68: jpayne@68: tqdm_args = {'file': fp} jpayne@68: try: jpayne@68: for (o, v) in opts.items(): jpayne@68: o = o.replace('-', '_') jpayne@68: try: jpayne@68: tqdm_args[o] = cast(v, opt_types[o]) jpayne@68: except KeyError as e: jpayne@68: raise TqdmKeyError(str(e)) jpayne@68: log.debug('args:' + str(tqdm_args)) jpayne@68: jpayne@68: delim_per_char = tqdm_args.pop('bytes', False) jpayne@68: update = tqdm_args.pop('update', False) jpayne@68: update_to = tqdm_args.pop('update_to', False) jpayne@68: if sum((delim_per_char, update, update_to)) > 1: jpayne@68: raise TqdmKeyError("Can only have one of --bytes --update --update_to") jpayne@68: except Exception: jpayne@68: fp.write("\nError:\n" + help_short) jpayne@68: stdin, stdout_write = sys.stdin, sys.stdout.write jpayne@68: for i in stdin: jpayne@68: stdout_write(i) jpayne@68: raise jpayne@68: else: jpayne@68: buf_size = tqdm_args.pop('buf_size', 256) jpayne@68: delim = tqdm_args.pop('delim', b'\\n') jpayne@68: tee = tqdm_args.pop('tee', False) jpayne@68: manpath = tqdm_args.pop('manpath', None) jpayne@68: comppath = tqdm_args.pop('comppath', None) jpayne@68: if tqdm_args.pop('null', False): jpayne@68: class stdout(object): jpayne@68: @staticmethod jpayne@68: def write(_): jpayne@68: pass jpayne@68: else: jpayne@68: stdout = sys.stdout jpayne@68: stdout = getattr(stdout, 'buffer', stdout) jpayne@68: stdin = getattr(sys.stdin, 'buffer', sys.stdin) jpayne@68: if manpath or comppath: jpayne@68: try: # py<3.9 jpayne@68: import importlib_resources as resources jpayne@68: except ImportError: jpayne@68: from importlib import resources jpayne@68: from pathlib import Path jpayne@68: jpayne@68: def cp(name, dst): jpayne@68: """copy resource `name` to `dst`""" jpayne@68: fi = resources.files('tqdm') / name jpayne@68: dst.write_bytes(fi.read_bytes()) jpayne@68: log.info("written:%s", dst) jpayne@68: if manpath is not None: jpayne@68: cp('tqdm.1', Path(manpath) / 'tqdm.1') jpayne@68: if comppath is not None: jpayne@68: cp('completion.sh', Path(comppath) / 'tqdm_completion.sh') jpayne@68: sys.exit(0) jpayne@68: if tee: jpayne@68: stdout_write = stdout.write jpayne@68: fp_write = getattr(fp, 'buffer', fp).write jpayne@68: jpayne@68: class stdout(object): # pylint: disable=function-redefined jpayne@68: @staticmethod jpayne@68: def write(x): jpayne@68: with tqdm.external_write_mode(file=fp): jpayne@68: fp_write(x) jpayne@68: stdout_write(x) jpayne@68: if delim_per_char: jpayne@68: tqdm_args.setdefault('unit', 'B') jpayne@68: tqdm_args.setdefault('unit_scale', True) jpayne@68: tqdm_args.setdefault('unit_divisor', 1024) jpayne@68: log.debug(tqdm_args) jpayne@68: with tqdm(**tqdm_args) as t: jpayne@68: posix_pipe(stdin, stdout, '', buf_size, t.update) jpayne@68: elif delim == b'\\n': jpayne@68: log.debug(tqdm_args) jpayne@68: write = stdout.write jpayne@68: if update or update_to: jpayne@68: with tqdm(**tqdm_args) as t: jpayne@68: if update: jpayne@68: def callback(i): jpayne@68: t.update(numeric(i.decode())) jpayne@68: else: # update_to jpayne@68: def callback(i): jpayne@68: t.update(numeric(i.decode()) - t.n) jpayne@68: for i in stdin: jpayne@68: write(i) jpayne@68: callback(i) jpayne@68: else: jpayne@68: for i in tqdm(stdin, **tqdm_args): jpayne@68: write(i) jpayne@68: else: jpayne@68: log.debug(tqdm_args) jpayne@68: with tqdm(**tqdm_args) as t: jpayne@68: callback_len = False jpayne@68: if update: jpayne@68: def callback(i): jpayne@68: t.update(numeric(i.decode())) jpayne@68: elif update_to: jpayne@68: def callback(i): jpayne@68: t.update(numeric(i.decode()) - t.n) jpayne@68: else: jpayne@68: callback = t.update jpayne@68: callback_len = True jpayne@68: posix_pipe(stdin, stdout, delim, buf_size, callback, callback_len)