Mercurial > repos > kkonganti > cfsan_lexmapr2
diff lexmapr/pipeline.py @ 3:be95a7ce968a tip
"planemo upload"
author | kkonganti |
---|---|
date | Tue, 13 Sep 2022 11:32:24 -0400 |
parents | 5244e7465767 |
children |
line wrap: on
line diff
--- a/lexmapr/pipeline.py Wed Aug 31 14:32:14 2022 -0400 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,261 +0,0 @@ -"""Pipeline script""" - -import csv, datetime, logging, re, sqlite3, sys -import lexmapr.create_databases as cdkp -import lexmapr.ontology_reasoner as ontr -import lexmapr.pipeline_helpers as helpers -import lexmapr.pipeline_resources as pipeline_resources -import lexmapr.run_summary as summarize -from itertools import permutations -from collections import OrderedDict -from nltk.tokenize import word_tokenize -from lexmapr.definitions import not_provided, arg_bins, ontol_db, ontol_interest - - -# TODO: deal with ambiguous terms: nut milk, dairy (building ENVO_00003862 v dairy food product) -# TODO: combine ScorLex.csv and misspellings.csv and edit _add_predefined_resources function -# TODO: decide what to do with same synonym, ex custom-customize vs custom-tradition -# TODO: make web on database instead of pulling relationships from API? -# TODO: remove synonyms over time from SynLex and resource_synonyms.csv; edit get_synonyms - -def run(run_args): - '''Main text processing and mapping pipeline''' - - # Add information from EMBL and predefined_resources folder - t0 = datetime.datetime.now() - global ontol_interest - if run_args.embl_ontol: - ontol_interest = run_args.embl_ontol - - print('\nBuilding databases...') - cdkp.get_synonyms(run_args.remake_cache, ontol_interest) - cdkp.get_resource_ids(run_args.remake_cache, ontol_interest) - lookup_table = pipeline_resources.get_predefined_resources() - t1 = datetime.datetime.now() - print(f'\tDone! {t1-t0} passed'.ljust(60) + '\nMapping terms...') - logging.info(f'Database build/confirm: {t1}') - - # Apply other arguments and initiate mapping cache - term_cache = {'':'\t\t\t\t',} - output_fields = ['Sample_Id', - 'Sample_Desc', - 'Processed_Sample', - 'Annotated_Sample', - 'Matched_Components'] - if run_args.full: - output_fields += ['Match_Status (Macro Level)', - 'Match_Status (Micro Level)', - 'Sample_Transformations'] - term_cache[''] += '\t\t' - else: - output_fields += ['Match_Status (Macro Level)'] - - if run_args.bin: - global arg_bins - if run_args.user_bin is not None: - arg_bins = run_args.user_bin - for x in arg_bins: - arg_bins[x] = ontr.Ontology_package(x, arg_bins[x]) - term_cache[''] += '\t'*len(arg_bins) - output_fields += list(arg_bins.keys()) - else: - arg_bins = {} - - OUT_file = open(run_args.output, 'w') if run_args.output else sys.stdout - if OUT_file is sys.stdout: - OUT_file.write('\n') - OUT_file.write('\t'.join(output_fields)) - - IN_file = open(run_args.input, 'r') - if run_args.input[-4:] == '.csv': - fr_reader = csv.reader(IN_file, delimiter=',') - elif run_args.input[-4:] == '.tsv': - fr_reader = csv.reader(IN_file, delimiter='\t') - next(fr_reader) - - # Connect to primary database - conn = sqlite3.connect(ontol_db) - c = conn.cursor() - - # Iterate over samples in input file - for sample_row in fr_reader: - sample_id = '\n' + sample_row[0].strip() + '\t' + ','.join(sample_row[1:]) - original_sample = ' '.join(sample_row[1:]).strip() - cleaned_sample = '' - cleaned_annotated = '' - macro_status = 'No Match' - matched_components = [] - synonym_match = [] - micro_status = [] - bin_class = {x:[] for x in arg_bins} - ancestors = set() - sample_conversion_status = {} - synonym_map = {} - treated_sample = helpers.punctuation_treatment(original_sample) - - # Determine if sample in predefined list of null values - if treated_sample in not_provided: - write_line = '\t' + treated_sample + '\tNot annotated\t\t' + macro_status - OUT_file.write(sample_id + write_line) - if run_args.full: - OUT_file.write('\t' + str(micro_status) + '\t' + str(sample_conversion_status)) - if run_args.bin: - OUT_file.write('\t'*len(bin_class)) - continue - - # Remove negated words and some compound words, apply corrections - proc_sample = helpers.further_cleanup(treated_sample) - proc_sample, micro_status = helpers.process_sample(proc_sample,lookup_table,micro_status) - - # Try finding processed sample in cache - try: - OUT_file.write(sample_id+term_cache[proc_sample]) - continue - except(KeyError): - pass - - # Attempt full term matches with and without suffixes - if OUT_file is not sys.stdout: - print('\tMatching '+sample_row[0].strip()+' '+ \ - '{:<40}'.format(proc_sample[:40]).ljust(60), end='\r') - - full_term_match = helpers.map_term(treated_sample, lookup_table, c) - if full_term_match == []: - full_term_match = helpers.map_term(proc_sample, lookup_table, c) - if full_term_match != []: - micro_status.insert(0, 'Used Processed Sample') - if full_term_match == [] and 'FOODON' in ontol_interest: - full_term_match = helpers.map_term(proc_sample, lookup_table, c, True) - if full_term_match != []: - micro_status.insert(0, 'Used Processed Sample') - - # Attempt full term match with cleaned sample using suffixes - if full_term_match == []: - for sw_token in word_tokenize(proc_sample): - if helpers.is_date(sw_token) or helpers.is_number(sw_token): - continue - lemma, micro_status = helpers.singularize_token(sw_token, lookup_table, - micro_status, c) - if not sw_token == lemma: - sample_conversion_status[sw_token] = lemma - cleaned_sample = helpers.get_cleaned_sample(cleaned_sample, lemma, lookup_table) - # Not de-duplicating tokens because can't account for all legitimate double names - - full_term_match = helpers.map_term(cleaned_sample, lookup_table, c) - if full_term_match == [] and 'FOODON' in ontol_interest: - full_term_match = helpers.map_term(cleaned_sample, lookup_table, c, True) - if full_term_match != []: - micro_status.insert(0, 'Used Cleaned Sample') - - # Combine the matched terms - if full_term_match != []: - for x in full_term_match: - matched_components.append(x['term'] + ':' + x['id']) - macro_status = 'Full Term Match' - micro_status += x['status'] - - # Try matching permutations if full term match fails - # Functions mostly retained from v 0.7 - if macro_status == 'No Match': - covered_tokens = set() - for i in range(5, 0, -1): - for gram_chunk in helpers.get_gram_chunks(cleaned_sample, i): - concat_gram_chunk = ' '.join(gram_chunk) - gram_permutations =\ - list(OrderedDict.fromkeys(permutations(concat_gram_chunk.split()))) - if set(gram_chunk) <= covered_tokens: - continue - for gram_permutation in gram_permutations: - gram_permutation_str = ' '.join(gram_permutation) - component_match = helpers.map_term(gram_permutation_str, lookup_table, c) - if not component_match and 'FOODON' in ontol_interest: - component_match = helpers.map_term(gram_permutation_str, - lookup_table, c, True) - if component_match: - for x in component_match: - matched_components.append(x['term'] + ':' + x['id']) - macro_status = 'Component Match' - micro_status += x['status'] - covered_tokens.update(gram_chunk) - - # Try matching annotated synonyms if component match fails - if macro_status == 'No Match': - for clean_token in cleaned_sample.split(): - cleaned_annotated,s_m=helpers.get_annotated_sample(cleaned_annotated,clean_token) - synonym_map.update(s_m) - cleaned_annotated = helpers.annotation_reduce(cleaned_annotated, synonym_map) - - for x in helpers.get_annotated_synonyms(cleaned_annotated): - synonym_match.extend(helpers.map_term(x, lookup_table, c)) - if synonym_match == [] and 'FOODON' in ontol_interest: - for x in helpers.get_annotated_synonyms(cleaned_annotated): - synonym_match.extend(helpers.map_term(x, lookup_table, c, True)) - if synonym_match != []: - macro_status = 'Synonym Match' - for x in synonym_match: - matched_components.append(x['term'] + ':' + x['id']) - micro_status += x['status'] - - # Remove matches that are ancestral to other matches - if run_args.no_ancestors: - for match_term in matched_components: - match_term = match_term.replace('NCBITAXON','NCBITaxon') - ontol_acc = ontr.Ontology_accession.make_instance(match_term) - ontol_anc = ontol_acc.get_family('ancestors') - try: - ontol_anc.remove('none found') - except(ValueError): - pass - ancestors |= set([x.id for x in ontol_anc]) - - final_matches = [] - for match_term in matched_components: - if match_term.split(':')[-1].replace('NCBITAXON','NCBITaxon') not in ancestors: - final_matches.append(match_term) - - # Bin matches - for x in arg_bins: - for y in matched_components: - ontol_y = ontr.Ontology_accession.make_instance(y) - bin_class[x].extend(ontol_y.bin_term(arg_bins[x])) - - # Write to output - if cleaned_annotated == '': - cleaned_annotated = 'Not annotated' - write_line = '\t' + cleaned_sample + '\t' + cleaned_annotated +\ - '\t' + '|'.join(sorted(set(matched_components))) + '\t' + macro_status - while re.search(' ', write_line): - write_line = write_line.replace(' ',' ') - term_cache[proc_sample] = write_line - OUT_file.write(sample_id + write_line) - if run_args.full: - OUT_file.write('\t' + str(micro_status) + '\t' + str(sample_conversion_status)) - term_cache[proc_sample] += '\t'+str(micro_status)+'\t'+str(sample_conversion_status) - if run_args.bin: - for x in list(bin_class): - OUT_file.write('\t' + '|'.join(sorted(set(bin_class[x]))).replace('[]','')) - term_cache[proc_sample] += '\t' + '|'.join(sorted(set(bin_class[x]))) - - - IN_file.close() - conn.close() - if OUT_file is not sys.stdout: - OUT_file.close() - else: - OUT_file.write('\n\n') - - # Report results to log and generate graphs - t2 = datetime.datetime.now() - print(f'\tDone! {t2-t1} passed'.ljust(60) + '\nReporting results...') - if run_args.output: - summarize.report_results(run_args.output, list(arg_bins.keys())) - if run_args.graph == True: - summarize.figure_folder() - summarize.visualize_results(run_args.output, list(arg_bins.keys())) - else: - match_counts = summarize.report_cache(term_cache) - if run_args.graph == True: - summarize.figure_folder() - summarize.visualize_cache(match_counts) - - print('\t'+f'Done! {datetime.datetime.now()-t2} passed'.ljust(60)+'\n')