comparison snp-cache.py @ 0:eefdd97a6749

planemo upload commit b'7f6183b769772449fbcee903686b8d5ec5b7439f\n'-dirty
author jpayne
date Wed, 24 Jan 2018 14:18:21 -0500
parents
children 66f988a9666f
comparison
equal deleted inserted replaced
-1:000000000000 0:eefdd97a6749
1 #! /usr/bin/env python3.6
2
3 import boto3
4 from botocore.exceptions import ClientError, DataNotFoundError, NoCredentialsError, BotoCoreError
5
6 import argparse
7 import subprocess
8 import contextlib
9 import logging
10 import io
11 import shutil
12 import os, sys
13 from builtins import open as _open
14 from copy import copy
15 from functools import partial
16 from itertools import tee
17 from io import BytesIO
18 from threading import Thread
19
20 CACHE_NAMESPACE = 'cfsan-galaxytrakr-cache'
21
22 class NoCacheNoCommandException(Exception):
23 pass
24
25 @contextlib.contextmanager
26 def open(filename=None, mode='r'):
27 "basically a wrapper to make sys.stdout usable where there's a contextmanager"
28 writer = sys.stdout.buffer
29 try:
30 if filename:
31 writer = io.FileIO(filename, mode)
32 if 'r' in mode:
33 writer = io.BufferedReader(writer)
34 elif 'w' in mode:
35 writer = io.BufferedWriter(writer)
36 yield writer
37 writer.flush()
38 finally:
39 if filename:
40 writer.close()
41
42 # class stream_over(io.IOBase):
43 # "a file-like object that works as a tee, for API's that accept a file-like"
44 # def __init__(self, output_streams, input_stream=None):
45 # self.streams = output_streams
46 # self.input = input_stream
47
48 # def writable(self, *a, **k):
49 # return all([s.writeable(*a, **k) for s in self.streams])
50
51 # def write(self, *a, **k):
52 # [s.write(*a, **k) for s in self.streams]
53
54 # def writelines(self, *a, **k):
55 # [s.writelines(*a, **k) for s in self.streams]
56
57 # def flush(self, *a, **k):
58 # [s.flush(*a, **k) for s in self.streams]
59
60 # def close(self, *a, **k):
61 # if self.input:
62 # self.input.close()
63 # [s.close(*a, **k) for s in self.streams]
64
65 # def read(self, *a, **k):
66 # if self.input:
67 # bts = self.input.read(*a, **k)
68 # self.write(bts)
69 # return bts
70 # raise ValueError("Not created with a readable stream; read ops not supported.")
71
72 # def readlines(self, *a, **k):
73 # if self.input:
74 # return self.input.readlines(*a, **k)
75 # raise ValueError("Not created with a readable stream; read ops not supported.")
76
77 # def seekable(self):
78 # return False
79
80 # @contextlib.contextmanager
81 # def multiwrite(*streams):
82 # multistream = stream_over(streams)
83 # yield multistream
84 # multistream.flush()
85 # multistream.close()
86
87 def stream_to(input_stream, output_stream):
88 for i, line in enumerate(input_stream.readlines()):
89 if i < 8:
90 logging.getLogger('strm').info(str(line[:70]))
91 output_stream.write(line)
92
93
94
95 def main(table, id, command=None, output=None, *a, **k):
96 id = id.strip()
97 table = table.strip()
98 name = f"{table}/{id}"
99 with open(output, 'wb') as output_f:
100 #lookup ID in table and get a FH to the resource
101 try:
102 api_key = os.environ.get('AWS_API_KEY', '')
103 s3 = boto3.resource('s3').Bucket(CACHE_NAMESPACE)
104 s3.download_fileobj(name, output_f)
105 logging.getLogger('snp-cache.cache').info(f"cache hit on {name}, retrieved.")
106 except (DataNotFoundError, NoCredentialsError, BotoCoreError) as e:
107 if type(e) is DataNotFoundError:
108 logging.getLogger('snp-cache.cache').info(f"cache miss on {name}")
109 else:
110 logging.getLogger('snp-cache.cache').error(e)
111 #if we couldn't find the data, we need to run the command to generate it
112 if not command:
113 raise NoCacheNoCommandException("No cached result for this id, and no command given to generate.")
114 logging.getLogger('snp-cache.cmd').info(command)
115 # sub = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
116 # cached, err = sub.communicate()
117 # cached, err = io.BytesIO(cached), io.BytesIO(err)
118 try:
119 cached = subprocess.check_output(command, shell=True)
120 try:
121 s3.upload_fileobj(BytesIO(cached), name)
122 except (ClientError, BotoCoreError) as e:
123 logging.getLogger('snp-cache.cache').error('Error writing to cache:')
124 logging.getLogger('snp-cache.cache').error(e)
125 finally:
126 #stream_to(cached, output_f) #stream FROM cached TO output_f
127 output_f.write(cached)
128 except subprocess.CalledProcessError as e:
129 print(e.output, file=sys.stderr)
130 return e.returncode
131 return 0
132
133
134
135
136 if __name__ == '__main__':
137 parser = argparse.ArgumentParser(description="lookup result for file in data table, or compute and install")
138 parser.add_argument('table', type=str)
139 parser.add_argument('id', type=str)
140 parser.add_argument('-c', dest='command')
141 parser.add_argument('-o', dest='output')
142 parser.add_argument('-l', dest='logging', default='/dev/null')
143 params = parser.parse_args()
144
145 logging.basicConfig(filename=params.logging,level=logging.INFO)
146
147 quit(main(**vars(params)))