Mercurial > repos > jpayne > tableops
diff table-union.py @ 0:402b58f45844 draft default tip
planemo upload commit 9cc4dc1db55299bf92ec6bd359161ece4592bd16-dirty
| author | jpayne |
|---|---|
| date | Mon, 08 Dec 2025 15:03:06 +0000 |
| parents | |
| children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/table-union.py Mon Dec 08 15:03:06 2025 +0000 @@ -0,0 +1,79 @@ +#! /usr/bin/env python +import csv +import sys +from collections import defaultdict + +import click + +@click.command() +@click.option("--unionize/--no-unionize", default=False, help="Attempt to unionize on an autodetected key column", is_flag=True) +@click.option("--tuple/--no-tuple", "tuple_mode", default=False, help="For tables with inconsistent headers - unionize by column order instead of column label") +@click.argument("files", nargs=-1, type=click.Path(exists=True)) +def cli(files, unionize=False, tuple_mode=False): + header = [] + items = [] + possible_identity_headers = None + + for fi in files: + with open( + fi, "r", newline="", encoding="utf-8" + ) as table: # Improved file opening + if not tuple_mode: + reader = csv.DictReader(table, delimiter="\t", dialect="excel") + + # Efficient header update using set operations + header_set = set(header) + new_headers = [ + field for field in reader.fieldnames if field not in header_set + ] + header.extend(new_headers) + + rows = list(reader) # Keep this for now, but see optimization below + if not rows: # skip empty files + continue + + if unionize: + # More efficient identity header detection + if possible_identity_headers is None: + possible_identity_headers = set(reader.fieldnames) + + # Optimized identity header filtering + possible_identity_headers.intersection_update( + f + for f in reader.fieldnames + if len({row[f] for row in rows if f in row}) == len(rows) + and all(row.get(f) is not None for row in rows) + ) + items.extend(rows) + else: + reader = csv.reader(table, delimiter="\t", dialect="excel") + if not header: + header = next(reader) + else: + next(reader) # skip header in subsequent files + items.extend(reader) + + + if possible_identity_headers and unionize and not tuple_mode: + key_column = possible_identity_headers.pop() + # More efficient merging using defaultdict + merged_rows = defaultdict(dict) + for row in items: + key = row.get(key_column) + if key is not None: # skip rows with null keys + merged_rows[key].update(row) + items = list(merged_rows.values()) + + if not tuple_mode: + wr = csv.DictWriter( + sys.stdout, delimiter="\t", dialect="excel", fieldnames=header + ) + wr.writeheader() + else: + wr = csv.writer(sys.stdout, delimiter="\t", dialect="excel") + wr.writerow(header) + wr.writerows(items) + + +if __name__ == "__main__": + cli()
