view snp-cache.py @ 27:1044c9a132c5

planemo upload commit 7f6183b769772449fbcee903686b8d5ec5b7439f-dirty
author jpayne
date Wed, 31 Jan 2018 13:24:56 -0500
parents 6adaecff5f2b
children 9022b00a9198
line wrap: on
line source
#! /usr/bin/env python

import boto3
from botocore.exceptions import ClientError, DataNotFoundError, NoCredentialsError, BotoCoreError

import argparse
import subprocess
import contextlib
import logging
import io
import shutil
import os, sys
from builtins import open as _open
from copy import copy
from functools import partial
from itertools import tee
from io import BytesIO
from threading import Thread

CACHE_NAMESPACE = 'cfsan-galaxytrakr-cache'

class NoCacheNoCommandException(Exception):
	pass

@contextlib.contextmanager
def open(filename=None, mode='r'):
	"basically a wrapper to make sys.stdout usable where there's a contextmanager"
	writer = sys.stdout.buffer
	try:
		if filename:
			writer = io.FileIO(filename, mode)
			if 'r' in mode:
				writer = io.BufferedReader(writer)
			elif 'w' in mode:
				writer = io.BufferedWriter(writer)
		yield writer
		writer.flush()
	finally:
		if filename:
			writer.close()

# class stream_over(io.IOBase):
# 	"a file-like object that works as a tee, for API's that accept a file-like"
# 	def __init__(self, output_streams, input_stream=None):
# 		self.streams = output_streams
# 		self.input = input_stream

# 	def writable(self, *a, **k):
# 		return all([s.writeable(*a, **k) for s in self.streams])
	
# 	def write(self, *a, **k):
# 		[s.write(*a, **k) for s in self.streams]

# 	def writelines(self, *a, **k):
# 		[s.writelines(*a, **k) for s in self.streams]

# 	def flush(self, *a, **k):
# 		[s.flush(*a, **k) for s in self.streams]

# 	def close(self, *a, **k):
# 		if self.input:
# 			self.input.close()
# 		[s.close(*a, **k) for s in self.streams]

# 	def read(self, *a, **k):
# 		if self.input:
# 			bts = self.input.read(*a, **k)
# 			self.write(bts)
# 			return bts
# 		raise ValueError("Not created with a readable stream; read ops not supported.")

# 	def readlines(self, *a, **k):
# 		if self.input:
# 			return self.input.readlines(*a, **k)
# 		raise ValueError("Not created with a readable stream; read ops not supported.")

# 	def seekable(self):
# 		return False

# @contextlib.contextmanager
# def multiwrite(*streams):
# 	multistream = stream_over(streams)
# 	yield multistream
# 	multistream.flush()
# 	multistream.close()

def stream_to(input_stream, output_stream):
	for i, line in enumerate(input_stream.readlines()):
		if i < 8:
			logging.getLogger('strm').info(str(line[:70]))
		output_stream.write(line)
		


def main(table, id, command=None, output=None, *a, **k):
	id = id.strip()
	table = table.strip()
	name = f"{table}/{id}"
	with open(output, 'wb') as output_f:
		#lookup ID in table and get a FH to the resource
		try:
			api_key = os.environ.get('AWS_API_KEY', '')
			s3 = boto3.resource('s3').Bucket(CACHE_NAMESPACE)
			s3.download_fileobj(name, output_f)
			logging.getLogger('snp-cache.cache').info(f"cache hit on {name}, retrieved.")
		except (DataNotFoundError, NoCredentialsError, BotoCoreError, ClientError) as e:
			if type(e) is DataNotFoundError:
				logging.getLogger('snp-cache.cache').info(f"cache miss on {name}")
			else:
				logging.getLogger('snp-cache.cache').error(e)
			#if we couldn't find the data, we need to run the command to generate it
			if not command:
				raise NoCacheNoCommandException("No cached result for this id, and no command given to generate.")
			logging.getLogger('snp-cache.cmd').info(command)
			# sub = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
			# cached, err = sub.communicate()
			# cached, err = io.BytesIO(cached), io.BytesIO(err)
			try:
				cached = subprocess.check_output(command, shell=True)
				try:
					s3.upload_fileobj(BytesIO(cached), name)
				except (ClientError, BotoCoreError) as e:
					logging.getLogger('snp-cache.cache').error('Error writing to cache:')
					logging.getLogger('snp-cache.cache').error(e)
				finally:
					#stream_to(cached, output_f) #stream FROM cached TO output_f
					output_f.write(cached)
			except subprocess.CalledProcessError as e:
				print(e.output, file=sys.stderr)
				return e.returncode
	return 0




if __name__ == '__main__':
	parser = argparse.ArgumentParser(description="lookup result for file in data table, or compute and install")
	parser.add_argument('table', type=str)
	parser.add_argument('id', type=str)
	parser.add_argument('-c', dest='command')
	parser.add_argument('-o', dest='output')
	parser.add_argument('-l', dest='logging', default='/dev/null')
	params = parser.parse_args()

	logging.basicConfig(filename=params.logging,level=logging.INFO)

	quit(main(**vars(params)))