view snp-cache.py @ 41:9022b00a9198

planemo upload commit 0399c6bd696435a7a99d8d8b4c237e5a78ee5856-dirty
author jpayne
date Thu, 01 Mar 2018 15:37:04 -0500
parents 6adaecff5f2b
children 11296a86e01b
line wrap: on
line source
#! /usr/bin/env python

import boto3
from botocore.exceptions import ClientError, DataNotFoundError, NoCredentialsError, BotoCoreError

import argparse
import subprocess
import contextlib
import logging
import io
import shutil
import os, sys
from builtins import open as _open
from copy import copy
from functools import partial
from itertools import tee
from io import BytesIO
from threading import Thread

CACHE_NAMESPACE = 'cfsan-galaxytrakr-cache'

class NoCacheNoCommandException(Exception):
	pass

@contextlib.contextmanager
def open(filename=None, mode='r'):
	"basically a wrapper to make sys.stdout usable where there's a contextmanager"
	writer = sys.stdout.buffer
	try:
		if filename:
			writer = io.FileIO(filename, mode)
			if 'r' in mode:
				writer = io.BufferedReader(writer)
			elif 'w' in mode:
				writer = io.BufferedWriter(writer)
		yield writer
		writer.flush()
	finally:
		if filename:
			writer.close()

# class stream_over(io.IOBase):
# 	"a file-like object that works as a tee, for API's that accept a file-like"
# 	def __init__(self, output_streams, input_stream=None):
# 		self.streams = output_streams
# 		self.input = input_stream

# 	def writable(self, *a, **k):
# 		return all([s.writeable(*a, **k) for s in self.streams])
	
# 	def write(self, *a, **k):
# 		[s.write(*a, **k) for s in self.streams]

# 	def writelines(self, *a, **k):
# 		[s.writelines(*a, **k) for s in self.streams]

# 	def flush(self, *a, **k):
# 		[s.flush(*a, **k) for s in self.streams]

# 	def close(self, *a, **k):
# 		if self.input:
# 			self.input.close()
# 		[s.close(*a, **k) for s in self.streams]

# 	def read(self, *a, **k):
# 		if self.input:
# 			bts = self.input.read(*a, **k)
# 			self.write(bts)
# 			return bts
# 		raise ValueError("Not created with a readable stream; read ops not supported.")

# 	def readlines(self, *a, **k):
# 		if self.input:
# 			return self.input.readlines(*a, **k)
# 		raise ValueError("Not created with a readable stream; read ops not supported.")

# 	def seekable(self):
# 		return False

# @contextlib.contextmanager
# def multiwrite(*streams):
# 	multistream = stream_over(streams)
# 	yield multistream
# 	multistream.flush()
# 	multistream.close()

def stream_to(input_stream, output_stream):
	for i, line in enumerate(input_stream.readlines()):
		if i < 8:
			logging.getLogger('strm').info(str(line[:70]))
		output_stream.write(line)
		


def main(table, id, command=None, output=None, *a, **k):
	id = id.strip()
	table = table.strip()
	name = "{table}/{id}".format(**locals())
	with open(output, 'wb') as output_f:
		#lookup ID in table and get a FH to the resource
		try:
			api_key = os.environ.get('AWS_API_KEY', '')
			s3 = boto3.resource('s3').Bucket(CACHE_NAMESPACE)
			s3.download_fileobj(name, output_f)
			logging.getLogger('snp-cache.cache').info("cache hit on {name}, retrieved.".format(**locals()))
		except (DataNotFoundError, NoCredentialsError, BotoCoreError, ClientError) as e:
			if type(e) is DataNotFoundError:
				logging.getLogger('snp-cache.cache').info("cache miss on {name}".format(**locals()))
			else:
				logging.getLogger('snp-cache.cache').error(e)
			#if we couldn't find the data, we need to run the command to generate it
			if not command:
				raise NoCacheNoCommandException("No cached result for this id, and no command given to generate.")
			logging.getLogger('snp-cache.cmd').info(command)
			# sub = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
			# cached, err = sub.communicate()
			# cached, err = io.BytesIO(cached), io.BytesIO(err)
			try:
				cached = subprocess.check_output(command, shell=True)
				try:
					s3.upload_fileobj(BytesIO(cached), name)
				except (ClientError, BotoCoreError) as e:
					logging.getLogger('snp-cache.cache').error('Error writing to cache:')
					logging.getLogger('snp-cache.cache').error(e)
				finally:
					#stream_to(cached, output_f) #stream FROM cached TO output_f
					output_f.write(cached)
			except subprocess.CalledProcessError as e:
				print(e.output, file=sys.stderr)
				return e.returncode
	return 0




if __name__ == '__main__':
	parser = argparse.ArgumentParser(description="lookup result for file in data table, or compute and install")
	parser.add_argument('table', type=str)
	parser.add_argument('id', type=str)
	parser.add_argument('-c', dest='command')
	parser.add_argument('-o', dest='output')
	parser.add_argument('-l', dest='logging', default='/dev/null')
	params = parser.parse_args()

	logging.basicConfig(filename=params.logging,level=logging.INFO)

	quit(main(**vars(params)))