kkonganti@0: #!/usr/bin/env python3 kkonganti@0: kkonganti@0: import argparse kkonganti@0: import errno kkonganti@0: import os kkonganti@0: import sys kkonganti@0: kkonganti@0: kkonganti@0: def parse_args(args=None): kkonganti@0: Description = "Reformat samplesheet file and check its contents." kkonganti@0: Epilog = "Example usage: python check_samplesheet.py " kkonganti@0: kkonganti@0: parser = argparse.ArgumentParser(description=Description, epilog=Epilog) kkonganti@0: parser.add_argument("FILE_IN", help="Input samplesheet file.") kkonganti@0: parser.add_argument("FILE_OUT", help="Output file.") kkonganti@0: return parser.parse_args(args) kkonganti@0: kkonganti@0: kkonganti@0: def make_dir(path): kkonganti@0: if len(path) > 0: kkonganti@0: try: kkonganti@0: os.makedirs(path) kkonganti@0: except OSError as exception: kkonganti@0: if exception.errno != errno.EEXIST: kkonganti@0: raise exception kkonganti@0: kkonganti@0: kkonganti@0: def print_error(error, context="Line", context_str=""): kkonganti@0: error_str = f"ERROR: Please check samplesheet -> {error}" kkonganti@0: if context != "" and context_str != "": kkonganti@0: error_str = f"ERROR: Please check samplesheet -> {error}\n{context.strip()}: '{context_str.strip()}'" kkonganti@0: print(error_str) kkonganti@0: sys.exit(1) kkonganti@0: kkonganti@0: kkonganti@0: def check_samplesheet(file_in, file_out): kkonganti@0: """ kkonganti@0: This function checks that the samplesheet follows the following structure: kkonganti@0: kkonganti@0: sample,fq1,fq2,strandedness kkonganti@0: SAMPLE_PE,SAMPLE_PE_RUN1_1.fastq.gz,SAMPLE_PE_RUN1_2.fastq.gz,forward kkonganti@0: SAMPLE_PE,SAMPLE_PE_RUN2_1.fastq.gz,SAMPLE_PE_RUN2_2.fastq.gz,forward kkonganti@0: SAMPLE_SE,SAMPLE_SE_RUN1_1.fastq,,forward kkonganti@0: SAMPLE_SE,SAMPLE_SE_RUN1_2.fastq.gz,,forward kkonganti@0: kkonganti@0: For an example see: kkonganti@0: https://github.com/nf-core/test-datasets/blob/rnaseq/samplesheet/v3.1/samplesheet_test.csv kkonganti@0: """ kkonganti@0: kkonganti@0: sample_mapping_dict = {} kkonganti@0: with open(file_in, "r", encoding="utf-8-sig") as fin: kkonganti@0: kkonganti@0: ## Check header kkonganti@0: MIN_COLS = 3 kkonganti@0: HEADER = ["sample", "fq1", "fq2", "strandedness"] kkonganti@0: header = [x.strip('"') for x in fin.readline().strip().split(",")] kkonganti@0: if header[: len(HEADER)] != HEADER: kkonganti@0: print( kkonganti@0: f"ERROR: Please check samplesheet header -> {','.join(header)} != {','.join(HEADER)}" kkonganti@0: ) kkonganti@0: sys.exit(1) kkonganti@0: kkonganti@0: ## Check sample entries kkonganti@0: for line in fin: kkonganti@0: if line.strip(): kkonganti@0: lspl = [x.strip().strip('"') for x in line.strip().split(",")] kkonganti@0: kkonganti@0: ## Check valid number of columns per row kkonganti@0: if len(lspl) < len(HEADER): kkonganti@0: print_error( kkonganti@0: f"Invalid number of columns (minimum = {len(HEADER)})!", kkonganti@0: "Line", kkonganti@0: line, kkonganti@0: ) kkonganti@0: kkonganti@0: num_cols = len([x for x in lspl if x]) kkonganti@0: if num_cols < MIN_COLS: kkonganti@0: print_error( kkonganti@0: f"Invalid number of populated columns (minimum = {MIN_COLS})!", kkonganti@0: "Line", kkonganti@0: line, kkonganti@0: ) kkonganti@0: kkonganti@0: ## Check sample name entries kkonganti@0: sample, fq1, fq2, strandedness = lspl[: len(HEADER)] kkonganti@0: if sample.find(" ") != -1: kkonganti@0: print( kkonganti@0: f"WARNING: Spaces have been replaced by underscores for sample: {sample}" kkonganti@0: ) kkonganti@0: sample = sample.replace(" ", "_") kkonganti@0: if not sample: kkonganti@0: print_error("Sample entry has not been specified!", "Line", line) kkonganti@0: kkonganti@0: ## Check FastQ file extension kkonganti@0: for fastq in [fq1, fq2]: kkonganti@0: if fastq: kkonganti@0: if fastq.find(" ") != -1: kkonganti@0: print_error("FastQ file contains spaces!", "Line", line) kkonganti@0: # if not fastq.endswith(".fastq.gz") and not fastq.endswith(".fq.gz"): kkonganti@0: # print_error( kkonganti@0: # "FastQ file does not have extension '.fastq.gz' or '.fq.gz'!", kkonganti@0: # "Line", kkonganti@0: # line, kkonganti@0: # ) kkonganti@0: kkonganti@0: ## Check strandedness kkonganti@0: strandednesses = ["unstranded", "forward", "reverse"] kkonganti@0: if strandedness: kkonganti@0: if strandedness not in strandednesses: kkonganti@0: print_error( kkonganti@0: f"Strandedness must be one of '{', '.join(strandednesses)}'!", kkonganti@0: "Line", kkonganti@0: line, kkonganti@0: ) kkonganti@0: else: kkonganti@0: print_error( kkonganti@0: f"Strandedness has not been specified! Must be one of {', '.join(strandednesses)}.", kkonganti@0: "Line", kkonganti@0: line, kkonganti@0: ) kkonganti@0: kkonganti@0: ## Auto-detect paired-end/single-end kkonganti@0: sample_info = [] ## [single_end, fq1, fq2, strandedness] kkonganti@0: if sample and fq1 and fq2: ## Paired-end short reads kkonganti@0: sample_info = ["0", fq1, fq2, strandedness] kkonganti@0: elif sample and fq1 and not fq2: ## Single-end short reads kkonganti@0: sample_info = ["1", fq1, fq2, strandedness] kkonganti@0: else: kkonganti@0: print_error( kkonganti@0: "Invalid combination of columns provided!", "Line", line kkonganti@0: ) kkonganti@0: kkonganti@0: ## Create sample mapping dictionary = {sample: [[ single_end, fq1, fq2, strandedness ]]} kkonganti@0: if sample not in sample_mapping_dict: kkonganti@0: sample_mapping_dict[sample] = [sample_info] kkonganti@0: else: kkonganti@0: if sample_info in sample_mapping_dict[sample]: kkonganti@0: print_error( kkonganti@0: "Samplesheet contains duplicate rows!", "Line", line kkonganti@0: ) kkonganti@0: else: kkonganti@0: sample_mapping_dict[sample].append(sample_info) kkonganti@0: kkonganti@0: ## Write validated samplesheet with appropriate columns kkonganti@0: if len(sample_mapping_dict) > 0: kkonganti@0: out_dir = os.path.dirname(file_out) kkonganti@0: make_dir(out_dir) kkonganti@0: with open(file_out, "w") as fout: kkonganti@0: fout.write( kkonganti@0: ",".join(["sample", "single_end", "fq1", "fq2", "strandedness"]) + "\n" kkonganti@0: ) kkonganti@0: for sample in sorted(sample_mapping_dict.keys()): kkonganti@0: kkonganti@0: ## Check that multiple runs of the same sample are of the same datatype i.e. single-end / paired-end kkonganti@0: if not all( kkonganti@0: x[0] == sample_mapping_dict[sample][0][0] kkonganti@0: for x in sample_mapping_dict[sample] kkonganti@0: ): kkonganti@0: print_error( kkonganti@0: f"Multiple runs of a sample must be of the same datatype i.e. single-end or paired-end!", kkonganti@0: "Sample", kkonganti@0: sample, kkonganti@0: ) kkonganti@0: kkonganti@0: ## Check that multiple runs of the same sample are of the same strandedness kkonganti@0: if not all( kkonganti@0: x[-1] == sample_mapping_dict[sample][0][-1] kkonganti@0: for x in sample_mapping_dict[sample] kkonganti@0: ): kkonganti@0: print_error( kkonganti@0: f"Multiple runs of a sample must have the same strandedness!", kkonganti@0: "Sample", kkonganti@0: sample, kkonganti@0: ) kkonganti@0: kkonganti@0: for idx, val in enumerate(sample_mapping_dict[sample]): kkonganti@0: fout.write(",".join([f"{sample}_T{idx+1}"] + val) + "\n") kkonganti@0: else: kkonganti@0: print_error(f"No entries to process!", "Samplesheet: {file_in}") kkonganti@0: kkonganti@0: kkonganti@0: def main(args=None): kkonganti@0: args = parse_args(args) kkonganti@0: check_samplesheet(args.FILE_IN, args.FILE_OUT) kkonganti@0: kkonganti@0: kkonganti@0: if __name__ == "__main__": kkonganti@0: sys.exit(main())