Mercurial > repos > kkonganti > cfsan_lexmapr2
diff lexmapr/pipeline_helpers.py @ 0:f5c39d0447be
"planemo upload"
author | kkonganti |
---|---|
date | Wed, 31 Aug 2022 14:32:07 -0400 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/lexmapr/pipeline_helpers.py Wed Aug 31 14:32:07 2022 -0400 @@ -0,0 +1,281 @@ +"""Helper functions for main pipeline""" + +import inflection, re, unicodedata, sqlite3 +from collections import OrderedDict +from itertools import combinations +from dateutil.parser import parse +from lexmapr.definitions import synonym_db +from nltk import pos_tag +from nltk.tokenize import word_tokenize +from nltk.tokenize.treebank import TreebankWordDetokenizer + + +def _lookup_correction(sample, lookup_table, lookup_x, micro_status, status_title): + '''Apply corections, if available in resource''' + sample = ' ' + sample + ' ' + for x in lookup_table[lookup_x]: + find_x = re.findall(' '+x+' ', sample) + if find_x != []: + micro_status.append(status_title + x) + sample = sample.replace(' '+x+' ', ' '+lookup_table[lookup_x][x]+' ') + return(' '.join(sample.split()), micro_status) + + +def _remove_annotated_synonyms(input_annotations): + '''Remove annotations to see original phrase''' + output_sample = '' + copy_char = True + for x in input_annotations: + if x == '{': + copy_char = False + elif x == '}': + copy_char = True + else: + if copy_char == True: + output_sample += x + while re.search(' ', output_sample): + output_sample = output_sample.replace(' ', ' ') + return(output_sample) + + +def _retrieve_map_id(search_results, c): + '''Get resource id from database''' + return_list = [] + for x in search_results: + c.execute('SELECT * FROM non_standard_resource_ids WHERE key=:key', {'key':x[1]}) + for y in c.fetchall(): + result_dic = {'term':y[1], 'id':y[0], 'status':[]} + if not result_dic in return_list: + return_list.append(result_dic) + return(return_list) + + +def _map_term_helper(term, c): + '''Maps term to resource or resource permutation''' + c.execute('SELECT * FROM standard_resource_labels WHERE key=:key', {'key':term}) + search_results = c.fetchall() + if len(search_results) == 0: + c.execute('SELECT * FROM standard_resource_permutations WHERE key=:key', {'key':term}) + search_results = c.fetchall() + if len(search_results) != 0: + return(_retrieve_map_id(search_results, c)) + else: + return(_retrieve_map_id(search_results, c)) + return(None) + + +def _ngrams(input_phrase, gram_value): + '''Get ngrams with a given value of gram_value''' + input_phrase = input_phrase.split() + output = [] + for i in range(len(input_phrase) - gram_value + 1): + output.append(input_phrase[i:i + gram_value]) + return(output) + + +def process_sample(sample, lookup_table, micro_status): + '''Apply corrections to input sample''' + sample, micro_status = _lookup_correction(sample, lookup_table, 'spelling_mistakes', + micro_status, 'Spelling Correction Treatment: ') + sample, micro_status = _lookup_correction(sample, lookup_table, 'abbreviations', + micro_status, 'Abbreviation-Acronym Treatment: ') + sample, micro_status = _lookup_correction(sample, lookup_table, 'non_english_words', + micro_status, 'Non English Language Words Treatment: ') + return(sample, micro_status) + + +def punctuation_treatment(untreated_term): + '''Remove punctuations from term''' + punctuations_regex_char_class = '[~`!@#$%^*()_\|/{}:;,.<>?]' + ret_term = '' + for word_token in untreated_term.split(): + if word_token.count('-') > 1: + ret_term += word_token.replace('-',' ') + ' ' + else: + ret_term += word_token + ' ' + ret_term = ret_term.lower().replace('\"','').replace('\'ve','').replace('\'m','') + ret_term = ret_term.replace('\'s','').replace('\'t','').replace('\'ll','').replace('\'re','') + ret_term = ret_term.replace('\'','').replace('-','').replace('[','').replace(']','') + ret_term = ret_term.replace('&',' and ').replace('+',' and ').replace('=',' is ') + ret_term = re.sub(punctuations_regex_char_class, ' ', ret_term).lower() + return(' '.join(ret_term.split())) + + +def further_cleanup(sample_text): + '''Remove terms indicated to not be relevant and some compound words''' + new_text = [] + neg_words = [r'no ',r'non',r'not',r'neither',r'nor',r'without'] + stt_words = ['animal','cb','chicken','environmental','food','human','large','medium','necropsy', + 'organic','other','poultry','product','sausage','small','stool','swab','wild',] + end_words = ['aspirate','culture','environmental','fluid','food','intestine','large','meal','medium', + 'mixed','necropsy','other','poultry','product','research','sample','sausage','slaughter', + 'small','swab','water','wild',] + not_replace = ['agriculture','apiculture','aquaculture','aquiculture','aviculture', + 'coculture','hemoculture','mariculture','monoculture','sericulture', + 'subculture','viniculture','viticulture', + 'semifluid','subfluid','superfluid', + 'superlarge','reenlarge','enlarge','overlarge','largemouth','larges', + 'bonemeal','cornmeal','fishmeal','inchmeal','oatmeal','piecemeal','premeal', + 'wholemeal','biosample','ensample','resample','subsample','backwater', + 'another','bother','brother','foremother','frother','godmother','grandmother', + 'housemother','mother','otherguess','otherness','othernesse','otherwhere', + 'otherwhile','otherworld','pother','soother','smoother','smother','stepbrother', + 'stepmother','tother', + 'byproduct','coproduct','production','productive','subproduct', + 'ultrasmall','smaller','smallmouth','smalltime','smallpox','smallpoxe', + 'smallsword','smallsholder','mediumship', + 'bathwater','bilgewater','blackwater','breakwater','cutwater','deepwater', + 'dewater','dishwater','eyewater','firewater','floodwater','freshwater', + 'graywater','groundwater','headwater','jerkwater','limewater','meltwater', + 'overwater','polywater','rainwater','rosewater','saltwater','seawater', + 'shearwater','springwater','tailwater','tidewater','underwater','wastewater', + 'semiwild','wildcard','wildcat','wildcatter','wildcatted','wildebeest','wilded', + 'wilder','wilderment','wilderness','wildernesse','wildest','wildfire','wildflower', + 'wildfowl','wildfowler','wildish','wildland','wildling','wildlife','wildwood', + ] + + found_comp = [] + for comp_word in stt_words: + found_comp.extend(re.findall(f'({comp_word})(\w+)', sample_text)) + for comp_word in end_words: + found_comp.extend(re.findall(f'(\w+)({comp_word})', sample_text)) + for x in found_comp: + if x[0]+x[1] not in not_replace and x[0]+x[1]+'s' not in not_replace: + sample_text = sample_text.replace(x[0]+x[1], x[0]+' '+x[1]) + + for sample_word in sample_text.split(): + if len(sample_word) > 1: + new_text.append(sample_word.strip()) + + if 'nor' in new_text: + if 'neither' not in new_text: + word_ind = new_text.index('nor') + new_text.insert(max[0,word_ind-2], 'neither') + + for neg_word in neg_words: + if neg_word in new_text: + word_ind = new_text.index(neg_word) + del(new_text[word_ind:word_ind+2]) + return(' '.join(new_text)) + + +def is_number(input_string): + '''Determine whether a string is a number''' + try: + unicodedata.numeric(input_string) + return(True) + except(TypeError, ValueError): + return(False) + + +def is_date(input_string): + '''Determine whether a string is a date or day''' + try: + parse(input_string) + return(True) + except(ValueError, OverflowError): + return(False) + + +def singularize_token(token, lookup_table, micro_status, c): + '''Singularize the string token, if applicable''' + if token in lookup_table['inflection_exceptions']: + return(token, micro_status) + + exception_tail_chars_list = ['us', 'ia', 'ta', 'ss'] # TODO: add as, is? + for char in exception_tail_chars_list: + if token.endswith(char): + return(token, micro_status) + + taxon_names = c.execute('''SELECT * FROM standard_resource_labels WHERE key LIKE :key AND + value LIKE :value''', + {'key':'% '+token,'value':'NCBITaxon%'}).fetchall() + remove_finds = [] + for x in taxon_names: + if len(x[0].split()) > 2: + remove_finds.append(x) + for x in remove_finds: + taxon_names.remove(x) + if taxon_names != []: + return(token, micro_status) + + lemma = inflection.singularize(token) + micro_status.append('Inflection (Plural) Treatment: ' + token) + return(lemma, micro_status) + + +def get_cleaned_sample(input_sample, token, lookup_table): + '''Prepare the cleaned sample phrase using the input token''' + if input_sample == '' and token not in lookup_table['stop_words']: + return(token) + elif token not in lookup_table['stop_words']: + return(input_sample + ' ' + token) + else: + return(input_sample) + + +def get_annotated_sample(annotated_sample, lemma): + '''Embed synonyms in the sample, if available''' + # TODO: able to annotate permuatations instead of just left to right? + synonym_map = {} + if not annotated_sample: + annotated_sample = lemma + else: + annotated_sample = f'{annotated_sample} {lemma}' + + conn_syn = sqlite3.connect(synonym_db) + d = conn_syn.cursor() + for y in [lemma, _remove_annotated_synonyms(annotated_sample)]: + d.execute('SELECT * FROM label_synonyms WHERE key=:key', {'key':y}) + for x in d.fetchall(): + if not re.search(x[1], annotated_sample): + annotated_sample = annotated_sample+' {'+x[1]+'}' + synonym_map[y] = x[1] + conn_syn.close() + return(annotated_sample, synonym_map) + + +def map_term(term, lookup_table, c, consider_suffixes=False): + '''Map term to some resource in database''' + if consider_suffixes: + for suffix in lookup_table['suffixes']: + mapping = _map_term_helper(term+' '+suffix, c) + if mapping: + for x in mapping: + x['status'].insert(-2, 'Suffix Addition') + return(mapping) + else: + mapping = _map_term_helper(term, c) + if mapping: + return(mapping) + return([]) + + +def annotation_reduce(annotated_sample, synonym_map): + '''Remove annotations on shorter phrases included in longer phrases with annotations''' + remove_list = [] + for x in list(synonym_map.keys()): + for y in list(synonym_map.keys()): + if x != y: + if x.startswith(y) or x.endswith(y) == True: + remove_list.append(y) + for x in remove_list: + annotated_sample = annotated_sample.replace('{'+synonym_map[x]+'}',' ') + return(' '.join(annotated_sample.split())) + + +def get_annotated_synonyms(input_annotations): + '''Get list of the annotations''' + synonym_list = [] + for x in input_annotations.split('{')[1:]: + synonym_list.append(x.split('}')[0]) + return(synonym_list) + + +def get_gram_chunks(input_phrase, num): + '''Make num-gram chunks from input''' + input_tokens = input_phrase.split() + if len(input_tokens) < 15: + return(list(combinations(input_tokens, num))) + else: + return(_ngrams(input_phrase, num))