Mercurial > repos > galaxytrakr > hfp_centriflaken_awsbatch
comparison 0.4.2/bin/prepare_nanopore_fastq_dir.py @ 0:082e0091e813 draft default tip
planemo upload
| author | galaxytrakr |
|---|---|
| date | Fri, 29 May 2026 13:27:47 +0000 |
| parents | |
| children |
comparison
equal
deleted
inserted
replaced
| -1:000000000000 | 0:082e0091e813 |
|---|---|
| 1 #!/usr/bin/env python3 | |
| 2 | |
| 3 import os | |
| 4 import re | |
| 5 import glob | |
| 6 import argparse | |
| 7 import logging | |
| 8 | |
| 9 def main(): | |
| 10 # READ IN ARGUMENTS | |
| 11 desc = """ | |
| 12 Takes in a file with flowcell ID, one per line and creates soft links | |
| 13 to 'fastq_pass' directory at target location. | |
| 14 | |
| 15 Ex: | |
| 16 | |
| 17 prepare_nanopore_fastq_dir.py \ | |
| 18 -o /hpc/scratch/Kranti.Konganti/np_test \ | |
| 19 -f flowcells.txt | |
| 20 | |
| 21 where flowcells.txt contains the following lines: | |
| 22 | |
| 23 FAL11127 | |
| 24 FAL11151 | |
| 25 | |
| 26 """ | |
| 27 parser = argparse.ArgumentParser(prog='prepare_nanopore_fastq_dir.py', | |
| 28 formatter_class=argparse.ArgumentDefaultsHelpFormatter, | |
| 29 description=desc) | |
| 30 required = parser.add_argument_group('required arguments') | |
| 31 | |
| 32 required.add_argument("-f", dest='flowcells', required=True, | |
| 33 help="Path to a text file containing Nanopore flowcell IDs, one per line") | |
| 34 required.add_argument("-i", dest='inputdir', | |
| 35 required=False, action='append', nargs='*', | |
| 36 help="Path to search directory. This directory location is where" + | |
| 37 " the presence of 'fastq_pass' will be searched for each flowcell.") | |
| 38 required.add_argument("-o", dest='outputdir', | |
| 39 required=True, | |
| 40 help="Path to output directory. This directory is created by the script" + | |
| 41 " and new soft links (symlinks) are created in this directory.") | |
| 42 | |
| 43 args = parser.parse_args() | |
| 44 flowcells = args.flowcells | |
| 45 output = args.outputdir | |
| 46 inputs = args.inputdir | |
| 47 | |
| 48 logging.basicConfig(format='%(asctime)s - %(levelname)s => %(message)s', level=logging.DEBUG) | |
| 49 | |
| 50 if not inputs: | |
| 51 inputs = ['/projects/nanopore/raw'] | |
| 52 nanopore_machines = ['RazorCrest', 'Revolution', 'ObiWan', 'MinIT', | |
| 53 'Mayhem', 'CaptainMarvel', 'MinION', 'MinION_Padmini', 'RogueOne'] | |
| 54 logging.info(f"Searching default path(s). Use -i option if custom path should be searched.") | |
| 55 else: | |
| 56 nanopore_machines = ['custom'] | |
| 57 | |
| 58 fastq_pass_found = {} | |
| 59 was_fastq_pass_found = [] | |
| 60 | |
| 61 for each_input in inputs: | |
| 62 for machine in nanopore_machines: | |
| 63 if ''.join(nanopore_machines) != 'custom': | |
| 64 input = os.path.join(each_input, machine) | |
| 65 else: | |
| 66 input = ''.join(each_input) | |
| 67 | |
| 68 logging.info(f"Searching path: {input}") | |
| 69 | |
| 70 if (os.path.exists(flowcells) and os.path.getsize(flowcells) > 0): | |
| 71 with open(flowcells, 'r') as fcells: | |
| 72 for flowcell in fcells: | |
| 73 if re.match('^\s*$', flowcell): | |
| 74 continue | |
| 75 flowcell = flowcell.strip() | |
| 76 fastq_pass_path = glob.glob(os.path.join(input, flowcell, f"**", f"*[!fast5]*", 'fastq_pass')) | |
| 77 # Try one more time since the flowcell user is trying to query may be the parent directory | |
| 78 # of fastq_pass | |
| 79 fastq_pass = fastq_pass_path if fastq_pass_path else glob.glob(os.path.join(input, f"**", f"*[!fast5]*", flowcell, 'fastq_pass')) | |
| 80 if not fastq_pass: | |
| 81 # logging.warning(f"Flowcell " + | |
| 82 # os.path.join(input, flowcell).strip() + | |
| 83 # f" does not seem to have a fastq_pass directory! Skipped!!") | |
| 84 if not flowcell in fastq_pass_found.keys(): | |
| 85 fastq_pass_found[flowcell] = 0 | |
| 86 else: | |
| 87 fastq_pass_found[flowcell] = 1 | |
| 88 sym_link_dir = os.path.join(output, flowcell) | |
| 89 sym_link_dir_dest = os.path.join(sym_link_dir, 'fastq_pass') | |
| 90 if not os.path.exists(sym_link_dir): | |
| 91 os.makedirs(sym_link_dir) | |
| 92 os.symlink( | |
| 93 ''.join(fastq_pass), | |
| 94 sym_link_dir_dest, target_is_directory=True | |
| 95 ) | |
| 96 logging.info(f"New soft link created: {sym_link_dir_dest}") | |
| 97 else: | |
| 98 logging.info(f"Soft link {sym_link_dir_dest} already exists! Skipped!!") | |
| 99 fcells.close() | |
| 100 else: | |
| 101 logging.error(f"File {flowcells} is empty or does not exist!\n") | |
| 102 | |
| 103 for k,v in fastq_pass_found.items(): | |
| 104 if not v: | |
| 105 was_fastq_pass_found.append(k) | |
| 106 | |
| 107 if was_fastq_pass_found: | |
| 108 logging.warning("Did not find fastq_pass folder for the supplied flowcells: " + | |
| 109 ', '.join(was_fastq_pass_found)) | |
| 110 | |
| 111 if was_fastq_pass_found and len(was_fastq_pass_found) == len(fastq_pass_found): | |
| 112 logging.error(f"None of the supplied flowcells were found! The output directory, {output} may not have been created!") | |
| 113 else: | |
| 114 logging.info(f"NOTE: Now you can use {output} directory as --input to cpipes.\n") | |
| 115 | |
| 116 if __name__ == "__main__": | |
| 117 main() |
