Mercurial > repos > jpayne > snp_pipeline
comparison snp-cache.py @ 64:b5cf2ec0c540 tip
planemo upload
author | jpayne |
---|---|
date | Sat, 29 Jun 2024 06:56:11 -0400 |
parents | fb44b003e29b |
children |
comparison
equal
deleted
inserted
replaced
63:fb44b003e29b | 64:b5cf2ec0c540 |
---|---|
21 CACHE_NAMESPACE = 'cfsan-galaxytrakr-cache' | 21 CACHE_NAMESPACE = 'cfsan-galaxytrakr-cache' |
22 | 22 |
23 class NoCacheNoCommandException(Exception): | 23 class NoCacheNoCommandException(Exception): |
24 pass | 24 pass |
25 | 25 |
26 @contextlib.contextmanager | 26 # @contextlib.contextmanager |
27 def open(filename=None, mode='r'): | 27 # def open(filename=None, mode='r'): |
28 "basically a wrapper to make sys.stdout usable where there's a contextmanager" | 28 # "basically a wrapper to make sys.stdout usable where there's a contextmanager" |
29 writer = sys.stdout.buffer | 29 # writer = sys.stdout.buffer |
30 try: | 30 # try: |
31 if filename: | 31 # if filename: |
32 writer = io.FileIO(filename, mode) | 32 # writer = io.FileIO(filename, mode) |
33 if 'r' in mode: | 33 # if 'r' in mode: |
34 writer = io.BufferedReader(writer) | 34 # writer = io.BufferedReader(writer) |
35 elif 'w' in mode: | 35 # elif 'w' in mode: |
36 writer = io.BufferedWriter(writer) | 36 # writer = io.BufferedWriter(writer) |
37 yield writer | 37 # yield writer |
38 writer.flush() | 38 # writer.flush() |
39 finally: | 39 # finally: |
40 if filename: | 40 # if filename: |
41 writer.close() | 41 # writer.close() |
42 | 42 |
43 # class stream_over(io.IOBase): | 43 # class stream_over(io.IOBase): |
44 # "a file-like object that works as a tee, for API's that accept a file-like" | 44 # "a file-like object that works as a tee, for API's that accept a file-like" |
45 # def __init__(self, output_streams, input_stream=None): | 45 # def __init__(self, output_streams, input_stream=None): |
46 # self.streams = output_streams | 46 # self.streams = output_streams |
83 # multistream = stream_over(streams) | 83 # multistream = stream_over(streams) |
84 # yield multistream | 84 # yield multistream |
85 # multistream.flush() | 85 # multistream.flush() |
86 # multistream.close() | 86 # multistream.close() |
87 | 87 |
88 def stream_to(input_stream, output_stream): | 88 # def stream_to(input_stream, output_stream): |
89 for i, line in enumerate(input_stream.readlines()): | 89 # for i, line in enumerate(input_stream.readlines()): |
90 if i < 8: | 90 # if i < 8: |
91 logging.getLogger('strm').info(str(line[:70])) | 91 # logging.getLogger('strm').info(str(line[:70])) |
92 output_stream.write(line) | 92 # output_stream.write(line) |
93 | 93 |
94 | 94 |
95 | 95 |
96 def main(table, id, command=None, output=None, *a, **k): | 96 def main(table, id, command=None, output=None, *a, **k): |
97 id = id.strip() | 97 id = id.strip() |
98 table = table.strip() | 98 table = table.strip() |
99 name = "{table}/{id}".format(**locals()) | 99 name = "{table}/{id}".format(**locals()) |
100 with open(output, 'wb') as output_f: | 100 with open(output, 'wb') as output_f: |
101 #lookup ID in table and get a FH to the resource | 101 # #lookup ID in table and get a FH to the resource |
102 try: | 102 # try: |
103 import boto3 | 103 # import boto3 |
104 api_key = os.environ.get('AWS_API_KEY', '') | 104 # api_key = os.environ.get('AWS_API_KEY', '') |
105 s3 = boto3.resource('s3').Bucket(CACHE_NAMESPACE) | 105 # s3 = boto3.resource('s3').Bucket(CACHE_NAMESPACE) |
106 s3.download_fileobj(name, output_f) | 106 # s3.download_fileobj(name, output_f) |
107 logging.getLogger('snp-cache.cache').info("cache hit on {name}, retrieved.".format(**locals())) | 107 # logging.getLogger('snp-cache.cache').info("cache hit on {name}, retrieved.".format(**locals())) |
108 except Exception as e: | 108 # except Exception as e: |
109 if type(e) is not ImportError: | 109 # if type(e) is not ImportError: |
110 logging.getLogger('snp-cache.cache').info("cache miss on {name}".format(**locals())) | 110 # logging.getLogger('snp-cache.cache').info("cache miss on {name}".format(**locals())) |
111 else: | 111 # else: |
112 logging.getLogger('snp-cache.cache').error(e) | 112 # logging.getLogger('snp-cache.cache').error(e) |
113 #if we couldn't find the data, we need to run the command to generate it | 113 # #if we couldn't find the data, we need to run the command to generate it |
114 if not command: | 114 # if not command: |
115 raise NoCacheNoCommandException("No cached result for this id, and no command given to generate.") | 115 # raise NoCacheNoCommandException("No cached result for this id, and no command given to generate.") |
116 logging.getLogger('snp-cache.cmd').info(command) | 116 # logging.getLogger('snp-cache.cmd').info(command) |
117 # sub = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) | 117 # # sub = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) |
118 # cached, err = sub.communicate() | 118 # # cached, err = sub.communicate() |
119 # cached, err = io.BytesIO(cached), io.BytesIO(err) | 119 # # cached, err = io.BytesIO(cached), io.BytesIO(err) |
120 try: | 120 try: |
121 cached = subprocess.check_output(command, shell=True) | 121 cached = subprocess.check_output(command, shell=True) |
122 try: | 122 try: |
123 s3.upload_fileobj(BytesIO(cached), name) | 123 pass # s3.upload_fileobj(BytesIO(cached), name) |
124 except Exception as e: | 124 except Exception as e: |
125 logging.getLogger('snp-cache.cache').error('Error writing to cache:') | 125 logging.getLogger('snp-cache.cache').error('Error writing to cache:') |
126 logging.getLogger('snp-cache.cache').error(e) | 126 logging.getLogger('snp-cache.cache').error(e) |
127 finally: | 127 finally: |
128 #stream_to(cached, output_f) #stream FROM cached TO output_f | 128 #stream_to(cached, output_f) #stream FROM cached TO output_f |