Mercurial > repos > jpayne > tableops
diff test_tables_ops.py @ 0:402b58f45844 draft default tip
planemo upload commit 9cc4dc1db55299bf92ec6bd359161ece4592bd16-dirty
| author | jpayne |
|---|---|
| date | Mon, 08 Dec 2025 15:03:06 +0000 |
| parents | |
| children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/test_tables_ops.py Mon Dec 08 15:03:06 2025 +0000 @@ -0,0 +1,105 @@ +import unittest +import subprocess +import os +import csv + + +class TestTableOps(unittest.TestCase): + TEST_DATA_DIR = "test-data" + + def _run_command(self, command, input_data=None): + process = subprocess.Popen( + command, + stdin=subprocess.PIPE if input_data else None, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, # Important for handling text I/O + ) + stdout, stderr = process.communicate(input_data) + return process.returncode, stdout, stderr + + def _compare_tsv(self, expected_file, actual_output): + with open( + os.path.join(self.TEST_DATA_DIR, expected_file), "r", encoding="utf-8" + ) as f: + expected_lines = list(csv.reader(f, delimiter="\t")) + actual_lines = list(csv.reader(actual_output.splitlines(), delimiter="\t")) + self.assertEqual(expected_lines, actual_lines) + + def test_table_union_union(self): + returncode, stdout, stderr = self._run_command( + [ + "table-union", + os.path.join(self.TEST_DATA_DIR, "dingbat.tsv"), + os.path.join(self.TEST_DATA_DIR, "loki.tsv"), + ] + ) + self.assertEqual(returncode, 0) + self._compare_tsv("combined.tsv", stdout) + self.assertEqual(stderr, "") + + def test_table_union_join(self): + returncode, stdout, stderr = self._run_command( + [ + "table-union", + "--no-union", + os.path.join(self.TEST_DATA_DIR, "users.tsv"), + os.path.join(self.TEST_DATA_DIR, "orders.tsv"), + ] + ) + self.assertEqual(returncode, 0) + self._compare_tsv("merged_expected.tsv", stdout) + self.assertEqual(stderr, "") + + def test_table_summarize(self): + returncode, stdout, stderr = self._run_command( + ["table-summarize", os.path.join(self.TEST_DATA_DIR, "data_summarize.tsv")] + ) + self.assertEqual(returncode, 0) + + expected_summary = """Summary: +Category: +\t - A: 3 of 6 +\t - B: 2 of 6 +\t - C: 1 of 6 +Value: +\t - 10: 1 of 6 +\t - 12: 1 of 6 +\t - 15: 1 of 6 +\t - 20: 1 of 6 +\t - 25: 1 of 6 +\t - 30: 1 of 6 +""" + self.assertEqual(stdout.strip(), expected_summary.strip()) + self.assertEqual(stderr, "") + + def test_table_sort(self): + returncode, stdout, stderr = self._run_command( + [ + "table-sort", + "-k", + "Age", + "-k", + "Name", + os.path.join(self.TEST_DATA_DIR, "data_sort.tsv"), + ] + ) + self.assertEqual(returncode, 0) + self._compare_tsv("sorted_data_expected.tsv", stdout) + self.assertEqual(stderr, "") + + def test_table_sort_pipe(self): + with open( + os.path.join(self.TEST_DATA_DIR, "data_sort.tsv"), "r", encoding="utf-8" + ) as infile: + input_data = infile.read() + returncode, stdout, stderr = self._run_command( + ["table-sort", "-k", "Age", "-k", "Name"], input_data + ) + self.assertEqual(returncode, 0) + self._compare_tsv("sorted_data_expected.tsv", stdout) + self.assertEqual(stderr, "") + + +if __name__ == "__main__": + unittest.main()
