Mercurial > repos > jpayne > snp_pipeline
diff snp-cache.py @ 0:eefdd97a6749
planemo upload commit b'7f6183b769772449fbcee903686b8d5ec5b7439f\n'-dirty
author | jpayne |
---|---|
date | Wed, 24 Jan 2018 14:18:21 -0500 |
parents | |
children | 66f988a9666f |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/snp-cache.py Wed Jan 24 14:18:21 2018 -0500 @@ -0,0 +1,147 @@ +#! /usr/bin/env python3.6 + +import boto3 +from botocore.exceptions import ClientError, DataNotFoundError, NoCredentialsError, BotoCoreError + +import argparse +import subprocess +import contextlib +import logging +import io +import shutil +import os, sys +from builtins import open as _open +from copy import copy +from functools import partial +from itertools import tee +from io import BytesIO +from threading import Thread + +CACHE_NAMESPACE = 'cfsan-galaxytrakr-cache' + +class NoCacheNoCommandException(Exception): + pass + +@contextlib.contextmanager +def open(filename=None, mode='r'): + "basically a wrapper to make sys.stdout usable where there's a contextmanager" + writer = sys.stdout.buffer + try: + if filename: + writer = io.FileIO(filename, mode) + if 'r' in mode: + writer = io.BufferedReader(writer) + elif 'w' in mode: + writer = io.BufferedWriter(writer) + yield writer + writer.flush() + finally: + if filename: + writer.close() + +# class stream_over(io.IOBase): +# "a file-like object that works as a tee, for API's that accept a file-like" +# def __init__(self, output_streams, input_stream=None): +# self.streams = output_streams +# self.input = input_stream + +# def writable(self, *a, **k): +# return all([s.writeable(*a, **k) for s in self.streams]) + +# def write(self, *a, **k): +# [s.write(*a, **k) for s in self.streams] + +# def writelines(self, *a, **k): +# [s.writelines(*a, **k) for s in self.streams] + +# def flush(self, *a, **k): +# [s.flush(*a, **k) for s in self.streams] + +# def close(self, *a, **k): +# if self.input: +# self.input.close() +# [s.close(*a, **k) for s in self.streams] + +# def read(self, *a, **k): +# if self.input: +# bts = self.input.read(*a, **k) +# self.write(bts) +# return bts +# raise ValueError("Not created with a readable stream; read ops not supported.") + +# def readlines(self, *a, **k): +# if self.input: +# return self.input.readlines(*a, **k) +# raise ValueError("Not created with a readable stream; read ops not supported.") + +# def seekable(self): +# return False + +# @contextlib.contextmanager +# def multiwrite(*streams): +# multistream = stream_over(streams) +# yield multistream +# multistream.flush() +# multistream.close() + +def stream_to(input_stream, output_stream): + for i, line in enumerate(input_stream.readlines()): + if i < 8: + logging.getLogger('strm').info(str(line[:70])) + output_stream.write(line) + + + +def main(table, id, command=None, output=None, *a, **k): + id = id.strip() + table = table.strip() + name = f"{table}/{id}" + with open(output, 'wb') as output_f: + #lookup ID in table and get a FH to the resource + try: + api_key = os.environ.get('AWS_API_KEY', '') + s3 = boto3.resource('s3').Bucket(CACHE_NAMESPACE) + s3.download_fileobj(name, output_f) + logging.getLogger('snp-cache.cache').info(f"cache hit on {name}, retrieved.") + except (DataNotFoundError, NoCredentialsError, BotoCoreError) as e: + if type(e) is DataNotFoundError: + logging.getLogger('snp-cache.cache').info(f"cache miss on {name}") + else: + logging.getLogger('snp-cache.cache').error(e) + #if we couldn't find the data, we need to run the command to generate it + if not command: + raise NoCacheNoCommandException("No cached result for this id, and no command given to generate.") + logging.getLogger('snp-cache.cmd').info(command) + # sub = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + # cached, err = sub.communicate() + # cached, err = io.BytesIO(cached), io.BytesIO(err) + try: + cached = subprocess.check_output(command, shell=True) + try: + s3.upload_fileobj(BytesIO(cached), name) + except (ClientError, BotoCoreError) as e: + logging.getLogger('snp-cache.cache').error('Error writing to cache:') + logging.getLogger('snp-cache.cache').error(e) + finally: + #stream_to(cached, output_f) #stream FROM cached TO output_f + output_f.write(cached) + except subprocess.CalledProcessError as e: + print(e.output, file=sys.stderr) + return e.returncode + return 0 + + + + +if __name__ == '__main__': + parser = argparse.ArgumentParser(description="lookup result for file in data table, or compute and install") + parser.add_argument('table', type=str) + parser.add_argument('id', type=str) + parser.add_argument('-c', dest='command') + parser.add_argument('-o', dest='output') + parser.add_argument('-l', dest='logging', default='/dev/null') + params = parser.parse_args() + + logging.basicConfig(filename=params.logging,level=logging.INFO) + + quit(main(**vars(params))) \ No newline at end of file