jpayne@13: #! /usr/bin/env python jpayne@0: jpayne@42: from __future__ import print_function jpayne@42: jpayne@60: jpayne@0: jpayne@0: import argparse jpayne@0: import subprocess jpayne@0: import contextlib jpayne@0: import logging jpayne@0: import io jpayne@0: import shutil jpayne@0: import os, sys jpayne@63: # from builtins import open as _open jpayne@0: from copy import copy jpayne@0: from functools import partial jpayne@0: from itertools import tee jpayne@0: from io import BytesIO jpayne@0: from threading import Thread jpayne@0: jpayne@0: CACHE_NAMESPACE = 'cfsan-galaxytrakr-cache' jpayne@0: jpayne@0: class NoCacheNoCommandException(Exception): jpayne@0: pass jpayne@0: jpayne@0: @contextlib.contextmanager jpayne@0: def open(filename=None, mode='r'): jpayne@0: "basically a wrapper to make sys.stdout usable where there's a contextmanager" jpayne@0: writer = sys.stdout.buffer jpayne@0: try: jpayne@0: if filename: jpayne@0: writer = io.FileIO(filename, mode) jpayne@0: if 'r' in mode: jpayne@0: writer = io.BufferedReader(writer) jpayne@0: elif 'w' in mode: jpayne@0: writer = io.BufferedWriter(writer) jpayne@0: yield writer jpayne@0: writer.flush() jpayne@0: finally: jpayne@0: if filename: jpayne@0: writer.close() jpayne@0: jpayne@0: # class stream_over(io.IOBase): jpayne@0: # "a file-like object that works as a tee, for API's that accept a file-like" jpayne@0: # def __init__(self, output_streams, input_stream=None): jpayne@0: # self.streams = output_streams jpayne@0: # self.input = input_stream jpayne@0: jpayne@0: # def writable(self, *a, **k): jpayne@0: # return all([s.writeable(*a, **k) for s in self.streams]) jpayne@0: jpayne@0: # def write(self, *a, **k): jpayne@0: # [s.write(*a, **k) for s in self.streams] jpayne@0: jpayne@0: # def writelines(self, *a, **k): jpayne@0: # [s.writelines(*a, **k) for s in self.streams] jpayne@0: jpayne@0: # def flush(self, *a, **k): jpayne@0: # [s.flush(*a, **k) for s in self.streams] jpayne@0: jpayne@0: # def close(self, *a, **k): jpayne@0: # if self.input: jpayne@0: # self.input.close() jpayne@0: # [s.close(*a, **k) for s in self.streams] jpayne@0: jpayne@0: # def read(self, *a, **k): jpayne@0: # if self.input: jpayne@0: # bts = self.input.read(*a, **k) jpayne@0: # self.write(bts) jpayne@0: # return bts jpayne@0: # raise ValueError("Not created with a readable stream; read ops not supported.") jpayne@0: jpayne@0: # def readlines(self, *a, **k): jpayne@0: # if self.input: jpayne@0: # return self.input.readlines(*a, **k) jpayne@0: # raise ValueError("Not created with a readable stream; read ops not supported.") jpayne@0: jpayne@0: # def seekable(self): jpayne@0: # return False jpayne@0: jpayne@0: # @contextlib.contextmanager jpayne@0: # def multiwrite(*streams): jpayne@0: # multistream = stream_over(streams) jpayne@0: # yield multistream jpayne@0: # multistream.flush() jpayne@0: # multistream.close() jpayne@0: jpayne@0: def stream_to(input_stream, output_stream): jpayne@0: for i, line in enumerate(input_stream.readlines()): jpayne@0: if i < 8: jpayne@0: logging.getLogger('strm').info(str(line[:70])) jpayne@0: output_stream.write(line) jpayne@0: jpayne@0: jpayne@0: jpayne@0: def main(table, id, command=None, output=None, *a, **k): jpayne@0: id = id.strip() jpayne@0: table = table.strip() jpayne@41: name = "{table}/{id}".format(**locals()) jpayne@0: with open(output, 'wb') as output_f: jpayne@0: #lookup ID in table and get a FH to the resource jpayne@0: try: jpayne@60: import boto3 jpayne@0: api_key = os.environ.get('AWS_API_KEY', '') jpayne@0: s3 = boto3.resource('s3').Bucket(CACHE_NAMESPACE) jpayne@0: s3.download_fileobj(name, output_f) jpayne@41: logging.getLogger('snp-cache.cache').info("cache hit on {name}, retrieved.".format(**locals())) jpayne@60: except Exception as e: jpayne@60: if type(e) is not ImportError: jpayne@41: logging.getLogger('snp-cache.cache').info("cache miss on {name}".format(**locals())) jpayne@0: else: jpayne@0: logging.getLogger('snp-cache.cache').error(e) jpayne@0: #if we couldn't find the data, we need to run the command to generate it jpayne@0: if not command: jpayne@0: raise NoCacheNoCommandException("No cached result for this id, and no command given to generate.") jpayne@0: logging.getLogger('snp-cache.cmd').info(command) jpayne@0: # sub = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) jpayne@0: # cached, err = sub.communicate() jpayne@0: # cached, err = io.BytesIO(cached), io.BytesIO(err) jpayne@0: try: jpayne@0: cached = subprocess.check_output(command, shell=True) jpayne@0: try: jpayne@0: s3.upload_fileobj(BytesIO(cached), name) jpayne@60: except Exception as e: jpayne@0: logging.getLogger('snp-cache.cache').error('Error writing to cache:') jpayne@0: logging.getLogger('snp-cache.cache').error(e) jpayne@0: finally: jpayne@0: #stream_to(cached, output_f) #stream FROM cached TO output_f jpayne@0: output_f.write(cached) jpayne@0: except subprocess.CalledProcessError as e: jpayne@0: print(e.output, file=sys.stderr) jpayne@0: return e.returncode jpayne@0: return 0 jpayne@0: jpayne@0: jpayne@0: jpayne@0: jpayne@0: if __name__ == '__main__': jpayne@0: parser = argparse.ArgumentParser(description="lookup result for file in data table, or compute and install") jpayne@0: parser.add_argument('table', type=str) jpayne@0: parser.add_argument('id', type=str) jpayne@0: parser.add_argument('-c', dest='command') jpayne@0: parser.add_argument('-o', dest='output') jpayne@0: parser.add_argument('-l', dest='logging', default='/dev/null') jpayne@0: params = parser.parse_args() jpayne@0: jpayne@0: logging.basicConfig(filename=params.logging,level=logging.INFO) jpayne@0: jpayne@0: quit(main(**vars(params)))