Mercurial > repos > jpayne > snp_pipeline
view snp-cache.py @ 64:b5cf2ec0c540 tip
planemo upload
author | jpayne |
---|---|
date | Sat, 29 Jun 2024 06:56:11 -0400 |
parents | fb44b003e29b |
children |
line wrap: on
line source
#! /usr/bin/env python from __future__ import print_function import argparse import subprocess import contextlib import logging import io import shutil import os, sys # from builtins import open as _open from copy import copy from functools import partial from itertools import tee from io import BytesIO from threading import Thread CACHE_NAMESPACE = 'cfsan-galaxytrakr-cache' class NoCacheNoCommandException(Exception): pass # @contextlib.contextmanager # def open(filename=None, mode='r'): # "basically a wrapper to make sys.stdout usable where there's a contextmanager" # writer = sys.stdout.buffer # try: # if filename: # writer = io.FileIO(filename, mode) # if 'r' in mode: # writer = io.BufferedReader(writer) # elif 'w' in mode: # writer = io.BufferedWriter(writer) # yield writer # writer.flush() # finally: # if filename: # writer.close() # class stream_over(io.IOBase): # "a file-like object that works as a tee, for API's that accept a file-like" # def __init__(self, output_streams, input_stream=None): # self.streams = output_streams # self.input = input_stream # def writable(self, *a, **k): # return all([s.writeable(*a, **k) for s in self.streams]) # def write(self, *a, **k): # [s.write(*a, **k) for s in self.streams] # def writelines(self, *a, **k): # [s.writelines(*a, **k) for s in self.streams] # def flush(self, *a, **k): # [s.flush(*a, **k) for s in self.streams] # def close(self, *a, **k): # if self.input: # self.input.close() # [s.close(*a, **k) for s in self.streams] # def read(self, *a, **k): # if self.input: # bts = self.input.read(*a, **k) # self.write(bts) # return bts # raise ValueError("Not created with a readable stream; read ops not supported.") # def readlines(self, *a, **k): # if self.input: # return self.input.readlines(*a, **k) # raise ValueError("Not created with a readable stream; read ops not supported.") # def seekable(self): # return False # @contextlib.contextmanager # def multiwrite(*streams): # multistream = stream_over(streams) # yield multistream # multistream.flush() # multistream.close() # def stream_to(input_stream, output_stream): # for i, line in enumerate(input_stream.readlines()): # if i < 8: # logging.getLogger('strm').info(str(line[:70])) # output_stream.write(line) def main(table, id, command=None, output=None, *a, **k): id = id.strip() table = table.strip() name = "{table}/{id}".format(**locals()) with open(output, 'wb') as output_f: # #lookup ID in table and get a FH to the resource # try: # import boto3 # api_key = os.environ.get('AWS_API_KEY', '') # s3 = boto3.resource('s3').Bucket(CACHE_NAMESPACE) # s3.download_fileobj(name, output_f) # logging.getLogger('snp-cache.cache').info("cache hit on {name}, retrieved.".format(**locals())) # except Exception as e: # if type(e) is not ImportError: # logging.getLogger('snp-cache.cache').info("cache miss on {name}".format(**locals())) # else: # logging.getLogger('snp-cache.cache').error(e) # #if we couldn't find the data, we need to run the command to generate it # if not command: # raise NoCacheNoCommandException("No cached result for this id, and no command given to generate.") # logging.getLogger('snp-cache.cmd').info(command) # # sub = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) # # cached, err = sub.communicate() # # cached, err = io.BytesIO(cached), io.BytesIO(err) try: cached = subprocess.check_output(command, shell=True) try: pass # s3.upload_fileobj(BytesIO(cached), name) except Exception as e: logging.getLogger('snp-cache.cache').error('Error writing to cache:') logging.getLogger('snp-cache.cache').error(e) finally: #stream_to(cached, output_f) #stream FROM cached TO output_f output_f.write(cached) except subprocess.CalledProcessError as e: print(e.output, file=sys.stderr) return e.returncode return 0 if __name__ == '__main__': parser = argparse.ArgumentParser(description="lookup result for file in data table, or compute and install") parser.add_argument('table', type=str) parser.add_argument('id', type=str) parser.add_argument('-c', dest='command') parser.add_argument('-o', dest='output') parser.add_argument('-l', dest='logging', default='/dev/null') params = parser.parse_args() logging.basicConfig(filename=params.logging,level=logging.INFO) quit(main(**vars(params)))