annotate 0.6.1/bin/check_samplesheet.py @ 16:b90e5a7a3d4f

"planemo upload"
author kkonganti
date Thu, 07 Sep 2023 15:22:10 -0400
parents 749faef1caa9
children
rev   line source
kkonganti@11 1 #!/usr/bin/env python3
kkonganti@11 2
kkonganti@11 3 import os
kkonganti@11 4 import sys
kkonganti@11 5 import errno
kkonganti@11 6 import argparse
kkonganti@11 7
kkonganti@11 8
kkonganti@11 9 def parse_args(args=None):
kkonganti@11 10 Description = "Reformat samplesheet file and check its contents."
kkonganti@11 11 Epilog = "Example usage: python check_samplesheet.py <FILE_IN> <FILE_OUT>"
kkonganti@11 12
kkonganti@11 13 parser = argparse.ArgumentParser(description=Description, epilog=Epilog)
kkonganti@11 14 parser.add_argument("FILE_IN", help="Input samplesheet file.")
kkonganti@11 15 parser.add_argument("FILE_OUT", help="Output file.")
kkonganti@11 16 return parser.parse_args(args)
kkonganti@11 17
kkonganti@11 18
kkonganti@11 19 def make_dir(path):
kkonganti@11 20 if len(path) > 0:
kkonganti@11 21 try:
kkonganti@11 22 os.makedirs(path)
kkonganti@11 23 except OSError as exception:
kkonganti@11 24 if exception.errno != errno.EEXIST:
kkonganti@11 25 raise exception
kkonganti@11 26
kkonganti@11 27
kkonganti@11 28 def print_error(error, context="Line", context_str=""):
kkonganti@11 29 error_str = f"ERROR: Please check samplesheet -> {error}"
kkonganti@11 30 if context != "" and context_str != "":
kkonganti@11 31 error_str = f"ERROR: Please check samplesheet -> {error}\n{context.strip()}: '{context_str.strip()}'"
kkonganti@11 32 print(error_str)
kkonganti@11 33 sys.exit(1)
kkonganti@11 34
kkonganti@11 35
kkonganti@11 36 def check_samplesheet(file_in, file_out):
kkonganti@11 37 """
kkonganti@11 38 This function checks that the samplesheet follows the following structure:
kkonganti@11 39
kkonganti@11 40 sample,fq1,fq2,strandedness
kkonganti@11 41 SAMPLE_PE,SAMPLE_PE_RUN1_1.fastq.gz,SAMPLE_PE_RUN1_2.fastq.gz,forward
kkonganti@11 42 SAMPLE_PE,SAMPLE_PE_RUN2_1.fastq.gz,SAMPLE_PE_RUN2_2.fastq.gz,forward
kkonganti@11 43 SAMPLE_SE,SAMPLE_SE_RUN1_1.fastq,,forward
kkonganti@11 44 SAMPLE_SE,SAMPLE_SE_RUN1_2.fastq.gz,,forward
kkonganti@11 45
kkonganti@11 46 For an example see:
kkonganti@11 47 https://github.com/nf-core/test-datasets/blob/rnaseq/samplesheet/v3.1/samplesheet_test.csv
kkonganti@11 48 """
kkonganti@11 49
kkonganti@11 50 sample_mapping_dict = {}
kkonganti@11 51 with open(file_in, "r", encoding='utf-8-sig') as fin:
kkonganti@11 52
kkonganti@11 53 ## Check header
kkonganti@11 54 MIN_COLS = 3
kkonganti@11 55 HEADER = ["sample", "fq1", "fq2", "strandedness"]
kkonganti@11 56 header = [x.strip('"') for x in fin.readline().strip().split(",")]
kkonganti@11 57 if header[: len(HEADER)] != HEADER:
kkonganti@11 58 print(
kkonganti@11 59 f"ERROR: Please check samplesheet header -> {','.join(header)} != {','.join(HEADER)}"
kkonganti@11 60 )
kkonganti@11 61 sys.exit(1)
kkonganti@11 62
kkonganti@11 63 ## Check sample entries
kkonganti@11 64 for line in fin:
kkonganti@11 65 if line.strip():
kkonganti@11 66 lspl = [x.strip().strip('"') for x in line.strip().split(",")]
kkonganti@11 67
kkonganti@11 68 ## Check valid number of columns per row
kkonganti@11 69 if len(lspl) < len(HEADER):
kkonganti@11 70 print_error(
kkonganti@11 71 f"Invalid number of columns (minimum = {len(HEADER)})!",
kkonganti@11 72 "Line",
kkonganti@11 73 line,
kkonganti@11 74 )
kkonganti@11 75
kkonganti@11 76 num_cols = len([x for x in lspl if x])
kkonganti@11 77 if num_cols < MIN_COLS:
kkonganti@11 78 print_error(
kkonganti@11 79 f"Invalid number of populated columns (minimum = {MIN_COLS})!",
kkonganti@11 80 "Line",
kkonganti@11 81 line,
kkonganti@11 82 )
kkonganti@11 83
kkonganti@11 84 ## Check sample name entries
kkonganti@11 85 sample, fq1, fq2, strandedness = lspl[: len(HEADER)]
kkonganti@11 86 if sample.find(" ") != -1:
kkonganti@11 87 print(
kkonganti@11 88 f"WARNING: Spaces have been replaced by underscores for sample: {sample}"
kkonganti@11 89 )
kkonganti@11 90 sample = sample.replace(" ", "_")
kkonganti@11 91 if not sample:
kkonganti@11 92 print_error("Sample entry has not been specified!", "Line", line)
kkonganti@11 93
kkonganti@11 94 ## Check FastQ file extension
kkonganti@11 95 for fastq in [fq1, fq2]:
kkonganti@11 96 if fastq:
kkonganti@11 97 if fastq.find(" ") != -1:
kkonganti@11 98 print_error("FastQ file contains spaces!", "Line", line)
kkonganti@11 99 # if not fastq.endswith(".fastq.gz") and not fastq.endswith(".fq.gz"):
kkonganti@11 100 # print_error(
kkonganti@11 101 # "FastQ file does not have extension '.fastq.gz' or '.fq.gz'!",
kkonganti@11 102 # "Line",
kkonganti@11 103 # line,
kkonganti@11 104 # )
kkonganti@11 105
kkonganti@11 106 ## Check strandedness
kkonganti@11 107 strandednesses = ["unstranded", "forward", "reverse"]
kkonganti@11 108 if strandedness:
kkonganti@11 109 if strandedness not in strandednesses:
kkonganti@11 110 print_error(
kkonganti@11 111 f"Strandedness must be one of '{', '.join(strandednesses)}'!",
kkonganti@11 112 "Line",
kkonganti@11 113 line,
kkonganti@11 114 )
kkonganti@11 115 else:
kkonganti@11 116 print_error(
kkonganti@11 117 f"Strandedness has not been specified! Must be one of {', '.join(strandednesses)}.",
kkonganti@11 118 "Line",
kkonganti@11 119 line,
kkonganti@11 120 )
kkonganti@11 121
kkonganti@11 122 ## Auto-detect paired-end/single-end
kkonganti@11 123 sample_info = [] ## [single_end, fq1, fq2, strandedness]
kkonganti@11 124 if sample and fq1 and fq2: ## Paired-end short reads
kkonganti@11 125 sample_info = ["0", fq1, fq2, strandedness]
kkonganti@11 126 elif sample and fq1 and not fq2: ## Single-end short reads
kkonganti@11 127 sample_info = ["1", fq1, fq2, strandedness]
kkonganti@11 128 else:
kkonganti@11 129 print_error("Invalid combination of columns provided!", "Line", line)
kkonganti@11 130
kkonganti@11 131 ## Create sample mapping dictionary = {sample: [[ single_end, fq1, fq2, strandedness ]]}
kkonganti@11 132 if sample not in sample_mapping_dict:
kkonganti@11 133 sample_mapping_dict[sample] = [sample_info]
kkonganti@11 134 else:
kkonganti@11 135 if sample_info in sample_mapping_dict[sample]:
kkonganti@11 136 print_error("Samplesheet contains duplicate rows!", "Line", line)
kkonganti@11 137 else:
kkonganti@11 138 sample_mapping_dict[sample].append(sample_info)
kkonganti@11 139
kkonganti@11 140 ## Write validated samplesheet with appropriate columns
kkonganti@11 141 if len(sample_mapping_dict) > 0:
kkonganti@11 142 out_dir = os.path.dirname(file_out)
kkonganti@11 143 make_dir(out_dir)
kkonganti@11 144 with open(file_out, "w") as fout:
kkonganti@11 145 fout.write(
kkonganti@11 146 ",".join(["sample", "single_end", "fq1", "fq2", "strandedness"])
kkonganti@11 147 + "\n"
kkonganti@11 148 )
kkonganti@11 149 for sample in sorted(sample_mapping_dict.keys()):
kkonganti@11 150
kkonganti@11 151 ## Check that multiple runs of the same sample are of the same datatype i.e. single-end / paired-end
kkonganti@11 152 if not all(
kkonganti@11 153 x[0] == sample_mapping_dict[sample][0][0]
kkonganti@11 154 for x in sample_mapping_dict[sample]
kkonganti@11 155 ):
kkonganti@11 156 print_error(
kkonganti@11 157 f"Multiple runs of a sample must be of the same datatype i.e. single-end or paired-end!",
kkonganti@11 158 "Sample",
kkonganti@11 159 sample,
kkonganti@11 160 )
kkonganti@11 161
kkonganti@11 162 ## Check that multiple runs of the same sample are of the same strandedness
kkonganti@11 163 if not all(
kkonganti@11 164 x[-1] == sample_mapping_dict[sample][0][-1]
kkonganti@11 165 for x in sample_mapping_dict[sample]
kkonganti@11 166 ):
kkonganti@11 167 print_error(
kkonganti@11 168 f"Multiple runs of a sample must have the same strandedness!",
kkonganti@11 169 "Sample",
kkonganti@11 170 sample,
kkonganti@11 171 )
kkonganti@11 172
kkonganti@11 173 for idx, val in enumerate(sample_mapping_dict[sample]):
kkonganti@11 174 fout.write(",".join([f"{sample}_T{idx+1}"] + val) + "\n")
kkonganti@11 175 else:
kkonganti@11 176 print_error(f"No entries to process!", "Samplesheet: {file_in}")
kkonganti@11 177
kkonganti@11 178
kkonganti@11 179 def main(args=None):
kkonganti@11 180 args = parse_args(args)
kkonganti@11 181 check_samplesheet(args.FILE_IN, args.FILE_OUT)
kkonganti@11 182
kkonganti@11 183
kkonganti@11 184 if __name__ == "__main__":
kkonganti@11 185 sys.exit(main())