Mercurial > repos > jpayne > tableops
comparison test_tables_ops.py @ 0:402b58f45844 draft default tip
planemo upload commit 9cc4dc1db55299bf92ec6bd359161ece4592bd16-dirty
| author | jpayne |
|---|---|
| date | Mon, 08 Dec 2025 15:03:06 +0000 |
| parents | |
| children |
comparison
equal
deleted
inserted
replaced
| -1:000000000000 | 0:402b58f45844 |
|---|---|
| 1 import unittest | |
| 2 import subprocess | |
| 3 import os | |
| 4 import csv | |
| 5 | |
| 6 | |
| 7 class TestTableOps(unittest.TestCase): | |
| 8 TEST_DATA_DIR = "test-data" | |
| 9 | |
| 10 def _run_command(self, command, input_data=None): | |
| 11 process = subprocess.Popen( | |
| 12 command, | |
| 13 stdin=subprocess.PIPE if input_data else None, | |
| 14 stdout=subprocess.PIPE, | |
| 15 stderr=subprocess.PIPE, | |
| 16 text=True, # Important for handling text I/O | |
| 17 ) | |
| 18 stdout, stderr = process.communicate(input_data) | |
| 19 return process.returncode, stdout, stderr | |
| 20 | |
| 21 def _compare_tsv(self, expected_file, actual_output): | |
| 22 with open( | |
| 23 os.path.join(self.TEST_DATA_DIR, expected_file), "r", encoding="utf-8" | |
| 24 ) as f: | |
| 25 expected_lines = list(csv.reader(f, delimiter="\t")) | |
| 26 actual_lines = list(csv.reader(actual_output.splitlines(), delimiter="\t")) | |
| 27 self.assertEqual(expected_lines, actual_lines) | |
| 28 | |
| 29 def test_table_union_union(self): | |
| 30 returncode, stdout, stderr = self._run_command( | |
| 31 [ | |
| 32 "table-union", | |
| 33 os.path.join(self.TEST_DATA_DIR, "dingbat.tsv"), | |
| 34 os.path.join(self.TEST_DATA_DIR, "loki.tsv"), | |
| 35 ] | |
| 36 ) | |
| 37 self.assertEqual(returncode, 0) | |
| 38 self._compare_tsv("combined.tsv", stdout) | |
| 39 self.assertEqual(stderr, "") | |
| 40 | |
| 41 def test_table_union_join(self): | |
| 42 returncode, stdout, stderr = self._run_command( | |
| 43 [ | |
| 44 "table-union", | |
| 45 "--no-union", | |
| 46 os.path.join(self.TEST_DATA_DIR, "users.tsv"), | |
| 47 os.path.join(self.TEST_DATA_DIR, "orders.tsv"), | |
| 48 ] | |
| 49 ) | |
| 50 self.assertEqual(returncode, 0) | |
| 51 self._compare_tsv("merged_expected.tsv", stdout) | |
| 52 self.assertEqual(stderr, "") | |
| 53 | |
| 54 def test_table_summarize(self): | |
| 55 returncode, stdout, stderr = self._run_command( | |
| 56 ["table-summarize", os.path.join(self.TEST_DATA_DIR, "data_summarize.tsv")] | |
| 57 ) | |
| 58 self.assertEqual(returncode, 0) | |
| 59 | |
| 60 expected_summary = """Summary: | |
| 61 Category: | |
| 62 \t - A: 3 of 6 | |
| 63 \t - B: 2 of 6 | |
| 64 \t - C: 1 of 6 | |
| 65 Value: | |
| 66 \t - 10: 1 of 6 | |
| 67 \t - 12: 1 of 6 | |
| 68 \t - 15: 1 of 6 | |
| 69 \t - 20: 1 of 6 | |
| 70 \t - 25: 1 of 6 | |
| 71 \t - 30: 1 of 6 | |
| 72 """ | |
| 73 self.assertEqual(stdout.strip(), expected_summary.strip()) | |
| 74 self.assertEqual(stderr, "") | |
| 75 | |
| 76 def test_table_sort(self): | |
| 77 returncode, stdout, stderr = self._run_command( | |
| 78 [ | |
| 79 "table-sort", | |
| 80 "-k", | |
| 81 "Age", | |
| 82 "-k", | |
| 83 "Name", | |
| 84 os.path.join(self.TEST_DATA_DIR, "data_sort.tsv"), | |
| 85 ] | |
| 86 ) | |
| 87 self.assertEqual(returncode, 0) | |
| 88 self._compare_tsv("sorted_data_expected.tsv", stdout) | |
| 89 self.assertEqual(stderr, "") | |
| 90 | |
| 91 def test_table_sort_pipe(self): | |
| 92 with open( | |
| 93 os.path.join(self.TEST_DATA_DIR, "data_sort.tsv"), "r", encoding="utf-8" | |
| 94 ) as infile: | |
| 95 input_data = infile.read() | |
| 96 returncode, stdout, stderr = self._run_command( | |
| 97 ["table-sort", "-k", "Age", "-k", "Name"], input_data | |
| 98 ) | |
| 99 self.assertEqual(returncode, 0) | |
| 100 self._compare_tsv("sorted_data_expected.tsv", stdout) | |
| 101 self.assertEqual(stderr, "") | |
| 102 | |
| 103 | |
| 104 if __name__ == "__main__": | |
| 105 unittest.main() |
