Mercurial > repos > kkonganti > cfsan_lexmapr2
view lexmapr/pipeline.py @ 1:5244e7465767
"planemo upload"
author | kkonganti |
---|---|
date | Wed, 31 Aug 2022 14:32:14 -0400 |
parents | f5c39d0447be |
children |
line wrap: on
line source
"""Pipeline script""" import csv, datetime, logging, re, sqlite3, sys import lexmapr.create_databases as cdkp import lexmapr.ontology_reasoner as ontr import lexmapr.pipeline_helpers as helpers import lexmapr.pipeline_resources as pipeline_resources import lexmapr.run_summary as summarize from itertools import permutations from collections import OrderedDict from nltk.tokenize import word_tokenize from lexmapr.definitions import not_provided, arg_bins, ontol_db, ontol_interest # TODO: deal with ambiguous terms: nut milk, dairy (building ENVO_00003862 v dairy food product) # TODO: combine ScorLex.csv and misspellings.csv and edit _add_predefined_resources function # TODO: decide what to do with same synonym, ex custom-customize vs custom-tradition # TODO: make web on database instead of pulling relationships from API? # TODO: remove synonyms over time from SynLex and resource_synonyms.csv; edit get_synonyms def run(run_args): '''Main text processing and mapping pipeline''' # Add information from EMBL and predefined_resources folder t0 = datetime.datetime.now() global ontol_interest if run_args.embl_ontol: ontol_interest = run_args.embl_ontol print('\nBuilding databases...') cdkp.get_synonyms(run_args.remake_cache, ontol_interest) cdkp.get_resource_ids(run_args.remake_cache, ontol_interest) lookup_table = pipeline_resources.get_predefined_resources() t1 = datetime.datetime.now() print(f'\tDone! {t1-t0} passed'.ljust(60) + '\nMapping terms...') logging.info(f'Database build/confirm: {t1}') # Apply other arguments and initiate mapping cache term_cache = {'':'\t\t\t\t',} output_fields = ['Sample_Id', 'Sample_Desc', 'Processed_Sample', 'Annotated_Sample', 'Matched_Components'] if run_args.full: output_fields += ['Match_Status (Macro Level)', 'Match_Status (Micro Level)', 'Sample_Transformations'] term_cache[''] += '\t\t' else: output_fields += ['Match_Status (Macro Level)'] if run_args.bin: global arg_bins if run_args.user_bin is not None: arg_bins = run_args.user_bin for x in arg_bins: arg_bins[x] = ontr.Ontology_package(x, arg_bins[x]) term_cache[''] += '\t'*len(arg_bins) output_fields += list(arg_bins.keys()) else: arg_bins = {} OUT_file = open(run_args.output, 'w') if run_args.output else sys.stdout if OUT_file is sys.stdout: OUT_file.write('\n') OUT_file.write('\t'.join(output_fields)) IN_file = open(run_args.input, 'r') if run_args.input[-4:] == '.csv': fr_reader = csv.reader(IN_file, delimiter=',') elif run_args.input[-4:] == '.tsv': fr_reader = csv.reader(IN_file, delimiter='\t') next(fr_reader) # Connect to primary database conn = sqlite3.connect(ontol_db) c = conn.cursor() # Iterate over samples in input file for sample_row in fr_reader: sample_id = '\n' + sample_row[0].strip() + '\t' + ','.join(sample_row[1:]) original_sample = ' '.join(sample_row[1:]).strip() cleaned_sample = '' cleaned_annotated = '' macro_status = 'No Match' matched_components = [] synonym_match = [] micro_status = [] bin_class = {x:[] for x in arg_bins} ancestors = set() sample_conversion_status = {} synonym_map = {} treated_sample = helpers.punctuation_treatment(original_sample) # Determine if sample in predefined list of null values if treated_sample in not_provided: write_line = '\t' + treated_sample + '\tNot annotated\t\t' + macro_status OUT_file.write(sample_id + write_line) if run_args.full: OUT_file.write('\t' + str(micro_status) + '\t' + str(sample_conversion_status)) if run_args.bin: OUT_file.write('\t'*len(bin_class)) continue # Remove negated words and some compound words, apply corrections proc_sample = helpers.further_cleanup(treated_sample) proc_sample, micro_status = helpers.process_sample(proc_sample,lookup_table,micro_status) # Try finding processed sample in cache try: OUT_file.write(sample_id+term_cache[proc_sample]) continue except(KeyError): pass # Attempt full term matches with and without suffixes if OUT_file is not sys.stdout: print('\tMatching '+sample_row[0].strip()+' '+ \ '{:<40}'.format(proc_sample[:40]).ljust(60), end='\r') full_term_match = helpers.map_term(treated_sample, lookup_table, c) if full_term_match == []: full_term_match = helpers.map_term(proc_sample, lookup_table, c) if full_term_match != []: micro_status.insert(0, 'Used Processed Sample') if full_term_match == [] and 'FOODON' in ontol_interest: full_term_match = helpers.map_term(proc_sample, lookup_table, c, True) if full_term_match != []: micro_status.insert(0, 'Used Processed Sample') # Attempt full term match with cleaned sample using suffixes if full_term_match == []: for sw_token in word_tokenize(proc_sample): if helpers.is_date(sw_token) or helpers.is_number(sw_token): continue lemma, micro_status = helpers.singularize_token(sw_token, lookup_table, micro_status, c) if not sw_token == lemma: sample_conversion_status[sw_token] = lemma cleaned_sample = helpers.get_cleaned_sample(cleaned_sample, lemma, lookup_table) # Not de-duplicating tokens because can't account for all legitimate double names full_term_match = helpers.map_term(cleaned_sample, lookup_table, c) if full_term_match == [] and 'FOODON' in ontol_interest: full_term_match = helpers.map_term(cleaned_sample, lookup_table, c, True) if full_term_match != []: micro_status.insert(0, 'Used Cleaned Sample') # Combine the matched terms if full_term_match != []: for x in full_term_match: matched_components.append(x['term'] + ':' + x['id']) macro_status = 'Full Term Match' micro_status += x['status'] # Try matching permutations if full term match fails # Functions mostly retained from v 0.7 if macro_status == 'No Match': covered_tokens = set() for i in range(5, 0, -1): for gram_chunk in helpers.get_gram_chunks(cleaned_sample, i): concat_gram_chunk = ' '.join(gram_chunk) gram_permutations =\ list(OrderedDict.fromkeys(permutations(concat_gram_chunk.split()))) if set(gram_chunk) <= covered_tokens: continue for gram_permutation in gram_permutations: gram_permutation_str = ' '.join(gram_permutation) component_match = helpers.map_term(gram_permutation_str, lookup_table, c) if not component_match and 'FOODON' in ontol_interest: component_match = helpers.map_term(gram_permutation_str, lookup_table, c, True) if component_match: for x in component_match: matched_components.append(x['term'] + ':' + x['id']) macro_status = 'Component Match' micro_status += x['status'] covered_tokens.update(gram_chunk) # Try matching annotated synonyms if component match fails if macro_status == 'No Match': for clean_token in cleaned_sample.split(): cleaned_annotated,s_m=helpers.get_annotated_sample(cleaned_annotated,clean_token) synonym_map.update(s_m) cleaned_annotated = helpers.annotation_reduce(cleaned_annotated, synonym_map) for x in helpers.get_annotated_synonyms(cleaned_annotated): synonym_match.extend(helpers.map_term(x, lookup_table, c)) if synonym_match == [] and 'FOODON' in ontol_interest: for x in helpers.get_annotated_synonyms(cleaned_annotated): synonym_match.extend(helpers.map_term(x, lookup_table, c, True)) if synonym_match != []: macro_status = 'Synonym Match' for x in synonym_match: matched_components.append(x['term'] + ':' + x['id']) micro_status += x['status'] # Remove matches that are ancestral to other matches if run_args.no_ancestors: for match_term in matched_components: match_term = match_term.replace('NCBITAXON','NCBITaxon') ontol_acc = ontr.Ontology_accession.make_instance(match_term) ontol_anc = ontol_acc.get_family('ancestors') try: ontol_anc.remove('none found') except(ValueError): pass ancestors |= set([x.id for x in ontol_anc]) final_matches = [] for match_term in matched_components: if match_term.split(':')[-1].replace('NCBITAXON','NCBITaxon') not in ancestors: final_matches.append(match_term) # Bin matches for x in arg_bins: for y in matched_components: ontol_y = ontr.Ontology_accession.make_instance(y) bin_class[x].extend(ontol_y.bin_term(arg_bins[x])) # Write to output if cleaned_annotated == '': cleaned_annotated = 'Not annotated' write_line = '\t' + cleaned_sample + '\t' + cleaned_annotated +\ '\t' + '|'.join(sorted(set(matched_components))) + '\t' + macro_status while re.search(' ', write_line): write_line = write_line.replace(' ',' ') term_cache[proc_sample] = write_line OUT_file.write(sample_id + write_line) if run_args.full: OUT_file.write('\t' + str(micro_status) + '\t' + str(sample_conversion_status)) term_cache[proc_sample] += '\t'+str(micro_status)+'\t'+str(sample_conversion_status) if run_args.bin: for x in list(bin_class): OUT_file.write('\t' + '|'.join(sorted(set(bin_class[x]))).replace('[]','')) term_cache[proc_sample] += '\t' + '|'.join(sorted(set(bin_class[x]))) IN_file.close() conn.close() if OUT_file is not sys.stdout: OUT_file.close() else: OUT_file.write('\n\n') # Report results to log and generate graphs t2 = datetime.datetime.now() print(f'\tDone! {t2-t1} passed'.ljust(60) + '\nReporting results...') if run_args.output: summarize.report_results(run_args.output, list(arg_bins.keys())) if run_args.graph == True: summarize.figure_folder() summarize.visualize_results(run_args.output, list(arg_bins.keys())) else: match_counts = summarize.report_cache(term_cache) if run_args.graph == True: summarize.figure_folder() summarize.visualize_cache(match_counts) print('\t'+f'Done! {datetime.datetime.now()-t2} passed'.ljust(60)+'\n')