kkonganti@0
|
1 """Ontology finder and visualizer"""
|
kkonganti@0
|
2
|
kkonganti@0
|
3 import copy, json, logging, requests, time
|
kkonganti@0
|
4 import pygraphviz as pgv
|
kkonganti@0
|
5
|
kkonganti@0
|
6 logging.getLogger('urllib3').setLevel(logging.WARNING)
|
kkonganti@0
|
7
|
kkonganti@0
|
8
|
kkonganti@0
|
9 # TODO: figure out what to do with root Thing:Thing
|
kkonganti@0
|
10 class Ontology_accession:
|
kkonganti@0
|
11 '''Base class for defining attributes and behavior of single ontology accesions;
|
kkonganti@0
|
12 Assume format definition (whitespace and punctuation okay):ontology_id'''
|
kkonganti@0
|
13 existing_ontologies = {}
|
kkonganti@0
|
14
|
kkonganti@0
|
15 @staticmethod
|
kkonganti@0
|
16 def make_instance(acc):
|
kkonganti@0
|
17 '''Use instead of default __init__ to enforce one instance per ontology'''
|
kkonganti@0
|
18 try:
|
kkonganti@0
|
19 return(Ontology_accession.existing_ontologies[acc])
|
kkonganti@0
|
20 except(KeyError):
|
kkonganti@0
|
21 Ontology_accession.existing_ontologies[acc] = Ontology_accession(acc)
|
kkonganti@0
|
22 return(Ontology_accession.existing_ontologies[acc])
|
kkonganti@0
|
23
|
kkonganti@0
|
24 def __init__(self, acc):
|
kkonganti@0
|
25 '''If ontology is not recognized, just use short form, ex THING'''
|
kkonganti@0
|
26 def_split = acc.split(':')
|
kkonganti@0
|
27 self.label = ':'.join(def_split[:-1])
|
kkonganti@0
|
28 self.id = def_split[-1].replace('_',':')
|
kkonganti@0
|
29 self.parents = 'not assigned yet'
|
kkonganti@0
|
30 self.children = 'not assigned yet'
|
kkonganti@0
|
31 self.ancestors = 'not assigned yet'
|
kkonganti@0
|
32 self.descendants = 'not assigned yet'
|
kkonganti@0
|
33 self.graph_nodes = 'not assigned yet'
|
kkonganti@0
|
34 self.graph_fill = False
|
kkonganti@0
|
35 self.ontology = def_split[1].split('_')[0]
|
kkonganti@0
|
36 if self.label == '':
|
kkonganti@0
|
37 self._get_label()
|
kkonganti@0
|
38
|
kkonganti@0
|
39 def _api_results(self, input_list, return_list):
|
kkonganti@0
|
40 '''Ignore obsolete terms, not currently checking for \'term_replaced_by\''''
|
kkonganti@0
|
41 for x_term in input_list:
|
kkonganti@0
|
42 if x_term['is_obsolete']:
|
kkonganti@0
|
43 continue
|
kkonganti@0
|
44 new_term = x_term['label'] + ':' + x_term['short_form']
|
kkonganti@0
|
45 return_list.append(Ontology_accession.make_instance(new_term))
|
kkonganti@0
|
46 return(return_list)
|
kkonganti@0
|
47
|
kkonganti@0
|
48 def _add_edges(self, family_member, family_list, edge_set, round_num):
|
kkonganti@0
|
49 '''Add edges to graph'''
|
kkonganti@0
|
50 if edge_set == []:
|
kkonganti@0
|
51 return(edge_set)
|
kkonganti@0
|
52 elif round_num > 0:
|
kkonganti@0
|
53 for x in family_list:
|
kkonganti@0
|
54 x.get_family(family_member)
|
kkonganti@0
|
55 if family_member == 'parents': # TODO: how get x.family_member to collapse code
|
kkonganti@0
|
56 if x.parents == ['none found']:
|
kkonganti@0
|
57 continue
|
kkonganti@0
|
58 if len(x.parents) > 5:
|
kkonganti@0
|
59 time.sleep(0.05)
|
kkonganti@0
|
60 new_edges = [(y._graph_label(),x._graph_label()) for y in x.parents]
|
kkonganti@0
|
61 edge_set = edge_set + [z for z in new_edges if z not in edge_set]
|
kkonganti@0
|
62 edge_set = x._add_edges(family_member, x.parents, edge_set, round_num-1)
|
kkonganti@0
|
63 elif family_member == 'children':
|
kkonganti@0
|
64 if x.children == ['none found']:
|
kkonganti@0
|
65 continue
|
kkonganti@0
|
66 if len(x.children) > 5:
|
kkonganti@0
|
67 time.sleep(0.05)
|
kkonganti@0
|
68 new_edges = [(x._graph_label(),y._graph_label()) for y in x.children]
|
kkonganti@0
|
69 edge_set = edge_set + [z for z in new_edges if z not in edge_set]
|
kkonganti@0
|
70 edge_set = x._add_edges(family_member, x.children, edge_set, round_num-1)
|
kkonganti@0
|
71 return(edge_set)
|
kkonganti@0
|
72
|
kkonganti@0
|
73 def _draw_graph(self, o_file, node_color, edge_color):
|
kkonganti@0
|
74 '''Draw and save the graph'''
|
kkonganti@0
|
75 ontol_graph = pgv.AGraph(name='ontology_graph')
|
kkonganti@0
|
76 ontol_graph.add_node(self._graph_label())
|
kkonganti@0
|
77 for x in self.graph_nodes:
|
kkonganti@0
|
78 ontol_graph.add_edge(x[0], x[1])
|
kkonganti@0
|
79 ontol_graph.node_attr.update(shape='box',
|
kkonganti@0
|
80 style='rounded,filled',
|
kkonganti@0
|
81 fillcolor='lightgrey',
|
kkonganti@0
|
82 color=node_color)
|
kkonganti@0
|
83 ontol_graph.edge_attr.update(shape='normal',
|
kkonganti@0
|
84 color=edge_color,
|
kkonganti@0
|
85 dir='back')
|
kkonganti@0
|
86 ontol_graph.get_node(self._graph_label()).attr.update(fillcolor='lightblue')
|
kkonganti@0
|
87 # TODO: determine best algorithm: neato, fdp, nop, twopi; tried circo; not dot, sfdp
|
kkonganti@0
|
88 ontol_graph.draw(o_file, prog='twopi')
|
kkonganti@0
|
89
|
kkonganti@0
|
90 def _expand_edge(self, family_member, family_list, edge_set, old_set='', stop_terms=False):
|
kkonganti@0
|
91 '''Add edges to graph'''
|
kkonganti@0
|
92 while old_set != edge_set:
|
kkonganti@0
|
93 old_set = copy.deepcopy(edge_set)
|
kkonganti@0
|
94 for x in family_list:
|
kkonganti@0
|
95 if x == 'none found':
|
kkonganti@0
|
96 break
|
kkonganti@0
|
97 if type(stop_terms) == list:
|
kkonganti@0
|
98 if x in stop_terms:
|
kkonganti@0
|
99 break
|
kkonganti@0
|
100 x.get_family(family_member)
|
kkonganti@0
|
101 if family_member == 'parents': # TODO: how get x.family_member to collapse code
|
kkonganti@0
|
102 if x.parents == ['none found']:
|
kkonganti@0
|
103 continue
|
kkonganti@0
|
104 if len(x.parents) > 5:
|
kkonganti@0
|
105 time.sleep(0.05)
|
kkonganti@0
|
106 new_edges = [(y._graph_label(),x._graph_label()) for y in x.parents]
|
kkonganti@0
|
107 edge_set = edge_set + [z for z in new_edges if z not in edge_set]
|
kkonganti@0
|
108 edge_set = x._expand_edge(family_member,x.parents,edge_set,old_set,stop_terms)
|
kkonganti@0
|
109 elif family_member == 'children':
|
kkonganti@0
|
110 if x.children == ['none found']:
|
kkonganti@0
|
111 continue
|
kkonganti@0
|
112 if len(x.children) > 5:
|
kkonganti@0
|
113 time.sleep(0.05)
|
kkonganti@0
|
114 new_edges = [(x._graph_label(),y._graph_label()) for y in x.children]
|
kkonganti@0
|
115 edge_set = edge_set + [z for z in new_edges if z not in edge_set]
|
kkonganti@0
|
116 edge_set = x._expand_edge(family_member,x.children,edge_set,old_set,stop_terms)
|
kkonganti@0
|
117 return(edge_set)
|
kkonganti@0
|
118
|
kkonganti@0
|
119 def _get_label(self):
|
kkonganti@0
|
120 '''Retrieve definition is correct for an id; updates instance'''
|
kkonganti@0
|
121 query_url = 'http://www.ebi.ac.uk/ols/api/terms?obo_id={}'.format(self.id)
|
kkonganti@0
|
122 ols_resp = self._get_request(query_url)
|
kkonganti@0
|
123 if ols_resp is None:
|
kkonganti@0
|
124 logging.warning(f'Did not retrieve PURL for {self.id}')
|
kkonganti@0
|
125 self.label = 'unk'
|
kkonganti@0
|
126 return
|
kkonganti@0
|
127 try:
|
kkonganti@0
|
128 self.label = ols_resp.json()['_embedded']['terms'][0]['label']
|
kkonganti@0
|
129 except(KeyError):
|
kkonganti@0
|
130 logging.warning(f'Did not find label for {self.id} in OLS')
|
kkonganti@0
|
131 self.label = 'unk'
|
kkonganti@0
|
132 except json.decoder.JSONDecodeError as err:
|
kkonganti@0
|
133 time.sleep(0.05)
|
kkonganti@0
|
134 self._get_label()
|
kkonganti@0
|
135
|
kkonganti@0
|
136 def _get_request(self, request_url, max_retries=5):
|
kkonganti@0
|
137 '''Retrieve URL'''
|
kkonganti@0
|
138 while max_retries > 0:
|
kkonganti@0
|
139 try:
|
kkonganti@0
|
140 return(requests.get(request_url))
|
kkonganti@0
|
141 except:
|
kkonganti@0
|
142 time.sleep(0.05)
|
kkonganti@0
|
143 max_retries -= 1
|
kkonganti@0
|
144 return(None)
|
kkonganti@0
|
145
|
kkonganti@0
|
146 def _graph_label(self):
|
kkonganti@0
|
147 '''Format a graph label'''
|
kkonganti@0
|
148 return(self.id+'\\n'+self.label)
|
kkonganti@0
|
149
|
kkonganti@0
|
150 def _next_page(self, url_link, return_list):
|
kkonganti@0
|
151 '''Get next page of search results'''
|
kkonganti@0
|
152 next_resp = self._get_request(url_link)
|
kkonganti@0
|
153 if next_resp is None:
|
kkonganti@0
|
154 logging.warning(f'Did not retrieve URL for {url_link} during API search')
|
kkonganti@0
|
155 return(False, return_list)
|
kkonganti@0
|
156 else:
|
kkonganti@0
|
157 try:
|
kkonganti@0
|
158 next_link = next_resp.json()['_links']['next']['href']
|
kkonganti@0
|
159 except(KeyError):
|
kkonganti@0
|
160 next_link = False
|
kkonganti@0
|
161 return_list = self._api_results(next_resp.json()['_embedded']['terms'], return_list)
|
kkonganti@0
|
162 return(next_link, return_list)
|
kkonganti@0
|
163
|
kkonganti@0
|
164 def check_label(self):
|
kkonganti@0
|
165 '''Check if given definition is correct for an id; returns Boolean or str `unk`'''
|
kkonganti@0
|
166 self._get_label()
|
kkonganti@0
|
167 if self.label != 'unk':
|
kkonganti@0
|
168 return(ols_resp.json()['_embedded']['terms'][0]['label'] == self.label)
|
kkonganti@0
|
169 else:
|
kkonganti@0
|
170 return(self.label)
|
kkonganti@0
|
171
|
kkonganti@0
|
172 def get_family(self, family_member):
|
kkonganti@0
|
173 '''Returns list of parents, ancestors, children or descendants'''
|
kkonganti@0
|
174 if family_member == 'parents' and self.parents != 'not assigned yet':
|
kkonganti@0
|
175 return(self.parents)
|
kkonganti@0
|
176 elif family_member == 'children' and self.children != 'not assigned yet':
|
kkonganti@0
|
177 return(self.children)
|
kkonganti@0
|
178 elif family_member == 'ancestors' and self.ancestors != 'not assigned yet':
|
kkonganti@0
|
179 return(self.ancestors)
|
kkonganti@0
|
180 elif family_member == 'descendants' and self.descendants != 'not assigned yet':
|
kkonganti@0
|
181 return(self.descendants)
|
kkonganti@0
|
182
|
kkonganti@0
|
183 if self.id.split(':')[0].lower() == 'gaz':
|
kkonganti@0
|
184 query_url = 'https://www.ebi.ac.uk/ols/api/ontologies/gaz/terms?iri='
|
kkonganti@0
|
185 query_url += 'http://purl.obolibrary.org/obo/' + self.id.replace(':','_')
|
kkonganti@0
|
186 ols_resp = self._get_request(query_url)
|
kkonganti@0
|
187 qry_url = ols_resp.json()['_embedded']['terms'][0]['_links']\
|
kkonganti@0
|
188 ['hierarchical'+family_member.title()]['href']
|
kkonganti@0
|
189 else:
|
kkonganti@0
|
190 query_url = 'http://www.ebi.ac.uk/ols/api/ontologies/{}/{}?id={}'
|
kkonganti@0
|
191 qry_url = query_url.format(self.id.split(':')[0].lower(),family_member,self.id)
|
kkonganti@0
|
192
|
kkonganti@0
|
193 ols_resp = self._get_request(qry_url)
|
kkonganti@0
|
194 if ols_resp is None:
|
kkonganti@0
|
195 logging.warning(f'Did not get URL for {url_link} during search for {family_member}')
|
kkonganti@0
|
196 result_list = ['none found']
|
kkonganti@0
|
197 elif ols_resp.status_code > 200:
|
kkonganti@0
|
198 result_list = ['none found']
|
kkonganti@0
|
199 elif ols_resp.json()['page']['totalElements'] > 0:
|
kkonganti@0
|
200 result_list = self._api_results(ols_resp.json()['_embedded']['terms'], [])
|
kkonganti@0
|
201 if ols_resp.json()['page']['totalPages'] > 1:
|
kkonganti@0
|
202 next_url = ols_resp.json()['_links']['next']['href']
|
kkonganti@0
|
203 while next_url:
|
kkonganti@0
|
204 next_url,result_list = self._next_page(next_url,result_list)
|
kkonganti@0
|
205 else:
|
kkonganti@0
|
206 result_list = ['none found']
|
kkonganti@0
|
207
|
kkonganti@0
|
208 if family_member == 'parents':
|
kkonganti@0
|
209 self.parents = list(set(result_list))
|
kkonganti@0
|
210 elif family_member == 'children':
|
kkonganti@0
|
211 self.children = list(set(result_list))
|
kkonganti@0
|
212 elif family_member == 'ancestors':
|
kkonganti@0
|
213 self.ancestors = list(set(result_list))
|
kkonganti@0
|
214 elif family_member == 'descendants':
|
kkonganti@0
|
215 self.descendants = list(set(result_list))
|
kkonganti@0
|
216 return(result_list)
|
kkonganti@0
|
217
|
kkonganti@0
|
218 def bin_term(self, bin_package):
|
kkonganti@0
|
219 '''Categorize term into given bins as Ontology_package'''
|
kkonganti@0
|
220 term_bins = []
|
kkonganti@0
|
221 self.get_family('ancestors')
|
kkonganti@0
|
222 if self.ancestors == ['none found']:
|
kkonganti@0
|
223 ancestor_labels = [x.label + ':' + x.id.replace(':','_') for x in [self]]
|
kkonganti@0
|
224 else:
|
kkonganti@0
|
225 ancestor_labels = [x.label+':'+x.id.replace(':','_') for x in [self]+self.ancestors]
|
kkonganti@0
|
226 return([x for x in ancestor_labels if x in bin_package.ontologies])
|
kkonganti@0
|
227
|
kkonganti@0
|
228 def visualize_term(self, o_file, node_color='black', edge_color='black',
|
kkonganti@0
|
229 fill_out=False, stop_terms=False, draw_graph=True):
|
kkonganti@0
|
230 '''Visualize one term'''
|
kkonganti@0
|
231 if self.graph_nodes!='not assigned yet' and self.graph_fill==fill_out:
|
kkonganti@0
|
232 if draw_graph:
|
kkonganti@0
|
233 self._draw_graph(o_file, node_color, edge_color)
|
kkonganti@0
|
234 else:
|
kkonganti@0
|
235 self.get_family('parents')
|
kkonganti@0
|
236 self.get_family('children')
|
kkonganti@0
|
237 edge_set1,edge_set2 = [],[]
|
kkonganti@0
|
238 if self.parents != ['none found']:
|
kkonganti@0
|
239 edge_set1 = [(x._graph_label(),self._graph_label()) for x in self.parents]
|
kkonganti@0
|
240 if self.children != ['none found']:
|
kkonganti@0
|
241 edge_set2 = [(self._graph_label(),x._graph_label()) for x in self.children]
|
kkonganti@0
|
242 if type(fill_out) == int:
|
kkonganti@0
|
243 edge_set1 = self._add_edges('parents', self.parents, edge_set1, fill_out-1)
|
kkonganti@0
|
244 edge_set2 = self._add_edges('children', self.children, edge_set2, fill_out-1)
|
kkonganti@0
|
245 elif fill_out==True:
|
kkonganti@0
|
246 edge_set1 = self._expand_edge('parents',self.parents,edge_set1,'',stop_terms)
|
kkonganti@0
|
247 edge_set2 = self._expand_edge('children',self.children,edge_set2,'',stop_terms)
|
kkonganti@0
|
248 self.graph_nodes = list(set(edge_set1+edge_set2))
|
kkonganti@0
|
249 if draw_graph:
|
kkonganti@0
|
250 self._draw_graph(o_file, node_color, edge_color)
|
kkonganti@0
|
251
|
kkonganti@0
|
252
|
kkonganti@0
|
253 class Ontology_package:
|
kkonganti@0
|
254 '''Associate or package Ontology_accession objects together'''
|
kkonganti@0
|
255 def __init__(self, package_label, ontol_list):
|
kkonganti@0
|
256 self.label = package_label
|
kkonganti@0
|
257 self.ontologies = ontol_list
|
kkonganti@0
|
258 self.bins = []
|
kkonganti@0
|
259 self.lcp = 'not assigned yet'
|
kkonganti@0
|
260 self.hcc = 'not assigned yet'
|
kkonganti@0
|
261 self._lcp_state = (True,[])
|
kkonganti@0
|
262 self._hcc_state = (True,[])
|
kkonganti@0
|
263 self._bin_state = []
|
kkonganti@0
|
264 self.graph_nodes = 'not assigned yet'
|
kkonganti@0
|
265 self.graph_state = False
|
kkonganti@0
|
266
|
kkonganti@0
|
267 def _common_family(self,family_member,incl_terms,excl_terms):
|
kkonganti@0
|
268 '''Find common family members'''
|
kkonganti@0
|
269 family_candidates = {}
|
kkonganti@0
|
270 for ontol_term in [x for x in self.ontologies if x.id not in excl_terms]:
|
kkonganti@0
|
271 family_candidates[ontol_term] = ontol_term.get_family(family_member)
|
kkonganti@0
|
272 common_members = self._common_list(family_candidates, incl_terms)
|
kkonganti@0
|
273 while common_members == []:
|
kkonganti@0
|
274 for ontol_term in [x for x in self.ontologies if x.id not in excl_terms]:
|
kkonganti@0
|
275 if len(self.ontologies) > 30:
|
kkonganti@0
|
276 time.sleep(0.05)
|
kkonganti@0
|
277 original_list = list(family_candidates[ontol_term])
|
kkonganti@0
|
278 for family_ontol in original_list:
|
kkonganti@0
|
279 if len(original_list) > 30:
|
kkonganti@0
|
280 time.sleep(0.05)
|
kkonganti@0
|
281 try:
|
kkonganti@0
|
282 family_candidates[ontol_term].extend(\
|
kkonganti@0
|
283 family_ontol.get_family(family_member))
|
kkonganti@0
|
284 except(AttributeError):
|
kkonganti@0
|
285 family_candidates[ontol_term].extend(['none found'])
|
kkonganti@0
|
286 return(common_members)
|
kkonganti@0
|
287
|
kkonganti@0
|
288 def _common_list(self, input_dic, incl_terms):
|
kkonganti@0
|
289 '''Compare input dictionary keys and list'''
|
kkonganti@0
|
290 term_lists = []
|
kkonganti@0
|
291 for ontol_key in input_dic:
|
kkonganti@0
|
292 append_list = [ontol_key]
|
kkonganti@0
|
293 for ontol_val in input_dic[ontol_key]:
|
kkonganti@0
|
294 append_list.append(ontol_val)
|
kkonganti@0
|
295 term_lists.append(append_list)
|
kkonganti@0
|
296 common_set = set.intersection(*map(set, term_lists))
|
kkonganti@0
|
297 if incl_terms:
|
kkonganti@0
|
298 common_keys = []
|
kkonganti@0
|
299 for ontol_acc in common_set:
|
kkonganti@0
|
300 if ontol_acc in input_dic.keys():
|
kkonganti@0
|
301 common_keys.append(ontol_acc)
|
kkonganti@0
|
302 if common_keys != []:
|
kkonganti@0
|
303 return(common_keys)
|
kkonganti@0
|
304 return(list(common_set - set(input_dic.keys())))
|
kkonganti@0
|
305
|
kkonganti@0
|
306 def _draw_graph(self, o_file, node_color, edge_color, show_lcp, show_hcc):
|
kkonganti@0
|
307 '''Draw and save graph'''
|
kkonganti@0
|
308 ontol_graph = pgv.AGraph(name='ontology_graph')
|
kkonganti@0
|
309 for x in self.ontologies:
|
kkonganti@0
|
310 ontol_graph.add_node(x._graph_label())
|
kkonganti@0
|
311 for x in self.graph_nodes:
|
kkonganti@0
|
312 ontol_graph.add_edge(x[0], x[1])
|
kkonganti@0
|
313 ontol_graph.node_attr.update(shape='box', style='rounded,filled',
|
kkonganti@0
|
314 fillcolor='lightgrey', color=node_color)
|
kkonganti@0
|
315 ontol_graph.edge_attr.update(shape='normal', color=edge_color, dir='back')
|
kkonganti@0
|
316 if show_lcp:
|
kkonganti@0
|
317 for x in self.lcp:
|
kkonganti@0
|
318 ontol_graph.get_node(x._graph_label()).attr.update(fillcolor='beige')
|
kkonganti@0
|
319 if show_hcc:
|
kkonganti@0
|
320 for x in self.hcc:
|
kkonganti@0
|
321 ontol_graph.get_node(x._graph_label()).attr.update(fillcolor='beige')
|
kkonganti@0
|
322 for x in self.ontologies:
|
kkonganti@0
|
323 ontol_graph.get_node(x._graph_label()).attr.update(fillcolor='lightblue')
|
kkonganti@0
|
324 ontol_graph.draw(o_file,prog='dot')
|
kkonganti@0
|
325
|
kkonganti@0
|
326 def _list_hierarchy(self, input_list, input_position):
|
kkonganti@0
|
327 '''Get lowest or highest terms'''
|
kkonganti@0
|
328 if input_list == ['none found']:
|
kkonganti@0
|
329 return(input_list)
|
kkonganti@0
|
330 family_lists = {}
|
kkonganti@0
|
331 for input_term in input_list:
|
kkonganti@0
|
332 if len(input_list) > 30: time.sleep(0.05)
|
kkonganti@0
|
333 if input_position == 'lowest':
|
kkonganti@0
|
334 if input_term == 'none found':
|
kkonganti@0
|
335 family_list = 'none found'
|
kkonganti@0
|
336 else:
|
kkonganti@0
|
337 family_list = input_term.get_family('ancestors')
|
kkonganti@0
|
338 elif input_position == 'highest':
|
kkonganti@0
|
339 if input_term == 'none found':
|
kkonganti@0
|
340 family_list = 'none found'
|
kkonganti@0
|
341 else:
|
kkonganti@0
|
342 family_list = input_term.get_family('descendants')
|
kkonganti@0
|
343 family_lists[input_term] = family_list
|
kkonganti@0
|
344 while True:
|
kkonganti@0
|
345 remove_terms = []
|
kkonganti@0
|
346 for input_term in input_list:
|
kkonganti@0
|
347 if [True for f_l in family_lists if input_term in family_lists[f_l]] != []:
|
kkonganti@0
|
348 del family_lists[input_term]
|
kkonganti@0
|
349 remove_terms.append(input_term)
|
kkonganti@0
|
350 if remove_terms != []:
|
kkonganti@0
|
351 for x_term in remove_terms:
|
kkonganti@0
|
352 input_list.remove(x_term)
|
kkonganti@0
|
353 else:
|
kkonganti@0
|
354 break
|
kkonganti@0
|
355 return(input_list)
|
kkonganti@0
|
356
|
kkonganti@0
|
357 def _trim_tips(self):
|
kkonganti@0
|
358 '''Remove descendants of self.ontologies and parents of self.lcp'''
|
kkonganti@0
|
359 tip_nodes = [x._graph_label() for x in self.ontologies] +\
|
kkonganti@0
|
360 [x._graph_label() for x in self.lcp]
|
kkonganti@0
|
361 old_nodes = []
|
kkonganti@0
|
362 while old_nodes != self.graph_nodes:
|
kkonganti@0
|
363 old_nodes = self.graph_nodes
|
kkonganti@0
|
364 right_nodes = set()
|
kkonganti@0
|
365 left_nodes = set()
|
kkonganti@0
|
366 for x in self.graph_nodes:
|
kkonganti@0
|
367 left_nodes.add(x[0])
|
kkonganti@0
|
368 right_nodes.add(x[1])
|
kkonganti@0
|
369 top_nodes = [x for x in left_nodes.difference(right_nodes) if x not in tip_nodes]
|
kkonganti@0
|
370 bot_nodes = [x for x in right_nodes.difference(left_nodes) if x not in tip_nodes]
|
kkonganti@0
|
371 self.graph_nodes = [x for x in self.graph_nodes if x[0] not in top_nodes]
|
kkonganti@0
|
372 self.graph_nodes = [x for x in self.graph_nodes if x[1] not in bot_nodes]
|
kkonganti@0
|
373
|
kkonganti@0
|
374 def get_lcp(self, incl_terms=True, excl_terms=[]): # TODO: missing excl_terms
|
kkonganti@0
|
375 '''Find lowest common parent(s); can include input terms as lcp,
|
kkonganti@0
|
376 exclude terms by obo id; saves results in lcp attribute'''
|
kkonganti@0
|
377 if self._lcp_state == (incl_terms, excl_terms):
|
kkonganti@0
|
378 if self.lcp != 'not assigned yet':
|
kkonganti@0
|
379 return
|
kkonganti@0
|
380 common_members = self._common_family('parents',incl_terms, excl_terms)
|
kkonganti@0
|
381 common_members = self._list_hierarchy(common_members, 'lowest')
|
kkonganti@0
|
382 if common_members != []:
|
kkonganti@0
|
383 self.lcp = common_members
|
kkonganti@0
|
384 self._lcp_state = (incl_terms, excl_terms)
|
kkonganti@0
|
385
|
kkonganti@0
|
386 def get_hcc(self, incl_terms=True, excl_terms=[]):
|
kkonganti@0
|
387 '''Get highest common child(ren); can include input terms as hcc;
|
kkonganti@0
|
388 exclude terms by obo id; saves results in hcc attribute'''
|
kkonganti@0
|
389 if self._hcc_state == (incl_terms, excl_terms):
|
kkonganti@0
|
390 if self.hcc != 'not assigned yet':
|
kkonganti@0
|
391 return
|
kkonganti@0
|
392 common_members = self._common_family('children', incl_terms, excl_terms)
|
kkonganti@0
|
393 common_members = self._list_hierarchy(common_members, 'highest')
|
kkonganti@0
|
394 if common_members != []:
|
kkonganti@0
|
395 self.hcc = common_members
|
kkonganti@0
|
396 self._hcc_state = (incl_terms, excl_terms)
|
kkonganti@0
|
397
|
kkonganti@0
|
398 def set_lcp(self, lcp_acc, incl_terms=True, excl_terms=[]):
|
kkonganti@0
|
399 self.lcp = lcp_acc
|
kkonganti@0
|
400 self._lcp_state = (incl_terms, excl_terms)
|
kkonganti@0
|
401
|
kkonganti@0
|
402 def set_hcc(self, hcc_acc, incl_terms=True, excl_terms=[]):
|
kkonganti@0
|
403 self.hcc = hcc_acc
|
kkonganti@0
|
404 self._hcc_state = (incl_terms, excl_terms)
|
kkonganti@0
|
405
|
kkonganti@0
|
406 def bin_terms(self, bin_package):
|
kkonganti@0
|
407 '''Categorize terms by those in Ontology_package; saves results in bins attribute'''
|
kkonganti@0
|
408 if self._bin_state == bin_package:
|
kkonganti@0
|
409 return
|
kkonganti@0
|
410 package_bins = []
|
kkonganti@0
|
411 for x in self.ontologies:
|
kkonganti@0
|
412 package_bins.extend(x.bin_term(bin_package))
|
kkonganti@0
|
413 self.bins = list(set(package_bins))
|
kkonganti@0
|
414
|
kkonganti@0
|
415 def visualize_terms(self, o_file, fill_out=False, show_lcp=False, show_hcc=False,
|
kkonganti@0
|
416 node_color='black', edge_color='black',
|
kkonganti@0
|
417 lcp_stop=False, hcc_stop=False, trim_nodes=False):
|
kkonganti@0
|
418 '''Visualize terms'''
|
kkonganti@0
|
419 if self.graph_nodes=='not assigned yet' or self.graph_fill!=fill_out:
|
kkonganti@0
|
420 self.graph_nodes = []
|
kkonganti@0
|
421 for x in self.ontologies:
|
kkonganti@0
|
422 if lcp_stop and not hcc_stop:
|
kkonganti@0
|
423 if x in self.lcp:
|
kkonganti@0
|
424 continue
|
kkonganti@0
|
425 x.visualize_term(o_file, fill_out=fill_out,
|
kkonganti@0
|
426 stop_terms=self.lcp, draw_graph=False)
|
kkonganti@0
|
427 elif hcc_stop and not lcp_stop:
|
kkonganti@0
|
428 if x in self.hcc:
|
kkonganti@0
|
429 continue
|
kkonganti@0
|
430 x.visualize_term(o_file, fill_out=fill_out,
|
kkonganti@0
|
431 stop_terms=self.hcc, draw_graph=False)
|
kkonganti@0
|
432 elif hcc_stop and lcp_stop:
|
kkonganti@0
|
433 if x in self.lcp+self.hcc:
|
kkonganti@0
|
434 continue
|
kkonganti@0
|
435 x.visualize_term(o_file, fill_out=fill_out,
|
kkonganti@0
|
436 stop_terms=self.lcp+self.hcc, draw_graph=False)
|
kkonganti@0
|
437 else:
|
kkonganti@0
|
438 x.visualize_term(o_file, fill_out=fill_out, draw_graph=False)
|
kkonganti@0
|
439 self.graph_nodes.extend([z for z in x.graph_nodes if z not in self.graph_nodes])
|
kkonganti@0
|
440 if trim_nodes:
|
kkonganti@0
|
441 self._trim_tips()
|
kkonganti@0
|
442 if len(self.graph_nodes) > 150:
|
kkonganti@0
|
443 edge_string = 'Parent node\tChild node'
|
kkonganti@0
|
444 for edge_tuple in self.graph_nodes:
|
kkonganti@0
|
445 edge_string += '\n'+'\t'.join(edge_tuple)
|
kkonganti@0
|
446 logging.info(f'Not drawing graph with {len(self.graph_nodes)} edges:\
|
kkonganti@0
|
447 \n\n{edge_string}\n')
|
kkonganti@0
|
448 else:
|
kkonganti@0
|
449 self._draw_graph(o_file,node_color,edge_color,show_lcp,show_hcc)
|