annotate snp-cache.py @ 51:568eb62c7524

planemo upload commit 2107f35724b3f897c9414be40a9d14663cba9f4d-dirty
author jpayne
date Tue, 17 Sep 2019 11:06:59 -0400
parents 11296a86e01b
children 0b0e3e4376a7
rev   line source
jpayne@13 1 #! /usr/bin/env python
jpayne@0 2
jpayne@42 3 from __future__ import print_function
jpayne@42 4
jpayne@0 5 import boto3
jpayne@0 6 from botocore.exceptions import ClientError, DataNotFoundError, NoCredentialsError, BotoCoreError
jpayne@0 7
jpayne@0 8 import argparse
jpayne@0 9 import subprocess
jpayne@0 10 import contextlib
jpayne@0 11 import logging
jpayne@0 12 import io
jpayne@0 13 import shutil
jpayne@0 14 import os, sys
jpayne@0 15 from builtins import open as _open
jpayne@0 16 from copy import copy
jpayne@0 17 from functools import partial
jpayne@0 18 from itertools import tee
jpayne@0 19 from io import BytesIO
jpayne@0 20 from threading import Thread
jpayne@0 21
jpayne@0 22 CACHE_NAMESPACE = 'cfsan-galaxytrakr-cache'
jpayne@0 23
jpayne@0 24 class NoCacheNoCommandException(Exception):
jpayne@0 25 pass
jpayne@0 26
jpayne@0 27 @contextlib.contextmanager
jpayne@0 28 def open(filename=None, mode='r'):
jpayne@0 29 "basically a wrapper to make sys.stdout usable where there's a contextmanager"
jpayne@0 30 writer = sys.stdout.buffer
jpayne@0 31 try:
jpayne@0 32 if filename:
jpayne@0 33 writer = io.FileIO(filename, mode)
jpayne@0 34 if 'r' in mode:
jpayne@0 35 writer = io.BufferedReader(writer)
jpayne@0 36 elif 'w' in mode:
jpayne@0 37 writer = io.BufferedWriter(writer)
jpayne@0 38 yield writer
jpayne@0 39 writer.flush()
jpayne@0 40 finally:
jpayne@0 41 if filename:
jpayne@0 42 writer.close()
jpayne@0 43
jpayne@0 44 # class stream_over(io.IOBase):
jpayne@0 45 # "a file-like object that works as a tee, for API's that accept a file-like"
jpayne@0 46 # def __init__(self, output_streams, input_stream=None):
jpayne@0 47 # self.streams = output_streams
jpayne@0 48 # self.input = input_stream
jpayne@0 49
jpayne@0 50 # def writable(self, *a, **k):
jpayne@0 51 # return all([s.writeable(*a, **k) for s in self.streams])
jpayne@0 52
jpayne@0 53 # def write(self, *a, **k):
jpayne@0 54 # [s.write(*a, **k) for s in self.streams]
jpayne@0 55
jpayne@0 56 # def writelines(self, *a, **k):
jpayne@0 57 # [s.writelines(*a, **k) for s in self.streams]
jpayne@0 58
jpayne@0 59 # def flush(self, *a, **k):
jpayne@0 60 # [s.flush(*a, **k) for s in self.streams]
jpayne@0 61
jpayne@0 62 # def close(self, *a, **k):
jpayne@0 63 # if self.input:
jpayne@0 64 # self.input.close()
jpayne@0 65 # [s.close(*a, **k) for s in self.streams]
jpayne@0 66
jpayne@0 67 # def read(self, *a, **k):
jpayne@0 68 # if self.input:
jpayne@0 69 # bts = self.input.read(*a, **k)
jpayne@0 70 # self.write(bts)
jpayne@0 71 # return bts
jpayne@0 72 # raise ValueError("Not created with a readable stream; read ops not supported.")
jpayne@0 73
jpayne@0 74 # def readlines(self, *a, **k):
jpayne@0 75 # if self.input:
jpayne@0 76 # return self.input.readlines(*a, **k)
jpayne@0 77 # raise ValueError("Not created with a readable stream; read ops not supported.")
jpayne@0 78
jpayne@0 79 # def seekable(self):
jpayne@0 80 # return False
jpayne@0 81
jpayne@0 82 # @contextlib.contextmanager
jpayne@0 83 # def multiwrite(*streams):
jpayne@0 84 # multistream = stream_over(streams)
jpayne@0 85 # yield multistream
jpayne@0 86 # multistream.flush()
jpayne@0 87 # multistream.close()
jpayne@0 88
jpayne@0 89 def stream_to(input_stream, output_stream):
jpayne@0 90 for i, line in enumerate(input_stream.readlines()):
jpayne@0 91 if i < 8:
jpayne@0 92 logging.getLogger('strm').info(str(line[:70]))
jpayne@0 93 output_stream.write(line)
jpayne@0 94
jpayne@0 95
jpayne@0 96
jpayne@0 97 def main(table, id, command=None, output=None, *a, **k):
jpayne@0 98 id = id.strip()
jpayne@0 99 table = table.strip()
jpayne@41 100 name = "{table}/{id}".format(**locals())
jpayne@0 101 with open(output, 'wb') as output_f:
jpayne@0 102 #lookup ID in table and get a FH to the resource
jpayne@0 103 try:
jpayne@0 104 api_key = os.environ.get('AWS_API_KEY', '')
jpayne@0 105 s3 = boto3.resource('s3').Bucket(CACHE_NAMESPACE)
jpayne@0 106 s3.download_fileobj(name, output_f)
jpayne@41 107 logging.getLogger('snp-cache.cache').info("cache hit on {name}, retrieved.".format(**locals()))
jpayne@4 108 except (DataNotFoundError, NoCredentialsError, BotoCoreError, ClientError) as e:
jpayne@0 109 if type(e) is DataNotFoundError:
jpayne@41 110 logging.getLogger('snp-cache.cache').info("cache miss on {name}".format(**locals()))
jpayne@0 111 else:
jpayne@0 112 logging.getLogger('snp-cache.cache').error(e)
jpayne@0 113 #if we couldn't find the data, we need to run the command to generate it
jpayne@0 114 if not command:
jpayne@0 115 raise NoCacheNoCommandException("No cached result for this id, and no command given to generate.")
jpayne@0 116 logging.getLogger('snp-cache.cmd').info(command)
jpayne@0 117 # sub = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
jpayne@0 118 # cached, err = sub.communicate()
jpayne@0 119 # cached, err = io.BytesIO(cached), io.BytesIO(err)
jpayne@0 120 try:
jpayne@0 121 cached = subprocess.check_output(command, shell=True)
jpayne@0 122 try:
jpayne@0 123 s3.upload_fileobj(BytesIO(cached), name)
jpayne@0 124 except (ClientError, BotoCoreError) as e:
jpayne@0 125 logging.getLogger('snp-cache.cache').error('Error writing to cache:')
jpayne@0 126 logging.getLogger('snp-cache.cache').error(e)
jpayne@0 127 finally:
jpayne@0 128 #stream_to(cached, output_f) #stream FROM cached TO output_f
jpayne@0 129 output_f.write(cached)
jpayne@0 130 except subprocess.CalledProcessError as e:
jpayne@0 131 print(e.output, file=sys.stderr)
jpayne@0 132 return e.returncode
jpayne@0 133 return 0
jpayne@0 134
jpayne@0 135
jpayne@0 136
jpayne@0 137
jpayne@0 138 if __name__ == '__main__':
jpayne@0 139 parser = argparse.ArgumentParser(description="lookup result for file in data table, or compute and install")
jpayne@0 140 parser.add_argument('table', type=str)
jpayne@0 141 parser.add_argument('id', type=str)
jpayne@0 142 parser.add_argument('-c', dest='command')
jpayne@0 143 parser.add_argument('-o', dest='output')
jpayne@0 144 parser.add_argument('-l', dest='logging', default='/dev/null')
jpayne@0 145 params = parser.parse_args()
jpayne@0 146
jpayne@0 147 logging.basicConfig(filename=params.logging,level=logging.INFO)
jpayne@0 148
jpayne@0 149 quit(main(**vars(params)))