Mercurial > repos > jpayne > snp_pipeline
view snp-cache.py @ 63:fb44b003e29b
planemo upload
author | jpayne |
---|---|
date | Fri, 28 Jun 2024 23:03:53 -0400 |
parents | 0b0e3e4376a7 |
children | b5cf2ec0c540 |
line wrap: on
line source
#! /usr/bin/env python from __future__ import print_function import argparse import subprocess import contextlib import logging import io import shutil import os, sys # from builtins import open as _open from copy import copy from functools import partial from itertools import tee from io import BytesIO from threading import Thread CACHE_NAMESPACE = 'cfsan-galaxytrakr-cache' class NoCacheNoCommandException(Exception): pass @contextlib.contextmanager def open(filename=None, mode='r'): "basically a wrapper to make sys.stdout usable where there's a contextmanager" writer = sys.stdout.buffer try: if filename: writer = io.FileIO(filename, mode) if 'r' in mode: writer = io.BufferedReader(writer) elif 'w' in mode: writer = io.BufferedWriter(writer) yield writer writer.flush() finally: if filename: writer.close() # class stream_over(io.IOBase): # "a file-like object that works as a tee, for API's that accept a file-like" # def __init__(self, output_streams, input_stream=None): # self.streams = output_streams # self.input = input_stream # def writable(self, *a, **k): # return all([s.writeable(*a, **k) for s in self.streams]) # def write(self, *a, **k): # [s.write(*a, **k) for s in self.streams] # def writelines(self, *a, **k): # [s.writelines(*a, **k) for s in self.streams] # def flush(self, *a, **k): # [s.flush(*a, **k) for s in self.streams] # def close(self, *a, **k): # if self.input: # self.input.close() # [s.close(*a, **k) for s in self.streams] # def read(self, *a, **k): # if self.input: # bts = self.input.read(*a, **k) # self.write(bts) # return bts # raise ValueError("Not created with a readable stream; read ops not supported.") # def readlines(self, *a, **k): # if self.input: # return self.input.readlines(*a, **k) # raise ValueError("Not created with a readable stream; read ops not supported.") # def seekable(self): # return False # @contextlib.contextmanager # def multiwrite(*streams): # multistream = stream_over(streams) # yield multistream # multistream.flush() # multistream.close() def stream_to(input_stream, output_stream): for i, line in enumerate(input_stream.readlines()): if i < 8: logging.getLogger('strm').info(str(line[:70])) output_stream.write(line) def main(table, id, command=None, output=None, *a, **k): id = id.strip() table = table.strip() name = "{table}/{id}".format(**locals()) with open(output, 'wb') as output_f: #lookup ID in table and get a FH to the resource try: import boto3 api_key = os.environ.get('AWS_API_KEY', '') s3 = boto3.resource('s3').Bucket(CACHE_NAMESPACE) s3.download_fileobj(name, output_f) logging.getLogger('snp-cache.cache').info("cache hit on {name}, retrieved.".format(**locals())) except Exception as e: if type(e) is not ImportError: logging.getLogger('snp-cache.cache').info("cache miss on {name}".format(**locals())) else: logging.getLogger('snp-cache.cache').error(e) #if we couldn't find the data, we need to run the command to generate it if not command: raise NoCacheNoCommandException("No cached result for this id, and no command given to generate.") logging.getLogger('snp-cache.cmd').info(command) # sub = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) # cached, err = sub.communicate() # cached, err = io.BytesIO(cached), io.BytesIO(err) try: cached = subprocess.check_output(command, shell=True) try: s3.upload_fileobj(BytesIO(cached), name) except Exception as e: logging.getLogger('snp-cache.cache').error('Error writing to cache:') logging.getLogger('snp-cache.cache').error(e) finally: #stream_to(cached, output_f) #stream FROM cached TO output_f output_f.write(cached) except subprocess.CalledProcessError as e: print(e.output, file=sys.stderr) return e.returncode return 0 if __name__ == '__main__': parser = argparse.ArgumentParser(description="lookup result for file in data table, or compute and install") parser.add_argument('table', type=str) parser.add_argument('id', type=str) parser.add_argument('-c', dest='command') parser.add_argument('-o', dest='output') parser.add_argument('-l', dest='logging', default='/dev/null') params = parser.parse_args() logging.basicConfig(filename=params.logging,level=logging.INFO) quit(main(**vars(params)))