jpayne@68: import sys jpayne@68: import multiprocessing jpayne@68: from . import helpers jpayne@68: import pybedtools jpayne@68: jpayne@68: jpayne@68: def _parallel_wrap( jpayne@68: orig_bedtool, jpayne@68: shuffle_kwargs, jpayne@68: genome_fn, jpayne@68: method, jpayne@68: method_args, jpayne@68: method_kwargs, jpayne@68: sort=False, jpayne@68: shuffle=True, jpayne@68: reduce_func=None, jpayne@68: ): jpayne@68: """ jpayne@68: Given a BedTool object `orig_bedtool`, call its `method` with `args` and jpayne@68: `kwargs` and then call `reduce_func` on the results. jpayne@68: jpayne@68: See parallel_apply docstring for details jpayne@68: jpayne@68: """ jpayne@68: jpayne@68: # note: be careful about cleaning up tempfiles jpayne@68: if not shuffle: jpayne@68: to_use = orig_bedtool jpayne@68: else: jpayne@68: shuffled = orig_bedtool.shuffle(g=genome_fn, **shuffle_kwargs) jpayne@68: if sort: jpayne@68: to_use = shuffled.sort() jpayne@68: helpers.close_or_delete(shuffled) jpayne@68: else: jpayne@68: to_use = shuffled jpayne@68: jpayne@68: result = getattr(to_use, method)(*method_args, **method_kwargs) jpayne@68: jpayne@68: if shuffle: jpayne@68: helpers.close_or_delete(to_use) jpayne@68: jpayne@68: if reduce_func: jpayne@68: reduced = reduce_func(result) jpayne@68: if isinstance(result, pybedtools.BedTool): jpayne@68: helpers.close_or_delete(result) jpayne@68: return reduced jpayne@68: else: jpayne@68: return result jpayne@68: jpayne@68: jpayne@68: def parallel_apply( jpayne@68: orig_bedtool, jpayne@68: method, jpayne@68: genome=None, jpayne@68: genome_fn=None, jpayne@68: method_args=None, jpayne@68: method_kwargs=None, jpayne@68: shuffle_kwargs=None, jpayne@68: shuffle=True, jpayne@68: reduce_func=None, jpayne@68: processes=1, jpayne@68: sort=False, jpayne@68: _orig_pool=None, jpayne@68: iterations=1000, jpayne@68: debug=False, jpayne@68: report_iterations=False, jpayne@68: ): jpayne@68: """ jpayne@68: Call an arbitrary BedTool method many times in parallel. jpayne@68: jpayne@68: An example use-case is to generate a null distribution of intersections, jpayne@68: and then compare this to the actual intersections. jpayne@68: jpayne@68: **Important:** due to a known file handle leak in BedTool.__len__, it's jpayne@68: best to simply check the number of lines in the file, as in the below jpayne@68: function. This works because BEDTools programs strip any non-interval lines jpayne@68: in the results. jpayne@68: jpayne@68: >>> # set up example BedTools jpayne@68: >>> a = pybedtools.example_bedtool('a.bed') jpayne@68: >>> b = pybedtools.example_bedtool('b.bed') jpayne@68: jpayne@68: >>> # Method of `a` to call: jpayne@68: >>> method = 'intersect' jpayne@68: jpayne@68: >>> # Kwargs provided to `a.intersect` each iteration jpayne@68: >>> method_kwargs = dict(b=b, u=True) jpayne@68: jpayne@68: >>> # Function that will be called on the results of jpayne@68: >>> # `a.intersect(**method_kwargs)`. jpayne@68: >>> def reduce_func(x): jpayne@68: ... return sum(1 for _ in open(x.fn)) jpayne@68: jpayne@68: >>> # Create a small artificial genome for this test (generally you'd jpayne@68: >>> # use an assembly name, like "hg19"): jpayne@68: >>> genome = dict(chr1=(0, 1000)) jpayne@68: jpayne@68: >>> # Do 10 iterations using 1 process for this test (generally you'd jpayne@68: >>> # use 1000+ iterations, and as many processes as you have CPUs) jpayne@68: >>> results = pybedtools.parallel.parallel_apply(a, method, genome=genome, jpayne@68: ... method_kwargs=method_kwargs, iterations=10, processes=1, jpayne@68: ... reduce_func=reduce_func, debug=True, report_iterations=True) jpayne@68: jpayne@68: >>> # get results jpayne@68: >>> print(list(results)) jpayne@68: [1, 0, 1, 2, 4, 2, 2, 1, 2, 4] jpayne@68: jpayne@68: >>> # We can compare this to the actual intersection: jpayne@68: >>> reduce_func(a.intersect(**method_kwargs)) jpayne@68: 3 jpayne@68: jpayne@68: Alternatively, we could use the `a.jaccard` method, which already does the jpayne@68: reduction to a dictionary. However, the Jaccard method requires the input jpayne@68: to be sorted. Here, we specify `sort=True` to sort each shuffled BedTool jpayne@68: before calling its `jaccard` method. jpayne@68: jpayne@68: >>> from pybedtools.parallel import parallel_apply jpayne@68: >>> a = pybedtools.example_bedtool('a.bed') jpayne@68: >>> results = parallel_apply(a, method='jaccard', method_args=(b,), jpayne@68: ... genome=genome, iterations=3, processes=1, sort=True, debug=True) jpayne@68: >>> for i in results: jpayne@68: ... print(sorted(i.items())) jpayne@68: [('intersection', 12), ('jaccard', 0.0171184), ('n_intersections', 1), ('union', 701)] jpayne@68: [('intersection', 0), ('jaccard', 0.0), ('n_intersections', 0), ('union', 527)] jpayne@68: [('intersection', 73), ('jaccard', 0.137996), ('n_intersections', 1), ('union', 529)] jpayne@68: jpayne@68: Parameters jpayne@68: ---------- jpayne@68: orig_bedtool : BedTool jpayne@68: jpayne@68: method : str jpayne@68: The method of `orig_bedtool` to run jpayne@68: jpayne@68: method_args : tuple jpayne@68: Passed directly to getattr(orig_bedtool, method)() jpayne@68: jpayne@68: method_kwargs : dict jpayne@68: Passed directly to getattr(orig_bedtool, method)() jpayne@68: jpayne@68: shuffle : bool jpayne@68: If True, then `orig_bedtool` will be shuffled at each iteration and jpayne@68: that shuffled version's `method` will be called with `method_args` and jpayne@68: `method_kwargs`. jpayne@68: jpayne@68: shuffle_kwargs : dict jpayne@68: If `shuffle` is True, these are passed to `orig_bedtool.shuffle()`. jpayne@68: You do not need to pass the genome here; that's handled separately by jpayne@68: the `genome` and `genome_fn` kwargs. jpayne@68: jpayne@68: iterations : int jpayne@68: Number of iterations to perform jpayne@68: jpayne@68: genome : string or dict jpayne@68: If string, then assume it is the assembly name (e.g., hg19) and get jpayne@68: a dictionary of chromsizes for that assembly, then converts to jpayne@68: a filename. jpayne@68: jpayne@68: genome_fn : str jpayne@68: Mutually exclusive with `genome`; `genome_fn` must be an existing jpayne@68: filename with the chromsizes. Use the `genome` kwarg instead if you'd jpayne@68: rather supply an assembly or dict. jpayne@68: jpayne@68: reduce_func : callable jpayne@68: Function or other callable object that accepts, as its only argument, jpayne@68: the results from `orig_bedtool.method()`. For example, if you care jpayne@68: about the number of results, then you can use `reduce_func=len`. jpayne@68: jpayne@68: processes : int jpayne@68: Number of processes to run. If `processes=1`, then multiprocessing is jpayne@68: not used (making it much easier to debug). This argument is ignored if jpayne@68: `_orig_pool` is provided. jpayne@68: jpayne@68: sort : bool jpayne@68: If both `shuffle` and `sort` are True, then the shuffled BedTool will jpayne@68: then be sorted. Use this if `method` requires sorted input. jpayne@68: jpayne@68: _orig_pool : multiprocessing.Pool instance jpayne@68: If provided, uses `_orig_pool` instead of creating one. In this case, jpayne@68: `processes` will be ignored. jpayne@68: jpayne@68: debug : bool jpayne@68: If True, then use the current iteration index as the seed to shuffle. jpayne@68: jpayne@68: report_iterations : bool jpayne@68: If True, then report the number of iterations to stderr. jpayne@68: """ jpayne@68: jpayne@68: shuffle_kwargs = shuffle_kwargs or {} jpayne@68: method_args = method_args or () jpayne@68: if not isinstance(method_args, list) and not isinstance(method_args, tuple): jpayne@68: raise ValueError( jpayne@68: "method_args must be a list or tuple, got %s" % type(method_args) jpayne@68: ) jpayne@68: method_kwargs = method_kwargs or {} jpayne@68: jpayne@68: if genome_fn and genome: jpayne@68: raise ValueError("only of of genome_fn or genome should be provided") jpayne@68: jpayne@68: if shuffle: jpayne@68: if not genome_fn: jpayne@68: if not genome: jpayne@68: raise ValueError( jpayne@68: "shuffle=True, so either genome_fn" " or genome must be provided" jpayne@68: ) jpayne@68: genome_fn = pybedtools.chromsizes_to_file(genome) jpayne@68: jpayne@68: _parallel_wrap_kwargs = dict( jpayne@68: orig_bedtool=orig_bedtool, jpayne@68: shuffle_kwargs=shuffle_kwargs, jpayne@68: genome_fn=genome_fn, jpayne@68: method=method, jpayne@68: method_args=method_args, jpayne@68: method_kwargs=method_kwargs, jpayne@68: shuffle=shuffle, jpayne@68: reduce_func=reduce_func, jpayne@68: sort=sort, jpayne@68: ) jpayne@68: jpayne@68: def add_seed(i, kwargs): jpayne@68: if debug and shuffle: jpayne@68: kwargs["shuffle_kwargs"]["seed"] = i jpayne@68: return kwargs jpayne@68: jpayne@68: if processes == 1: jpayne@68: for it in range(iterations): jpayne@68: yield _parallel_wrap(**add_seed(it, _parallel_wrap_kwargs)) jpayne@68: return jpayne@68: jpayne@68: if _orig_pool: jpayne@68: p = _orig_pool jpayne@68: else: jpayne@68: p = multiprocessing.Pool(processes) jpayne@68: jpayne@68: results = [ jpayne@68: p.apply_async(_parallel_wrap, (), add_seed(it, _parallel_wrap_kwargs)) jpayne@68: for it in range(iterations) jpayne@68: ] jpayne@68: for i, r in enumerate(results): jpayne@68: yield r.get() jpayne@68: if report_iterations: jpayne@68: sys.stderr.write("%s\r" % i) jpayne@68: sys.stderr.flush()