Mercurial > repos > jpayne > tableops
view table-union.py @ 0:402b58f45844 draft default tip
planemo upload commit 9cc4dc1db55299bf92ec6bd359161ece4592bd16-dirty
| author | jpayne |
|---|---|
| date | Mon, 08 Dec 2025 15:03:06 +0000 |
| parents | |
| children |
line wrap: on
line source
#! /usr/bin/env python import csv import sys from collections import defaultdict import click @click.command() @click.option("--unionize/--no-unionize", default=False, help="Attempt to unionize on an autodetected key column", is_flag=True) @click.option("--tuple/--no-tuple", "tuple_mode", default=False, help="For tables with inconsistent headers - unionize by column order instead of column label") @click.argument("files", nargs=-1, type=click.Path(exists=True)) def cli(files, unionize=False, tuple_mode=False): header = [] items = [] possible_identity_headers = None for fi in files: with open( fi, "r", newline="", encoding="utf-8" ) as table: # Improved file opening if not tuple_mode: reader = csv.DictReader(table, delimiter="\t", dialect="excel") # Efficient header update using set operations header_set = set(header) new_headers = [ field for field in reader.fieldnames if field not in header_set ] header.extend(new_headers) rows = list(reader) # Keep this for now, but see optimization below if not rows: # skip empty files continue if unionize: # More efficient identity header detection if possible_identity_headers is None: possible_identity_headers = set(reader.fieldnames) # Optimized identity header filtering possible_identity_headers.intersection_update( f for f in reader.fieldnames if len({row[f] for row in rows if f in row}) == len(rows) and all(row.get(f) is not None for row in rows) ) items.extend(rows) else: reader = csv.reader(table, delimiter="\t", dialect="excel") if not header: header = next(reader) else: next(reader) # skip header in subsequent files items.extend(reader) if possible_identity_headers and unionize and not tuple_mode: key_column = possible_identity_headers.pop() # More efficient merging using defaultdict merged_rows = defaultdict(dict) for row in items: key = row.get(key_column) if key is not None: # skip rows with null keys merged_rows[key].update(row) items = list(merged_rows.values()) if not tuple_mode: wr = csv.DictWriter( sys.stdout, delimiter="\t", dialect="excel", fieldnames=header ) wr.writeheader() else: wr = csv.writer(sys.stdout, delimiter="\t", dialect="excel") wr.writerow(header) wr.writerows(items) if __name__ == "__main__": cli()
