view table-union.py @ 0:402b58f45844 draft default tip

planemo upload commit 9cc4dc1db55299bf92ec6bd359161ece4592bd16-dirty
author jpayne
date Mon, 08 Dec 2025 15:03:06 +0000
parents
children
line wrap: on
line source

#! /usr/bin/env python
import csv
import sys
from collections import defaultdict

import click

@click.command()
@click.option("--unionize/--no-unionize", default=False, help="Attempt to unionize on an autodetected key column", is_flag=True)
@click.option("--tuple/--no-tuple", "tuple_mode", default=False, help="For tables with inconsistent headers - unionize by column order instead of column label")
@click.argument("files", nargs=-1, type=click.Path(exists=True))
def cli(files, unionize=False, tuple_mode=False):
    header = []
    items = []
    possible_identity_headers = None

    for fi in files:
        with open(
            fi, "r", newline="", encoding="utf-8"
        ) as table:  # Improved file opening
            if not tuple_mode:
                reader = csv.DictReader(table, delimiter="\t", dialect="excel")

                # Efficient header update using set operations
                header_set = set(header)
                new_headers = [
                    field for field in reader.fieldnames if field not in header_set
                ]
                header.extend(new_headers)

                rows = list(reader)  # Keep this for now, but see optimization below
                if not rows:  # skip empty files
                    continue
                
                if unionize:
                    # More efficient identity header detection
                    if possible_identity_headers is None:
                        possible_identity_headers = set(reader.fieldnames)

                    # Optimized identity header filtering
                    possible_identity_headers.intersection_update(
                        f
                        for f in reader.fieldnames
                        if len({row[f] for row in rows if f in row}) == len(rows)
                        and all(row.get(f) is not None for row in rows)
                    )
                items.extend(rows)
            else:
                reader = csv.reader(table, delimiter="\t", dialect="excel")
                if not header:
                    header = next(reader)
                else:
                    next(reader)  # skip header in subsequent files
                items.extend(reader)


    if possible_identity_headers and unionize and not tuple_mode:
        key_column = possible_identity_headers.pop()
        # More efficient merging using defaultdict
        merged_rows = defaultdict(dict)
        for row in items:
            key = row.get(key_column)
            if key is not None:  # skip rows with null keys
                merged_rows[key].update(row)
        items = list(merged_rows.values())

    if not tuple_mode:
        wr = csv.DictWriter(
            sys.stdout, delimiter="\t", dialect="excel", fieldnames=header
        )
        wr.writeheader()
    else: 
        wr = csv.writer(sys.stdout, delimiter="\t", dialect="excel")
        wr.writerow(header)
    wr.writerows(items)


if __name__ == "__main__":
    cli()