diff 0.5.0/bin/check_samplesheet.py @ 1:365849f031fd

"planemo upload"
author kkonganti
date Mon, 05 Jun 2023 18:48:51 -0400
parents
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/0.5.0/bin/check_samplesheet.py	Mon Jun 05 18:48:51 2023 -0400
@@ -0,0 +1,185 @@
+#!/usr/bin/env python3
+
+import os
+import sys
+import errno
+import argparse
+
+
+def parse_args(args=None):
+    Description = "Reformat samplesheet file and check its contents."
+    Epilog = "Example usage: python check_samplesheet.py <FILE_IN> <FILE_OUT>"
+
+    parser = argparse.ArgumentParser(description=Description, epilog=Epilog)
+    parser.add_argument("FILE_IN", help="Input samplesheet file.")
+    parser.add_argument("FILE_OUT", help="Output file.")
+    return parser.parse_args(args)
+
+
+def make_dir(path):
+    if len(path) > 0:
+        try:
+            os.makedirs(path)
+        except OSError as exception:
+            if exception.errno != errno.EEXIST:
+                raise exception
+
+
+def print_error(error, context="Line", context_str=""):
+    error_str = f"ERROR: Please check samplesheet -> {error}"
+    if context != "" and context_str != "":
+        error_str = f"ERROR: Please check samplesheet -> {error}\n{context.strip()}: '{context_str.strip()}'"
+    print(error_str)
+    sys.exit(1)
+
+
+def check_samplesheet(file_in, file_out):
+    """
+    This function checks that the samplesheet follows the following structure:
+
+    sample,fq1,fq2,strandedness
+    SAMPLE_PE,SAMPLE_PE_RUN1_1.fastq.gz,SAMPLE_PE_RUN1_2.fastq.gz,forward
+    SAMPLE_PE,SAMPLE_PE_RUN2_1.fastq.gz,SAMPLE_PE_RUN2_2.fastq.gz,forward
+    SAMPLE_SE,SAMPLE_SE_RUN1_1.fastq,,forward
+    SAMPLE_SE,SAMPLE_SE_RUN1_2.fastq.gz,,forward
+
+    For an example see:
+    https://github.com/nf-core/test-datasets/blob/rnaseq/samplesheet/v3.1/samplesheet_test.csv
+    """
+
+    sample_mapping_dict = {}
+    with open(file_in, "r", encoding='utf-8-sig') as fin:
+
+        ## Check header
+        MIN_COLS = 3
+        HEADER = ["sample", "fq1", "fq2", "strandedness"]
+        header = [x.strip('"') for x in fin.readline().strip().split(",")]
+        if header[: len(HEADER)] != HEADER:
+            print(
+                f"ERROR: Please check samplesheet header -> {','.join(header)} != {','.join(HEADER)}"
+            )
+            sys.exit(1)
+
+        ## Check sample entries
+        for line in fin:
+            if line.strip():
+                lspl = [x.strip().strip('"') for x in line.strip().split(",")]
+
+                ## Check valid number of columns per row
+                if len(lspl) < len(HEADER):
+                    print_error(
+                        f"Invalid number of columns (minimum = {len(HEADER)})!",
+                        "Line",
+                        line,
+                    )
+
+                num_cols = len([x for x in lspl if x])
+                if num_cols < MIN_COLS:
+                    print_error(
+                        f"Invalid number of populated columns (minimum = {MIN_COLS})!",
+                        "Line",
+                        line,
+                    )
+
+                ## Check sample name entries
+                sample, fq1, fq2, strandedness = lspl[: len(HEADER)]
+                if sample.find(" ") != -1:
+                    print(
+                        f"WARNING: Spaces have been replaced by underscores for sample: {sample}"
+                    )
+                    sample = sample.replace(" ", "_")
+                if not sample:
+                    print_error("Sample entry has not been specified!", "Line", line)
+
+                ## Check FastQ file extension
+                for fastq in [fq1, fq2]:
+                    if fastq:
+                        if fastq.find(" ") != -1:
+                            print_error("FastQ file contains spaces!", "Line", line)
+                        # if not fastq.endswith(".fastq.gz") and not fastq.endswith(".fq.gz"):
+                        #     print_error(
+                        #         "FastQ file does not have extension '.fastq.gz' or '.fq.gz'!",
+                        #         "Line",
+                        #         line,
+                        #     )
+
+                ## Check strandedness
+                strandednesses = ["unstranded", "forward", "reverse"]
+                if strandedness:
+                    if strandedness not in strandednesses:
+                        print_error(
+                            f"Strandedness must be one of '{', '.join(strandednesses)}'!",
+                            "Line",
+                            line,
+                        )
+                else:
+                    print_error(
+                        f"Strandedness has not been specified! Must be one of {', '.join(strandednesses)}.",
+                        "Line",
+                        line,
+                    )
+
+                ## Auto-detect paired-end/single-end
+                sample_info = []  ## [single_end, fq1, fq2, strandedness]
+                if sample and fq1 and fq2:  ## Paired-end short reads
+                    sample_info = ["0", fq1, fq2, strandedness]
+                elif sample and fq1 and not fq2:  ## Single-end short reads
+                    sample_info = ["1", fq1, fq2, strandedness]
+                else:
+                    print_error("Invalid combination of columns provided!", "Line", line)
+
+                ## Create sample mapping dictionary = {sample: [[ single_end, fq1, fq2, strandedness ]]}
+                if sample not in sample_mapping_dict:
+                    sample_mapping_dict[sample] = [sample_info]
+                else:
+                    if sample_info in sample_mapping_dict[sample]:
+                        print_error("Samplesheet contains duplicate rows!", "Line", line)
+                    else:
+                        sample_mapping_dict[sample].append(sample_info)
+
+    ## Write validated samplesheet with appropriate columns
+    if len(sample_mapping_dict) > 0:
+        out_dir = os.path.dirname(file_out)
+        make_dir(out_dir)
+        with open(file_out, "w") as fout:
+            fout.write(
+                ",".join(["sample", "single_end", "fq1", "fq2", "strandedness"])
+                + "\n"
+            )
+            for sample in sorted(sample_mapping_dict.keys()):
+
+                ## Check that multiple runs of the same sample are of the same datatype i.e. single-end / paired-end
+                if not all(
+                    x[0] == sample_mapping_dict[sample][0][0]
+                    for x in sample_mapping_dict[sample]
+                ):
+                    print_error(
+                        f"Multiple runs of a sample must be of the same datatype i.e. single-end or paired-end!",
+                        "Sample",
+                        sample,
+                    )
+
+                ## Check that multiple runs of the same sample are of the same strandedness
+                if not all(
+                    x[-1] == sample_mapping_dict[sample][0][-1]
+                    for x in sample_mapping_dict[sample]
+                ):
+                    print_error(
+                        f"Multiple runs of a sample must have the same strandedness!",
+                        "Sample",
+                        sample,
+                    )
+
+                for idx, val in enumerate(sample_mapping_dict[sample]):
+                    fout.write(",".join([f"{sample}_T{idx+1}"] + val) + "\n")
+    else:
+        print_error(f"No entries to process!", "Samplesheet: {file_in}")
+
+
+def main(args=None):
+    args = parse_args(args)
+    check_samplesheet(args.FILE_IN, args.FILE_OUT)
+
+
+if __name__ == "__main__":
+    sys.exit(main())