view snp-cache.py @ 48:fc7a28de6aa0

planemo upload commit 2107f35724b3f897c9414be40a9d14663cba9f4d-dirty
author jpayne
date Wed, 08 May 2019 11:10:14 -0400
parents 11296a86e01b
children 0b0e3e4376a7
line wrap: on
line source
#! /usr/bin/env python

from __future__ import print_function

import boto3
from botocore.exceptions import ClientError, DataNotFoundError, NoCredentialsError, BotoCoreError

import argparse
import subprocess
import contextlib
import logging
import io
import shutil
import os, sys
from builtins import open as _open
from copy import copy
from functools import partial
from itertools import tee
from io import BytesIO
from threading import Thread

CACHE_NAMESPACE = 'cfsan-galaxytrakr-cache'

class NoCacheNoCommandException(Exception):
	pass

@contextlib.contextmanager
def open(filename=None, mode='r'):
	"basically a wrapper to make sys.stdout usable where there's a contextmanager"
	writer = sys.stdout.buffer
	try:
		if filename:
			writer = io.FileIO(filename, mode)
			if 'r' in mode:
				writer = io.BufferedReader(writer)
			elif 'w' in mode:
				writer = io.BufferedWriter(writer)
		yield writer
		writer.flush()
	finally:
		if filename:
			writer.close()

# class stream_over(io.IOBase):
# 	"a file-like object that works as a tee, for API's that accept a file-like"
# 	def __init__(self, output_streams, input_stream=None):
# 		self.streams = output_streams
# 		self.input = input_stream

# 	def writable(self, *a, **k):
# 		return all([s.writeable(*a, **k) for s in self.streams])
	
# 	def write(self, *a, **k):
# 		[s.write(*a, **k) for s in self.streams]

# 	def writelines(self, *a, **k):
# 		[s.writelines(*a, **k) for s in self.streams]

# 	def flush(self, *a, **k):
# 		[s.flush(*a, **k) for s in self.streams]

# 	def close(self, *a, **k):
# 		if self.input:
# 			self.input.close()
# 		[s.close(*a, **k) for s in self.streams]

# 	def read(self, *a, **k):
# 		if self.input:
# 			bts = self.input.read(*a, **k)
# 			self.write(bts)
# 			return bts
# 		raise ValueError("Not created with a readable stream; read ops not supported.")

# 	def readlines(self, *a, **k):
# 		if self.input:
# 			return self.input.readlines(*a, **k)
# 		raise ValueError("Not created with a readable stream; read ops not supported.")

# 	def seekable(self):
# 		return False

# @contextlib.contextmanager
# def multiwrite(*streams):
# 	multistream = stream_over(streams)
# 	yield multistream
# 	multistream.flush()
# 	multistream.close()

def stream_to(input_stream, output_stream):
	for i, line in enumerate(input_stream.readlines()):
		if i < 8:
			logging.getLogger('strm').info(str(line[:70]))
		output_stream.write(line)
		


def main(table, id, command=None, output=None, *a, **k):
	id = id.strip()
	table = table.strip()
	name = "{table}/{id}".format(**locals())
	with open(output, 'wb') as output_f:
		#lookup ID in table and get a FH to the resource
		try:
			api_key = os.environ.get('AWS_API_KEY', '')
			s3 = boto3.resource('s3').Bucket(CACHE_NAMESPACE)
			s3.download_fileobj(name, output_f)
			logging.getLogger('snp-cache.cache').info("cache hit on {name}, retrieved.".format(**locals()))
		except (DataNotFoundError, NoCredentialsError, BotoCoreError, ClientError) as e:
			if type(e) is DataNotFoundError:
				logging.getLogger('snp-cache.cache').info("cache miss on {name}".format(**locals()))
			else:
				logging.getLogger('snp-cache.cache').error(e)
			#if we couldn't find the data, we need to run the command to generate it
			if not command:
				raise NoCacheNoCommandException("No cached result for this id, and no command given to generate.")
			logging.getLogger('snp-cache.cmd').info(command)
			# sub = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
			# cached, err = sub.communicate()
			# cached, err = io.BytesIO(cached), io.BytesIO(err)
			try:
				cached = subprocess.check_output(command, shell=True)
				try:
					s3.upload_fileobj(BytesIO(cached), name)
				except (ClientError, BotoCoreError) as e:
					logging.getLogger('snp-cache.cache').error('Error writing to cache:')
					logging.getLogger('snp-cache.cache').error(e)
				finally:
					#stream_to(cached, output_f) #stream FROM cached TO output_f
					output_f.write(cached)
			except subprocess.CalledProcessError as e:
				print(e.output, file=sys.stderr)
				return e.returncode
	return 0




if __name__ == '__main__':
	parser = argparse.ArgumentParser(description="lookup result for file in data table, or compute and install")
	parser.add_argument('table', type=str)
	parser.add_argument('id', type=str)
	parser.add_argument('-c', dest='command')
	parser.add_argument('-o', dest='output')
	parser.add_argument('-l', dest='logging', default='/dev/null')
	params = parser.parse_args()

	logging.basicConfig(filename=params.logging,level=logging.INFO)

	quit(main(**vars(params)))