Mercurial > repos > rliterman > csp2
diff CSP2/CSP2_env/env-d9b9114564458d9d-741b3de822f2aaca6c6caa4325c4afce/lib/python3.8/site-packages/tqdm/cli.py @ 68:5028fdace37b
planemo upload commit 2e9511a184a1ca667c7be0c6321a36dc4e3d116d
author | jpayne |
---|---|
date | Tue, 18 Mar 2025 16:23:26 -0400 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/CSP2/CSP2_env/env-d9b9114564458d9d-741b3de822f2aaca6c6caa4325c4afce/lib/python3.8/site-packages/tqdm/cli.py Tue Mar 18 16:23:26 2025 -0400 @@ -0,0 +1,324 @@ +""" +Module version for monitoring CLI pipes (`... | python -m tqdm | ...`). +""" +import logging +import re +import sys +from ast import literal_eval as numeric +from textwrap import indent + +from .std import TqdmKeyError, TqdmTypeError, tqdm +from .version import __version__ + +__all__ = ["main"] +log = logging.getLogger(__name__) + + +def cast(val, typ): + log.debug((val, typ)) + if " or " in typ: + for t in typ.split(" or "): + try: + return cast(val, t) + except TqdmTypeError: + pass + raise TqdmTypeError(f"{val} : {typ}") + + # sys.stderr.write('\ndebug | `val:type`: `' + val + ':' + typ + '`.\n') + if typ == 'bool': + if (val == 'True') or (val == ''): + return True + if val == 'False': + return False + raise TqdmTypeError(val + ' : ' + typ) + if typ == 'chr': + if len(val) == 1: + return val.encode() + if re.match(r"^\\\w+$", val): + return eval(f'"{val}"').encode() + raise TqdmTypeError(f"{val} : {typ}") + if typ == 'str': + return val + if typ == 'int': + try: + return int(val) + except ValueError as exc: + raise TqdmTypeError(f"{val} : {typ}") from exc + if typ == 'float': + try: + return float(val) + except ValueError as exc: + raise TqdmTypeError(f"{val} : {typ}") from exc + raise TqdmTypeError(f"{val} : {typ}") + + +def posix_pipe(fin, fout, delim=b'\\n', buf_size=256, + callback=lambda float: None, callback_len=True): + """ + Params + ------ + fin : binary file with `read(buf_size : int)` method + fout : binary file with `write` (and optionally `flush`) methods. + callback : function(float), e.g.: `tqdm.update` + callback_len : If (default: True) do `callback(len(buffer))`. + Otherwise, do `callback(data) for data in buffer.split(delim)`. + """ + fp_write = fout.write + + if not delim: + while True: + tmp = fin.read(buf_size) + + # flush at EOF + if not tmp: + getattr(fout, 'flush', lambda: None)() + return + + fp_write(tmp) + callback(len(tmp)) + # return + + buf = b'' + len_delim = len(delim) + # n = 0 + while True: + tmp = fin.read(buf_size) + + # flush at EOF + if not tmp: + if buf: + fp_write(buf) + if callback_len: + # n += 1 + buf.count(delim) + callback(1 + buf.count(delim)) + else: + for i in buf.split(delim): + callback(i) + getattr(fout, 'flush', lambda: None)() + return # n + + while True: + i = tmp.find(delim) + if i < 0: + buf += tmp + break + fp_write(buf + tmp[:i + len(delim)]) + # n += 1 + callback(1 if callback_len else (buf + tmp[:i])) + buf = b'' + tmp = tmp[i + len_delim:] + + +# ((opt, type), ... ) +RE_OPTS = re.compile(r'\n {4}(\S+)\s{2,}:\s*([^,]+)') +# better split method assuming no positional args +RE_SHLEX = re.compile(r'\s*(?<!\S)--?([^\s=]+)(\s+|=|$)') + +# TODO: add custom support for some of the following? +UNSUPPORTED_OPTS = ('iterable', 'gui', 'out', 'file') + +# The 8 leading spaces are required for consistency +CLI_EXTRA_DOC = r""" + Extra CLI Options + ----------------- + name : type, optional + TODO: find out why this is needed. + delim : chr, optional + Delimiting character [default: '\n']. Use '\0' for null. + N.B.: on Windows systems, Python converts '\n' to '\r\n'. + buf_size : int, optional + String buffer size in bytes [default: 256] + used when `delim` is specified. + bytes : bool, optional + If true, will count bytes, ignore `delim`, and default + `unit_scale` to True, `unit_divisor` to 1024, and `unit` to 'B'. + tee : bool, optional + If true, passes `stdin` to both `stderr` and `stdout`. + update : bool, optional + If true, will treat input as newly elapsed iterations, + i.e. numbers to pass to `update()`. Note that this is slow + (~2e5 it/s) since every input must be decoded as a number. + update_to : bool, optional + If true, will treat input as total elapsed iterations, + i.e. numbers to assign to `self.n`. Note that this is slow + (~2e5 it/s) since every input must be decoded as a number. + null : bool, optional + If true, will discard input (no stdout). + manpath : str, optional + Directory in which to install tqdm man pages. + comppath : str, optional + Directory in which to place tqdm completion. + log : str, optional + CRITICAL|FATAL|ERROR|WARN(ING)|[default: 'INFO']|DEBUG|NOTSET. +""" + + +def main(fp=sys.stderr, argv=None): + """ + Parameters (internal use only) + --------- + fp : file-like object for tqdm + argv : list (default: sys.argv[1:]) + """ + if argv is None: + argv = sys.argv[1:] + try: + log_idx = argv.index('--log') + except ValueError: + for i in argv: + if i.startswith('--log='): + logLevel = i[len('--log='):] + break + else: + logLevel = 'INFO' + else: + # argv.pop(log_idx) + # logLevel = argv.pop(log_idx) + logLevel = argv[log_idx + 1] + logging.basicConfig(level=getattr(logging, logLevel), + format="%(levelname)s:%(module)s:%(lineno)d:%(message)s") + + # py<3.13 doesn't dedent docstrings + d = (tqdm.__doc__ if sys.version_info < (3, 13) + else indent(tqdm.__doc__, " ")) + CLI_EXTRA_DOC + + opt_types = dict(RE_OPTS.findall(d)) + # opt_types['delim'] = 'chr' + + for o in UNSUPPORTED_OPTS: + opt_types.pop(o) + + log.debug(sorted(opt_types.items())) + + # d = RE_OPTS.sub(r' --\1=<\1> : \2', d) + split = RE_OPTS.split(d) + opt_types_desc = zip(split[1::3], split[2::3], split[3::3]) + d = ''.join(('\n --{0} : {2}{3}' if otd[1] == 'bool' else + '\n --{0}=<{1}> : {2}{3}').format( + otd[0].replace('_', '-'), otd[0], *otd[1:]) + for otd in opt_types_desc if otd[0] not in UNSUPPORTED_OPTS) + + help_short = "Usage:\n tqdm [--help | options]\n" + d = help_short + """ +Options: + -h, --help Print this help and exit. + -v, --version Print version and exit. +""" + d.strip('\n') + '\n' + + # opts = docopt(d, version=__version__) + if any(v in argv for v in ('-v', '--version')): + sys.stdout.write(__version__ + '\n') + sys.exit(0) + elif any(v in argv for v in ('-h', '--help')): + sys.stdout.write(d + '\n') + sys.exit(0) + elif argv and argv[0][:2] != '--': + sys.stderr.write(f"Error:Unknown argument:{argv[0]}\n{help_short}") + + argv = RE_SHLEX.split(' '.join(["tqdm"] + argv)) + opts = dict(zip(argv[1::3], argv[3::3])) + + log.debug(opts) + opts.pop('log', True) + + tqdm_args = {'file': fp} + try: + for (o, v) in opts.items(): + o = o.replace('-', '_') + try: + tqdm_args[o] = cast(v, opt_types[o]) + except KeyError as e: + raise TqdmKeyError(str(e)) + log.debug('args:' + str(tqdm_args)) + + delim_per_char = tqdm_args.pop('bytes', False) + update = tqdm_args.pop('update', False) + update_to = tqdm_args.pop('update_to', False) + if sum((delim_per_char, update, update_to)) > 1: + raise TqdmKeyError("Can only have one of --bytes --update --update_to") + except Exception: + fp.write("\nError:\n" + help_short) + stdin, stdout_write = sys.stdin, sys.stdout.write + for i in stdin: + stdout_write(i) + raise + else: + buf_size = tqdm_args.pop('buf_size', 256) + delim = tqdm_args.pop('delim', b'\\n') + tee = tqdm_args.pop('tee', False) + manpath = tqdm_args.pop('manpath', None) + comppath = tqdm_args.pop('comppath', None) + if tqdm_args.pop('null', False): + class stdout(object): + @staticmethod + def write(_): + pass + else: + stdout = sys.stdout + stdout = getattr(stdout, 'buffer', stdout) + stdin = getattr(sys.stdin, 'buffer', sys.stdin) + if manpath or comppath: + try: # py<3.9 + import importlib_resources as resources + except ImportError: + from importlib import resources + from pathlib import Path + + def cp(name, dst): + """copy resource `name` to `dst`""" + fi = resources.files('tqdm') / name + dst.write_bytes(fi.read_bytes()) + log.info("written:%s", dst) + if manpath is not None: + cp('tqdm.1', Path(manpath) / 'tqdm.1') + if comppath is not None: + cp('completion.sh', Path(comppath) / 'tqdm_completion.sh') + sys.exit(0) + if tee: + stdout_write = stdout.write + fp_write = getattr(fp, 'buffer', fp).write + + class stdout(object): # pylint: disable=function-redefined + @staticmethod + def write(x): + with tqdm.external_write_mode(file=fp): + fp_write(x) + stdout_write(x) + if delim_per_char: + tqdm_args.setdefault('unit', 'B') + tqdm_args.setdefault('unit_scale', True) + tqdm_args.setdefault('unit_divisor', 1024) + log.debug(tqdm_args) + with tqdm(**tqdm_args) as t: + posix_pipe(stdin, stdout, '', buf_size, t.update) + elif delim == b'\\n': + log.debug(tqdm_args) + write = stdout.write + if update or update_to: + with tqdm(**tqdm_args) as t: + if update: + def callback(i): + t.update(numeric(i.decode())) + else: # update_to + def callback(i): + t.update(numeric(i.decode()) - t.n) + for i in stdin: + write(i) + callback(i) + else: + for i in tqdm(stdin, **tqdm_args): + write(i) + else: + log.debug(tqdm_args) + with tqdm(**tqdm_args) as t: + callback_len = False + if update: + def callback(i): + t.update(numeric(i.decode())) + elif update_to: + def callback(i): + t.update(numeric(i.decode()) - t.n) + else: + callback = t.update + callback_len = True + posix_pipe(stdin, stdout, delim, buf_size, callback, callback_len)