Mercurial > repos > jpayne > snp_pipeline
view snp-cache.py @ 2:66ca2fd8592a
planemo upload commit b'7f6183b769772449fbcee903686b8d5ec5b7439f\n'-dirty
author | jpayne |
---|---|
date | Wed, 24 Jan 2018 15:07:00 -0500 |
parents | eefdd97a6749 |
children | 66f988a9666f |
line wrap: on
line source
#! /usr/bin/env python3.6 import boto3 from botocore.exceptions import ClientError, DataNotFoundError, NoCredentialsError, BotoCoreError import argparse import subprocess import contextlib import logging import io import shutil import os, sys from builtins import open as _open from copy import copy from functools import partial from itertools import tee from io import BytesIO from threading import Thread CACHE_NAMESPACE = 'cfsan-galaxytrakr-cache' class NoCacheNoCommandException(Exception): pass @contextlib.contextmanager def open(filename=None, mode='r'): "basically a wrapper to make sys.stdout usable where there's a contextmanager" writer = sys.stdout.buffer try: if filename: writer = io.FileIO(filename, mode) if 'r' in mode: writer = io.BufferedReader(writer) elif 'w' in mode: writer = io.BufferedWriter(writer) yield writer writer.flush() finally: if filename: writer.close() # class stream_over(io.IOBase): # "a file-like object that works as a tee, for API's that accept a file-like" # def __init__(self, output_streams, input_stream=None): # self.streams = output_streams # self.input = input_stream # def writable(self, *a, **k): # return all([s.writeable(*a, **k) for s in self.streams]) # def write(self, *a, **k): # [s.write(*a, **k) for s in self.streams] # def writelines(self, *a, **k): # [s.writelines(*a, **k) for s in self.streams] # def flush(self, *a, **k): # [s.flush(*a, **k) for s in self.streams] # def close(self, *a, **k): # if self.input: # self.input.close() # [s.close(*a, **k) for s in self.streams] # def read(self, *a, **k): # if self.input: # bts = self.input.read(*a, **k) # self.write(bts) # return bts # raise ValueError("Not created with a readable stream; read ops not supported.") # def readlines(self, *a, **k): # if self.input: # return self.input.readlines(*a, **k) # raise ValueError("Not created with a readable stream; read ops not supported.") # def seekable(self): # return False # @contextlib.contextmanager # def multiwrite(*streams): # multistream = stream_over(streams) # yield multistream # multistream.flush() # multistream.close() def stream_to(input_stream, output_stream): for i, line in enumerate(input_stream.readlines()): if i < 8: logging.getLogger('strm').info(str(line[:70])) output_stream.write(line) def main(table, id, command=None, output=None, *a, **k): id = id.strip() table = table.strip() name = f"{table}/{id}" with open(output, 'wb') as output_f: #lookup ID in table and get a FH to the resource try: api_key = os.environ.get('AWS_API_KEY', '') s3 = boto3.resource('s3').Bucket(CACHE_NAMESPACE) s3.download_fileobj(name, output_f) logging.getLogger('snp-cache.cache').info(f"cache hit on {name}, retrieved.") except (DataNotFoundError, NoCredentialsError, BotoCoreError) as e: if type(e) is DataNotFoundError: logging.getLogger('snp-cache.cache').info(f"cache miss on {name}") else: logging.getLogger('snp-cache.cache').error(e) #if we couldn't find the data, we need to run the command to generate it if not command: raise NoCacheNoCommandException("No cached result for this id, and no command given to generate.") logging.getLogger('snp-cache.cmd').info(command) # sub = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) # cached, err = sub.communicate() # cached, err = io.BytesIO(cached), io.BytesIO(err) try: cached = subprocess.check_output(command, shell=True) try: s3.upload_fileobj(BytesIO(cached), name) except (ClientError, BotoCoreError) as e: logging.getLogger('snp-cache.cache').error('Error writing to cache:') logging.getLogger('snp-cache.cache').error(e) finally: #stream_to(cached, output_f) #stream FROM cached TO output_f output_f.write(cached) except subprocess.CalledProcessError as e: print(e.output, file=sys.stderr) return e.returncode return 0 if __name__ == '__main__': parser = argparse.ArgumentParser(description="lookup result for file in data table, or compute and install") parser.add_argument('table', type=str) parser.add_argument('id', type=str) parser.add_argument('-c', dest='command') parser.add_argument('-o', dest='output') parser.add_argument('-l', dest='logging', default='/dev/null') params = parser.parse_args() logging.basicConfig(filename=params.logging,level=logging.INFO) quit(main(**vars(params)))