Mercurial > repos > rliterman > csp2
comparison CSP2/CSP2_env/env-d9b9114564458d9d-741b3de822f2aaca6c6caa4325c4afce/lib/python3.8/site-packages/tqdm/cli.py @ 68:5028fdace37b
planemo upload commit 2e9511a184a1ca667c7be0c6321a36dc4e3d116d
author | jpayne |
---|---|
date | Tue, 18 Mar 2025 16:23:26 -0400 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
67:0e9998148a16 | 68:5028fdace37b |
---|---|
1 """ | |
2 Module version for monitoring CLI pipes (`... | python -m tqdm | ...`). | |
3 """ | |
4 import logging | |
5 import re | |
6 import sys | |
7 from ast import literal_eval as numeric | |
8 from textwrap import indent | |
9 | |
10 from .std import TqdmKeyError, TqdmTypeError, tqdm | |
11 from .version import __version__ | |
12 | |
13 __all__ = ["main"] | |
14 log = logging.getLogger(__name__) | |
15 | |
16 | |
17 def cast(val, typ): | |
18 log.debug((val, typ)) | |
19 if " or " in typ: | |
20 for t in typ.split(" or "): | |
21 try: | |
22 return cast(val, t) | |
23 except TqdmTypeError: | |
24 pass | |
25 raise TqdmTypeError(f"{val} : {typ}") | |
26 | |
27 # sys.stderr.write('\ndebug | `val:type`: `' + val + ':' + typ + '`.\n') | |
28 if typ == 'bool': | |
29 if (val == 'True') or (val == ''): | |
30 return True | |
31 if val == 'False': | |
32 return False | |
33 raise TqdmTypeError(val + ' : ' + typ) | |
34 if typ == 'chr': | |
35 if len(val) == 1: | |
36 return val.encode() | |
37 if re.match(r"^\\\w+$", val): | |
38 return eval(f'"{val}"').encode() | |
39 raise TqdmTypeError(f"{val} : {typ}") | |
40 if typ == 'str': | |
41 return val | |
42 if typ == 'int': | |
43 try: | |
44 return int(val) | |
45 except ValueError as exc: | |
46 raise TqdmTypeError(f"{val} : {typ}") from exc | |
47 if typ == 'float': | |
48 try: | |
49 return float(val) | |
50 except ValueError as exc: | |
51 raise TqdmTypeError(f"{val} : {typ}") from exc | |
52 raise TqdmTypeError(f"{val} : {typ}") | |
53 | |
54 | |
55 def posix_pipe(fin, fout, delim=b'\\n', buf_size=256, | |
56 callback=lambda float: None, callback_len=True): | |
57 """ | |
58 Params | |
59 ------ | |
60 fin : binary file with `read(buf_size : int)` method | |
61 fout : binary file with `write` (and optionally `flush`) methods. | |
62 callback : function(float), e.g.: `tqdm.update` | |
63 callback_len : If (default: True) do `callback(len(buffer))`. | |
64 Otherwise, do `callback(data) for data in buffer.split(delim)`. | |
65 """ | |
66 fp_write = fout.write | |
67 | |
68 if not delim: | |
69 while True: | |
70 tmp = fin.read(buf_size) | |
71 | |
72 # flush at EOF | |
73 if not tmp: | |
74 getattr(fout, 'flush', lambda: None)() | |
75 return | |
76 | |
77 fp_write(tmp) | |
78 callback(len(tmp)) | |
79 # return | |
80 | |
81 buf = b'' | |
82 len_delim = len(delim) | |
83 # n = 0 | |
84 while True: | |
85 tmp = fin.read(buf_size) | |
86 | |
87 # flush at EOF | |
88 if not tmp: | |
89 if buf: | |
90 fp_write(buf) | |
91 if callback_len: | |
92 # n += 1 + buf.count(delim) | |
93 callback(1 + buf.count(delim)) | |
94 else: | |
95 for i in buf.split(delim): | |
96 callback(i) | |
97 getattr(fout, 'flush', lambda: None)() | |
98 return # n | |
99 | |
100 while True: | |
101 i = tmp.find(delim) | |
102 if i < 0: | |
103 buf += tmp | |
104 break | |
105 fp_write(buf + tmp[:i + len(delim)]) | |
106 # n += 1 | |
107 callback(1 if callback_len else (buf + tmp[:i])) | |
108 buf = b'' | |
109 tmp = tmp[i + len_delim:] | |
110 | |
111 | |
112 # ((opt, type), ... ) | |
113 RE_OPTS = re.compile(r'\n {4}(\S+)\s{2,}:\s*([^,]+)') | |
114 # better split method assuming no positional args | |
115 RE_SHLEX = re.compile(r'\s*(?<!\S)--?([^\s=]+)(\s+|=|$)') | |
116 | |
117 # TODO: add custom support for some of the following? | |
118 UNSUPPORTED_OPTS = ('iterable', 'gui', 'out', 'file') | |
119 | |
120 # The 8 leading spaces are required for consistency | |
121 CLI_EXTRA_DOC = r""" | |
122 Extra CLI Options | |
123 ----------------- | |
124 name : type, optional | |
125 TODO: find out why this is needed. | |
126 delim : chr, optional | |
127 Delimiting character [default: '\n']. Use '\0' for null. | |
128 N.B.: on Windows systems, Python converts '\n' to '\r\n'. | |
129 buf_size : int, optional | |
130 String buffer size in bytes [default: 256] | |
131 used when `delim` is specified. | |
132 bytes : bool, optional | |
133 If true, will count bytes, ignore `delim`, and default | |
134 `unit_scale` to True, `unit_divisor` to 1024, and `unit` to 'B'. | |
135 tee : bool, optional | |
136 If true, passes `stdin` to both `stderr` and `stdout`. | |
137 update : bool, optional | |
138 If true, will treat input as newly elapsed iterations, | |
139 i.e. numbers to pass to `update()`. Note that this is slow | |
140 (~2e5 it/s) since every input must be decoded as a number. | |
141 update_to : bool, optional | |
142 If true, will treat input as total elapsed iterations, | |
143 i.e. numbers to assign to `self.n`. Note that this is slow | |
144 (~2e5 it/s) since every input must be decoded as a number. | |
145 null : bool, optional | |
146 If true, will discard input (no stdout). | |
147 manpath : str, optional | |
148 Directory in which to install tqdm man pages. | |
149 comppath : str, optional | |
150 Directory in which to place tqdm completion. | |
151 log : str, optional | |
152 CRITICAL|FATAL|ERROR|WARN(ING)|[default: 'INFO']|DEBUG|NOTSET. | |
153 """ | |
154 | |
155 | |
156 def main(fp=sys.stderr, argv=None): | |
157 """ | |
158 Parameters (internal use only) | |
159 --------- | |
160 fp : file-like object for tqdm | |
161 argv : list (default: sys.argv[1:]) | |
162 """ | |
163 if argv is None: | |
164 argv = sys.argv[1:] | |
165 try: | |
166 log_idx = argv.index('--log') | |
167 except ValueError: | |
168 for i in argv: | |
169 if i.startswith('--log='): | |
170 logLevel = i[len('--log='):] | |
171 break | |
172 else: | |
173 logLevel = 'INFO' | |
174 else: | |
175 # argv.pop(log_idx) | |
176 # logLevel = argv.pop(log_idx) | |
177 logLevel = argv[log_idx + 1] | |
178 logging.basicConfig(level=getattr(logging, logLevel), | |
179 format="%(levelname)s:%(module)s:%(lineno)d:%(message)s") | |
180 | |
181 # py<3.13 doesn't dedent docstrings | |
182 d = (tqdm.__doc__ if sys.version_info < (3, 13) | |
183 else indent(tqdm.__doc__, " ")) + CLI_EXTRA_DOC | |
184 | |
185 opt_types = dict(RE_OPTS.findall(d)) | |
186 # opt_types['delim'] = 'chr' | |
187 | |
188 for o in UNSUPPORTED_OPTS: | |
189 opt_types.pop(o) | |
190 | |
191 log.debug(sorted(opt_types.items())) | |
192 | |
193 # d = RE_OPTS.sub(r' --\1=<\1> : \2', d) | |
194 split = RE_OPTS.split(d) | |
195 opt_types_desc = zip(split[1::3], split[2::3], split[3::3]) | |
196 d = ''.join(('\n --{0} : {2}{3}' if otd[1] == 'bool' else | |
197 '\n --{0}=<{1}> : {2}{3}').format( | |
198 otd[0].replace('_', '-'), otd[0], *otd[1:]) | |
199 for otd in opt_types_desc if otd[0] not in UNSUPPORTED_OPTS) | |
200 | |
201 help_short = "Usage:\n tqdm [--help | options]\n" | |
202 d = help_short + """ | |
203 Options: | |
204 -h, --help Print this help and exit. | |
205 -v, --version Print version and exit. | |
206 """ + d.strip('\n') + '\n' | |
207 | |
208 # opts = docopt(d, version=__version__) | |
209 if any(v in argv for v in ('-v', '--version')): | |
210 sys.stdout.write(__version__ + '\n') | |
211 sys.exit(0) | |
212 elif any(v in argv for v in ('-h', '--help')): | |
213 sys.stdout.write(d + '\n') | |
214 sys.exit(0) | |
215 elif argv and argv[0][:2] != '--': | |
216 sys.stderr.write(f"Error:Unknown argument:{argv[0]}\n{help_short}") | |
217 | |
218 argv = RE_SHLEX.split(' '.join(["tqdm"] + argv)) | |
219 opts = dict(zip(argv[1::3], argv[3::3])) | |
220 | |
221 log.debug(opts) | |
222 opts.pop('log', True) | |
223 | |
224 tqdm_args = {'file': fp} | |
225 try: | |
226 for (o, v) in opts.items(): | |
227 o = o.replace('-', '_') | |
228 try: | |
229 tqdm_args[o] = cast(v, opt_types[o]) | |
230 except KeyError as e: | |
231 raise TqdmKeyError(str(e)) | |
232 log.debug('args:' + str(tqdm_args)) | |
233 | |
234 delim_per_char = tqdm_args.pop('bytes', False) | |
235 update = tqdm_args.pop('update', False) | |
236 update_to = tqdm_args.pop('update_to', False) | |
237 if sum((delim_per_char, update, update_to)) > 1: | |
238 raise TqdmKeyError("Can only have one of --bytes --update --update_to") | |
239 except Exception: | |
240 fp.write("\nError:\n" + help_short) | |
241 stdin, stdout_write = sys.stdin, sys.stdout.write | |
242 for i in stdin: | |
243 stdout_write(i) | |
244 raise | |
245 else: | |
246 buf_size = tqdm_args.pop('buf_size', 256) | |
247 delim = tqdm_args.pop('delim', b'\\n') | |
248 tee = tqdm_args.pop('tee', False) | |
249 manpath = tqdm_args.pop('manpath', None) | |
250 comppath = tqdm_args.pop('comppath', None) | |
251 if tqdm_args.pop('null', False): | |
252 class stdout(object): | |
253 @staticmethod | |
254 def write(_): | |
255 pass | |
256 else: | |
257 stdout = sys.stdout | |
258 stdout = getattr(stdout, 'buffer', stdout) | |
259 stdin = getattr(sys.stdin, 'buffer', sys.stdin) | |
260 if manpath or comppath: | |
261 try: # py<3.9 | |
262 import importlib_resources as resources | |
263 except ImportError: | |
264 from importlib import resources | |
265 from pathlib import Path | |
266 | |
267 def cp(name, dst): | |
268 """copy resource `name` to `dst`""" | |
269 fi = resources.files('tqdm') / name | |
270 dst.write_bytes(fi.read_bytes()) | |
271 log.info("written:%s", dst) | |
272 if manpath is not None: | |
273 cp('tqdm.1', Path(manpath) / 'tqdm.1') | |
274 if comppath is not None: | |
275 cp('completion.sh', Path(comppath) / 'tqdm_completion.sh') | |
276 sys.exit(0) | |
277 if tee: | |
278 stdout_write = stdout.write | |
279 fp_write = getattr(fp, 'buffer', fp).write | |
280 | |
281 class stdout(object): # pylint: disable=function-redefined | |
282 @staticmethod | |
283 def write(x): | |
284 with tqdm.external_write_mode(file=fp): | |
285 fp_write(x) | |
286 stdout_write(x) | |
287 if delim_per_char: | |
288 tqdm_args.setdefault('unit', 'B') | |
289 tqdm_args.setdefault('unit_scale', True) | |
290 tqdm_args.setdefault('unit_divisor', 1024) | |
291 log.debug(tqdm_args) | |
292 with tqdm(**tqdm_args) as t: | |
293 posix_pipe(stdin, stdout, '', buf_size, t.update) | |
294 elif delim == b'\\n': | |
295 log.debug(tqdm_args) | |
296 write = stdout.write | |
297 if update or update_to: | |
298 with tqdm(**tqdm_args) as t: | |
299 if update: | |
300 def callback(i): | |
301 t.update(numeric(i.decode())) | |
302 else: # update_to | |
303 def callback(i): | |
304 t.update(numeric(i.decode()) - t.n) | |
305 for i in stdin: | |
306 write(i) | |
307 callback(i) | |
308 else: | |
309 for i in tqdm(stdin, **tqdm_args): | |
310 write(i) | |
311 else: | |
312 log.debug(tqdm_args) | |
313 with tqdm(**tqdm_args) as t: | |
314 callback_len = False | |
315 if update: | |
316 def callback(i): | |
317 t.update(numeric(i.decode())) | |
318 elif update_to: | |
319 def callback(i): | |
320 t.update(numeric(i.decode()) - t.n) | |
321 else: | |
322 callback = t.update | |
323 callback_len = True | |
324 posix_pipe(stdin, stdout, delim, buf_size, callback, callback_len) |