Mercurial > repos > jpayne > snp_pipeline
comparison snp-cache.py @ 0:eefdd97a6749
planemo upload commit b'7f6183b769772449fbcee903686b8d5ec5b7439f\n'-dirty
author | jpayne |
---|---|
date | Wed, 24 Jan 2018 14:18:21 -0500 |
parents | |
children | 66f988a9666f |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:eefdd97a6749 |
---|---|
1 #! /usr/bin/env python3.6 | |
2 | |
3 import boto3 | |
4 from botocore.exceptions import ClientError, DataNotFoundError, NoCredentialsError, BotoCoreError | |
5 | |
6 import argparse | |
7 import subprocess | |
8 import contextlib | |
9 import logging | |
10 import io | |
11 import shutil | |
12 import os, sys | |
13 from builtins import open as _open | |
14 from copy import copy | |
15 from functools import partial | |
16 from itertools import tee | |
17 from io import BytesIO | |
18 from threading import Thread | |
19 | |
20 CACHE_NAMESPACE = 'cfsan-galaxytrakr-cache' | |
21 | |
22 class NoCacheNoCommandException(Exception): | |
23 pass | |
24 | |
25 @contextlib.contextmanager | |
26 def open(filename=None, mode='r'): | |
27 "basically a wrapper to make sys.stdout usable where there's a contextmanager" | |
28 writer = sys.stdout.buffer | |
29 try: | |
30 if filename: | |
31 writer = io.FileIO(filename, mode) | |
32 if 'r' in mode: | |
33 writer = io.BufferedReader(writer) | |
34 elif 'w' in mode: | |
35 writer = io.BufferedWriter(writer) | |
36 yield writer | |
37 writer.flush() | |
38 finally: | |
39 if filename: | |
40 writer.close() | |
41 | |
42 # class stream_over(io.IOBase): | |
43 # "a file-like object that works as a tee, for API's that accept a file-like" | |
44 # def __init__(self, output_streams, input_stream=None): | |
45 # self.streams = output_streams | |
46 # self.input = input_stream | |
47 | |
48 # def writable(self, *a, **k): | |
49 # return all([s.writeable(*a, **k) for s in self.streams]) | |
50 | |
51 # def write(self, *a, **k): | |
52 # [s.write(*a, **k) for s in self.streams] | |
53 | |
54 # def writelines(self, *a, **k): | |
55 # [s.writelines(*a, **k) for s in self.streams] | |
56 | |
57 # def flush(self, *a, **k): | |
58 # [s.flush(*a, **k) for s in self.streams] | |
59 | |
60 # def close(self, *a, **k): | |
61 # if self.input: | |
62 # self.input.close() | |
63 # [s.close(*a, **k) for s in self.streams] | |
64 | |
65 # def read(self, *a, **k): | |
66 # if self.input: | |
67 # bts = self.input.read(*a, **k) | |
68 # self.write(bts) | |
69 # return bts | |
70 # raise ValueError("Not created with a readable stream; read ops not supported.") | |
71 | |
72 # def readlines(self, *a, **k): | |
73 # if self.input: | |
74 # return self.input.readlines(*a, **k) | |
75 # raise ValueError("Not created with a readable stream; read ops not supported.") | |
76 | |
77 # def seekable(self): | |
78 # return False | |
79 | |
80 # @contextlib.contextmanager | |
81 # def multiwrite(*streams): | |
82 # multistream = stream_over(streams) | |
83 # yield multistream | |
84 # multistream.flush() | |
85 # multistream.close() | |
86 | |
87 def stream_to(input_stream, output_stream): | |
88 for i, line in enumerate(input_stream.readlines()): | |
89 if i < 8: | |
90 logging.getLogger('strm').info(str(line[:70])) | |
91 output_stream.write(line) | |
92 | |
93 | |
94 | |
95 def main(table, id, command=None, output=None, *a, **k): | |
96 id = id.strip() | |
97 table = table.strip() | |
98 name = f"{table}/{id}" | |
99 with open(output, 'wb') as output_f: | |
100 #lookup ID in table and get a FH to the resource | |
101 try: | |
102 api_key = os.environ.get('AWS_API_KEY', '') | |
103 s3 = boto3.resource('s3').Bucket(CACHE_NAMESPACE) | |
104 s3.download_fileobj(name, output_f) | |
105 logging.getLogger('snp-cache.cache').info(f"cache hit on {name}, retrieved.") | |
106 except (DataNotFoundError, NoCredentialsError, BotoCoreError) as e: | |
107 if type(e) is DataNotFoundError: | |
108 logging.getLogger('snp-cache.cache').info(f"cache miss on {name}") | |
109 else: | |
110 logging.getLogger('snp-cache.cache').error(e) | |
111 #if we couldn't find the data, we need to run the command to generate it | |
112 if not command: | |
113 raise NoCacheNoCommandException("No cached result for this id, and no command given to generate.") | |
114 logging.getLogger('snp-cache.cmd').info(command) | |
115 # sub = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) | |
116 # cached, err = sub.communicate() | |
117 # cached, err = io.BytesIO(cached), io.BytesIO(err) | |
118 try: | |
119 cached = subprocess.check_output(command, shell=True) | |
120 try: | |
121 s3.upload_fileobj(BytesIO(cached), name) | |
122 except (ClientError, BotoCoreError) as e: | |
123 logging.getLogger('snp-cache.cache').error('Error writing to cache:') | |
124 logging.getLogger('snp-cache.cache').error(e) | |
125 finally: | |
126 #stream_to(cached, output_f) #stream FROM cached TO output_f | |
127 output_f.write(cached) | |
128 except subprocess.CalledProcessError as e: | |
129 print(e.output, file=sys.stderr) | |
130 return e.returncode | |
131 return 0 | |
132 | |
133 | |
134 | |
135 | |
136 if __name__ == '__main__': | |
137 parser = argparse.ArgumentParser(description="lookup result for file in data table, or compute and install") | |
138 parser.add_argument('table', type=str) | |
139 parser.add_argument('id', type=str) | |
140 parser.add_argument('-c', dest='command') | |
141 parser.add_argument('-o', dest='output') | |
142 parser.add_argument('-l', dest='logging', default='/dev/null') | |
143 params = parser.parse_args() | |
144 | |
145 logging.basicConfig(filename=params.logging,level=logging.INFO) | |
146 | |
147 quit(main(**vars(params))) |