Mercurial > repos > galaxytrakr > hfp_bettercallsal_konda
comparison 1.0.0/bin/check_samplesheet.py @ 0:0a8dda29956e draft default tip
planemo upload
| author | galaxytrakr |
|---|---|
| date | Thu, 28 May 2026 20:41:10 +0000 |
| parents | |
| children |
comparison
equal
deleted
inserted
replaced
| -1:000000000000 | 0:0a8dda29956e |
|---|---|
| 1 #!/usr/bin/env python3 | |
| 2 | |
| 3 import os | |
| 4 import sys | |
| 5 import errno | |
| 6 import argparse | |
| 7 | |
| 8 | |
| 9 def parse_args(args=None): | |
| 10 Description = "Reformat samplesheet file and check its contents." | |
| 11 Epilog = "Example usage: python check_samplesheet.py <FILE_IN> <FILE_OUT>" | |
| 12 | |
| 13 parser = argparse.ArgumentParser(description=Description, epilog=Epilog) | |
| 14 parser.add_argument("FILE_IN", help="Input samplesheet file.") | |
| 15 parser.add_argument("FILE_OUT", help="Output file.") | |
| 16 return parser.parse_args(args) | |
| 17 | |
| 18 | |
| 19 def make_dir(path): | |
| 20 if len(path) > 0: | |
| 21 try: | |
| 22 os.makedirs(path) | |
| 23 except OSError as exception: | |
| 24 if exception.errno != errno.EEXIST: | |
| 25 raise exception | |
| 26 | |
| 27 | |
| 28 def print_error(error, context="Line", context_str=""): | |
| 29 error_str = f"ERROR: Please check samplesheet -> {error}" | |
| 30 if context != "" and context_str != "": | |
| 31 error_str = f"ERROR: Please check samplesheet -> {error}\n{context.strip()}: '{context_str.strip()}'" | |
| 32 print(error_str) | |
| 33 sys.exit(1) | |
| 34 | |
| 35 | |
| 36 def check_samplesheet(file_in, file_out): | |
| 37 """ | |
| 38 This function checks that the samplesheet follows the following structure: | |
| 39 | |
| 40 sample,fq1,fq2,strandedness | |
| 41 SAMPLE_PE,SAMPLE_PE_RUN1_1.fastq.gz,SAMPLE_PE_RUN1_2.fastq.gz,forward | |
| 42 SAMPLE_PE,SAMPLE_PE_RUN2_1.fastq.gz,SAMPLE_PE_RUN2_2.fastq.gz,forward | |
| 43 SAMPLE_SE,SAMPLE_SE_RUN1_1.fastq,,forward | |
| 44 SAMPLE_SE,SAMPLE_SE_RUN1_2.fastq.gz,,forward | |
| 45 | |
| 46 For an example see: | |
| 47 https://github.com/nf-core/test-datasets/blob/rnaseq/samplesheet/v3.1/samplesheet_test.csv | |
| 48 """ | |
| 49 | |
| 50 sample_mapping_dict = {} | |
| 51 with open(file_in, "r", encoding='utf-8-sig') as fin: | |
| 52 | |
| 53 ## Check header | |
| 54 MIN_COLS = 3 | |
| 55 HEADER = ["sample", "fq1", "fq2", "strandedness"] | |
| 56 header = [x.strip('"') for x in fin.readline().strip().split(",")] | |
| 57 if header[: len(HEADER)] != HEADER: | |
| 58 print( | |
| 59 f"ERROR: Please check samplesheet header -> {','.join(header)} != {','.join(HEADER)}" | |
| 60 ) | |
| 61 sys.exit(1) | |
| 62 | |
| 63 ## Check sample entries | |
| 64 for line in fin: | |
| 65 if line.strip(): | |
| 66 lspl = [x.strip().strip('"') for x in line.strip().split(",")] | |
| 67 | |
| 68 ## Check valid number of columns per row | |
| 69 if len(lspl) < len(HEADER): | |
| 70 print_error( | |
| 71 f"Invalid number of columns (minimum = {len(HEADER)})!", | |
| 72 "Line", | |
| 73 line, | |
| 74 ) | |
| 75 | |
| 76 num_cols = len([x for x in lspl if x]) | |
| 77 if num_cols < MIN_COLS: | |
| 78 print_error( | |
| 79 f"Invalid number of populated columns (minimum = {MIN_COLS})!", | |
| 80 "Line", | |
| 81 line, | |
| 82 ) | |
| 83 | |
| 84 ## Check sample name entries | |
| 85 sample, fq1, fq2, strandedness = lspl[: len(HEADER)] | |
| 86 if sample.find(" ") != -1: | |
| 87 print( | |
| 88 f"WARNING: Spaces have been replaced by underscores for sample: {sample}" | |
| 89 ) | |
| 90 sample = sample.replace(" ", "_") | |
| 91 if not sample: | |
| 92 print_error("Sample entry has not been specified!", "Line", line) | |
| 93 | |
| 94 ## Check FastQ file extension | |
| 95 for fastq in [fq1, fq2]: | |
| 96 if fastq: | |
| 97 if fastq.find(" ") != -1: | |
| 98 print_error("FastQ file contains spaces!", "Line", line) | |
| 99 # if not fastq.endswith(".fastq.gz") and not fastq.endswith(".fq.gz"): | |
| 100 # print_error( | |
| 101 # "FastQ file does not have extension '.fastq.gz' or '.fq.gz'!", | |
| 102 # "Line", | |
| 103 # line, | |
| 104 # ) | |
| 105 | |
| 106 ## Check strandedness | |
| 107 strandednesses = ["unstranded", "forward", "reverse"] | |
| 108 if strandedness: | |
| 109 if strandedness not in strandednesses: | |
| 110 print_error( | |
| 111 f"Strandedness must be one of '{', '.join(strandednesses)}'!", | |
| 112 "Line", | |
| 113 line, | |
| 114 ) | |
| 115 else: | |
| 116 print_error( | |
| 117 f"Strandedness has not been specified! Must be one of {', '.join(strandednesses)}.", | |
| 118 "Line", | |
| 119 line, | |
| 120 ) | |
| 121 | |
| 122 ## Auto-detect paired-end/single-end | |
| 123 sample_info = [] ## [single_end, fq1, fq2, strandedness] | |
| 124 if sample and fq1 and fq2: ## Paired-end short reads | |
| 125 sample_info = ["0", fq1, fq2, strandedness] | |
| 126 elif sample and fq1 and not fq2: ## Single-end short reads | |
| 127 sample_info = ["1", fq1, fq2, strandedness] | |
| 128 else: | |
| 129 print_error("Invalid combination of columns provided!", "Line", line) | |
| 130 | |
| 131 ## Create sample mapping dictionary = {sample: [[ single_end, fq1, fq2, strandedness ]]} | |
| 132 if sample not in sample_mapping_dict: | |
| 133 sample_mapping_dict[sample] = [sample_info] | |
| 134 else: | |
| 135 if sample_info in sample_mapping_dict[sample]: | |
| 136 print_error("Samplesheet contains duplicate rows!", "Line", line) | |
| 137 else: | |
| 138 sample_mapping_dict[sample].append(sample_info) | |
| 139 | |
| 140 ## Write validated samplesheet with appropriate columns | |
| 141 if len(sample_mapping_dict) > 0: | |
| 142 out_dir = os.path.dirname(file_out) | |
| 143 make_dir(out_dir) | |
| 144 with open(file_out, "w") as fout: | |
| 145 fout.write( | |
| 146 ",".join(["sample", "single_end", "fq1", "fq2", "strandedness"]) | |
| 147 + "\n" | |
| 148 ) | |
| 149 for sample in sorted(sample_mapping_dict.keys()): | |
| 150 | |
| 151 ## Check that multiple runs of the same sample are of the same datatype i.e. single-end / paired-end | |
| 152 if not all( | |
| 153 x[0] == sample_mapping_dict[sample][0][0] | |
| 154 for x in sample_mapping_dict[sample] | |
| 155 ): | |
| 156 print_error( | |
| 157 f"Multiple runs of a sample must be of the same datatype i.e. single-end or paired-end!", | |
| 158 "Sample", | |
| 159 sample, | |
| 160 ) | |
| 161 | |
| 162 ## Check that multiple runs of the same sample are of the same strandedness | |
| 163 if not all( | |
| 164 x[-1] == sample_mapping_dict[sample][0][-1] | |
| 165 for x in sample_mapping_dict[sample] | |
| 166 ): | |
| 167 print_error( | |
| 168 f"Multiple runs of a sample must have the same strandedness!", | |
| 169 "Sample", | |
| 170 sample, | |
| 171 ) | |
| 172 | |
| 173 for idx, val in enumerate(sample_mapping_dict[sample]): | |
| 174 fout.write(",".join([f"{sample}_T{idx+1}"] + val) + "\n") | |
| 175 else: | |
| 176 print_error(f"No entries to process!", "Samplesheet: {file_in}") | |
| 177 | |
| 178 | |
| 179 def main(args=None): | |
| 180 args = parse_args(args) | |
| 181 check_samplesheet(args.FILE_IN, args.FILE_OUT) | |
| 182 | |
| 183 | |
| 184 if __name__ == "__main__": | |
| 185 sys.exit(main()) |
