jpayne@13
|
1 #! /usr/bin/env python
|
jpayne@0
|
2
|
jpayne@42
|
3 from __future__ import print_function
|
jpayne@42
|
4
|
jpayne@0
|
5 import boto3
|
jpayne@0
|
6 from botocore.exceptions import ClientError, DataNotFoundError, NoCredentialsError, BotoCoreError
|
jpayne@0
|
7
|
jpayne@0
|
8 import argparse
|
jpayne@0
|
9 import subprocess
|
jpayne@0
|
10 import contextlib
|
jpayne@0
|
11 import logging
|
jpayne@0
|
12 import io
|
jpayne@0
|
13 import shutil
|
jpayne@0
|
14 import os, sys
|
jpayne@0
|
15 from builtins import open as _open
|
jpayne@0
|
16 from copy import copy
|
jpayne@0
|
17 from functools import partial
|
jpayne@0
|
18 from itertools import tee
|
jpayne@0
|
19 from io import BytesIO
|
jpayne@0
|
20 from threading import Thread
|
jpayne@0
|
21
|
jpayne@0
|
22 CACHE_NAMESPACE = 'cfsan-galaxytrakr-cache'
|
jpayne@0
|
23
|
jpayne@0
|
24 class NoCacheNoCommandException(Exception):
|
jpayne@0
|
25 pass
|
jpayne@0
|
26
|
jpayne@0
|
27 @contextlib.contextmanager
|
jpayne@0
|
28 def open(filename=None, mode='r'):
|
jpayne@0
|
29 "basically a wrapper to make sys.stdout usable where there's a contextmanager"
|
jpayne@0
|
30 writer = sys.stdout.buffer
|
jpayne@0
|
31 try:
|
jpayne@0
|
32 if filename:
|
jpayne@0
|
33 writer = io.FileIO(filename, mode)
|
jpayne@0
|
34 if 'r' in mode:
|
jpayne@0
|
35 writer = io.BufferedReader(writer)
|
jpayne@0
|
36 elif 'w' in mode:
|
jpayne@0
|
37 writer = io.BufferedWriter(writer)
|
jpayne@0
|
38 yield writer
|
jpayne@0
|
39 writer.flush()
|
jpayne@0
|
40 finally:
|
jpayne@0
|
41 if filename:
|
jpayne@0
|
42 writer.close()
|
jpayne@0
|
43
|
jpayne@0
|
44 # class stream_over(io.IOBase):
|
jpayne@0
|
45 # "a file-like object that works as a tee, for API's that accept a file-like"
|
jpayne@0
|
46 # def __init__(self, output_streams, input_stream=None):
|
jpayne@0
|
47 # self.streams = output_streams
|
jpayne@0
|
48 # self.input = input_stream
|
jpayne@0
|
49
|
jpayne@0
|
50 # def writable(self, *a, **k):
|
jpayne@0
|
51 # return all([s.writeable(*a, **k) for s in self.streams])
|
jpayne@0
|
52
|
jpayne@0
|
53 # def write(self, *a, **k):
|
jpayne@0
|
54 # [s.write(*a, **k) for s in self.streams]
|
jpayne@0
|
55
|
jpayne@0
|
56 # def writelines(self, *a, **k):
|
jpayne@0
|
57 # [s.writelines(*a, **k) for s in self.streams]
|
jpayne@0
|
58
|
jpayne@0
|
59 # def flush(self, *a, **k):
|
jpayne@0
|
60 # [s.flush(*a, **k) for s in self.streams]
|
jpayne@0
|
61
|
jpayne@0
|
62 # def close(self, *a, **k):
|
jpayne@0
|
63 # if self.input:
|
jpayne@0
|
64 # self.input.close()
|
jpayne@0
|
65 # [s.close(*a, **k) for s in self.streams]
|
jpayne@0
|
66
|
jpayne@0
|
67 # def read(self, *a, **k):
|
jpayne@0
|
68 # if self.input:
|
jpayne@0
|
69 # bts = self.input.read(*a, **k)
|
jpayne@0
|
70 # self.write(bts)
|
jpayne@0
|
71 # return bts
|
jpayne@0
|
72 # raise ValueError("Not created with a readable stream; read ops not supported.")
|
jpayne@0
|
73
|
jpayne@0
|
74 # def readlines(self, *a, **k):
|
jpayne@0
|
75 # if self.input:
|
jpayne@0
|
76 # return self.input.readlines(*a, **k)
|
jpayne@0
|
77 # raise ValueError("Not created with a readable stream; read ops not supported.")
|
jpayne@0
|
78
|
jpayne@0
|
79 # def seekable(self):
|
jpayne@0
|
80 # return False
|
jpayne@0
|
81
|
jpayne@0
|
82 # @contextlib.contextmanager
|
jpayne@0
|
83 # def multiwrite(*streams):
|
jpayne@0
|
84 # multistream = stream_over(streams)
|
jpayne@0
|
85 # yield multistream
|
jpayne@0
|
86 # multistream.flush()
|
jpayne@0
|
87 # multistream.close()
|
jpayne@0
|
88
|
jpayne@0
|
89 def stream_to(input_stream, output_stream):
|
jpayne@0
|
90 for i, line in enumerate(input_stream.readlines()):
|
jpayne@0
|
91 if i < 8:
|
jpayne@0
|
92 logging.getLogger('strm').info(str(line[:70]))
|
jpayne@0
|
93 output_stream.write(line)
|
jpayne@0
|
94
|
jpayne@0
|
95
|
jpayne@0
|
96
|
jpayne@0
|
97 def main(table, id, command=None, output=None, *a, **k):
|
jpayne@0
|
98 id = id.strip()
|
jpayne@0
|
99 table = table.strip()
|
jpayne@41
|
100 name = "{table}/{id}".format(**locals())
|
jpayne@0
|
101 with open(output, 'wb') as output_f:
|
jpayne@0
|
102 #lookup ID in table and get a FH to the resource
|
jpayne@0
|
103 try:
|
jpayne@0
|
104 api_key = os.environ.get('AWS_API_KEY', '')
|
jpayne@0
|
105 s3 = boto3.resource('s3').Bucket(CACHE_NAMESPACE)
|
jpayne@0
|
106 s3.download_fileobj(name, output_f)
|
jpayne@41
|
107 logging.getLogger('snp-cache.cache').info("cache hit on {name}, retrieved.".format(**locals()))
|
jpayne@4
|
108 except (DataNotFoundError, NoCredentialsError, BotoCoreError, ClientError) as e:
|
jpayne@0
|
109 if type(e) is DataNotFoundError:
|
jpayne@41
|
110 logging.getLogger('snp-cache.cache').info("cache miss on {name}".format(**locals()))
|
jpayne@0
|
111 else:
|
jpayne@0
|
112 logging.getLogger('snp-cache.cache').error(e)
|
jpayne@0
|
113 #if we couldn't find the data, we need to run the command to generate it
|
jpayne@0
|
114 if not command:
|
jpayne@0
|
115 raise NoCacheNoCommandException("No cached result for this id, and no command given to generate.")
|
jpayne@0
|
116 logging.getLogger('snp-cache.cmd').info(command)
|
jpayne@0
|
117 # sub = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
jpayne@0
|
118 # cached, err = sub.communicate()
|
jpayne@0
|
119 # cached, err = io.BytesIO(cached), io.BytesIO(err)
|
jpayne@0
|
120 try:
|
jpayne@0
|
121 cached = subprocess.check_output(command, shell=True)
|
jpayne@0
|
122 try:
|
jpayne@0
|
123 s3.upload_fileobj(BytesIO(cached), name)
|
jpayne@0
|
124 except (ClientError, BotoCoreError) as e:
|
jpayne@0
|
125 logging.getLogger('snp-cache.cache').error('Error writing to cache:')
|
jpayne@0
|
126 logging.getLogger('snp-cache.cache').error(e)
|
jpayne@0
|
127 finally:
|
jpayne@0
|
128 #stream_to(cached, output_f) #stream FROM cached TO output_f
|
jpayne@0
|
129 output_f.write(cached)
|
jpayne@0
|
130 except subprocess.CalledProcessError as e:
|
jpayne@0
|
131 print(e.output, file=sys.stderr)
|
jpayne@0
|
132 return e.returncode
|
jpayne@0
|
133 return 0
|
jpayne@0
|
134
|
jpayne@0
|
135
|
jpayne@0
|
136
|
jpayne@0
|
137
|
jpayne@0
|
138 if __name__ == '__main__':
|
jpayne@0
|
139 parser = argparse.ArgumentParser(description="lookup result for file in data table, or compute and install")
|
jpayne@0
|
140 parser.add_argument('table', type=str)
|
jpayne@0
|
141 parser.add_argument('id', type=str)
|
jpayne@0
|
142 parser.add_argument('-c', dest='command')
|
jpayne@0
|
143 parser.add_argument('-o', dest='output')
|
jpayne@0
|
144 parser.add_argument('-l', dest='logging', default='/dev/null')
|
jpayne@0
|
145 params = parser.parse_args()
|
jpayne@0
|
146
|
jpayne@0
|
147 logging.basicConfig(filename=params.logging,level=logging.INFO)
|
jpayne@0
|
148
|
jpayne@0
|
149 quit(main(**vars(params))) |