Mercurial > repos > jpayne > snp_pipeline
view snp-cache.py @ 16:e3a4351809ce
planemo upload commit 7f6183b769772449fbcee903686b8d5ec5b7439f-dirty
author | jpayne |
---|---|
date | Wed, 31 Jan 2018 12:28:26 -0500 |
parents | 6adaecff5f2b |
children | 9022b00a9198 |
line wrap: on
line source
#! /usr/bin/env python import boto3 from botocore.exceptions import ClientError, DataNotFoundError, NoCredentialsError, BotoCoreError import argparse import subprocess import contextlib import logging import io import shutil import os, sys from builtins import open as _open from copy import copy from functools import partial from itertools import tee from io import BytesIO from threading import Thread CACHE_NAMESPACE = 'cfsan-galaxytrakr-cache' class NoCacheNoCommandException(Exception): pass @contextlib.contextmanager def open(filename=None, mode='r'): "basically a wrapper to make sys.stdout usable where there's a contextmanager" writer = sys.stdout.buffer try: if filename: writer = io.FileIO(filename, mode) if 'r' in mode: writer = io.BufferedReader(writer) elif 'w' in mode: writer = io.BufferedWriter(writer) yield writer writer.flush() finally: if filename: writer.close() # class stream_over(io.IOBase): # "a file-like object that works as a tee, for API's that accept a file-like" # def __init__(self, output_streams, input_stream=None): # self.streams = output_streams # self.input = input_stream # def writable(self, *a, **k): # return all([s.writeable(*a, **k) for s in self.streams]) # def write(self, *a, **k): # [s.write(*a, **k) for s in self.streams] # def writelines(self, *a, **k): # [s.writelines(*a, **k) for s in self.streams] # def flush(self, *a, **k): # [s.flush(*a, **k) for s in self.streams] # def close(self, *a, **k): # if self.input: # self.input.close() # [s.close(*a, **k) for s in self.streams] # def read(self, *a, **k): # if self.input: # bts = self.input.read(*a, **k) # self.write(bts) # return bts # raise ValueError("Not created with a readable stream; read ops not supported.") # def readlines(self, *a, **k): # if self.input: # return self.input.readlines(*a, **k) # raise ValueError("Not created with a readable stream; read ops not supported.") # def seekable(self): # return False # @contextlib.contextmanager # def multiwrite(*streams): # multistream = stream_over(streams) # yield multistream # multistream.flush() # multistream.close() def stream_to(input_stream, output_stream): for i, line in enumerate(input_stream.readlines()): if i < 8: logging.getLogger('strm').info(str(line[:70])) output_stream.write(line) def main(table, id, command=None, output=None, *a, **k): id = id.strip() table = table.strip() name = f"{table}/{id}" with open(output, 'wb') as output_f: #lookup ID in table and get a FH to the resource try: api_key = os.environ.get('AWS_API_KEY', '') s3 = boto3.resource('s3').Bucket(CACHE_NAMESPACE) s3.download_fileobj(name, output_f) logging.getLogger('snp-cache.cache').info(f"cache hit on {name}, retrieved.") except (DataNotFoundError, NoCredentialsError, BotoCoreError, ClientError) as e: if type(e) is DataNotFoundError: logging.getLogger('snp-cache.cache').info(f"cache miss on {name}") else: logging.getLogger('snp-cache.cache').error(e) #if we couldn't find the data, we need to run the command to generate it if not command: raise NoCacheNoCommandException("No cached result for this id, and no command given to generate.") logging.getLogger('snp-cache.cmd').info(command) # sub = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) # cached, err = sub.communicate() # cached, err = io.BytesIO(cached), io.BytesIO(err) try: cached = subprocess.check_output(command, shell=True) try: s3.upload_fileobj(BytesIO(cached), name) except (ClientError, BotoCoreError) as e: logging.getLogger('snp-cache.cache').error('Error writing to cache:') logging.getLogger('snp-cache.cache').error(e) finally: #stream_to(cached, output_f) #stream FROM cached TO output_f output_f.write(cached) except subprocess.CalledProcessError as e: print(e.output, file=sys.stderr) return e.returncode return 0 if __name__ == '__main__': parser = argparse.ArgumentParser(description="lookup result for file in data table, or compute and install") parser.add_argument('table', type=str) parser.add_argument('id', type=str) parser.add_argument('-c', dest='command') parser.add_argument('-o', dest='output') parser.add_argument('-l', dest='logging', default='/dev/null') params = parser.parse_args() logging.basicConfig(filename=params.logging,level=logging.INFO) quit(main(**vars(params)))