Mercurial > repos > kkonganti > cfsan_bettercallsal
comparison 0.5.0/bin/check_samplesheet.py @ 1:365849f031fd
"planemo upload"
author | kkonganti |
---|---|
date | Mon, 05 Jun 2023 18:48:51 -0400 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
0:a4b1ee4b68b1 | 1:365849f031fd |
---|---|
1 #!/usr/bin/env python3 | |
2 | |
3 import os | |
4 import sys | |
5 import errno | |
6 import argparse | |
7 | |
8 | |
9 def parse_args(args=None): | |
10 Description = "Reformat samplesheet file and check its contents." | |
11 Epilog = "Example usage: python check_samplesheet.py <FILE_IN> <FILE_OUT>" | |
12 | |
13 parser = argparse.ArgumentParser(description=Description, epilog=Epilog) | |
14 parser.add_argument("FILE_IN", help="Input samplesheet file.") | |
15 parser.add_argument("FILE_OUT", help="Output file.") | |
16 return parser.parse_args(args) | |
17 | |
18 | |
19 def make_dir(path): | |
20 if len(path) > 0: | |
21 try: | |
22 os.makedirs(path) | |
23 except OSError as exception: | |
24 if exception.errno != errno.EEXIST: | |
25 raise exception | |
26 | |
27 | |
28 def print_error(error, context="Line", context_str=""): | |
29 error_str = f"ERROR: Please check samplesheet -> {error}" | |
30 if context != "" and context_str != "": | |
31 error_str = f"ERROR: Please check samplesheet -> {error}\n{context.strip()}: '{context_str.strip()}'" | |
32 print(error_str) | |
33 sys.exit(1) | |
34 | |
35 | |
36 def check_samplesheet(file_in, file_out): | |
37 """ | |
38 This function checks that the samplesheet follows the following structure: | |
39 | |
40 sample,fq1,fq2,strandedness | |
41 SAMPLE_PE,SAMPLE_PE_RUN1_1.fastq.gz,SAMPLE_PE_RUN1_2.fastq.gz,forward | |
42 SAMPLE_PE,SAMPLE_PE_RUN2_1.fastq.gz,SAMPLE_PE_RUN2_2.fastq.gz,forward | |
43 SAMPLE_SE,SAMPLE_SE_RUN1_1.fastq,,forward | |
44 SAMPLE_SE,SAMPLE_SE_RUN1_2.fastq.gz,,forward | |
45 | |
46 For an example see: | |
47 https://github.com/nf-core/test-datasets/blob/rnaseq/samplesheet/v3.1/samplesheet_test.csv | |
48 """ | |
49 | |
50 sample_mapping_dict = {} | |
51 with open(file_in, "r", encoding='utf-8-sig') as fin: | |
52 | |
53 ## Check header | |
54 MIN_COLS = 3 | |
55 HEADER = ["sample", "fq1", "fq2", "strandedness"] | |
56 header = [x.strip('"') for x in fin.readline().strip().split(",")] | |
57 if header[: len(HEADER)] != HEADER: | |
58 print( | |
59 f"ERROR: Please check samplesheet header -> {','.join(header)} != {','.join(HEADER)}" | |
60 ) | |
61 sys.exit(1) | |
62 | |
63 ## Check sample entries | |
64 for line in fin: | |
65 if line.strip(): | |
66 lspl = [x.strip().strip('"') for x in line.strip().split(",")] | |
67 | |
68 ## Check valid number of columns per row | |
69 if len(lspl) < len(HEADER): | |
70 print_error( | |
71 f"Invalid number of columns (minimum = {len(HEADER)})!", | |
72 "Line", | |
73 line, | |
74 ) | |
75 | |
76 num_cols = len([x for x in lspl if x]) | |
77 if num_cols < MIN_COLS: | |
78 print_error( | |
79 f"Invalid number of populated columns (minimum = {MIN_COLS})!", | |
80 "Line", | |
81 line, | |
82 ) | |
83 | |
84 ## Check sample name entries | |
85 sample, fq1, fq2, strandedness = lspl[: len(HEADER)] | |
86 if sample.find(" ") != -1: | |
87 print( | |
88 f"WARNING: Spaces have been replaced by underscores for sample: {sample}" | |
89 ) | |
90 sample = sample.replace(" ", "_") | |
91 if not sample: | |
92 print_error("Sample entry has not been specified!", "Line", line) | |
93 | |
94 ## Check FastQ file extension | |
95 for fastq in [fq1, fq2]: | |
96 if fastq: | |
97 if fastq.find(" ") != -1: | |
98 print_error("FastQ file contains spaces!", "Line", line) | |
99 # if not fastq.endswith(".fastq.gz") and not fastq.endswith(".fq.gz"): | |
100 # print_error( | |
101 # "FastQ file does not have extension '.fastq.gz' or '.fq.gz'!", | |
102 # "Line", | |
103 # line, | |
104 # ) | |
105 | |
106 ## Check strandedness | |
107 strandednesses = ["unstranded", "forward", "reverse"] | |
108 if strandedness: | |
109 if strandedness not in strandednesses: | |
110 print_error( | |
111 f"Strandedness must be one of '{', '.join(strandednesses)}'!", | |
112 "Line", | |
113 line, | |
114 ) | |
115 else: | |
116 print_error( | |
117 f"Strandedness has not been specified! Must be one of {', '.join(strandednesses)}.", | |
118 "Line", | |
119 line, | |
120 ) | |
121 | |
122 ## Auto-detect paired-end/single-end | |
123 sample_info = [] ## [single_end, fq1, fq2, strandedness] | |
124 if sample and fq1 and fq2: ## Paired-end short reads | |
125 sample_info = ["0", fq1, fq2, strandedness] | |
126 elif sample and fq1 and not fq2: ## Single-end short reads | |
127 sample_info = ["1", fq1, fq2, strandedness] | |
128 else: | |
129 print_error("Invalid combination of columns provided!", "Line", line) | |
130 | |
131 ## Create sample mapping dictionary = {sample: [[ single_end, fq1, fq2, strandedness ]]} | |
132 if sample not in sample_mapping_dict: | |
133 sample_mapping_dict[sample] = [sample_info] | |
134 else: | |
135 if sample_info in sample_mapping_dict[sample]: | |
136 print_error("Samplesheet contains duplicate rows!", "Line", line) | |
137 else: | |
138 sample_mapping_dict[sample].append(sample_info) | |
139 | |
140 ## Write validated samplesheet with appropriate columns | |
141 if len(sample_mapping_dict) > 0: | |
142 out_dir = os.path.dirname(file_out) | |
143 make_dir(out_dir) | |
144 with open(file_out, "w") as fout: | |
145 fout.write( | |
146 ",".join(["sample", "single_end", "fq1", "fq2", "strandedness"]) | |
147 + "\n" | |
148 ) | |
149 for sample in sorted(sample_mapping_dict.keys()): | |
150 | |
151 ## Check that multiple runs of the same sample are of the same datatype i.e. single-end / paired-end | |
152 if not all( | |
153 x[0] == sample_mapping_dict[sample][0][0] | |
154 for x in sample_mapping_dict[sample] | |
155 ): | |
156 print_error( | |
157 f"Multiple runs of a sample must be of the same datatype i.e. single-end or paired-end!", | |
158 "Sample", | |
159 sample, | |
160 ) | |
161 | |
162 ## Check that multiple runs of the same sample are of the same strandedness | |
163 if not all( | |
164 x[-1] == sample_mapping_dict[sample][0][-1] | |
165 for x in sample_mapping_dict[sample] | |
166 ): | |
167 print_error( | |
168 f"Multiple runs of a sample must have the same strandedness!", | |
169 "Sample", | |
170 sample, | |
171 ) | |
172 | |
173 for idx, val in enumerate(sample_mapping_dict[sample]): | |
174 fout.write(",".join([f"{sample}_T{idx+1}"] + val) + "\n") | |
175 else: | |
176 print_error(f"No entries to process!", "Samplesheet: {file_in}") | |
177 | |
178 | |
179 def main(args=None): | |
180 args = parse_args(args) | |
181 check_samplesheet(args.FILE_IN, args.FILE_OUT) | |
182 | |
183 | |
184 if __name__ == "__main__": | |
185 sys.exit(main()) |