annotate lexmapr/ontology_reasoner.py @ 4:819eff1bd7ac tip

"planemo upload"
author cstrittmatter
date Wed, 29 Jun 2022 15:30:52 -0400
parents f298f3e5c515
children
rev   line source
cstrittmatter@0 1 """Ontology finder and visualizer"""
cstrittmatter@0 2
cstrittmatter@0 3 import copy, json, logging, requests, time
cstrittmatter@0 4 import pygraphviz as pgv
cstrittmatter@0 5
cstrittmatter@0 6 logging.getLogger('urllib3').setLevel(logging.WARNING)
cstrittmatter@0 7
cstrittmatter@0 8
cstrittmatter@0 9 # TODO: figure out what to do with root Thing:Thing
cstrittmatter@0 10 class Ontology_accession:
cstrittmatter@0 11 '''Base class for defining attributes and behavior of single ontology accesions;
cstrittmatter@0 12 Assume format definition (whitespace and punctuation okay):ontology_id'''
cstrittmatter@0 13 existing_ontologies = {}
cstrittmatter@0 14
cstrittmatter@0 15 @staticmethod
cstrittmatter@0 16 def make_instance(acc):
cstrittmatter@0 17 '''Use instead of default __init__ to enforce one instance per ontology'''
cstrittmatter@0 18 try:
cstrittmatter@0 19 return(Ontology_accession.existing_ontologies[acc])
cstrittmatter@0 20 except(KeyError):
cstrittmatter@0 21 Ontology_accession.existing_ontologies[acc] = Ontology_accession(acc)
cstrittmatter@0 22 return(Ontology_accession.existing_ontologies[acc])
cstrittmatter@0 23
cstrittmatter@0 24 def __init__(self, acc):
cstrittmatter@0 25 '''If ontology is not recognized, just use short form, ex THING'''
cstrittmatter@0 26 def_split = acc.split(':')
cstrittmatter@0 27 self.label = ':'.join(def_split[:-1])
cstrittmatter@0 28 self.id = def_split[-1].replace('_',':')
cstrittmatter@0 29 self.parents = 'not assigned yet'
cstrittmatter@0 30 self.children = 'not assigned yet'
cstrittmatter@0 31 self.ancestors = 'not assigned yet'
cstrittmatter@0 32 self.descendants = 'not assigned yet'
cstrittmatter@0 33 self.graph_nodes = 'not assigned yet'
cstrittmatter@0 34 self.graph_fill = False
cstrittmatter@0 35 self.ontology = def_split[1].split('_')[0]
cstrittmatter@0 36 if self.label == '':
cstrittmatter@0 37 self._get_label()
cstrittmatter@0 38
cstrittmatter@0 39 def _api_results(self, input_list, return_list):
cstrittmatter@0 40 '''Ignore obsolete terms, not currently checking for \'term_replaced_by\''''
cstrittmatter@0 41 for x_term in input_list:
cstrittmatter@0 42 if x_term['is_obsolete']:
cstrittmatter@0 43 continue
cstrittmatter@0 44 new_term = x_term['label'] + ':' + x_term['short_form']
cstrittmatter@0 45 return_list.append(Ontology_accession.make_instance(new_term))
cstrittmatter@0 46 return(return_list)
cstrittmatter@0 47
cstrittmatter@0 48 def _add_edges(self, family_member, family_list, edge_set, round_num):
cstrittmatter@0 49 '''Add edges to graph'''
cstrittmatter@0 50 if edge_set == []:
cstrittmatter@0 51 return(edge_set)
cstrittmatter@0 52 elif round_num > 0:
cstrittmatter@0 53 for x in family_list:
cstrittmatter@0 54 x.get_family(family_member)
cstrittmatter@0 55 if family_member == 'parents': # TODO: how get x.family_member to collapse code
cstrittmatter@0 56 if x.parents == ['none found']:
cstrittmatter@0 57 continue
cstrittmatter@0 58 if len(x.parents) > 5:
cstrittmatter@0 59 time.sleep(0.05)
cstrittmatter@0 60 new_edges = [(y._graph_label(),x._graph_label()) for y in x.parents]
cstrittmatter@0 61 edge_set = edge_set + [z for z in new_edges if z not in edge_set]
cstrittmatter@0 62 edge_set = x._add_edges(family_member, x.parents, edge_set, round_num-1)
cstrittmatter@0 63 elif family_member == 'children':
cstrittmatter@0 64 if x.children == ['none found']:
cstrittmatter@0 65 continue
cstrittmatter@0 66 if len(x.children) > 5:
cstrittmatter@0 67 time.sleep(0.05)
cstrittmatter@0 68 new_edges = [(x._graph_label(),y._graph_label()) for y in x.children]
cstrittmatter@0 69 edge_set = edge_set + [z for z in new_edges if z not in edge_set]
cstrittmatter@0 70 edge_set = x._add_edges(family_member, x.children, edge_set, round_num-1)
cstrittmatter@0 71 return(edge_set)
cstrittmatter@0 72
cstrittmatter@0 73 def _draw_graph(self, o_file, node_color, edge_color):
cstrittmatter@0 74 '''Draw and save the graph'''
cstrittmatter@0 75 ontol_graph = pgv.AGraph(name='ontology_graph')
cstrittmatter@0 76 ontol_graph.add_node(self._graph_label())
cstrittmatter@0 77 for x in self.graph_nodes:
cstrittmatter@0 78 ontol_graph.add_edge(x[0], x[1])
cstrittmatter@0 79 ontol_graph.node_attr.update(shape='box',
cstrittmatter@0 80 style='rounded,filled',
cstrittmatter@0 81 fillcolor='lightgrey',
cstrittmatter@0 82 color=node_color)
cstrittmatter@0 83 ontol_graph.edge_attr.update(shape='normal',
cstrittmatter@0 84 color=edge_color,
cstrittmatter@0 85 dir='back')
cstrittmatter@0 86 ontol_graph.get_node(self._graph_label()).attr.update(fillcolor='lightblue')
cstrittmatter@0 87 # TODO: determine best algorithm: neato, fdp, nop, twopi; tried circo; not dot, sfdp
cstrittmatter@0 88 ontol_graph.draw(o_file, prog='twopi')
cstrittmatter@0 89
cstrittmatter@0 90 def _expand_edge(self, family_member, family_list, edge_set, old_set='', stop_terms=False):
cstrittmatter@0 91 '''Add edges to graph'''
cstrittmatter@0 92 while old_set != edge_set:
cstrittmatter@0 93 old_set = copy.deepcopy(edge_set)
cstrittmatter@0 94 for x in family_list:
cstrittmatter@0 95 if x == 'none found':
cstrittmatter@0 96 break
cstrittmatter@0 97 if type(stop_terms) == list:
cstrittmatter@0 98 if x in stop_terms:
cstrittmatter@0 99 break
cstrittmatter@0 100 x.get_family(family_member)
cstrittmatter@0 101 if family_member == 'parents': # TODO: how get x.family_member to collapse code
cstrittmatter@0 102 if x.parents == ['none found']:
cstrittmatter@0 103 continue
cstrittmatter@0 104 if len(x.parents) > 5:
cstrittmatter@0 105 time.sleep(0.05)
cstrittmatter@0 106 new_edges = [(y._graph_label(),x._graph_label()) for y in x.parents]
cstrittmatter@0 107 edge_set = edge_set + [z for z in new_edges if z not in edge_set]
cstrittmatter@0 108 edge_set = x._expand_edge(family_member,x.parents,edge_set,old_set,stop_terms)
cstrittmatter@0 109 elif family_member == 'children':
cstrittmatter@0 110 if x.children == ['none found']:
cstrittmatter@0 111 continue
cstrittmatter@0 112 if len(x.children) > 5:
cstrittmatter@0 113 time.sleep(0.05)
cstrittmatter@0 114 new_edges = [(x._graph_label(),y._graph_label()) for y in x.children]
cstrittmatter@0 115 edge_set = edge_set + [z for z in new_edges if z not in edge_set]
cstrittmatter@0 116 edge_set = x._expand_edge(family_member,x.children,edge_set,old_set,stop_terms)
cstrittmatter@0 117 return(edge_set)
cstrittmatter@0 118
cstrittmatter@0 119 def _get_label(self):
cstrittmatter@0 120 '''Retrieve definition is correct for an id; updates instance'''
cstrittmatter@0 121 query_url = 'http://www.ebi.ac.uk/ols/api/terms?obo_id={}'.format(self.id)
cstrittmatter@0 122 ols_resp = self._get_request(query_url)
cstrittmatter@0 123 if ols_resp is None:
cstrittmatter@0 124 logging.warning(f'Did not retrieve PURL for {self.id}')
cstrittmatter@0 125 self.label = 'unk'
cstrittmatter@0 126 return
cstrittmatter@0 127 try:
cstrittmatter@0 128 self.label = ols_resp.json()['_embedded']['terms'][0]['label']
cstrittmatter@0 129 except(KeyError):
cstrittmatter@0 130 logging.warning(f'Did not find label for {self.id} in OLS')
cstrittmatter@0 131 self.label = 'unk'
cstrittmatter@0 132 except json.decoder.JSONDecodeError as err:
cstrittmatter@0 133 time.sleep(0.05)
cstrittmatter@0 134 self._get_label()
cstrittmatter@0 135
cstrittmatter@0 136 def _get_request(self, request_url, max_retries=5):
cstrittmatter@0 137 '''Retrieve URL'''
cstrittmatter@0 138 while max_retries > 0:
cstrittmatter@0 139 try:
cstrittmatter@0 140 return(requests.get(request_url))
cstrittmatter@0 141 except:
cstrittmatter@0 142 time.sleep(0.05)
cstrittmatter@0 143 max_retries -= 1
cstrittmatter@0 144 return(None)
cstrittmatter@0 145
cstrittmatter@0 146 def _graph_label(self):
cstrittmatter@0 147 '''Format a graph label'''
cstrittmatter@0 148 return(self.id+'\\n'+self.label)
cstrittmatter@0 149
cstrittmatter@0 150 def _next_page(self, url_link, return_list):
cstrittmatter@0 151 '''Get next page of search results'''
cstrittmatter@0 152 next_resp = self._get_request(url_link)
cstrittmatter@0 153 if next_resp is None:
cstrittmatter@0 154 logging.warning(f'Did not retrieve URL for {url_link} during API search')
cstrittmatter@0 155 return(False, return_list)
cstrittmatter@0 156 else:
cstrittmatter@0 157 try:
cstrittmatter@0 158 next_link = next_resp.json()['_links']['next']['href']
cstrittmatter@0 159 except(KeyError):
cstrittmatter@0 160 next_link = False
cstrittmatter@0 161 return_list = self._api_results(next_resp.json()['_embedded']['terms'], return_list)
cstrittmatter@0 162 return(next_link, return_list)
cstrittmatter@0 163
cstrittmatter@0 164 def check_label(self):
cstrittmatter@0 165 '''Check if given definition is correct for an id; returns Boolean or str `unk`'''
cstrittmatter@0 166 self._get_label()
cstrittmatter@0 167 if self.label != 'unk':
cstrittmatter@0 168 return(ols_resp.json()['_embedded']['terms'][0]['label'] == self.label)
cstrittmatter@0 169 else:
cstrittmatter@0 170 return(self.label)
cstrittmatter@0 171
cstrittmatter@0 172 def get_family(self, family_member):
cstrittmatter@0 173 '''Returns list of parents, ancestors, children or descendants'''
cstrittmatter@0 174 if family_member == 'parents' and self.parents != 'not assigned yet':
cstrittmatter@0 175 return(self.parents)
cstrittmatter@0 176 elif family_member == 'children' and self.children != 'not assigned yet':
cstrittmatter@0 177 return(self.children)
cstrittmatter@0 178 elif family_member == 'ancestors' and self.ancestors != 'not assigned yet':
cstrittmatter@0 179 return(self.ancestors)
cstrittmatter@0 180 elif family_member == 'descendants' and self.descendants != 'not assigned yet':
cstrittmatter@0 181 return(self.descendants)
cstrittmatter@0 182
cstrittmatter@0 183 if self.id.split(':')[0].lower() == 'gaz':
cstrittmatter@0 184 query_url = 'https://www.ebi.ac.uk/ols/api/ontologies/gaz/terms?iri='
cstrittmatter@0 185 query_url += 'http://purl.obolibrary.org/obo/' + self.id.replace(':','_')
cstrittmatter@0 186 ols_resp = self._get_request(query_url)
cstrittmatter@0 187 qry_url = ols_resp.json()['_embedded']['terms'][0]['_links']\
cstrittmatter@0 188 ['hierarchical'+family_member.title()]['href']
cstrittmatter@0 189 else:
cstrittmatter@0 190 query_url = 'http://www.ebi.ac.uk/ols/api/ontologies/{}/{}?id={}'
cstrittmatter@0 191 qry_url = query_url.format(self.id.split(':')[0].lower(),family_member,self.id)
cstrittmatter@0 192
cstrittmatter@0 193 ols_resp = self._get_request(qry_url)
cstrittmatter@0 194 if ols_resp is None:
cstrittmatter@0 195 logging.warning(f'Did not get URL for {url_link} during search for {family_member}')
cstrittmatter@0 196 result_list = ['none found']
cstrittmatter@0 197 elif ols_resp.status_code > 200:
cstrittmatter@0 198 result_list = ['none found']
cstrittmatter@0 199 elif ols_resp.json()['page']['totalElements'] > 0:
cstrittmatter@0 200 result_list = self._api_results(ols_resp.json()['_embedded']['terms'], [])
cstrittmatter@0 201 if ols_resp.json()['page']['totalPages'] > 1:
cstrittmatter@0 202 next_url = ols_resp.json()['_links']['next']['href']
cstrittmatter@0 203 while next_url:
cstrittmatter@0 204 next_url,result_list = self._next_page(next_url,result_list)
cstrittmatter@0 205 else:
cstrittmatter@0 206 result_list = ['none found']
cstrittmatter@0 207
cstrittmatter@0 208 if family_member == 'parents':
cstrittmatter@0 209 self.parents = list(set(result_list))
cstrittmatter@0 210 elif family_member == 'children':
cstrittmatter@0 211 self.children = list(set(result_list))
cstrittmatter@0 212 elif family_member == 'ancestors':
cstrittmatter@0 213 self.ancestors = list(set(result_list))
cstrittmatter@0 214 elif family_member == 'descendants':
cstrittmatter@0 215 self.descendants = list(set(result_list))
cstrittmatter@0 216 return(result_list)
cstrittmatter@0 217
cstrittmatter@0 218 def bin_term(self, bin_package):
cstrittmatter@0 219 '''Categorize term into given bins as Ontology_package'''
cstrittmatter@0 220 term_bins = []
cstrittmatter@0 221 self.get_family('ancestors')
cstrittmatter@0 222 if self.ancestors == ['none found']:
cstrittmatter@0 223 ancestor_labels = [x.label + ':' + x.id.replace(':','_') for x in [self]]
cstrittmatter@0 224 else:
cstrittmatter@0 225 ancestor_labels = [x.label+':'+x.id.replace(':','_') for x in [self]+self.ancestors]
cstrittmatter@0 226 return([x for x in ancestor_labels if x in bin_package.ontologies])
cstrittmatter@0 227
cstrittmatter@0 228 def visualize_term(self, o_file, node_color='black', edge_color='black',
cstrittmatter@0 229 fill_out=False, stop_terms=False, draw_graph=True):
cstrittmatter@0 230 '''Visualize one term'''
cstrittmatter@0 231 if self.graph_nodes!='not assigned yet' and self.graph_fill==fill_out:
cstrittmatter@0 232 if draw_graph:
cstrittmatter@0 233 self._draw_graph(o_file, node_color, edge_color)
cstrittmatter@0 234 else:
cstrittmatter@0 235 self.get_family('parents')
cstrittmatter@0 236 self.get_family('children')
cstrittmatter@0 237 edge_set1,edge_set2 = [],[]
cstrittmatter@0 238 if self.parents != ['none found']:
cstrittmatter@0 239 edge_set1 = [(x._graph_label(),self._graph_label()) for x in self.parents]
cstrittmatter@0 240 if self.children != ['none found']:
cstrittmatter@0 241 edge_set2 = [(self._graph_label(),x._graph_label()) for x in self.children]
cstrittmatter@0 242 if type(fill_out) == int:
cstrittmatter@0 243 edge_set1 = self._add_edges('parents', self.parents, edge_set1, fill_out-1)
cstrittmatter@0 244 edge_set2 = self._add_edges('children', self.children, edge_set2, fill_out-1)
cstrittmatter@0 245 elif fill_out==True:
cstrittmatter@0 246 edge_set1 = self._expand_edge('parents',self.parents,edge_set1,'',stop_terms)
cstrittmatter@0 247 edge_set2 = self._expand_edge('children',self.children,edge_set2,'',stop_terms)
cstrittmatter@0 248 self.graph_nodes = list(set(edge_set1+edge_set2))
cstrittmatter@0 249 if draw_graph:
cstrittmatter@0 250 self._draw_graph(o_file, node_color, edge_color)
cstrittmatter@0 251
cstrittmatter@0 252
cstrittmatter@0 253 class Ontology_package:
cstrittmatter@0 254 '''Associate or package Ontology_accession objects together'''
cstrittmatter@0 255 def __init__(self, package_label, ontol_list):
cstrittmatter@0 256 self.label = package_label
cstrittmatter@0 257 self.ontologies = ontol_list
cstrittmatter@0 258 self.bins = []
cstrittmatter@0 259 self.lcp = 'not assigned yet'
cstrittmatter@0 260 self.hcc = 'not assigned yet'
cstrittmatter@0 261 self._lcp_state = (True,[])
cstrittmatter@0 262 self._hcc_state = (True,[])
cstrittmatter@0 263 self._bin_state = []
cstrittmatter@0 264 self.graph_nodes = 'not assigned yet'
cstrittmatter@0 265 self.graph_state = False
cstrittmatter@0 266
cstrittmatter@0 267 def _common_family(self,family_member,incl_terms,excl_terms):
cstrittmatter@0 268 '''Find common family members'''
cstrittmatter@0 269 family_candidates = {}
cstrittmatter@0 270 for ontol_term in [x for x in self.ontologies if x.id not in excl_terms]:
cstrittmatter@0 271 family_candidates[ontol_term] = ontol_term.get_family(family_member)
cstrittmatter@0 272 common_members = self._common_list(family_candidates, incl_terms)
cstrittmatter@0 273 while common_members == []:
cstrittmatter@0 274 for ontol_term in [x for x in self.ontologies if x.id not in excl_terms]:
cstrittmatter@0 275 if len(self.ontologies) > 30:
cstrittmatter@0 276 time.sleep(0.05)
cstrittmatter@0 277 original_list = list(family_candidates[ontol_term])
cstrittmatter@0 278 for family_ontol in original_list:
cstrittmatter@0 279 if len(original_list) > 30:
cstrittmatter@0 280 time.sleep(0.05)
cstrittmatter@0 281 try:
cstrittmatter@0 282 family_candidates[ontol_term].extend(\
cstrittmatter@0 283 family_ontol.get_family(family_member))
cstrittmatter@0 284 except(AttributeError):
cstrittmatter@0 285 family_candidates[ontol_term].extend(['none found'])
cstrittmatter@0 286 return(common_members)
cstrittmatter@0 287
cstrittmatter@0 288 def _common_list(self, input_dic, incl_terms):
cstrittmatter@0 289 '''Compare input dictionary keys and list'''
cstrittmatter@0 290 term_lists = []
cstrittmatter@0 291 for ontol_key in input_dic:
cstrittmatter@0 292 append_list = [ontol_key]
cstrittmatter@0 293 for ontol_val in input_dic[ontol_key]:
cstrittmatter@0 294 append_list.append(ontol_val)
cstrittmatter@0 295 term_lists.append(append_list)
cstrittmatter@0 296 common_set = set.intersection(*map(set, term_lists))
cstrittmatter@0 297 if incl_terms:
cstrittmatter@0 298 common_keys = []
cstrittmatter@0 299 for ontol_acc in common_set:
cstrittmatter@0 300 if ontol_acc in input_dic.keys():
cstrittmatter@0 301 common_keys.append(ontol_acc)
cstrittmatter@0 302 if common_keys != []:
cstrittmatter@0 303 return(common_keys)
cstrittmatter@0 304 return(list(common_set - set(input_dic.keys())))
cstrittmatter@0 305
cstrittmatter@0 306 def _draw_graph(self, o_file, node_color, edge_color, show_lcp, show_hcc):
cstrittmatter@0 307 '''Draw and save graph'''
cstrittmatter@0 308 ontol_graph = pgv.AGraph(name='ontology_graph')
cstrittmatter@0 309 for x in self.ontologies:
cstrittmatter@0 310 ontol_graph.add_node(x._graph_label())
cstrittmatter@0 311 for x in self.graph_nodes:
cstrittmatter@0 312 ontol_graph.add_edge(x[0], x[1])
cstrittmatter@0 313 ontol_graph.node_attr.update(shape='box', style='rounded,filled',
cstrittmatter@0 314 fillcolor='lightgrey', color=node_color)
cstrittmatter@0 315 ontol_graph.edge_attr.update(shape='normal', color=edge_color, dir='back')
cstrittmatter@0 316 if show_lcp:
cstrittmatter@0 317 for x in self.lcp:
cstrittmatter@0 318 ontol_graph.get_node(x._graph_label()).attr.update(fillcolor='beige')
cstrittmatter@0 319 if show_hcc:
cstrittmatter@0 320 for x in self.hcc:
cstrittmatter@0 321 ontol_graph.get_node(x._graph_label()).attr.update(fillcolor='beige')
cstrittmatter@0 322 for x in self.ontologies:
cstrittmatter@0 323 ontol_graph.get_node(x._graph_label()).attr.update(fillcolor='lightblue')
cstrittmatter@0 324 ontol_graph.draw(o_file,prog='dot')
cstrittmatter@0 325
cstrittmatter@0 326 def _list_hierarchy(self, input_list, input_position):
cstrittmatter@0 327 '''Get lowest or highest terms'''
cstrittmatter@0 328 if input_list == ['none found']:
cstrittmatter@0 329 return(input_list)
cstrittmatter@0 330 family_lists = {}
cstrittmatter@0 331 for input_term in input_list:
cstrittmatter@0 332 if len(input_list) > 30: time.sleep(0.05)
cstrittmatter@0 333 if input_position == 'lowest':
cstrittmatter@0 334 if input_term == 'none found':
cstrittmatter@0 335 family_list = 'none found'
cstrittmatter@0 336 else:
cstrittmatter@0 337 family_list = input_term.get_family('ancestors')
cstrittmatter@0 338 elif input_position == 'highest':
cstrittmatter@0 339 if input_term == 'none found':
cstrittmatter@0 340 family_list = 'none found'
cstrittmatter@0 341 else:
cstrittmatter@0 342 family_list = input_term.get_family('descendants')
cstrittmatter@0 343 family_lists[input_term] = family_list
cstrittmatter@0 344 while True:
cstrittmatter@0 345 remove_terms = []
cstrittmatter@0 346 for input_term in input_list:
cstrittmatter@0 347 if [True for f_l in family_lists if input_term in family_lists[f_l]] != []:
cstrittmatter@0 348 del family_lists[input_term]
cstrittmatter@0 349 remove_terms.append(input_term)
cstrittmatter@0 350 if remove_terms != []:
cstrittmatter@0 351 for x_term in remove_terms:
cstrittmatter@0 352 input_list.remove(x_term)
cstrittmatter@0 353 else:
cstrittmatter@0 354 break
cstrittmatter@0 355 return(input_list)
cstrittmatter@0 356
cstrittmatter@0 357 def _trim_tips(self):
cstrittmatter@0 358 '''Remove descendants of self.ontologies and parents of self.lcp'''
cstrittmatter@0 359 tip_nodes = [x._graph_label() for x in self.ontologies] +\
cstrittmatter@0 360 [x._graph_label() for x in self.lcp]
cstrittmatter@0 361 old_nodes = []
cstrittmatter@0 362 while old_nodes != self.graph_nodes:
cstrittmatter@0 363 old_nodes = self.graph_nodes
cstrittmatter@0 364 right_nodes = set()
cstrittmatter@0 365 left_nodes = set()
cstrittmatter@0 366 for x in self.graph_nodes:
cstrittmatter@0 367 left_nodes.add(x[0])
cstrittmatter@0 368 right_nodes.add(x[1])
cstrittmatter@0 369 top_nodes = [x for x in left_nodes.difference(right_nodes) if x not in tip_nodes]
cstrittmatter@0 370 bot_nodes = [x for x in right_nodes.difference(left_nodes) if x not in tip_nodes]
cstrittmatter@0 371 self.graph_nodes = [x for x in self.graph_nodes if x[0] not in top_nodes]
cstrittmatter@0 372 self.graph_nodes = [x for x in self.graph_nodes if x[1] not in bot_nodes]
cstrittmatter@0 373
cstrittmatter@0 374 def get_lcp(self, incl_terms=True, excl_terms=[]): # TODO: missing excl_terms
cstrittmatter@0 375 '''Find lowest common parent(s); can include input terms as lcp,
cstrittmatter@0 376 exclude terms by obo id; saves results in lcp attribute'''
cstrittmatter@0 377 if self._lcp_state == (incl_terms, excl_terms):
cstrittmatter@0 378 if self.lcp != 'not assigned yet':
cstrittmatter@0 379 return
cstrittmatter@0 380 common_members = self._common_family('parents',incl_terms, excl_terms)
cstrittmatter@0 381 common_members = self._list_hierarchy(common_members, 'lowest')
cstrittmatter@0 382 if common_members != []:
cstrittmatter@0 383 self.lcp = common_members
cstrittmatter@0 384 self._lcp_state = (incl_terms, excl_terms)
cstrittmatter@0 385
cstrittmatter@0 386 def get_hcc(self, incl_terms=True, excl_terms=[]):
cstrittmatter@0 387 '''Get highest common child(ren); can include input terms as hcc;
cstrittmatter@0 388 exclude terms by obo id; saves results in hcc attribute'''
cstrittmatter@0 389 if self._hcc_state == (incl_terms, excl_terms):
cstrittmatter@0 390 if self.hcc != 'not assigned yet':
cstrittmatter@0 391 return
cstrittmatter@0 392 common_members = self._common_family('children', incl_terms, excl_terms)
cstrittmatter@0 393 common_members = self._list_hierarchy(common_members, 'highest')
cstrittmatter@0 394 if common_members != []:
cstrittmatter@0 395 self.hcc = common_members
cstrittmatter@0 396 self._hcc_state = (incl_terms, excl_terms)
cstrittmatter@0 397
cstrittmatter@0 398 def set_lcp(self, lcp_acc, incl_terms=True, excl_terms=[]):
cstrittmatter@0 399 self.lcp = lcp_acc
cstrittmatter@0 400 self._lcp_state = (incl_terms, excl_terms)
cstrittmatter@0 401
cstrittmatter@0 402 def set_hcc(self, hcc_acc, incl_terms=True, excl_terms=[]):
cstrittmatter@0 403 self.hcc = hcc_acc
cstrittmatter@0 404 self._hcc_state = (incl_terms, excl_terms)
cstrittmatter@0 405
cstrittmatter@0 406 def bin_terms(self, bin_package):
cstrittmatter@0 407 '''Categorize terms by those in Ontology_package; saves results in bins attribute'''
cstrittmatter@0 408 if self._bin_state == bin_package:
cstrittmatter@0 409 return
cstrittmatter@0 410 package_bins = []
cstrittmatter@0 411 for x in self.ontologies:
cstrittmatter@0 412 package_bins.extend(x.bin_term(bin_package))
cstrittmatter@0 413 self.bins = list(set(package_bins))
cstrittmatter@0 414
cstrittmatter@0 415 def visualize_terms(self, o_file, fill_out=False, show_lcp=False, show_hcc=False,
cstrittmatter@0 416 node_color='black', edge_color='black',
cstrittmatter@0 417 lcp_stop=False, hcc_stop=False, trim_nodes=False):
cstrittmatter@0 418 '''Visualize terms'''
cstrittmatter@0 419 if self.graph_nodes=='not assigned yet' or self.graph_fill!=fill_out:
cstrittmatter@0 420 self.graph_nodes = []
cstrittmatter@0 421 for x in self.ontologies:
cstrittmatter@0 422 if lcp_stop and not hcc_stop:
cstrittmatter@0 423 if x in self.lcp:
cstrittmatter@0 424 continue
cstrittmatter@0 425 x.visualize_term(o_file, fill_out=fill_out,
cstrittmatter@0 426 stop_terms=self.lcp, draw_graph=False)
cstrittmatter@0 427 elif hcc_stop and not lcp_stop:
cstrittmatter@0 428 if x in self.hcc:
cstrittmatter@0 429 continue
cstrittmatter@0 430 x.visualize_term(o_file, fill_out=fill_out,
cstrittmatter@0 431 stop_terms=self.hcc, draw_graph=False)
cstrittmatter@0 432 elif hcc_stop and lcp_stop:
cstrittmatter@0 433 if x in self.lcp+self.hcc:
cstrittmatter@0 434 continue
cstrittmatter@0 435 x.visualize_term(o_file, fill_out=fill_out,
cstrittmatter@0 436 stop_terms=self.lcp+self.hcc, draw_graph=False)
cstrittmatter@0 437 else:
cstrittmatter@0 438 x.visualize_term(o_file, fill_out=fill_out, draw_graph=False)
cstrittmatter@0 439 self.graph_nodes.extend([z for z in x.graph_nodes if z not in self.graph_nodes])
cstrittmatter@0 440 if trim_nodes:
cstrittmatter@0 441 self._trim_tips()
cstrittmatter@0 442 if len(self.graph_nodes) > 150:
cstrittmatter@0 443 edge_string = 'Parent node\tChild node'
cstrittmatter@0 444 for edge_tuple in self.graph_nodes:
cstrittmatter@0 445 edge_string += '\n'+'\t'.join(edge_tuple)
cstrittmatter@0 446 logging.info(f'Not drawing graph with {len(self.graph_nodes)} edges:\
cstrittmatter@0 447 \n\n{edge_string}\n')
cstrittmatter@0 448 else:
cstrittmatter@0 449 self._draw_graph(o_file,node_color,edge_color,show_lcp,show_hcc)