view lexmapr/pipeline.py @ 0:f5c39d0447be

"planemo upload"
author kkonganti
date Wed, 31 Aug 2022 14:32:07 -0400
parents
children
line wrap: on
line source
"""Pipeline script"""

import csv, datetime, logging, re, sqlite3, sys
import lexmapr.create_databases as cdkp
import lexmapr.ontology_reasoner as ontr
import lexmapr.pipeline_helpers as helpers
import lexmapr.pipeline_resources as pipeline_resources
import lexmapr.run_summary as summarize
from itertools import permutations
from collections import OrderedDict
from nltk.tokenize import word_tokenize
from lexmapr.definitions import not_provided, arg_bins, ontol_db, ontol_interest


# TODO: deal with ambiguous terms: nut milk, dairy (building ENVO_00003862 v dairy food product)
# TODO: combine ScorLex.csv and misspellings.csv and edit _add_predefined_resources function
# TODO: decide what to do with same synonym, ex custom-customize vs custom-tradition
# TODO: make web on database instead of pulling relationships from API?
# TODO: remove synonyms over time from SynLex and resource_synonyms.csv; edit get_synonyms

def run(run_args):
    '''Main text processing and mapping pipeline'''

    # Add information from EMBL and predefined_resources folder
    t0 = datetime.datetime.now()
    global ontol_interest
    if run_args.embl_ontol:
        ontol_interest = run_args.embl_ontol

    print('\nBuilding databases...')
    cdkp.get_synonyms(run_args.remake_cache, ontol_interest)
    cdkp.get_resource_ids(run_args.remake_cache, ontol_interest)
    lookup_table = pipeline_resources.get_predefined_resources()
    t1 = datetime.datetime.now()
    print(f'\tDone! {t1-t0} passed'.ljust(60) + '\nMapping terms...')
    logging.info(f'Database build/confirm: {t1}')

    # Apply other arguments and initiate mapping cache
    term_cache = {'':'\t\t\t\t',}
    output_fields = ['Sample_Id',
                     'Sample_Desc',
                     'Processed_Sample',
                     'Annotated_Sample',
                     'Matched_Components']
    if run_args.full:
        output_fields += ['Match_Status (Macro Level)',
                          'Match_Status (Micro Level)',
                          'Sample_Transformations']
        term_cache[''] += '\t\t'
    else:
        output_fields += ['Match_Status (Macro Level)']

    if run_args.bin:
        global arg_bins
        if run_args.user_bin is not None:
            arg_bins = run_args.user_bin
        for x in arg_bins:
            arg_bins[x] = ontr.Ontology_package(x, arg_bins[x])
        term_cache[''] += '\t'*len(arg_bins)
        output_fields += list(arg_bins.keys())
    else:
        arg_bins = {}

    OUT_file = open(run_args.output, 'w') if run_args.output else sys.stdout
    if OUT_file is sys.stdout:
        OUT_file.write('\n')
    OUT_file.write('\t'.join(output_fields))

    IN_file = open(run_args.input, 'r')
    if run_args.input[-4:] == '.csv':
        fr_reader = csv.reader(IN_file, delimiter=',')
    elif run_args.input[-4:] == '.tsv':
        fr_reader = csv.reader(IN_file, delimiter='\t')
    next(fr_reader)
    
    # Connect to primary database
    conn = sqlite3.connect(ontol_db)
    c = conn.cursor()

    # Iterate over samples in input file
    for sample_row in fr_reader:
        sample_id = '\n' + sample_row[0].strip() + '\t' + ','.join(sample_row[1:])
        original_sample = ' '.join(sample_row[1:]).strip()
        cleaned_sample = ''
        cleaned_annotated = ''
        macro_status = 'No Match'
        matched_components = []
        synonym_match = []
        micro_status = []
        bin_class = {x:[] for x in arg_bins}
        ancestors = set()
        sample_conversion_status = {}
        synonym_map = {}
        treated_sample = helpers.punctuation_treatment(original_sample)

        # Determine if sample in predefined list of null values
        if treated_sample in not_provided:
            write_line = '\t' + treated_sample + '\tNot annotated\t\t' + macro_status
            OUT_file.write(sample_id + write_line)
            if run_args.full:
                OUT_file.write('\t' + str(micro_status) + '\t' + str(sample_conversion_status))
            if run_args.bin:
                OUT_file.write('\t'*len(bin_class))
            continue

        # Remove negated words and some compound words, apply corrections
        proc_sample = helpers.further_cleanup(treated_sample)
        proc_sample, micro_status = helpers.process_sample(proc_sample,lookup_table,micro_status)

        # Try finding processed sample in cache
        try:
            OUT_file.write(sample_id+term_cache[proc_sample])
            continue
        except(KeyError):
            pass

        # Attempt full term matches with and without suffixes
        if OUT_file is not sys.stdout:
            print('\tMatching '+sample_row[0].strip()+' '+ \
                                '{:<40}'.format(proc_sample[:40]).ljust(60), end='\r')

        full_term_match = helpers.map_term(treated_sample, lookup_table, c)
        if full_term_match == []:
            full_term_match = helpers.map_term(proc_sample, lookup_table, c)
            if full_term_match != []:
                micro_status.insert(0, 'Used Processed Sample')
        if full_term_match == [] and 'FOODON' in ontol_interest:
            full_term_match = helpers.map_term(proc_sample, lookup_table, c, True)
            if full_term_match != []:
                micro_status.insert(0, 'Used Processed Sample')

        # Attempt full term match with cleaned sample using suffixes
        if full_term_match == []:
            for sw_token in word_tokenize(proc_sample):
                if helpers.is_date(sw_token) or helpers.is_number(sw_token):
                    continue
                lemma, micro_status = helpers.singularize_token(sw_token, lookup_table,
                                                               micro_status, c)
                if not sw_token == lemma:
                    sample_conversion_status[sw_token] = lemma
                cleaned_sample = helpers.get_cleaned_sample(cleaned_sample, lemma, lookup_table)
                # Not de-duplicating tokens because can't account for all legitimate double names

            full_term_match = helpers.map_term(cleaned_sample, lookup_table, c)
            if full_term_match == [] and 'FOODON' in ontol_interest:
                full_term_match = helpers.map_term(cleaned_sample, lookup_table, c, True)
            if full_term_match != []:
                micro_status.insert(0, 'Used Cleaned Sample')

        # Combine the matched terms
        if full_term_match != []:
            for x in full_term_match:
                matched_components.append(x['term'] + ':' + x['id'])
                macro_status = 'Full Term Match'
                micro_status += x['status']

        # Try matching permutations if full term match fails
        # Functions mostly retained from v 0.7
        if macro_status == 'No Match':
            covered_tokens = set()
            for i in range(5, 0, -1):
                for gram_chunk in helpers.get_gram_chunks(cleaned_sample, i):
                    concat_gram_chunk = ' '.join(gram_chunk)
                    gram_permutations =\
                        list(OrderedDict.fromkeys(permutations(concat_gram_chunk.split())))
                    if set(gram_chunk) <= covered_tokens:
                        continue
                    for gram_permutation in gram_permutations:
                        gram_permutation_str = ' '.join(gram_permutation)
                        component_match = helpers.map_term(gram_permutation_str, lookup_table, c)
                        if not component_match and 'FOODON' in ontol_interest:
                            component_match = helpers.map_term(gram_permutation_str,
                                                         lookup_table, c, True)
                        if component_match:
                            for x in component_match:
                                matched_components.append(x['term'] + ':' + x['id'])
                                macro_status = 'Component Match'
                                micro_status += x['status']
                                covered_tokens.update(gram_chunk)

        # Try matching annotated synonyms if component match fails
        if macro_status == 'No Match':
            for clean_token in cleaned_sample.split():
                cleaned_annotated,s_m=helpers.get_annotated_sample(cleaned_annotated,clean_token)
                synonym_map.update(s_m)
        cleaned_annotated = helpers.annotation_reduce(cleaned_annotated, synonym_map)

        for x in helpers.get_annotated_synonyms(cleaned_annotated):
            synonym_match.extend(helpers.map_term(x, lookup_table, c))
        if synonym_match == [] and 'FOODON' in ontol_interest:
            for x in helpers.get_annotated_synonyms(cleaned_annotated):
                synonym_match.extend(helpers.map_term(x, lookup_table, c, True))
        if synonym_match != []:
            macro_status = 'Synonym Match'
        for x in synonym_match:
            matched_components.append(x['term'] + ':' + x['id'])
            micro_status += x['status']

        # Remove matches that are ancestral to other matches
        if run_args.no_ancestors:
            for match_term in matched_components:
                match_term = match_term.replace('NCBITAXON','NCBITaxon')
                ontol_acc = ontr.Ontology_accession.make_instance(match_term)
                ontol_anc = ontol_acc.get_family('ancestors')
                try:
                    ontol_anc.remove('none found')
                except(ValueError):
                    pass
                ancestors |= set([x.id for x in ontol_anc])

            final_matches = []
            for match_term in matched_components:
                if match_term.split(':')[-1].replace('NCBITAXON','NCBITaxon') not in ancestors:
                    final_matches.append(match_term)

        # Bin matches
        for x in arg_bins:
            for y in matched_components:
                ontol_y = ontr.Ontology_accession.make_instance(y)
                bin_class[x].extend(ontol_y.bin_term(arg_bins[x]))

        # Write to output
        if cleaned_annotated == '':
            cleaned_annotated = 'Not annotated'
        write_line = '\t' + cleaned_sample + '\t' + cleaned_annotated +\
                     '\t' + '|'.join(sorted(set(matched_components))) + '\t' + macro_status
        while re.search('  ', write_line):
            write_line = write_line.replace('  ',' ')
        term_cache[proc_sample] = write_line
        OUT_file.write(sample_id + write_line)
        if run_args.full:
            OUT_file.write('\t' + str(micro_status) + '\t' + str(sample_conversion_status))
            term_cache[proc_sample] += '\t'+str(micro_status)+'\t'+str(sample_conversion_status)
        if run_args.bin:
            for x in list(bin_class):
                OUT_file.write('\t' + '|'.join(sorted(set(bin_class[x]))).replace('[]',''))
                term_cache[proc_sample] += '\t' + '|'.join(sorted(set(bin_class[x])))


    IN_file.close()
    conn.close()
    if OUT_file is not sys.stdout:
        OUT_file.close()
    else:
        OUT_file.write('\n\n')

    # Report results to log and generate graphs
    t2 = datetime.datetime.now()
    print(f'\tDone! {t2-t1} passed'.ljust(60) + '\nReporting results...')
    if run_args.output:
        summarize.report_results(run_args.output, list(arg_bins.keys()))
        if run_args.graph == True:
            summarize.figure_folder()
            summarize.visualize_results(run_args.output, list(arg_bins.keys()))
    else:
        match_counts = summarize.report_cache(term_cache)
        if run_args.graph == True:
            summarize.figure_folder()
            summarize.visualize_cache(match_counts)

    print('\t'+f'Done! {datetime.datetime.now()-t2} passed'.ljust(60)+'\n')