view snp-cache.py @ 2:66ca2fd8592a

planemo upload commit b'7f6183b769772449fbcee903686b8d5ec5b7439f\n'-dirty
author jpayne
date Wed, 24 Jan 2018 15:07:00 -0500
parents eefdd97a6749
children 66f988a9666f
line wrap: on
line source
#! /usr/bin/env python3.6

import boto3
from botocore.exceptions import ClientError, DataNotFoundError, NoCredentialsError, BotoCoreError

import argparse
import subprocess
import contextlib
import logging
import io
import shutil
import os, sys
from builtins import open as _open
from copy import copy
from functools import partial
from itertools import tee
from io import BytesIO
from threading import Thread

CACHE_NAMESPACE = 'cfsan-galaxytrakr-cache'

class NoCacheNoCommandException(Exception):
	pass

@contextlib.contextmanager
def open(filename=None, mode='r'):
	"basically a wrapper to make sys.stdout usable where there's a contextmanager"
	writer = sys.stdout.buffer
	try:
		if filename:
			writer = io.FileIO(filename, mode)
			if 'r' in mode:
				writer = io.BufferedReader(writer)
			elif 'w' in mode:
				writer = io.BufferedWriter(writer)
		yield writer
		writer.flush()
	finally:
		if filename:
			writer.close()

# class stream_over(io.IOBase):
# 	"a file-like object that works as a tee, for API's that accept a file-like"
# 	def __init__(self, output_streams, input_stream=None):
# 		self.streams = output_streams
# 		self.input = input_stream

# 	def writable(self, *a, **k):
# 		return all([s.writeable(*a, **k) for s in self.streams])
	
# 	def write(self, *a, **k):
# 		[s.write(*a, **k) for s in self.streams]

# 	def writelines(self, *a, **k):
# 		[s.writelines(*a, **k) for s in self.streams]

# 	def flush(self, *a, **k):
# 		[s.flush(*a, **k) for s in self.streams]

# 	def close(self, *a, **k):
# 		if self.input:
# 			self.input.close()
# 		[s.close(*a, **k) for s in self.streams]

# 	def read(self, *a, **k):
# 		if self.input:
# 			bts = self.input.read(*a, **k)
# 			self.write(bts)
# 			return bts
# 		raise ValueError("Not created with a readable stream; read ops not supported.")

# 	def readlines(self, *a, **k):
# 		if self.input:
# 			return self.input.readlines(*a, **k)
# 		raise ValueError("Not created with a readable stream; read ops not supported.")

# 	def seekable(self):
# 		return False

# @contextlib.contextmanager
# def multiwrite(*streams):
# 	multistream = stream_over(streams)
# 	yield multistream
# 	multistream.flush()
# 	multistream.close()

def stream_to(input_stream, output_stream):
	for i, line in enumerate(input_stream.readlines()):
		if i < 8:
			logging.getLogger('strm').info(str(line[:70]))
		output_stream.write(line)
		


def main(table, id, command=None, output=None, *a, **k):
	id = id.strip()
	table = table.strip()
	name = f"{table}/{id}"
	with open(output, 'wb') as output_f:
		#lookup ID in table and get a FH to the resource
		try:
			api_key = os.environ.get('AWS_API_KEY', '')
			s3 = boto3.resource('s3').Bucket(CACHE_NAMESPACE)
			s3.download_fileobj(name, output_f)
			logging.getLogger('snp-cache.cache').info(f"cache hit on {name}, retrieved.")
		except (DataNotFoundError, NoCredentialsError, BotoCoreError) as e:
			if type(e) is DataNotFoundError:
				logging.getLogger('snp-cache.cache').info(f"cache miss on {name}")
			else:
				logging.getLogger('snp-cache.cache').error(e)
			#if we couldn't find the data, we need to run the command to generate it
			if not command:
				raise NoCacheNoCommandException("No cached result for this id, and no command given to generate.")
			logging.getLogger('snp-cache.cmd').info(command)
			# sub = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
			# cached, err = sub.communicate()
			# cached, err = io.BytesIO(cached), io.BytesIO(err)
			try:
				cached = subprocess.check_output(command, shell=True)
				try:
					s3.upload_fileobj(BytesIO(cached), name)
				except (ClientError, BotoCoreError) as e:
					logging.getLogger('snp-cache.cache').error('Error writing to cache:')
					logging.getLogger('snp-cache.cache').error(e)
				finally:
					#stream_to(cached, output_f) #stream FROM cached TO output_f
					output_f.write(cached)
			except subprocess.CalledProcessError as e:
				print(e.output, file=sys.stderr)
				return e.returncode
	return 0




if __name__ == '__main__':
	parser = argparse.ArgumentParser(description="lookup result for file in data table, or compute and install")
	parser.add_argument('table', type=str)
	parser.add_argument('id', type=str)
	parser.add_argument('-c', dest='command')
	parser.add_argument('-o', dest='output')
	parser.add_argument('-l', dest='logging', default='/dev/null')
	params = parser.parse_args()

	logging.basicConfig(filename=params.logging,level=logging.INFO)

	quit(main(**vars(params)))