kkonganti@0: """Helper functions for main pipeline""" kkonganti@0: kkonganti@0: import inflection, re, unicodedata, sqlite3 kkonganti@0: from collections import OrderedDict kkonganti@0: from itertools import combinations kkonganti@0: from dateutil.parser import parse kkonganti@0: from lexmapr.definitions import synonym_db kkonganti@0: from nltk import pos_tag kkonganti@0: from nltk.tokenize import word_tokenize kkonganti@0: from nltk.tokenize.treebank import TreebankWordDetokenizer kkonganti@0: kkonganti@0: kkonganti@0: def _lookup_correction(sample, lookup_table, lookup_x, micro_status, status_title): kkonganti@0: '''Apply corections, if available in resource''' kkonganti@0: sample = ' ' + sample + ' ' kkonganti@0: for x in lookup_table[lookup_x]: kkonganti@0: find_x = re.findall(' '+x+' ', sample) kkonganti@0: if find_x != []: kkonganti@0: micro_status.append(status_title + x) kkonganti@0: sample = sample.replace(' '+x+' ', ' '+lookup_table[lookup_x][x]+' ') kkonganti@0: return(' '.join(sample.split()), micro_status) kkonganti@0: kkonganti@0: kkonganti@0: def _remove_annotated_synonyms(input_annotations): kkonganti@0: '''Remove annotations to see original phrase''' kkonganti@0: output_sample = '' kkonganti@0: copy_char = True kkonganti@0: for x in input_annotations: kkonganti@0: if x == '{': kkonganti@0: copy_char = False kkonganti@0: elif x == '}': kkonganti@0: copy_char = True kkonganti@0: else: kkonganti@0: if copy_char == True: kkonganti@0: output_sample += x kkonganti@0: while re.search(' ', output_sample): kkonganti@0: output_sample = output_sample.replace(' ', ' ') kkonganti@0: return(output_sample) kkonganti@0: kkonganti@0: kkonganti@0: def _retrieve_map_id(search_results, c): kkonganti@0: '''Get resource id from database''' kkonganti@0: return_list = [] kkonganti@0: for x in search_results: kkonganti@0: c.execute('SELECT * FROM non_standard_resource_ids WHERE key=:key', {'key':x[1]}) kkonganti@0: for y in c.fetchall(): kkonganti@0: result_dic = {'term':y[1], 'id':y[0], 'status':[]} kkonganti@0: if not result_dic in return_list: kkonganti@0: return_list.append(result_dic) kkonganti@0: return(return_list) kkonganti@0: kkonganti@0: kkonganti@0: def _map_term_helper(term, c): kkonganti@0: '''Maps term to resource or resource permutation''' kkonganti@0: c.execute('SELECT * FROM standard_resource_labels WHERE key=:key', {'key':term}) kkonganti@0: search_results = c.fetchall() kkonganti@0: if len(search_results) == 0: kkonganti@0: c.execute('SELECT * FROM standard_resource_permutations WHERE key=:key', {'key':term}) kkonganti@0: search_results = c.fetchall() kkonganti@0: if len(search_results) != 0: kkonganti@0: return(_retrieve_map_id(search_results, c)) kkonganti@0: else: kkonganti@0: return(_retrieve_map_id(search_results, c)) kkonganti@0: return(None) kkonganti@0: kkonganti@0: kkonganti@0: def _ngrams(input_phrase, gram_value): kkonganti@0: '''Get ngrams with a given value of gram_value''' kkonganti@0: input_phrase = input_phrase.split() kkonganti@0: output = [] kkonganti@0: for i in range(len(input_phrase) - gram_value + 1): kkonganti@0: output.append(input_phrase[i:i + gram_value]) kkonganti@0: return(output) kkonganti@0: kkonganti@0: kkonganti@0: def process_sample(sample, lookup_table, micro_status): kkonganti@0: '''Apply corrections to input sample''' kkonganti@0: sample, micro_status = _lookup_correction(sample, lookup_table, 'spelling_mistakes', kkonganti@0: micro_status, 'Spelling Correction Treatment: ') kkonganti@0: sample, micro_status = _lookup_correction(sample, lookup_table, 'abbreviations', kkonganti@0: micro_status, 'Abbreviation-Acronym Treatment: ') kkonganti@0: sample, micro_status = _lookup_correction(sample, lookup_table, 'non_english_words', kkonganti@0: micro_status, 'Non English Language Words Treatment: ') kkonganti@0: return(sample, micro_status) kkonganti@0: kkonganti@0: kkonganti@0: def punctuation_treatment(untreated_term): kkonganti@0: '''Remove punctuations from term''' kkonganti@0: punctuations_regex_char_class = '[~`!@#$%^*()_\|/{}:;,.<>?]' kkonganti@0: ret_term = '' kkonganti@0: for word_token in untreated_term.split(): kkonganti@0: if word_token.count('-') > 1: kkonganti@0: ret_term += word_token.replace('-',' ') + ' ' kkonganti@0: else: kkonganti@0: ret_term += word_token + ' ' kkonganti@0: ret_term = ret_term.lower().replace('\"','').replace('\'ve','').replace('\'m','') kkonganti@0: ret_term = ret_term.replace('\'s','').replace('\'t','').replace('\'ll','').replace('\'re','') kkonganti@0: ret_term = ret_term.replace('\'','').replace('-','').replace('[','').replace(']','') kkonganti@0: ret_term = ret_term.replace('&',' and ').replace('+',' and ').replace('=',' is ') kkonganti@0: ret_term = re.sub(punctuations_regex_char_class, ' ', ret_term).lower() kkonganti@0: return(' '.join(ret_term.split())) kkonganti@0: kkonganti@0: kkonganti@0: def further_cleanup(sample_text): kkonganti@0: '''Remove terms indicated to not be relevant and some compound words''' kkonganti@0: new_text = [] kkonganti@0: neg_words = [r'no ',r'non',r'not',r'neither',r'nor',r'without'] kkonganti@0: stt_words = ['animal','cb','chicken','environmental','food','human','large','medium','necropsy', kkonganti@0: 'organic','other','poultry','product','sausage','small','stool','swab','wild',] kkonganti@0: end_words = ['aspirate','culture','environmental','fluid','food','intestine','large','meal','medium', kkonganti@0: 'mixed','necropsy','other','poultry','product','research','sample','sausage','slaughter', kkonganti@0: 'small','swab','water','wild',] kkonganti@0: not_replace = ['agriculture','apiculture','aquaculture','aquiculture','aviculture', kkonganti@0: 'coculture','hemoculture','mariculture','monoculture','sericulture', kkonganti@0: 'subculture','viniculture','viticulture', kkonganti@0: 'semifluid','subfluid','superfluid', kkonganti@0: 'superlarge','reenlarge','enlarge','overlarge','largemouth','larges', kkonganti@0: 'bonemeal','cornmeal','fishmeal','inchmeal','oatmeal','piecemeal','premeal', kkonganti@0: 'wholemeal','biosample','ensample','resample','subsample','backwater', kkonganti@0: 'another','bother','brother','foremother','frother','godmother','grandmother', kkonganti@0: 'housemother','mother','otherguess','otherness','othernesse','otherwhere', kkonganti@0: 'otherwhile','otherworld','pother','soother','smoother','smother','stepbrother', kkonganti@0: 'stepmother','tother', kkonganti@0: 'byproduct','coproduct','production','productive','subproduct', kkonganti@0: 'ultrasmall','smaller','smallmouth','smalltime','smallpox','smallpoxe', kkonganti@0: 'smallsword','smallsholder','mediumship', kkonganti@0: 'bathwater','bilgewater','blackwater','breakwater','cutwater','deepwater', kkonganti@0: 'dewater','dishwater','eyewater','firewater','floodwater','freshwater', kkonganti@0: 'graywater','groundwater','headwater','jerkwater','limewater','meltwater', kkonganti@0: 'overwater','polywater','rainwater','rosewater','saltwater','seawater', kkonganti@0: 'shearwater','springwater','tailwater','tidewater','underwater','wastewater', kkonganti@0: 'semiwild','wildcard','wildcat','wildcatter','wildcatted','wildebeest','wilded', kkonganti@0: 'wilder','wilderment','wilderness','wildernesse','wildest','wildfire','wildflower', kkonganti@0: 'wildfowl','wildfowler','wildish','wildland','wildling','wildlife','wildwood', kkonganti@0: ] kkonganti@0: kkonganti@0: found_comp = [] kkonganti@0: for comp_word in stt_words: kkonganti@0: found_comp.extend(re.findall(f'({comp_word})(\w+)', sample_text)) kkonganti@0: for comp_word in end_words: kkonganti@0: found_comp.extend(re.findall(f'(\w+)({comp_word})', sample_text)) kkonganti@0: for x in found_comp: kkonganti@0: if x[0]+x[1] not in not_replace and x[0]+x[1]+'s' not in not_replace: kkonganti@0: sample_text = sample_text.replace(x[0]+x[1], x[0]+' '+x[1]) kkonganti@0: kkonganti@0: for sample_word in sample_text.split(): kkonganti@0: if len(sample_word) > 1: kkonganti@0: new_text.append(sample_word.strip()) kkonganti@0: kkonganti@0: if 'nor' in new_text: kkonganti@0: if 'neither' not in new_text: kkonganti@0: word_ind = new_text.index('nor') kkonganti@0: new_text.insert(max[0,word_ind-2], 'neither') kkonganti@0: kkonganti@0: for neg_word in neg_words: kkonganti@0: if neg_word in new_text: kkonganti@0: word_ind = new_text.index(neg_word) kkonganti@0: del(new_text[word_ind:word_ind+2]) kkonganti@0: return(' '.join(new_text)) kkonganti@0: kkonganti@0: kkonganti@0: def is_number(input_string): kkonganti@0: '''Determine whether a string is a number''' kkonganti@0: try: kkonganti@0: unicodedata.numeric(input_string) kkonganti@0: return(True) kkonganti@0: except(TypeError, ValueError): kkonganti@0: return(False) kkonganti@0: kkonganti@0: kkonganti@0: def is_date(input_string): kkonganti@0: '''Determine whether a string is a date or day''' kkonganti@0: try: kkonganti@0: parse(input_string) kkonganti@0: return(True) kkonganti@0: except(ValueError, OverflowError): kkonganti@0: return(False) kkonganti@0: kkonganti@0: kkonganti@0: def singularize_token(token, lookup_table, micro_status, c): kkonganti@0: '''Singularize the string token, if applicable''' kkonganti@0: if token in lookup_table['inflection_exceptions']: kkonganti@0: return(token, micro_status) kkonganti@0: kkonganti@0: exception_tail_chars_list = ['us', 'ia', 'ta', 'ss'] # TODO: add as, is? kkonganti@0: for char in exception_tail_chars_list: kkonganti@0: if token.endswith(char): kkonganti@0: return(token, micro_status) kkonganti@0: kkonganti@0: taxon_names = c.execute('''SELECT * FROM standard_resource_labels WHERE key LIKE :key AND kkonganti@0: value LIKE :value''', kkonganti@0: {'key':'% '+token,'value':'NCBITaxon%'}).fetchall() kkonganti@0: remove_finds = [] kkonganti@0: for x in taxon_names: kkonganti@0: if len(x[0].split()) > 2: kkonganti@0: remove_finds.append(x) kkonganti@0: for x in remove_finds: kkonganti@0: taxon_names.remove(x) kkonganti@0: if taxon_names != []: kkonganti@0: return(token, micro_status) kkonganti@0: kkonganti@0: lemma = inflection.singularize(token) kkonganti@0: micro_status.append('Inflection (Plural) Treatment: ' + token) kkonganti@0: return(lemma, micro_status) kkonganti@0: kkonganti@0: kkonganti@0: def get_cleaned_sample(input_sample, token, lookup_table): kkonganti@0: '''Prepare the cleaned sample phrase using the input token''' kkonganti@0: if input_sample == '' and token not in lookup_table['stop_words']: kkonganti@0: return(token) kkonganti@0: elif token not in lookup_table['stop_words']: kkonganti@0: return(input_sample + ' ' + token) kkonganti@0: else: kkonganti@0: return(input_sample) kkonganti@0: kkonganti@0: kkonganti@0: def get_annotated_sample(annotated_sample, lemma): kkonganti@0: '''Embed synonyms in the sample, if available''' kkonganti@0: # TODO: able to annotate permuatations instead of just left to right? kkonganti@0: synonym_map = {} kkonganti@0: if not annotated_sample: kkonganti@0: annotated_sample = lemma kkonganti@0: else: kkonganti@0: annotated_sample = f'{annotated_sample} {lemma}' kkonganti@0: kkonganti@0: conn_syn = sqlite3.connect(synonym_db) kkonganti@0: d = conn_syn.cursor() kkonganti@0: for y in [lemma, _remove_annotated_synonyms(annotated_sample)]: kkonganti@0: d.execute('SELECT * FROM label_synonyms WHERE key=:key', {'key':y}) kkonganti@0: for x in d.fetchall(): kkonganti@0: if not re.search(x[1], annotated_sample): kkonganti@0: annotated_sample = annotated_sample+' {'+x[1]+'}' kkonganti@0: synonym_map[y] = x[1] kkonganti@0: conn_syn.close() kkonganti@0: return(annotated_sample, synonym_map) kkonganti@0: kkonganti@0: kkonganti@0: def map_term(term, lookup_table, c, consider_suffixes=False): kkonganti@0: '''Map term to some resource in database''' kkonganti@0: if consider_suffixes: kkonganti@0: for suffix in lookup_table['suffixes']: kkonganti@0: mapping = _map_term_helper(term+' '+suffix, c) kkonganti@0: if mapping: kkonganti@0: for x in mapping: kkonganti@0: x['status'].insert(-2, 'Suffix Addition') kkonganti@0: return(mapping) kkonganti@0: else: kkonganti@0: mapping = _map_term_helper(term, c) kkonganti@0: if mapping: kkonganti@0: return(mapping) kkonganti@0: return([]) kkonganti@0: kkonganti@0: kkonganti@0: def annotation_reduce(annotated_sample, synonym_map): kkonganti@0: '''Remove annotations on shorter phrases included in longer phrases with annotations''' kkonganti@0: remove_list = [] kkonganti@0: for x in list(synonym_map.keys()): kkonganti@0: for y in list(synonym_map.keys()): kkonganti@0: if x != y: kkonganti@0: if x.startswith(y) or x.endswith(y) == True: kkonganti@0: remove_list.append(y) kkonganti@0: for x in remove_list: kkonganti@0: annotated_sample = annotated_sample.replace('{'+synonym_map[x]+'}',' ') kkonganti@0: return(' '.join(annotated_sample.split())) kkonganti@0: kkonganti@0: kkonganti@0: def get_annotated_synonyms(input_annotations): kkonganti@0: '''Get list of the annotations''' kkonganti@0: synonym_list = [] kkonganti@0: for x in input_annotations.split('{')[1:]: kkonganti@0: synonym_list.append(x.split('}')[0]) kkonganti@0: return(synonym_list) kkonganti@0: kkonganti@0: kkonganti@0: def get_gram_chunks(input_phrase, num): kkonganti@0: '''Make num-gram chunks from input''' kkonganti@0: input_tokens = input_phrase.split() kkonganti@0: if len(input_tokens) < 15: kkonganti@0: return(list(combinations(input_tokens, num))) kkonganti@0: else: kkonganti@0: return(_ngrams(input_phrase, num))