Mercurial > repos > kkonganti > cfsan_lexmapr2
comparison lexmapr/ontology_reasoner.py @ 0:f5c39d0447be
"planemo upload"
author | kkonganti |
---|---|
date | Wed, 31 Aug 2022 14:32:07 -0400 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:f5c39d0447be |
---|---|
1 """Ontology finder and visualizer""" | |
2 | |
3 import copy, json, logging, requests, time | |
4 import pygraphviz as pgv | |
5 | |
6 logging.getLogger('urllib3').setLevel(logging.WARNING) | |
7 | |
8 | |
9 # TODO: figure out what to do with root Thing:Thing | |
10 class Ontology_accession: | |
11 '''Base class for defining attributes and behavior of single ontology accesions; | |
12 Assume format definition (whitespace and punctuation okay):ontology_id''' | |
13 existing_ontologies = {} | |
14 | |
15 @staticmethod | |
16 def make_instance(acc): | |
17 '''Use instead of default __init__ to enforce one instance per ontology''' | |
18 try: | |
19 return(Ontology_accession.existing_ontologies[acc]) | |
20 except(KeyError): | |
21 Ontology_accession.existing_ontologies[acc] = Ontology_accession(acc) | |
22 return(Ontology_accession.existing_ontologies[acc]) | |
23 | |
24 def __init__(self, acc): | |
25 '''If ontology is not recognized, just use short form, ex THING''' | |
26 def_split = acc.split(':') | |
27 self.label = ':'.join(def_split[:-1]) | |
28 self.id = def_split[-1].replace('_',':') | |
29 self.parents = 'not assigned yet' | |
30 self.children = 'not assigned yet' | |
31 self.ancestors = 'not assigned yet' | |
32 self.descendants = 'not assigned yet' | |
33 self.graph_nodes = 'not assigned yet' | |
34 self.graph_fill = False | |
35 self.ontology = def_split[1].split('_')[0] | |
36 if self.label == '': | |
37 self._get_label() | |
38 | |
39 def _api_results(self, input_list, return_list): | |
40 '''Ignore obsolete terms, not currently checking for \'term_replaced_by\'''' | |
41 for x_term in input_list: | |
42 if x_term['is_obsolete']: | |
43 continue | |
44 new_term = x_term['label'] + ':' + x_term['short_form'] | |
45 return_list.append(Ontology_accession.make_instance(new_term)) | |
46 return(return_list) | |
47 | |
48 def _add_edges(self, family_member, family_list, edge_set, round_num): | |
49 '''Add edges to graph''' | |
50 if edge_set == []: | |
51 return(edge_set) | |
52 elif round_num > 0: | |
53 for x in family_list: | |
54 x.get_family(family_member) | |
55 if family_member == 'parents': # TODO: how get x.family_member to collapse code | |
56 if x.parents == ['none found']: | |
57 continue | |
58 if len(x.parents) > 5: | |
59 time.sleep(0.05) | |
60 new_edges = [(y._graph_label(),x._graph_label()) for y in x.parents] | |
61 edge_set = edge_set + [z for z in new_edges if z not in edge_set] | |
62 edge_set = x._add_edges(family_member, x.parents, edge_set, round_num-1) | |
63 elif family_member == 'children': | |
64 if x.children == ['none found']: | |
65 continue | |
66 if len(x.children) > 5: | |
67 time.sleep(0.05) | |
68 new_edges = [(x._graph_label(),y._graph_label()) for y in x.children] | |
69 edge_set = edge_set + [z for z in new_edges if z not in edge_set] | |
70 edge_set = x._add_edges(family_member, x.children, edge_set, round_num-1) | |
71 return(edge_set) | |
72 | |
73 def _draw_graph(self, o_file, node_color, edge_color): | |
74 '''Draw and save the graph''' | |
75 ontol_graph = pgv.AGraph(name='ontology_graph') | |
76 ontol_graph.add_node(self._graph_label()) | |
77 for x in self.graph_nodes: | |
78 ontol_graph.add_edge(x[0], x[1]) | |
79 ontol_graph.node_attr.update(shape='box', | |
80 style='rounded,filled', | |
81 fillcolor='lightgrey', | |
82 color=node_color) | |
83 ontol_graph.edge_attr.update(shape='normal', | |
84 color=edge_color, | |
85 dir='back') | |
86 ontol_graph.get_node(self._graph_label()).attr.update(fillcolor='lightblue') | |
87 # TODO: determine best algorithm: neato, fdp, nop, twopi; tried circo; not dot, sfdp | |
88 ontol_graph.draw(o_file, prog='twopi') | |
89 | |
90 def _expand_edge(self, family_member, family_list, edge_set, old_set='', stop_terms=False): | |
91 '''Add edges to graph''' | |
92 while old_set != edge_set: | |
93 old_set = copy.deepcopy(edge_set) | |
94 for x in family_list: | |
95 if x == 'none found': | |
96 break | |
97 if type(stop_terms) == list: | |
98 if x in stop_terms: | |
99 break | |
100 x.get_family(family_member) | |
101 if family_member == 'parents': # TODO: how get x.family_member to collapse code | |
102 if x.parents == ['none found']: | |
103 continue | |
104 if len(x.parents) > 5: | |
105 time.sleep(0.05) | |
106 new_edges = [(y._graph_label(),x._graph_label()) for y in x.parents] | |
107 edge_set = edge_set + [z for z in new_edges if z not in edge_set] | |
108 edge_set = x._expand_edge(family_member,x.parents,edge_set,old_set,stop_terms) | |
109 elif family_member == 'children': | |
110 if x.children == ['none found']: | |
111 continue | |
112 if len(x.children) > 5: | |
113 time.sleep(0.05) | |
114 new_edges = [(x._graph_label(),y._graph_label()) for y in x.children] | |
115 edge_set = edge_set + [z for z in new_edges if z not in edge_set] | |
116 edge_set = x._expand_edge(family_member,x.children,edge_set,old_set,stop_terms) | |
117 return(edge_set) | |
118 | |
119 def _get_label(self): | |
120 '''Retrieve definition is correct for an id; updates instance''' | |
121 query_url = 'http://www.ebi.ac.uk/ols/api/terms?obo_id={}'.format(self.id) | |
122 ols_resp = self._get_request(query_url) | |
123 if ols_resp is None: | |
124 logging.warning(f'Did not retrieve PURL for {self.id}') | |
125 self.label = 'unk' | |
126 return | |
127 try: | |
128 self.label = ols_resp.json()['_embedded']['terms'][0]['label'] | |
129 except(KeyError): | |
130 logging.warning(f'Did not find label for {self.id} in OLS') | |
131 self.label = 'unk' | |
132 except json.decoder.JSONDecodeError as err: | |
133 time.sleep(0.05) | |
134 self._get_label() | |
135 | |
136 def _get_request(self, request_url, max_retries=5): | |
137 '''Retrieve URL''' | |
138 while max_retries > 0: | |
139 try: | |
140 return(requests.get(request_url)) | |
141 except: | |
142 time.sleep(0.05) | |
143 max_retries -= 1 | |
144 return(None) | |
145 | |
146 def _graph_label(self): | |
147 '''Format a graph label''' | |
148 return(self.id+'\\n'+self.label) | |
149 | |
150 def _next_page(self, url_link, return_list): | |
151 '''Get next page of search results''' | |
152 next_resp = self._get_request(url_link) | |
153 if next_resp is None: | |
154 logging.warning(f'Did not retrieve URL for {url_link} during API search') | |
155 return(False, return_list) | |
156 else: | |
157 try: | |
158 next_link = next_resp.json()['_links']['next']['href'] | |
159 except(KeyError): | |
160 next_link = False | |
161 return_list = self._api_results(next_resp.json()['_embedded']['terms'], return_list) | |
162 return(next_link, return_list) | |
163 | |
164 def check_label(self): | |
165 '''Check if given definition is correct for an id; returns Boolean or str `unk`''' | |
166 self._get_label() | |
167 if self.label != 'unk': | |
168 return(ols_resp.json()['_embedded']['terms'][0]['label'] == self.label) | |
169 else: | |
170 return(self.label) | |
171 | |
172 def get_family(self, family_member): | |
173 '''Returns list of parents, ancestors, children or descendants''' | |
174 if family_member == 'parents' and self.parents != 'not assigned yet': | |
175 return(self.parents) | |
176 elif family_member == 'children' and self.children != 'not assigned yet': | |
177 return(self.children) | |
178 elif family_member == 'ancestors' and self.ancestors != 'not assigned yet': | |
179 return(self.ancestors) | |
180 elif family_member == 'descendants' and self.descendants != 'not assigned yet': | |
181 return(self.descendants) | |
182 | |
183 if self.id.split(':')[0].lower() == 'gaz': | |
184 query_url = 'https://www.ebi.ac.uk/ols/api/ontologies/gaz/terms?iri=' | |
185 query_url += 'http://purl.obolibrary.org/obo/' + self.id.replace(':','_') | |
186 ols_resp = self._get_request(query_url) | |
187 qry_url = ols_resp.json()['_embedded']['terms'][0]['_links']\ | |
188 ['hierarchical'+family_member.title()]['href'] | |
189 else: | |
190 query_url = 'http://www.ebi.ac.uk/ols/api/ontologies/{}/{}?id={}' | |
191 qry_url = query_url.format(self.id.split(':')[0].lower(),family_member,self.id) | |
192 | |
193 ols_resp = self._get_request(qry_url) | |
194 if ols_resp is None: | |
195 logging.warning(f'Did not get URL for {url_link} during search for {family_member}') | |
196 result_list = ['none found'] | |
197 elif ols_resp.status_code > 200: | |
198 result_list = ['none found'] | |
199 elif ols_resp.json()['page']['totalElements'] > 0: | |
200 result_list = self._api_results(ols_resp.json()['_embedded']['terms'], []) | |
201 if ols_resp.json()['page']['totalPages'] > 1: | |
202 next_url = ols_resp.json()['_links']['next']['href'] | |
203 while next_url: | |
204 next_url,result_list = self._next_page(next_url,result_list) | |
205 else: | |
206 result_list = ['none found'] | |
207 | |
208 if family_member == 'parents': | |
209 self.parents = list(set(result_list)) | |
210 elif family_member == 'children': | |
211 self.children = list(set(result_list)) | |
212 elif family_member == 'ancestors': | |
213 self.ancestors = list(set(result_list)) | |
214 elif family_member == 'descendants': | |
215 self.descendants = list(set(result_list)) | |
216 return(result_list) | |
217 | |
218 def bin_term(self, bin_package): | |
219 '''Categorize term into given bins as Ontology_package''' | |
220 term_bins = [] | |
221 self.get_family('ancestors') | |
222 if self.ancestors == ['none found']: | |
223 ancestor_labels = [x.label + ':' + x.id.replace(':','_') for x in [self]] | |
224 else: | |
225 ancestor_labels = [x.label+':'+x.id.replace(':','_') for x in [self]+self.ancestors] | |
226 return([x for x in ancestor_labels if x in bin_package.ontologies]) | |
227 | |
228 def visualize_term(self, o_file, node_color='black', edge_color='black', | |
229 fill_out=False, stop_terms=False, draw_graph=True): | |
230 '''Visualize one term''' | |
231 if self.graph_nodes!='not assigned yet' and self.graph_fill==fill_out: | |
232 if draw_graph: | |
233 self._draw_graph(o_file, node_color, edge_color) | |
234 else: | |
235 self.get_family('parents') | |
236 self.get_family('children') | |
237 edge_set1,edge_set2 = [],[] | |
238 if self.parents != ['none found']: | |
239 edge_set1 = [(x._graph_label(),self._graph_label()) for x in self.parents] | |
240 if self.children != ['none found']: | |
241 edge_set2 = [(self._graph_label(),x._graph_label()) for x in self.children] | |
242 if type(fill_out) == int: | |
243 edge_set1 = self._add_edges('parents', self.parents, edge_set1, fill_out-1) | |
244 edge_set2 = self._add_edges('children', self.children, edge_set2, fill_out-1) | |
245 elif fill_out==True: | |
246 edge_set1 = self._expand_edge('parents',self.parents,edge_set1,'',stop_terms) | |
247 edge_set2 = self._expand_edge('children',self.children,edge_set2,'',stop_terms) | |
248 self.graph_nodes = list(set(edge_set1+edge_set2)) | |
249 if draw_graph: | |
250 self._draw_graph(o_file, node_color, edge_color) | |
251 | |
252 | |
253 class Ontology_package: | |
254 '''Associate or package Ontology_accession objects together''' | |
255 def __init__(self, package_label, ontol_list): | |
256 self.label = package_label | |
257 self.ontologies = ontol_list | |
258 self.bins = [] | |
259 self.lcp = 'not assigned yet' | |
260 self.hcc = 'not assigned yet' | |
261 self._lcp_state = (True,[]) | |
262 self._hcc_state = (True,[]) | |
263 self._bin_state = [] | |
264 self.graph_nodes = 'not assigned yet' | |
265 self.graph_state = False | |
266 | |
267 def _common_family(self,family_member,incl_terms,excl_terms): | |
268 '''Find common family members''' | |
269 family_candidates = {} | |
270 for ontol_term in [x for x in self.ontologies if x.id not in excl_terms]: | |
271 family_candidates[ontol_term] = ontol_term.get_family(family_member) | |
272 common_members = self._common_list(family_candidates, incl_terms) | |
273 while common_members == []: | |
274 for ontol_term in [x for x in self.ontologies if x.id not in excl_terms]: | |
275 if len(self.ontologies) > 30: | |
276 time.sleep(0.05) | |
277 original_list = list(family_candidates[ontol_term]) | |
278 for family_ontol in original_list: | |
279 if len(original_list) > 30: | |
280 time.sleep(0.05) | |
281 try: | |
282 family_candidates[ontol_term].extend(\ | |
283 family_ontol.get_family(family_member)) | |
284 except(AttributeError): | |
285 family_candidates[ontol_term].extend(['none found']) | |
286 return(common_members) | |
287 | |
288 def _common_list(self, input_dic, incl_terms): | |
289 '''Compare input dictionary keys and list''' | |
290 term_lists = [] | |
291 for ontol_key in input_dic: | |
292 append_list = [ontol_key] | |
293 for ontol_val in input_dic[ontol_key]: | |
294 append_list.append(ontol_val) | |
295 term_lists.append(append_list) | |
296 common_set = set.intersection(*map(set, term_lists)) | |
297 if incl_terms: | |
298 common_keys = [] | |
299 for ontol_acc in common_set: | |
300 if ontol_acc in input_dic.keys(): | |
301 common_keys.append(ontol_acc) | |
302 if common_keys != []: | |
303 return(common_keys) | |
304 return(list(common_set - set(input_dic.keys()))) | |
305 | |
306 def _draw_graph(self, o_file, node_color, edge_color, show_lcp, show_hcc): | |
307 '''Draw and save graph''' | |
308 ontol_graph = pgv.AGraph(name='ontology_graph') | |
309 for x in self.ontologies: | |
310 ontol_graph.add_node(x._graph_label()) | |
311 for x in self.graph_nodes: | |
312 ontol_graph.add_edge(x[0], x[1]) | |
313 ontol_graph.node_attr.update(shape='box', style='rounded,filled', | |
314 fillcolor='lightgrey', color=node_color) | |
315 ontol_graph.edge_attr.update(shape='normal', color=edge_color, dir='back') | |
316 if show_lcp: | |
317 for x in self.lcp: | |
318 ontol_graph.get_node(x._graph_label()).attr.update(fillcolor='beige') | |
319 if show_hcc: | |
320 for x in self.hcc: | |
321 ontol_graph.get_node(x._graph_label()).attr.update(fillcolor='beige') | |
322 for x in self.ontologies: | |
323 ontol_graph.get_node(x._graph_label()).attr.update(fillcolor='lightblue') | |
324 ontol_graph.draw(o_file,prog='dot') | |
325 | |
326 def _list_hierarchy(self, input_list, input_position): | |
327 '''Get lowest or highest terms''' | |
328 if input_list == ['none found']: | |
329 return(input_list) | |
330 family_lists = {} | |
331 for input_term in input_list: | |
332 if len(input_list) > 30: time.sleep(0.05) | |
333 if input_position == 'lowest': | |
334 if input_term == 'none found': | |
335 family_list = 'none found' | |
336 else: | |
337 family_list = input_term.get_family('ancestors') | |
338 elif input_position == 'highest': | |
339 if input_term == 'none found': | |
340 family_list = 'none found' | |
341 else: | |
342 family_list = input_term.get_family('descendants') | |
343 family_lists[input_term] = family_list | |
344 while True: | |
345 remove_terms = [] | |
346 for input_term in input_list: | |
347 if [True for f_l in family_lists if input_term in family_lists[f_l]] != []: | |
348 del family_lists[input_term] | |
349 remove_terms.append(input_term) | |
350 if remove_terms != []: | |
351 for x_term in remove_terms: | |
352 input_list.remove(x_term) | |
353 else: | |
354 break | |
355 return(input_list) | |
356 | |
357 def _trim_tips(self): | |
358 '''Remove descendants of self.ontologies and parents of self.lcp''' | |
359 tip_nodes = [x._graph_label() for x in self.ontologies] +\ | |
360 [x._graph_label() for x in self.lcp] | |
361 old_nodes = [] | |
362 while old_nodes != self.graph_nodes: | |
363 old_nodes = self.graph_nodes | |
364 right_nodes = set() | |
365 left_nodes = set() | |
366 for x in self.graph_nodes: | |
367 left_nodes.add(x[0]) | |
368 right_nodes.add(x[1]) | |
369 top_nodes = [x for x in left_nodes.difference(right_nodes) if x not in tip_nodes] | |
370 bot_nodes = [x for x in right_nodes.difference(left_nodes) if x not in tip_nodes] | |
371 self.graph_nodes = [x for x in self.graph_nodes if x[0] not in top_nodes] | |
372 self.graph_nodes = [x for x in self.graph_nodes if x[1] not in bot_nodes] | |
373 | |
374 def get_lcp(self, incl_terms=True, excl_terms=[]): # TODO: missing excl_terms | |
375 '''Find lowest common parent(s); can include input terms as lcp, | |
376 exclude terms by obo id; saves results in lcp attribute''' | |
377 if self._lcp_state == (incl_terms, excl_terms): | |
378 if self.lcp != 'not assigned yet': | |
379 return | |
380 common_members = self._common_family('parents',incl_terms, excl_terms) | |
381 common_members = self._list_hierarchy(common_members, 'lowest') | |
382 if common_members != []: | |
383 self.lcp = common_members | |
384 self._lcp_state = (incl_terms, excl_terms) | |
385 | |
386 def get_hcc(self, incl_terms=True, excl_terms=[]): | |
387 '''Get highest common child(ren); can include input terms as hcc; | |
388 exclude terms by obo id; saves results in hcc attribute''' | |
389 if self._hcc_state == (incl_terms, excl_terms): | |
390 if self.hcc != 'not assigned yet': | |
391 return | |
392 common_members = self._common_family('children', incl_terms, excl_terms) | |
393 common_members = self._list_hierarchy(common_members, 'highest') | |
394 if common_members != []: | |
395 self.hcc = common_members | |
396 self._hcc_state = (incl_terms, excl_terms) | |
397 | |
398 def set_lcp(self, lcp_acc, incl_terms=True, excl_terms=[]): | |
399 self.lcp = lcp_acc | |
400 self._lcp_state = (incl_terms, excl_terms) | |
401 | |
402 def set_hcc(self, hcc_acc, incl_terms=True, excl_terms=[]): | |
403 self.hcc = hcc_acc | |
404 self._hcc_state = (incl_terms, excl_terms) | |
405 | |
406 def bin_terms(self, bin_package): | |
407 '''Categorize terms by those in Ontology_package; saves results in bins attribute''' | |
408 if self._bin_state == bin_package: | |
409 return | |
410 package_bins = [] | |
411 for x in self.ontologies: | |
412 package_bins.extend(x.bin_term(bin_package)) | |
413 self.bins = list(set(package_bins)) | |
414 | |
415 def visualize_terms(self, o_file, fill_out=False, show_lcp=False, show_hcc=False, | |
416 node_color='black', edge_color='black', | |
417 lcp_stop=False, hcc_stop=False, trim_nodes=False): | |
418 '''Visualize terms''' | |
419 if self.graph_nodes=='not assigned yet' or self.graph_fill!=fill_out: | |
420 self.graph_nodes = [] | |
421 for x in self.ontologies: | |
422 if lcp_stop and not hcc_stop: | |
423 if x in self.lcp: | |
424 continue | |
425 x.visualize_term(o_file, fill_out=fill_out, | |
426 stop_terms=self.lcp, draw_graph=False) | |
427 elif hcc_stop and not lcp_stop: | |
428 if x in self.hcc: | |
429 continue | |
430 x.visualize_term(o_file, fill_out=fill_out, | |
431 stop_terms=self.hcc, draw_graph=False) | |
432 elif hcc_stop and lcp_stop: | |
433 if x in self.lcp+self.hcc: | |
434 continue | |
435 x.visualize_term(o_file, fill_out=fill_out, | |
436 stop_terms=self.lcp+self.hcc, draw_graph=False) | |
437 else: | |
438 x.visualize_term(o_file, fill_out=fill_out, draw_graph=False) | |
439 self.graph_nodes.extend([z for z in x.graph_nodes if z not in self.graph_nodes]) | |
440 if trim_nodes: | |
441 self._trim_tips() | |
442 if len(self.graph_nodes) > 150: | |
443 edge_string = 'Parent node\tChild node' | |
444 for edge_tuple in self.graph_nodes: | |
445 edge_string += '\n'+'\t'.join(edge_tuple) | |
446 logging.info(f'Not drawing graph with {len(self.graph_nodes)} edges:\ | |
447 \n\n{edge_string}\n') | |
448 else: | |
449 self._draw_graph(o_file,node_color,edge_color,show_lcp,show_hcc) |