annotate CSP2/CSP2_env/env-d9b9114564458d9d-741b3de822f2aaca6c6caa4325c4afce/lib/python3.8/site-packages/pybedtools/parallel.py @ 68:5028fdace37b

planemo upload commit 2e9511a184a1ca667c7be0c6321a36dc4e3d116d
author jpayne
date Tue, 18 Mar 2025 16:23:26 -0400
parents
children
rev   line source
jpayne@68 1 import sys
jpayne@68 2 import multiprocessing
jpayne@68 3 from . import helpers
jpayne@68 4 import pybedtools
jpayne@68 5
jpayne@68 6
jpayne@68 7 def _parallel_wrap(
jpayne@68 8 orig_bedtool,
jpayne@68 9 shuffle_kwargs,
jpayne@68 10 genome_fn,
jpayne@68 11 method,
jpayne@68 12 method_args,
jpayne@68 13 method_kwargs,
jpayne@68 14 sort=False,
jpayne@68 15 shuffle=True,
jpayne@68 16 reduce_func=None,
jpayne@68 17 ):
jpayne@68 18 """
jpayne@68 19 Given a BedTool object `orig_bedtool`, call its `method` with `args` and
jpayne@68 20 `kwargs` and then call `reduce_func` on the results.
jpayne@68 21
jpayne@68 22 See parallel_apply docstring for details
jpayne@68 23
jpayne@68 24 """
jpayne@68 25
jpayne@68 26 # note: be careful about cleaning up tempfiles
jpayne@68 27 if not shuffle:
jpayne@68 28 to_use = orig_bedtool
jpayne@68 29 else:
jpayne@68 30 shuffled = orig_bedtool.shuffle(g=genome_fn, **shuffle_kwargs)
jpayne@68 31 if sort:
jpayne@68 32 to_use = shuffled.sort()
jpayne@68 33 helpers.close_or_delete(shuffled)
jpayne@68 34 else:
jpayne@68 35 to_use = shuffled
jpayne@68 36
jpayne@68 37 result = getattr(to_use, method)(*method_args, **method_kwargs)
jpayne@68 38
jpayne@68 39 if shuffle:
jpayne@68 40 helpers.close_or_delete(to_use)
jpayne@68 41
jpayne@68 42 if reduce_func:
jpayne@68 43 reduced = reduce_func(result)
jpayne@68 44 if isinstance(result, pybedtools.BedTool):
jpayne@68 45 helpers.close_or_delete(result)
jpayne@68 46 return reduced
jpayne@68 47 else:
jpayne@68 48 return result
jpayne@68 49
jpayne@68 50
jpayne@68 51 def parallel_apply(
jpayne@68 52 orig_bedtool,
jpayne@68 53 method,
jpayne@68 54 genome=None,
jpayne@68 55 genome_fn=None,
jpayne@68 56 method_args=None,
jpayne@68 57 method_kwargs=None,
jpayne@68 58 shuffle_kwargs=None,
jpayne@68 59 shuffle=True,
jpayne@68 60 reduce_func=None,
jpayne@68 61 processes=1,
jpayne@68 62 sort=False,
jpayne@68 63 _orig_pool=None,
jpayne@68 64 iterations=1000,
jpayne@68 65 debug=False,
jpayne@68 66 report_iterations=False,
jpayne@68 67 ):
jpayne@68 68 """
jpayne@68 69 Call an arbitrary BedTool method many times in parallel.
jpayne@68 70
jpayne@68 71 An example use-case is to generate a null distribution of intersections,
jpayne@68 72 and then compare this to the actual intersections.
jpayne@68 73
jpayne@68 74 **Important:** due to a known file handle leak in BedTool.__len__, it's
jpayne@68 75 best to simply check the number of lines in the file, as in the below
jpayne@68 76 function. This works because BEDTools programs strip any non-interval lines
jpayne@68 77 in the results.
jpayne@68 78
jpayne@68 79 >>> # set up example BedTools
jpayne@68 80 >>> a = pybedtools.example_bedtool('a.bed')
jpayne@68 81 >>> b = pybedtools.example_bedtool('b.bed')
jpayne@68 82
jpayne@68 83 >>> # Method of `a` to call:
jpayne@68 84 >>> method = 'intersect'
jpayne@68 85
jpayne@68 86 >>> # Kwargs provided to `a.intersect` each iteration
jpayne@68 87 >>> method_kwargs = dict(b=b, u=True)
jpayne@68 88
jpayne@68 89 >>> # Function that will be called on the results of
jpayne@68 90 >>> # `a.intersect(**method_kwargs)`.
jpayne@68 91 >>> def reduce_func(x):
jpayne@68 92 ... return sum(1 for _ in open(x.fn))
jpayne@68 93
jpayne@68 94 >>> # Create a small artificial genome for this test (generally you'd
jpayne@68 95 >>> # use an assembly name, like "hg19"):
jpayne@68 96 >>> genome = dict(chr1=(0, 1000))
jpayne@68 97
jpayne@68 98 >>> # Do 10 iterations using 1 process for this test (generally you'd
jpayne@68 99 >>> # use 1000+ iterations, and as many processes as you have CPUs)
jpayne@68 100 >>> results = pybedtools.parallel.parallel_apply(a, method, genome=genome,
jpayne@68 101 ... method_kwargs=method_kwargs, iterations=10, processes=1,
jpayne@68 102 ... reduce_func=reduce_func, debug=True, report_iterations=True)
jpayne@68 103
jpayne@68 104 >>> # get results
jpayne@68 105 >>> print(list(results))
jpayne@68 106 [1, 0, 1, 2, 4, 2, 2, 1, 2, 4]
jpayne@68 107
jpayne@68 108 >>> # We can compare this to the actual intersection:
jpayne@68 109 >>> reduce_func(a.intersect(**method_kwargs))
jpayne@68 110 3
jpayne@68 111
jpayne@68 112 Alternatively, we could use the `a.jaccard` method, which already does the
jpayne@68 113 reduction to a dictionary. However, the Jaccard method requires the input
jpayne@68 114 to be sorted. Here, we specify `sort=True` to sort each shuffled BedTool
jpayne@68 115 before calling its `jaccard` method.
jpayne@68 116
jpayne@68 117 >>> from pybedtools.parallel import parallel_apply
jpayne@68 118 >>> a = pybedtools.example_bedtool('a.bed')
jpayne@68 119 >>> results = parallel_apply(a, method='jaccard', method_args=(b,),
jpayne@68 120 ... genome=genome, iterations=3, processes=1, sort=True, debug=True)
jpayne@68 121 >>> for i in results:
jpayne@68 122 ... print(sorted(i.items()))
jpayne@68 123 [('intersection', 12), ('jaccard', 0.0171184), ('n_intersections', 1), ('union', 701)]
jpayne@68 124 [('intersection', 0), ('jaccard', 0.0), ('n_intersections', 0), ('union', 527)]
jpayne@68 125 [('intersection', 73), ('jaccard', 0.137996), ('n_intersections', 1), ('union', 529)]
jpayne@68 126
jpayne@68 127 Parameters
jpayne@68 128 ----------
jpayne@68 129 orig_bedtool : BedTool
jpayne@68 130
jpayne@68 131 method : str
jpayne@68 132 The method of `orig_bedtool` to run
jpayne@68 133
jpayne@68 134 method_args : tuple
jpayne@68 135 Passed directly to getattr(orig_bedtool, method)()
jpayne@68 136
jpayne@68 137 method_kwargs : dict
jpayne@68 138 Passed directly to getattr(orig_bedtool, method)()
jpayne@68 139
jpayne@68 140 shuffle : bool
jpayne@68 141 If True, then `orig_bedtool` will be shuffled at each iteration and
jpayne@68 142 that shuffled version's `method` will be called with `method_args` and
jpayne@68 143 `method_kwargs`.
jpayne@68 144
jpayne@68 145 shuffle_kwargs : dict
jpayne@68 146 If `shuffle` is True, these are passed to `orig_bedtool.shuffle()`.
jpayne@68 147 You do not need to pass the genome here; that's handled separately by
jpayne@68 148 the `genome` and `genome_fn` kwargs.
jpayne@68 149
jpayne@68 150 iterations : int
jpayne@68 151 Number of iterations to perform
jpayne@68 152
jpayne@68 153 genome : string or dict
jpayne@68 154 If string, then assume it is the assembly name (e.g., hg19) and get
jpayne@68 155 a dictionary of chromsizes for that assembly, then converts to
jpayne@68 156 a filename.
jpayne@68 157
jpayne@68 158 genome_fn : str
jpayne@68 159 Mutually exclusive with `genome`; `genome_fn` must be an existing
jpayne@68 160 filename with the chromsizes. Use the `genome` kwarg instead if you'd
jpayne@68 161 rather supply an assembly or dict.
jpayne@68 162
jpayne@68 163 reduce_func : callable
jpayne@68 164 Function or other callable object that accepts, as its only argument,
jpayne@68 165 the results from `orig_bedtool.method()`. For example, if you care
jpayne@68 166 about the number of results, then you can use `reduce_func=len`.
jpayne@68 167
jpayne@68 168 processes : int
jpayne@68 169 Number of processes to run. If `processes=1`, then multiprocessing is
jpayne@68 170 not used (making it much easier to debug). This argument is ignored if
jpayne@68 171 `_orig_pool` is provided.
jpayne@68 172
jpayne@68 173 sort : bool
jpayne@68 174 If both `shuffle` and `sort` are True, then the shuffled BedTool will
jpayne@68 175 then be sorted. Use this if `method` requires sorted input.
jpayne@68 176
jpayne@68 177 _orig_pool : multiprocessing.Pool instance
jpayne@68 178 If provided, uses `_orig_pool` instead of creating one. In this case,
jpayne@68 179 `processes` will be ignored.
jpayne@68 180
jpayne@68 181 debug : bool
jpayne@68 182 If True, then use the current iteration index as the seed to shuffle.
jpayne@68 183
jpayne@68 184 report_iterations : bool
jpayne@68 185 If True, then report the number of iterations to stderr.
jpayne@68 186 """
jpayne@68 187
jpayne@68 188 shuffle_kwargs = shuffle_kwargs or {}
jpayne@68 189 method_args = method_args or ()
jpayne@68 190 if not isinstance(method_args, list) and not isinstance(method_args, tuple):
jpayne@68 191 raise ValueError(
jpayne@68 192 "method_args must be a list or tuple, got %s" % type(method_args)
jpayne@68 193 )
jpayne@68 194 method_kwargs = method_kwargs or {}
jpayne@68 195
jpayne@68 196 if genome_fn and genome:
jpayne@68 197 raise ValueError("only of of genome_fn or genome should be provided")
jpayne@68 198
jpayne@68 199 if shuffle:
jpayne@68 200 if not genome_fn:
jpayne@68 201 if not genome:
jpayne@68 202 raise ValueError(
jpayne@68 203 "shuffle=True, so either genome_fn" " or genome must be provided"
jpayne@68 204 )
jpayne@68 205 genome_fn = pybedtools.chromsizes_to_file(genome)
jpayne@68 206
jpayne@68 207 _parallel_wrap_kwargs = dict(
jpayne@68 208 orig_bedtool=orig_bedtool,
jpayne@68 209 shuffle_kwargs=shuffle_kwargs,
jpayne@68 210 genome_fn=genome_fn,
jpayne@68 211 method=method,
jpayne@68 212 method_args=method_args,
jpayne@68 213 method_kwargs=method_kwargs,
jpayne@68 214 shuffle=shuffle,
jpayne@68 215 reduce_func=reduce_func,
jpayne@68 216 sort=sort,
jpayne@68 217 )
jpayne@68 218
jpayne@68 219 def add_seed(i, kwargs):
jpayne@68 220 if debug and shuffle:
jpayne@68 221 kwargs["shuffle_kwargs"]["seed"] = i
jpayne@68 222 return kwargs
jpayne@68 223
jpayne@68 224 if processes == 1:
jpayne@68 225 for it in range(iterations):
jpayne@68 226 yield _parallel_wrap(**add_seed(it, _parallel_wrap_kwargs))
jpayne@68 227 return
jpayne@68 228
jpayne@68 229 if _orig_pool:
jpayne@68 230 p = _orig_pool
jpayne@68 231 else:
jpayne@68 232 p = multiprocessing.Pool(processes)
jpayne@68 233
jpayne@68 234 results = [
jpayne@68 235 p.apply_async(_parallel_wrap, (), add_seed(it, _parallel_wrap_kwargs))
jpayne@68 236 for it in range(iterations)
jpayne@68 237 ]
jpayne@68 238 for i, r in enumerate(results):
jpayne@68 239 yield r.get()
jpayne@68 240 if report_iterations:
jpayne@68 241 sys.stderr.write("%s\r" % i)
jpayne@68 242 sys.stderr.flush()