jpayne@68: from __future__ import annotations jpayne@68: import tempfile jpayne@68: from textwrap import dedent jpayne@68: import shutil jpayne@68: import subprocess jpayne@68: import operator jpayne@68: import os jpayne@68: import sys jpayne@68: import random jpayne@68: import string jpayne@68: import pprint jpayne@68: from itertools import islice jpayne@68: from multiprocessing import Pool jpayne@68: import gzip jpayne@68: from typing import Any, Callable, Iterable, Iterator, Literal, Optional, TYPE_CHECKING, cast jpayne@68: import pysam jpayne@68: from warnings import warn jpayne@68: import pathlib jpayne@68: from pathlib import Path jpayne@68: jpayne@68: from .helpers import ( jpayne@68: get_tempdir, jpayne@68: _tags, jpayne@68: call_bedtools, jpayne@68: _flatten_list, jpayne@68: _check_sequence_stderr, jpayne@68: isBAM, jpayne@68: isBGZIP, jpayne@68: isGZIP, jpayne@68: BEDToolsError, jpayne@68: pybedtoolsError, jpayne@68: _call_randomintersect, jpayne@68: SplitOutput, jpayne@68: FisherOutput, jpayne@68: ) jpayne@68: from . import helpers jpayne@68: from .cbedtools import ( jpayne@68: IntervalFile, jpayne@68: IntervalIterator, jpayne@68: Interval, jpayne@68: create_interval_from_list, jpayne@68: BedToolsFileError, jpayne@68: ) jpayne@68: import pybedtools jpayne@68: from . import settings jpayne@68: from . import filenames jpayne@68: jpayne@68: if TYPE_CHECKING: jpayne@68: import pandas as pd jpayne@68: import matplotlib.colors as mcolors jpayne@68: jpayne@68: _implicit_registry = {} jpayne@68: _other_registry = {} jpayne@68: _bam_registry = {} jpayne@68: jpayne@68: jpayne@68: def _jaccard_output_to_dict(s, **kwargs) -> dict: jpayne@68: """ jpayne@68: jaccard method doesn't return an interval file, rather, it returns a short jpayne@68: summary of results. Here, we simply parse it into a dict for convenience. jpayne@68: """ jpayne@68: if isinstance(s, str): jpayne@68: _s = open(s).read() jpayne@68: elif hasattr(s, "next") or hasattr(s, "__next__"): jpayne@68: _s = "".join([i for i in s]) jpayne@68: else: jpayne@68: raise ValueError("Unexpected object %r" % s) jpayne@68: header, data = _s.splitlines() jpayne@68: header = header.split() jpayne@68: data = data.split() jpayne@68: data[0] = int(data[0]) jpayne@68: data[1] = int(data[1]) jpayne@68: data[2] = float(data[2]) jpayne@68: data[3] = int(data[3]) jpayne@68: return dict(list(zip(header, data))) jpayne@68: jpayne@68: jpayne@68: def _reldist_output_handler(s, **kwargs): jpayne@68: """ jpayne@68: reldist, if called with -detail, returns a valid BED file with the relative jpayne@68: distance as the last field. In that case, return the BedTool immediately. jpayne@68: If not -detail, then the results are a table, in which case here we parse jpayne@68: into a dict for convenience. jpayne@68: """ jpayne@68: if "detail" in kwargs: jpayne@68: return BedTool(s) jpayne@68: if isinstance(s, str): jpayne@68: iterable = open(s) jpayne@68: if hasattr(s, "next"): jpayne@68: iterable = s jpayne@68: header = next(iterable).split() jpayne@68: results = {} jpayne@68: for h in header: jpayne@68: results[h] = [] jpayne@68: for i in iterable: jpayne@68: reldist, count, total, fraction = i.split() jpayne@68: data = [float(reldist), int(count), int(total), float(fraction)] jpayne@68: for h, d in zip(header, data): jpayne@68: results[h].append(d) jpayne@68: return results jpayne@68: jpayne@68: jpayne@68: def _wraps( jpayne@68: prog: Optional[str] = None, jpayne@68: implicit: Optional[str] = None, jpayne@68: bam: Optional[str] = None, jpayne@68: other: Optional[str] = None, jpayne@68: uses_genome: bool = False, jpayne@68: make_tempfile_for: Optional[str] = None, jpayne@68: check_stderr:Optional[Callable]=None, jpayne@68: add_to_bedtool:Optional[dict] = None, jpayne@68: nonbam: Optional[Literal["ALL"] | str | list[str]] = None, jpayne@68: force_bam: bool = False, jpayne@68: genome_none_if: Optional[list[str]] =None, jpayne@68: genome_if: Optional[list[str]] = None, jpayne@68: genome_ok_if: Optional[list[str]] = None, jpayne@68: does_not_return_bedtool: Optional[Callable] =None, jpayne@68: arg_order: Optional[list[str]] = None, jpayne@68: ): jpayne@68: """ jpayne@68: Do-it-all wrapper, to be used as a decorator. jpayne@68: jpayne@68: *prog* is the name of the BEDTools program that will be called. The help jpayne@68: for this program will also be added to the decorated method's docstring. jpayne@68: jpayne@68: *implicit* is the BEDTools program arg that should be filled in jpayne@68: automatically. jpayne@68: jpayne@68: *bam* will disable the implicit substitution if *bam* is in the kwargs. jpayne@68: This is typically 'abam' or 'ibam' if the program accepts BAM input. jpayne@68: jpayne@68: *other* is the BEDTools program arg that is passed in as the second input, jpayne@68: if supported. Within the semantics of BEDTools, the typical case will be jpayne@68: that if implicit='a' then other='b'; if implicit='i' then other=None. jpayne@68: jpayne@68: *uses_genome*, if True, will check for 'g' and/or 'genome' args and jpayne@68: retrieve the corresponding genome files as needed. jpayne@68: jpayne@68: *make_tempfile_for* is used for the sequence methods and indicates which jpayne@68: kwarg should have a tempfile made for it if it's not provided ('fo' for the jpayne@68: sequence methods) jpayne@68: jpayne@68: *check_stderr*, if not None, is a function that accepts a string (which jpayne@68: will be anything written to stdout when calling the wrapped program). This jpayne@68: function should return True if the string is OK, and False if it should jpayne@68: truly be considered an error. This is needed for wrapping fastaFromBed, jpayne@68: which will report to stderr that it's creating an index file. jpayne@68: jpayne@68: *add_to_bedtool* is used for sequence methods. It is a dictionary mapping jpayne@68: kwargs to attributes to be created in the resulting BedTool. Typically it jpayne@68: is {'fo':'seqfn'} which will add the resulting sequence name to the jpayne@68: BedTool's .seqfn attribute. If *add_to_bedtool* is not None, then the jpayne@68: returned BedTool will be *self* with the added attribute. If a key is jpayne@68: "stdout" (e.g., {"stdout": attr_name}), then save the stdout of the command jpayne@68: as a tempfile and store the tempfile's name in the attribute. This is jpayne@68: required for linksBed and bedToIgv. jpayne@68: jpayne@68: *nonbam* is a kwarg that even if the input file was a BAM, the output will jpayne@68: *not* be BAM format. For example, the `-bed` arg for intersectBed will jpayne@68: cause the output to be in BED format, not BAM. If not None, this can be a jpayne@68: string, a list of strings, or the special string "ALL", which means that jpayne@68: the wrapped program will never return BAM output. jpayne@68: jpayne@68: *force_bam*, if True, will force the output to be BAM. This is used for jpayne@68: bedToBam. jpayne@68: jpayne@68: *genome_none_if* is a list of arguments that will ignore the requirement jpayne@68: for a genome. This is needed for window_maker, where -b and -g are jpayne@68: mutually exclusive. jpayne@68: jpayne@68: *genome_ok_if* is a list of arguments that, if they are in jpayne@68: *genome_none_if*, are still OK to pass in. This is needed for bedtool jpayne@68: genomecov, where -g is not needed if -ibam is specified...but it's still OK jpayne@68: if the user passes a genome arg. jpayne@68: jpayne@68: *genome_if* is a list of arguments that will trigger the requirement for jpayne@68: a genome; otherwise no genome needs to be specified. jpayne@68: jpayne@68: *does_not_return_bedtool*, if not None, should be a function that handles jpayne@68: the returned output. Its signature should be ``func(output, kwargs)``, jpayne@68: where `output` is the output from the [possibly streaming] call to BEDTools jpayne@68: and `kwargs` are passed verbatim from the wrapped method call. Some jpayne@68: examples of methods that use this are jaccard, reldist, fisher, and split jpayne@68: methods. jpayne@68: jpayne@68: *arg_order*, if not None, is a sorted list of arguments. This is used by jpayne@68: handle_kwargs() to deal with things like issues 81 and 345, where some jpayne@68: BEDTools programs are sensitive to argument order. jpayne@68: """ jpayne@68: jpayne@68: # NOTE: We are calling each BEDTools program to get its help and adding jpayne@68: # that to the docstring of each method. This is run at import time. However jpayne@68: # if BEDTools is not on the path at import time, `not_implemented` is set jpayne@68: # to True and isn't reset later until the module is reloaded. jpayne@68: # jpayne@68: # helpers.set_bedtools_path therefore will trigger a module reload. jpayne@68: not_implemented = False jpayne@68: jpayne@68: # Call the program with -h to get help, which prints to stderr. jpayne@68: try: jpayne@68: p = subprocess.Popen( jpayne@68: helpers._version_2_15_plus_names(prog) + ["-h"], jpayne@68: stdout=subprocess.PIPE, jpayne@68: stderr=subprocess.PIPE, jpayne@68: ) jpayne@68: help_str = p.communicate()[1].decode() jpayne@68: jpayne@68: # underscores throw off ReStructuredText syntax of docstrings, so jpayne@68: # replace 'em jpayne@68: help_str = help_str.replace("_", "**") jpayne@68: jpayne@68: # indent jpayne@68: help_str = help_str.split("\n") jpayne@68: help_str = ["\n\n**Original BEDTools help:**::"] + ["\t" + i for i in help_str] jpayne@68: help_str = "\n".join(help_str) + "\n" jpayne@68: jpayne@68: # If the program can't be found, then we'll eventually replace the method jpayne@68: # with a version that does nothing but raise a NotImplementedError (plus jpayne@68: # a helpful message). jpayne@68: except OSError: jpayne@68: help_str = ( jpayne@68: '"%s" does not appear to be installed ' jpayne@68: "or on the path, so this method is " jpayne@68: "disabled. Please install a more recent " jpayne@68: "version of BEDTools and re-import to " jpayne@68: "use this method." % prog jpayne@68: ) jpayne@68: not_implemented = True jpayne@68: jpayne@68: def decorator(func): jpayne@68: """ jpayne@68: Accepts a function to be wrapped; discards the original and returns a jpayne@68: new, rebuilt-from-scratch function based on the kwargs passed to jpayne@68: _wraps(). jpayne@68: """ jpayne@68: # Register the implicit (as well as bam and other) args in the global jpayne@68: # registry. BedTool.handle_kwargs() will access these at runtime. The jpayne@68: # registry is keyed by program name (like intersectBed). jpayne@68: _implicit_registry[prog] = implicit jpayne@68: if other is not None: jpayne@68: _other_registry[prog] = other jpayne@68: if bam is not None: jpayne@68: _bam_registry[prog] = bam jpayne@68: jpayne@68: # Here's where we replace an unable-to-be-found program's method with jpayne@68: # one that only returns a NotImplementedError jpayne@68: if not_implemented: jpayne@68: jpayne@68: def not_implemented_func(*args, **kwargs): jpayne@68: raise NotImplementedError(help_str) jpayne@68: jpayne@68: return not_implemented_func jpayne@68: jpayne@68: _add_doc = [] jpayne@68: if implicit: jpayne@68: _add_doc.append( jpayne@68: dedent( jpayne@68: """ jpayne@68: For convenience, the file or stream this BedTool points to jpayne@68: is implicitly passed as the `-%s` argument to `%s` jpayne@68: """ jpayne@68: % (implicit, prog) jpayne@68: ) jpayne@68: ) jpayne@68: jpayne@68: if uses_genome: jpayne@68: _add_doc.append( jpayne@68: dedent( jpayne@68: """ jpayne@68: There are two alternatives for supplying a genome. Use jpayne@68: `g="genome.filename"` if you have a genome's chrom sizes jpayne@68: saved as a file. This is the what BEDTools expects when jpayne@68: using it from the command line. Alternatively, use the jpayne@68: `genome="assembly.name"` (for example, `genome="hg19"`) to jpayne@68: use chrom sizes for that assembly without having to manage jpayne@68: a separate file. The `genome` argument triggers a call jpayne@68: `pybedtools.chromsizes`, so see that method for more jpayne@68: details. jpayne@68: """ jpayne@68: ) jpayne@68: ) jpayne@68: jpayne@68: def wrapped(self, *args, **kwargs): jpayne@68: """ jpayne@68: A newly created function that will be returned by the _wraps() jpayne@68: decorator jpayne@68: """ jpayne@68: jpayne@68: # Only one non-keyword argument is supported; this is then assumed jpayne@68: # to be "other" (e.g., `-b` for intersectBed) jpayne@68: if len(args) > 0: jpayne@68: assert len(args) == 1 jpayne@68: kwargs[other] = args[0] jpayne@68: jpayne@68: # Add the implicit values to kwargs. If the current BedTool is jpayne@68: # BAM, it will automatically be passed to the appropriate jpayne@68: # BAM-support arg (like `-abam`). But this also allows the user to jpayne@68: # explicitly specify the abam kwarg, which will override the jpayne@68: # auto-substitution. jpayne@68: # Note: here, `implicit` is something like "a"; `bam` is something jpayne@68: # like "abam" jpayne@68: if ( jpayne@68: (implicit not in kwargs) jpayne@68: and (bam not in kwargs) jpayne@68: and (implicit is not None) jpayne@68: ): jpayne@68: if not self._isbam: jpayne@68: kwargs[implicit] = self.fn jpayne@68: else: jpayne@68: # It is a bam file. If this program supports BAM as the jpayne@68: # first input, then we set it here jpayne@68: if bam is not None: jpayne@68: kwargs[bam] = self.fn jpayne@68: jpayne@68: # Otherwise, BEDTools can't currently handle it, so raise jpayne@68: # an exception. jpayne@68: else: jpayne@68: raise pybedtoolsError( jpayne@68: '"%s" currently can\'t handle BAM ' jpayne@68: "input, please use bam_to_bed() first." % prog jpayne@68: ) jpayne@68: jpayne@68: # Should this function handle genome files? jpayne@68: check_for_genome = uses_genome jpayne@68: if uses_genome: jpayne@68: if genome_none_if: jpayne@68: for i in genome_none_if: jpayne@68: if i in kwargs or i == implicit: jpayne@68: check_for_genome = False jpayne@68: jpayne@68: # for genomecov, if -ibam then -g is optional. So it's OK jpayne@68: # for the user to provide genome or g kwargs, even if jpayne@68: # -ibam. jpayne@68: if genome_ok_if: jpayne@68: for i in genome_ok_if: jpayne@68: if i in kwargs or i == implicit: jpayne@68: if ("g" in kwargs) or ("genome" in kwargs): jpayne@68: check_for_genome = True jpayne@68: if genome_if: jpayne@68: check_for_genome = False jpayne@68: for i in genome_if: jpayne@68: if (i in kwargs) or (i == implicit): jpayne@68: check_for_genome = True jpayne@68: if check_for_genome: jpayne@68: kwargs = self.check_genome(**kwargs) jpayne@68: jpayne@68: # For sequence methods, we may need to make a tempfile that will jpayne@68: # hold the resulting sequence. For example, fastaFromBed needs to jpayne@68: # make a tempfile for 'fo' if no 'fo' was explicitly specified by jpayne@68: # the user. jpayne@68: if make_tempfile_for is not None: jpayne@68: if make_tempfile_for not in kwargs: jpayne@68: kwargs[make_tempfile_for] = self._tmp() jpayne@68: jpayne@68: # At runtime, this will parse the kwargs, convert streams to jpayne@68: # tempfiles if needed, and return all the goodies jpayne@68: cmds, tmp, stdin = self.handle_kwargs(prog=prog, jpayne@68: arg_order=arg_order, jpayne@68: **kwargs) jpayne@68: jpayne@68: # Decide whether the output is BAM format or not. jpayne@68: result_is_bam = False jpayne@68: jpayne@68: # By default, if the current BedTool is BAM, then the result should jpayne@68: # be, too. jpayne@68: if self._isbam: jpayne@68: result_is_bam = True jpayne@68: jpayne@68: # If nonbam is "ALL", then this method will never return BAM jpayne@68: # output. jpayne@68: if nonbam == "ALL": jpayne@68: result_is_bam = False jpayne@68: jpayne@68: # If any of the `nonbam` args are found in kwargs, then result is jpayne@68: # not a BAM. Side note: the _nonbam name mangling is necessary to jpayne@68: # keep the nonbam arg passed into the original _wraps() decorator jpayne@68: # in scope. jpayne@68: if nonbam is not None and nonbam != "ALL": jpayne@68: if isinstance(nonbam, str): jpayne@68: _nonbam = [nonbam] jpayne@68: else: jpayne@68: _nonbam = nonbam jpayne@68: for i in _nonbam: jpayne@68: if i in kwargs: jpayne@68: result_is_bam = False jpayne@68: break jpayne@68: jpayne@68: if force_bam: jpayne@68: result_is_bam = True jpayne@68: jpayne@68: decode_output = not result_is_bam jpayne@68: jpayne@68: # Do the actual call jpayne@68: stream = call_bedtools( jpayne@68: cmds, jpayne@68: tmp, jpayne@68: stdin=stdin, jpayne@68: check_stderr=check_stderr, jpayne@68: decode_output=decode_output, jpayne@68: ) jpayne@68: jpayne@68: if does_not_return_bedtool: jpayne@68: return does_not_return_bedtool(stream, **kwargs) jpayne@68: jpayne@68: # Post-hoc editing of the BedTool -- for example, this is used for jpayne@68: # the sequence methods to add a `seqfn` attribute to the resulting jpayne@68: # BedTool. jpayne@68: if add_to_bedtool is not None: jpayne@68: for kw, attr in list(add_to_bedtool.items()): jpayne@68: if kw == "stdout": jpayne@68: value = stream jpayne@68: else: jpayne@68: value = kwargs[kw] jpayne@68: setattr(self, attr, value) jpayne@68: result = self jpayne@68: else: jpayne@68: result = BedTool(stream) jpayne@68: jpayne@68: result._isbam = result_is_bam jpayne@68: result._cmds = cmds jpayne@68: del kwargs jpayne@68: return result jpayne@68: jpayne@68: # Now add the edited docstring (original Python docstring plus BEDTools jpayne@68: # help) to the newly created method above jpayne@68: if func.__doc__ is None: jpayne@68: orig = "" jpayne@68: else: jpayne@68: orig = func.__doc__ jpayne@68: jpayne@68: wrapped.__doc__ = orig + "\n".join(_add_doc) + help_str jpayne@68: jpayne@68: # Add the original method's name to a new attribute so we can access it jpayne@68: # when logging history jpayne@68: wrapped._name = func.__name__ # type: ignore jpayne@68: jpayne@68: return wrapped jpayne@68: jpayne@68: return decorator jpayne@68: jpayne@68: jpayne@68: def _log_to_history(method: Callable): jpayne@68: """ jpayne@68: Decorator to add a method and its kwargs to the history. jpayne@68: jpayne@68: Assumes that you only add this decorator to bedtool instances that jpayne@68: return other bedtool instances jpayne@68: """ jpayne@68: jpayne@68: def decorated(self, *args, **kwargs): jpayne@68: jpayne@68: # this calls the actual method in the first place; *result* is jpayne@68: # whatever you get back jpayne@68: result = method(self, *args, **kwargs) jpayne@68: jpayne@68: # add appropriate tags jpayne@68: parent_tag = self._tag jpayne@68: result_tag = result._tag jpayne@68: jpayne@68: history_step = HistoryStep( jpayne@68: method, args, kwargs, self, parent_tag, result_tag jpayne@68: ) jpayne@68: jpayne@68: # only add the current history to the new bedtool if there's jpayne@68: # something to add jpayne@68: if len(self.history) > 0: jpayne@68: result.history.append(self.history) jpayne@68: jpayne@68: # but either way, add this history step to the result. jpayne@68: result.history.append(history_step) jpayne@68: jpayne@68: return result jpayne@68: jpayne@68: decorated.__doc__ = method.__doc__ jpayne@68: return decorated jpayne@68: jpayne@68: jpayne@68: class BedTool(object): jpayne@68: TEMPFILES = filenames.TEMPFILES jpayne@68: jpayne@68: def __init__(self, fn: Optional[Any] = None, jpayne@68: from_string: bool = False, jpayne@68: remote: bool = False): jpayne@68: """ jpayne@68: Wrapper around Aaron Quinlan's ``BEDtools`` suite of programs jpayne@68: (https://github.com/arq5x/bedtools); also contains many useful jpayne@68: methods for more detailed work with BED files. jpayne@68: jpayne@68: *fn* is typically the name of a BED-like file, but can also be jpayne@68: one of the following: jpayne@68: jpayne@68: * a string filename jpayne@68: * another BedTool object jpayne@68: * an iterable of Interval objects jpayne@68: * an open file object jpayne@68: * a "file contents" string (see below) jpayne@68: jpayne@68: If *from_string* is True, then you can pass a string that contains jpayne@68: the contents of the BedTool you want to create. This will treat all jpayne@68: spaces as TABs and write to tempfile, treating whatever you pass as jpayne@68: *fn* as the contents of the bed file. This also strips empty lines. jpayne@68: jpayne@68: Typical usage is to point to an existing file:: jpayne@68: jpayne@68: a = BedTool('a.bed') jpayne@68: jpayne@68: But you can also create one from scratch from a string:: jpayne@68: jpayne@68: >>> s = ''' jpayne@68: ... chrX 1 100 jpayne@68: ... chrX 25 800 jpayne@68: ... ''' jpayne@68: >>> a = BedTool(s, from_string=True) jpayne@68: jpayne@68: Or use examples that come with pybedtools:: jpayne@68: jpayne@68: >>> example_files = pybedtools.list_example_files() jpayne@68: >>> assert 'a.bed' in example_files jpayne@68: >>> a = pybedtools.example_bedtool('a.bed') jpayne@68: jpayne@68: """ jpayne@68: if remote: jpayne@68: raise ValueError( jpayne@68: "Remote BAM no longer supported (since BEDTools does not " "support it)" jpayne@68: ) jpayne@68: self.remote = remote jpayne@68: self._isbam = False jpayne@68: self._bam_header = "" jpayne@68: self._cmds = [] jpayne@68: if from_string: jpayne@68: if fn is None or not isinstance(fn, str): jpayne@68: raise ValueError("from_string=True requires a string to parse") jpayne@68: bed_contents = fn jpayne@68: fn = self._tmp() jpayne@68: fout = open(fn, "w") jpayne@68: for line in bed_contents.splitlines(): jpayne@68: if len(line.strip()) == 0: jpayne@68: continue jpayne@68: line = "\t".join(line.split()) + "\n" jpayne@68: fout.write(line) jpayne@68: fout.close() jpayne@68: jpayne@68: else: jpayne@68: # if fn is a Path object, we have to use its string representation jpayne@68: if isinstance(fn, pathlib.PurePath): jpayne@68: fn = str(fn) jpayne@68: jpayne@68: # our work is already done jpayne@68: if isinstance(fn, BedTool): jpayne@68: fn = fn.fn jpayne@68: jpayne@68: # from_string=False, so assume it's a filename jpayne@68: elif isinstance(fn, str): jpayne@68: if remote: jpayne@68: self._isbam = True jpayne@68: else: jpayne@68: if not os.path.exists(fn): jpayne@68: msg = 'File "%s" does not exist' % fn jpayne@68: raise FileNotFoundError(msg) jpayne@68: self._isbam = isBAM(fn) jpayne@68: jpayne@68: # TODO: we dont' really need this, but it's added here for jpayne@68: # compatibility with existing tests jpayne@68: if self._isbam: jpayne@68: header = pysam.Samfile(fn).header.to_dict() jpayne@68: # For example: jpayne@68: # { jpayne@68: # 'HD': {'VN': '1.0', 'SO': 'coordinate'}, jpayne@68: # 'SQ': [ jpayne@68: # {'LN': 23011544, jpayne@68: # 'SN': 'chr2L'}, jpayne@68: # {'LN': 21146708, jpayne@68: # 'SN': 'chr2R'}, jpayne@68: # {'LN': 24543557, jpayne@68: # 'SN': 'chr3L'}, jpayne@68: # {'LN': 27905053, jpayne@68: # 'SN': 'chr3R'}, jpayne@68: # {'LN': 1351857, jpayne@68: # 'SN': 'chr4'}, jpayne@68: # {'LN': 22422827, jpayne@68: # 'SN': 'chrX'} jpayne@68: # ] jpayne@68: # } jpayne@68: jpayne@68: txt_header = [] jpayne@68: for k, v in header.items(): jpayne@68: if isinstance(v, list): jpayne@68: for i in v: jpayne@68: if isinstance(i, dict): jpayne@68: txt_header.append( jpayne@68: "\t".join( jpayne@68: ["@" + k] jpayne@68: + [ jpayne@68: ":".join(map(str, j)) jpayne@68: for j in sorted(i.items(), reverse=True) jpayne@68: ] jpayne@68: ) jpayne@68: ) jpayne@68: elif isinstance(i, str): jpayne@68: txt_header.append(i) jpayne@68: jpayne@68: elif isinstance(v, dict): jpayne@68: txt_header.append( jpayne@68: "\t".join( jpayne@68: ["@" + k] jpayne@68: + [ jpayne@68: ":".join(map(str, j)) jpayne@68: for j in sorted(v.items(), reverse=True) jpayne@68: ] jpayne@68: ) jpayne@68: ) jpayne@68: else: jpayne@68: raise ValueError("unhandled type in BAM header") jpayne@68: self._bam_header = "\n".join(txt_header) + "\n" jpayne@68: jpayne@68: # If tuple or list, then save as file first jpayne@68: # (fixes #73) jpayne@68: elif isinstance(fn, (list, tuple)): jpayne@68: fn = BedTool(iter(fn)).saveas().fn jpayne@68: jpayne@68: # Otherwise assume iterator, say an open file as from jpayne@68: # subprocess.PIPE jpayne@68: else: jpayne@68: fn = fn jpayne@68: jpayne@68: self.fn = fn jpayne@68: tag = "".join([random.choice(string.ascii_lowercase) for _ in range(8)]) jpayne@68: self._tag = tag jpayne@68: _tags[tag] = self jpayne@68: self._hascounts = False jpayne@68: self._file_type = None jpayne@68: self.seqfn = None jpayne@68: self.fastq = None jpayne@68: self.igv_script = None jpayne@68: self.links_html = None jpayne@68: self.history = History() jpayne@68: jpayne@68: @classmethod jpayne@68: def from_dataframe( jpayne@68: cls, jpayne@68: df: pd.DataFrame, jpayne@68: outfile: Optional[str] =None, jpayne@68: sep: str ="\t", jpayne@68: header: bool =False, jpayne@68: na_rep:str =".", jpayne@68: index:bool =False, jpayne@68: **kwargs jpayne@68: ) -> BedTool: jpayne@68: """ jpayne@68: Creates a BedTool from a pandas.DataFrame. jpayne@68: jpayne@68: If `outfile` is None, a temporary file will be used. Otherwise it can jpayne@68: be a specific filename or an open file handle. Additional kwargs will jpayne@68: be passed to `pandas.DataFrame.to_csv`. jpayne@68: jpayne@68: The fields of the resulting BedTool will match the order of columns in jpayne@68: the dataframe. jpayne@68: """ jpayne@68: try: jpayne@68: import pandas jpayne@68: except ImportError: jpayne@68: raise ImportError("pandas must be installed to use dataframes") jpayne@68: if outfile is None: jpayne@68: outfile = cls._tmp() jpayne@68: default_kwargs = dict(sep=sep, header=header, na_rep=na_rep, index=index) jpayne@68: default_kwargs.update(kwargs) jpayne@68: df.to_csv(outfile, **default_kwargs) jpayne@68: jpayne@68: if isinstance(outfile, str): jpayne@68: fn = outfile jpayne@68: else: jpayne@68: try: jpayne@68: fn = outfile.name jpayne@68: except AttributeError: jpayne@68: raise ValueError( jpayne@68: "`outfile` is not a string and doesn't have a `name` attribute. " jpayne@68: "Unable to determine filename." jpayne@68: ) jpayne@68: return BedTool(fn) jpayne@68: jpayne@68: def split(self, func: Callable, *args, **kwargs) -> BedTool: jpayne@68: """ jpayne@68: Split each feature using a user-defined function. jpayne@68: jpayne@68: Calls the provided function `func` with each interval. In contrast to jpayne@68: `each` (which does something similar), this method expects `func` to jpayne@68: return an *iterable* of Interval objects. jpayne@68: jpayne@68: args and kwargs are passed directly to `func`. jpayne@68: jpayne@68: Returns a new BedTool. jpayne@68: """ jpayne@68: jpayne@68: def generator(): jpayne@68: for orig_interval in self: jpayne@68: for interval in func(orig_interval, *args, **kwargs): jpayne@68: yield interval jpayne@68: jpayne@68: return BedTool(generator()) jpayne@68: jpayne@68: def truncate_to_chrom(self, genome: str | dict) -> BedTool: jpayne@68: """ jpayne@68: Ensure all features fall within chromosome limits. jpayne@68: jpayne@68: Some peak-callers extend peaks such that the boundaries overstep jpayne@68: chromosome coordinates. Upon uploading such a file to a genome browser jpayne@68: like UCSC, this results in an error like:: jpayne@68: jpayne@68: Error line 101 of custom track: chromEnd larger than chrom chr2 jpayne@68: size jpayne@68: jpayne@68: Use this method to clean your file, truncating any out-of-bounds jpayne@68: features to fit within the chromosome coordinates of `genome`. jpayne@68: jpayne@68: `genome` can be either an assembly name ('dm3') or a dictionary where jpayne@68: keys are chrom and values are (start, stop) tuples. jpayne@68: """ jpayne@68: if isinstance(genome, dict): jpayne@68: chromdict = genome jpayne@68: else: jpayne@68: assert isinstance(genome, str) jpayne@68: chromdict = helpers.chromsizes(genome) jpayne@68: jpayne@68: tmp = self._tmp() jpayne@68: with open(tmp, "w") as fout: jpayne@68: for chrom, coords in list(chromdict.items()): jpayne@68: start, stop = coords jpayne@68: start = str(start) jpayne@68: stop = str(stop) jpayne@68: fout.write("\t".join([chrom, start, stop]) + "\n") jpayne@68: return self.intersect(tmp) jpayne@68: jpayne@68: def tabix_intervals(self, interval_or_string: Interval | str, check_coordinates: bool=False) -> BedTool: jpayne@68: """ jpayne@68: Retrieve all intervals within coordinates from a "tabixed" BedTool. jpayne@68: jpayne@68: Given either a string in "chrom:start-stop" format, or an interval-like jpayne@68: object with chrom, start, stop attributes, return a *streaming* BedTool jpayne@68: of the features in this BedTool that overlap the provided interval. jpayne@68: jpayne@68: If the coordinates are invalid, an empty generator is returned unless jpayne@68: `check_coordinates=True` in which case a ValueError will be raised. jpayne@68: """ jpayne@68: if not self._tabixed(): jpayne@68: raise ValueError( jpayne@68: "This BedTool has not been indexed for tabix " jpayne@68: "-- please use the .tabix() method" jpayne@68: ) jpayne@68: jpayne@68: # tabix expects 1-based coords, but BEDTools works with jpayne@68: # zero-based. pybedtools and pysam also work with zero-based. So we can jpayne@68: # pass zero-based directly to the pysam tabix interface. jpayne@68: tbx = pysam.TabixFile(self.fn) jpayne@68: jpayne@68: # If an interval is passed, use its coordinates directly jpayne@68: if isinstance(interval_or_string, Interval): jpayne@68: interval: Interval = interval_or_string jpayne@68: chrom, start, end = interval.chrom, interval.start, interval.stop jpayne@68: # Parse string directly instead of relying on Interval, in order to jpayne@68: # permit full chromosome fetching jpayne@68: else: jpayne@68: match = helpers.coord_re.search(interval_or_string) jpayne@68: # Assume string is contig if it doesn't fit chrom:start-end format jpayne@68: if match is None: jpayne@68: chrom = interval_or_string jpayne@68: start, end = None, None jpayne@68: # Otherwise parse the coordinates jpayne@68: else: jpayne@68: chrom, start, end = match.group(1, 2, 3) jpayne@68: start, end = int(start), int(end) jpayne@68: jpayne@68: # Fetch results. jpayne@68: try: jpayne@68: results = tbx.fetch(str(chrom), start, end) jpayne@68: except ValueError: jpayne@68: if check_coordinates: jpayne@68: raise jpayne@68: else: jpayne@68: results = [] jpayne@68: jpayne@68: # pysam.ctabix.TabixIterator does not include newlines when yielding so jpayne@68: # we need to add them. jpayne@68: def gen(): jpayne@68: for i in results: jpayne@68: yield i + "\n" jpayne@68: jpayne@68: # xref #190 jpayne@68: x = BedTool(gen()).saveas() jpayne@68: tbx.close() jpayne@68: return x jpayne@68: jpayne@68: def tabix_contigs(self): jpayne@68: """ jpayne@68: Returns a list of contigs from the tabix index. jpayne@68: """ jpayne@68: if not self._tabixed(): jpayne@68: raise ValueError( jpayne@68: "This BedTool has not been indexed for tabix " jpayne@68: "-- please use the .tabix() method" jpayne@68: ) jpayne@68: jpayne@68: tbx = pysam.TabixFile(self.fn) jpayne@68: return tbx.contigs jpayne@68: jpayne@68: def tabix(self, in_place: bool = True, force: bool = False, is_sorted: bool = False) -> BedTool: jpayne@68: """ jpayne@68: Prepare a BedTool for use with Tabix. jpayne@68: jpayne@68: Returns a new BedTool that has been BGZIP compressed jpayne@68: and indexed by tabix. jpayne@68: jpayne@68: Parameters jpayne@68: ---------- jpayne@68: jpayne@68: in_place : bool jpayne@68: If True (default), then assume the file is already sorted and jpayne@68: replace the existing file with the BGZIPed version. jpayne@68: jpayne@68: force : bool jpayne@68: If True (default is False), then overwrite both the index and the jpayne@68: BGZIP file. jpayne@68: jpayne@68: is_sorted : bool jpayne@68: If True (default is False), then assume the file is already sorted jpayne@68: so that BedTool.bgzip() doesn't have to do that work. jpayne@68: """ jpayne@68: # Return quickly if nothing to do jpayne@68: if self._tabixed() and not force: jpayne@68: return self jpayne@68: jpayne@68: # Make sure it's BGZIPed jpayne@68: fn = self.bgzip(in_place=in_place, force=force, is_sorted=is_sorted) jpayne@68: if self.file_type is not None and self.file_type not in ["bam", "empty"]: jpayne@68: pysam.tabix_index(fn, force=force, preset=self.file_type) # type: ignore jpayne@68: return BedTool(fn) jpayne@68: jpayne@68: def _tabixed(self): jpayne@68: """ jpayne@68: Verifies that we're working with a tabixed file: a string filename jpayne@68: pointing to a BGZIPed file with a .tbi file in the same dir. jpayne@68: """ jpayne@68: if ( jpayne@68: isinstance(self.fn, str) jpayne@68: and isBGZIP(self.fn) jpayne@68: and os.path.exists(self.fn + ".tbi") jpayne@68: ): jpayne@68: return True jpayne@68: jpayne@68: def bgzip(self, in_place: bool = True, force: bool = False, is_sorted: bool = False) -> str: jpayne@68: """ jpayne@68: Helper function for more control over "tabixed" BedTools. jpayne@68: jpayne@68: Checks to see if we already have a BGZIP file; if not then prepare one. jpayne@68: Always leaves the original file alone. You can always just make a jpayne@68: BedTool out of an already sorted and BGZIPed file to avoid this step. jpayne@68: jpayne@68: `in_place` will put the BGZIPed file in the same dir (possibly after jpayne@68: sorting to tempfile). jpayne@68: jpayne@68: If `is_sorted`, then assume the file is already sorted. Otherwise call jpayne@68: bedtools sort with the `-header` option. jpayne@68: jpayne@68: `force` will overwrite without asking. jpayne@68: """ jpayne@68: # It may already be BGZIPed... jpayne@68: if isinstance(self.fn, str) and not force: jpayne@68: if isBGZIP(self.fn): jpayne@68: return self.fn jpayne@68: jpayne@68: # If not in_place, then make a tempfile for the BGZIPed version jpayne@68: if not in_place: jpayne@68: # Get tempfile name, sorted or not jpayne@68: if not is_sorted: jpayne@68: fn = self.sort(header=True).fn jpayne@68: else: jpayne@68: fn = self._tmp() jpayne@68: jpayne@68: # Register for later deletion jpayne@68: outfn = fn + ".gz" jpayne@68: BedTool.TEMPFILES.append(outfn) jpayne@68: jpayne@68: # Creates tempfile.gz jpayne@68: pysam.tabix_compress(fn, outfn, force=force) jpayne@68: return outfn jpayne@68: jpayne@68: # Otherwise, make sure the BGZIPed version has a similar name to the jpayne@68: # current BedTool's file jpayne@68: if in_place: jpayne@68: if not is_sorted: jpayne@68: fn = self.sort(header=True).saveas().fn jpayne@68: else: jpayne@68: fn = self.fn jpayne@68: outfn = self.fn + ".gz" jpayne@68: pysam.tabix_compress(fn, outfn, force=force) jpayne@68: return outfn jpayne@68: jpayne@68: def delete_temporary_history(self, ask: bool = True, raw_input_func=None): jpayne@68: """ jpayne@68: Use at your own risk! This method will delete temp files. You will be jpayne@68: prompted for deletion of files unless you specify *ask=False*. jpayne@68: jpayne@68: Deletes all temporary files created during the history of this BedTool jpayne@68: up to but not including the file this current BedTool points to. jpayne@68: jpayne@68: Any filenames that are in the history and have the following pattern jpayne@68: will be deleted:: jpayne@68: jpayne@68: /pybedtools.*.tmp jpayne@68: jpayne@68: (where is the result from get_tempdir() and is by default jpayne@68: "/tmp") jpayne@68: jpayne@68: Any files that don't have this format will be left alone. jpayne@68: jpayne@68: (*raw_input_func* is used for testing) jpayne@68: """ jpayne@68: flattened_history = _flatten_list(self.history) jpayne@68: to_delete = [] jpayne@68: tempdir = get_tempdir() jpayne@68: for i in flattened_history: jpayne@68: fn = i.fn jpayne@68: if fn.startswith(os.path.join(os.path.abspath(tempdir), "pybedtools")): jpayne@68: if fn.endswith(".tmp"): jpayne@68: to_delete.append(fn) jpayne@68: jpayne@68: if raw_input_func is None: jpayne@68: raw_input_func = input jpayne@68: jpayne@68: str_fns = "\n\t".join(to_delete) jpayne@68: if ask: jpayne@68: answer = raw_input_func("Delete these files?\n\t%s\n(y/N) " % str_fns) jpayne@68: jpayne@68: if not answer.lower()[0] == "y": jpayne@68: print("OK, not deleting.") jpayne@68: return jpayne@68: for fn in to_delete: jpayne@68: os.unlink(fn) jpayne@68: return jpayne@68: jpayne@68: def filter(self, func: Callable, *args, **kwargs) -> BedTool: jpayne@68: """ jpayne@68: Filter features by user-defined function. jpayne@68: jpayne@68: Takes a function *func* that is called for each feature in the jpayne@68: `BedTool` object and returns only those for which the function returns jpayne@68: True. jpayne@68: jpayne@68: *args and **kwargs are passed directly to *func*. jpayne@68: jpayne@68: Returns a streaming BedTool; if you want the filename then use the jpayne@68: .saveas() method. jpayne@68: jpayne@68: >>> a = pybedtools.example_bedtool('a.bed') jpayne@68: >>> subset = a.filter(lambda b: b.chrom == 'chr1' and b.start < 150) jpayne@68: >>> len(a), len(subset) jpayne@68: (4, 2) jpayne@68: jpayne@68: so it has extracted 2 records from the original 4. jpayne@68: jpayne@68: """ jpayne@68: return BedTool((f for f in self if func(f, *args, **kwargs))) jpayne@68: jpayne@68: def field_count(self, n:int=10) -> int: jpayne@68: """ jpayne@68: Number of fields in each line of this BedTool (checks `n` lines) jpayne@68: jpayne@68: Return the number of fields in the features this file contains. Checks jpayne@68: the first *n* features. jpayne@68: jpayne@68: >>> a = pybedtools.example_bedtool('a.bed') jpayne@68: >>> a.field_count() jpayne@68: 6 jpayne@68: """ jpayne@68: if self.file_type == "empty": jpayne@68: return 0 jpayne@68: i = 0 jpayne@68: fields = set([]) jpayne@68: for feat in self: jpayne@68: if i > n: jpayne@68: break jpayne@68: i += 1 jpayne@68: # TODO: make this more efficient. jpayne@68: fields.update([len(feat.fields)]) jpayne@68: assert len(fields) == 1, fields jpayne@68: return list(fields)[0] jpayne@68: jpayne@68: def each(self, func: Callable, *args, **kwargs) -> BedTool: jpayne@68: """ jpayne@68: Modify each feature with a user-defined function. jpayne@68: jpayne@68: Applies user-defined function *func* to each feature. *func* must jpayne@68: accept an Interval as its first argument; *args and **kwargs will be jpayne@68: passed to *func*. jpayne@68: jpayne@68: *func* must return an Interval object OR a value that evaluates to jpayne@68: False, in which case the original feature will be removed from the jpayne@68: output. This way, an additional "filter" call is not necessary. jpayne@68: jpayne@68: >>> def truncate_feature(feature, limit=0): jpayne@68: ... feature.score = str(len(feature)) jpayne@68: ... if len(feature) > limit: jpayne@68: ... feature.stop = feature.start + limit jpayne@68: ... feature.name = feature.name + '.short' jpayne@68: ... return feature jpayne@68: jpayne@68: >>> a = pybedtools.example_bedtool('a.bed') jpayne@68: >>> b = a.each(truncate_feature, limit=100) jpayne@68: >>> print(b) #doctest: +NORMALIZE_WHITESPACE jpayne@68: chr1 1 100 feature1 99 + jpayne@68: chr1 100 200 feature2 100 + jpayne@68: chr1 150 250 feature3.short 350 - jpayne@68: chr1 900 950 feature4 50 + jpayne@68: jpayne@68: jpayne@68: """ jpayne@68: jpayne@68: def _generator(): jpayne@68: for f in self: jpayne@68: result = func(f, *args, **kwargs) jpayne@68: if result: jpayne@68: yield result jpayne@68: jpayne@68: return BedTool(_generator()) jpayne@68: jpayne@68: def introns(self, gene: str = "gene", exon: str = "exon") -> BedTool: jpayne@68: """ jpayne@68: Create intron features (requires specific input format). jpayne@68: jpayne@68: NOTE: this method assumes a simple file with non-overlapping exons. For jpayne@68: more sophisticated features, consider the gffutils package instead. jpayne@68: jpayne@68: Given a BED12 or a GFF with exons, create a new `BedTool` with just jpayne@68: introns. The output is a bed6 file with the score column (5) being one jpayne@68: of 'intron'/'utr5'/'utr3' jpayne@68: """ jpayne@68: # iterate over all the features in the gene. jpayne@68: s = self.sort() jpayne@68: if self.file_type == "gff": jpayne@68: exon_iter = BedTool((f for f in s if f[2] == exon)).saveas() jpayne@68: gene_iter = BedTool((f for f in s if f[2] == gene)).saveas() jpayne@68: jpayne@68: elif self.file_type == "bed": jpayne@68: if s.field_count() == 12: jpayne@68: exon_iter = s.bed6().saveas() jpayne@68: gene_iter = s.saveas() jpayne@68: else: jpayne@68: # TODO: bed6. groupby on name and find smallest start, jpayne@68: # largest stop. jpayne@68: exon_iter = s jpayne@68: gene_iter = None jpayne@68: raise NotImplementedError( jpayne@68: ".introns() only supported for bed12" "and GFF" jpayne@68: ) jpayne@68: jpayne@68: else: jpayne@68: raise NotImplementedError(".introns() only supported for BED and GFF") jpayne@68: jpayne@68: with open(BedTool._tmp(), "w") as fh: jpayne@68: # group on the name. jpayne@68: exon_intervals = IntervalFile(exon_iter.fn) jpayne@68: for g in gene_iter: jpayne@68: # search finds all, but we just want the ones that completely jpayne@68: # overlap this gene. jpayne@68: exons = [ jpayne@68: e jpayne@68: for e in exon_intervals.search(g, same_strand=True) jpayne@68: if e.start >= g.start and e.end <= g.end jpayne@68: ] jpayne@68: jpayne@68: for i, exon_instance in enumerate(exons): jpayne@68: exon_instance: pybedtools.Interval jpayne@68: # 5' utr between gene start and first intron jpayne@68: if i == 0 and exon_instance.start > g.start: jpayne@68: utr = {"+": "utr5", "-": "utr3"}[g.strand] jpayne@68: print( jpayne@68: "%s\t%i\t%i\t%s\t%s\t%s" jpayne@68: % (g.chrom, g.start, exon_instance.start, g.name, utr, g.strand), jpayne@68: file=fh, jpayne@68: ) jpayne@68: elif i == len(exons) - 1 and exon_instance.end < g.end: jpayne@68: utr = {"+": "utr3", "-": "utr5"}[g.strand] jpayne@68: print( jpayne@68: "%s\t%i\t%i\t%s\t%s\t%s" jpayne@68: % (g.chrom, exon_instance.end, g.end, g.name, utr, g.strand), jpayne@68: file=fh, jpayne@68: ) jpayne@68: elif i != len(exons) - 1: jpayne@68: istart = exon_instance.end jpayne@68: iend = exons[i + 1].start jpayne@68: print( jpayne@68: "%s\t%i\t%i\t%s\tintron\t%s" jpayne@68: % (g.chrom, istart, iend, g.name, g.strand), jpayne@68: file=fh, jpayne@68: ) jpayne@68: return BedTool(fh.name) jpayne@68: jpayne@68: def features(self): jpayne@68: """ jpayne@68: Returns an iterable of features jpayne@68: """ jpayne@68: if hasattr(self, "next") or hasattr(self, "__next__"): jpayne@68: return self jpayne@68: return iter(self) jpayne@68: jpayne@68: FileType = Literal['bed', 'vcf', 'gff', 'bam', 'sam', 'empty'] jpayne@68: jpayne@68: @property jpayne@68: def file_type(self) -> Optional[FileType]: jpayne@68: """ jpayne@68: Return the type of the current file. One of ('bed','vcf','gff', 'bam', jpayne@68: 'sam', 'empty'). jpayne@68: jpayne@68: >>> a = pybedtools.example_bedtool('a.bed') jpayne@68: >>> print(a.file_type) jpayne@68: bed jpayne@68: """ jpayne@68: if not isinstance(self.fn, str): jpayne@68: raise ValueError( jpayne@68: "Checking file_type not supported for " jpayne@68: "non-file BedTools. Use .saveas() to " jpayne@68: "save as a temp file first." jpayne@68: ) jpayne@68: if self._isbam: jpayne@68: self._file_type = "bam" jpayne@68: else: jpayne@68: try: jpayne@68: self._file_type = next(iter(self)).file_type jpayne@68: except StopIteration: jpayne@68: self._file_type = "empty" jpayne@68: jpayne@68: return self._file_type jpayne@68: jpayne@68: def cut(self, indexes: list[int], stream: bool = False) -> BedTool: jpayne@68: """ jpayne@68: Analogous to unix `cut`. jpayne@68: jpayne@68: Similar to unix `cut` except indexes are 0-based, must be a list jpayne@68: and the columns are returned in the order requested. jpayne@68: jpayne@68: This method returns a BedTool of results, which means that the indexes jpayne@68: returned must be valid GFF/GTF/BED/SAM features. jpayne@68: jpayne@68: If you would like arbitrary columns -- say, just chrom and featuretype jpayne@68: of a GFF, which would not comprise a valid feature -- then instead of jpayne@68: this method, simply use indexes on each feature, e.g, jpayne@68: jpayne@68: >>> gff = pybedtools.example_bedtool('d.gff') jpayne@68: >>> results = [(f[0], f[2]) for f in gff] jpayne@68: jpayne@68: In addition, `indexes` can contain keys of the GFF/GTF attributes, in jpayne@68: which case the values are returned. e.g. 'gene_name' will return the jpayne@68: corresponding name from a GTF, or 'start' will return the start jpayne@68: attribute of a BED Interval. jpayne@68: """ jpayne@68: if stream: jpayne@68: return BedTool(([f[attr] for attr in indexes] for f in self)) jpayne@68: else: jpayne@68: with open(self._tmp(), "w") as fh: jpayne@68: for f in self: jpayne@68: print("\t".join(map(str, [f[attr] for attr in indexes])), file=fh) jpayne@68: return BedTool(fh.name) jpayne@68: jpayne@68: @classmethod jpayne@68: def _tmp(cls) -> str: jpayne@68: """ jpayne@68: Makes a tempfile and registers it in the BedTool.TEMPFILES class jpayne@68: variable. Adds a "pybedtools." prefix and ".tmp" extension for easy jpayne@68: deletion if you forget to call pybedtools.cleanup(). jpayne@68: """ jpayne@68: tmpfn = tempfile.NamedTemporaryFile( jpayne@68: prefix=settings.tempfile_prefix, jpayne@68: suffix=settings.tempfile_suffix, jpayne@68: delete=False, jpayne@68: ) jpayne@68: tmpfn = tmpfn.name jpayne@68: cls.TEMPFILES.append(tmpfn) jpayne@68: return tmpfn jpayne@68: jpayne@68: def __iter__(self): jpayne@68: """ jpayne@68: Dispatches the right iterator depending on how this BedTool was jpayne@68: created jpayne@68: """ jpayne@68: if self._isbam: jpayne@68: # Note: BAM class takes filename or stream, so self.fn is OK jpayne@68: # here jpayne@68: return BAM(self.fn) jpayne@68: jpayne@68: # Plain ol' filename jpayne@68: if isinstance(self.fn, str): jpayne@68: if not os.path.exists(self.fn): jpayne@68: raise BedToolsFileError("{0} does not exist".format(self.fn)) jpayne@68: if isGZIP(self.fn): jpayne@68: return IntervalIterator(gzip.open(self.fn, "rt")) jpayne@68: else: jpayne@68: return IntervalIterator(open(self.fn, "r")) jpayne@68: # Any other kind of input (streaming string from stdout; iterable of jpayne@68: # Intervals, iterable of (chrom, start, stop) tuples, etc are handled jpayne@68: # appropriately by IntervalIterator. jpayne@68: else: jpayne@68: return IntervalIterator(self.fn) jpayne@68: jpayne@68: @property jpayne@68: def intervals(self): jpayne@68: if isinstance(self.fn, str): jpayne@68: return IntervalFile(self.fn) jpayne@68: else: jpayne@68: raise ValueError("Please convert to a file-based BedTool using saveas") jpayne@68: jpayne@68: def __repr__(self): jpayne@68: if isinstance(self.fn, str): jpayne@68: if os.path.exists(self.fn) or self.remote: jpayne@68: return "" % self.fn jpayne@68: else: jpayne@68: return "" % self.fn jpayne@68: elif isinstance(self.fn, BedTool): jpayne@68: return repr(self.fn) jpayne@68: else: jpayne@68: return "" % repr(self.fn) jpayne@68: jpayne@68: def __str__(self): jpayne@68: """ jpayne@68: Returns the string representation of the whole `BedTool` jpayne@68: """ jpayne@68: items = [] jpayne@68: for i in iter(self): jpayne@68: i = str(i) jpayne@68: if isinstance(i, bytes): jpayne@68: i = i.decode("UTF-8") jpayne@68: items.append(i) jpayne@68: return "".join(items) jpayne@68: jpayne@68: def __len__(self): jpayne@68: return self.count() jpayne@68: jpayne@68: def __eq__(self, other: object) -> bool: jpayne@68: if isinstance(other, BedTool): jpayne@68: if not isinstance(self.fn, str) or not isinstance( jpayne@68: other.fn, str jpayne@68: ): jpayne@68: raise NotImplementedError( jpayne@68: "Testing equality only supported for" jpayne@68: " BedTools that point to files" jpayne@68: ) jpayne@68: elif not isinstance(other, str): jpayne@68: raise NotImplementedError( jpayne@68: "Testing equality only supported for" jpayne@68: " BedTools that point to files or str of content" jpayne@68: ) jpayne@68: return str(self) == str(other) jpayne@68: jpayne@68: def __ne__(self, other:object): jpayne@68: return not self.__eq__(other) jpayne@68: jpayne@68: def __getitem__(self, key: slice|int): jpayne@68: if isinstance(key, slice): jpayne@68: return islice(self, key.start, key.stop, key.step) jpayne@68: elif isinstance(key, int): jpayne@68: return list(islice(self, key, key + 1))[0] jpayne@68: else: jpayne@68: raise ValueError( jpayne@68: "Only slices or integers allowed for indexing " "into a BedTool" jpayne@68: ) jpayne@68: jpayne@68: def __add__(self, other: BedTool) -> BedTool: jpayne@68: try: jpayne@68: result = self.intersect(other, u=True) jpayne@68: except BEDToolsError as e: jpayne@68: # BEDTools versions <2.20 would raise BEDToolsError jpayne@68: if (self.file_type == "empty") or (other.file_type == "empty"): jpayne@68: result = pybedtools.BedTool("", from_string=True) jpayne@68: else: jpayne@68: raise e jpayne@68: return result jpayne@68: jpayne@68: def __sub__(self, other: BedTool) -> BedTool: jpayne@68: result = None jpayne@68: jpayne@68: try: jpayne@68: result = self.intersect(other, v=True) jpayne@68: except BEDToolsError: jpayne@68: # BEDTools versions <2.20 would raise BEDToolsError jpayne@68: jpayne@68: if (self.file_type == "empty") and (other.file_type == "empty"): jpayne@68: result = pybedtools.BedTool("", from_string=True) jpayne@68: elif other.file_type == "empty": jpayne@68: result = self.saveas() jpayne@68: elif self.file_type == "empty": jpayne@68: result = pybedtools.BedTool("", from_string=True) jpayne@68: if result is None: jpayne@68: raise ValueError("Subtraction operation failed.") jpayne@68: jpayne@68: return result jpayne@68: jpayne@68: def head(self, n: int = 10, as_string: bool = False): jpayne@68: """ jpayne@68: Prints the first *n* lines or returns them if as_string is True jpayne@68: jpayne@68: Note that this only opens the underlying file (gzipped or not), so it jpayne@68: does not check to see if the file is a valid BED file. jpayne@68: jpayne@68: >>> a = pybedtools.example_bedtool('a.bed') jpayne@68: >>> a.head(2) #doctest: +NORMALIZE_WHITESPACE jpayne@68: chr1 1 100 feature1 0 + jpayne@68: chr1 100 200 feature2 0 + jpayne@68: jpayne@68: jpayne@68: """ jpayne@68: if not isinstance(self.fn, str): jpayne@68: raise NotImplementedError( jpayne@68: "head() not supported for non file-based BedTools" jpayne@68: ) jpayne@68: if as_string: jpayne@68: return "".join(str(line) for line in self[:n]) jpayne@68: if self._isbam: jpayne@68: raise NotImplementedError("head() not supported for BAM") jpayne@68: else: jpayne@68: if isGZIP(self.fn): jpayne@68: openfunc = gzip.open jpayne@68: openmode = "rt" jpayne@68: else: jpayne@68: openfunc = open jpayne@68: openmode = "r" jpayne@68: with openfunc(self.fn, openmode) as fin: jpayne@68: for i, line in enumerate(fin): jpayne@68: if i == (n): jpayne@68: break jpayne@68: print(line, end=" ") jpayne@68: jpayne@68: def set_chromsizes(self, chromsizes: str | dict): jpayne@68: """ jpayne@68: Prepare BedTool for operations that require chromosome coords. jpayne@68: jpayne@68: Set the chromsizes for this genome. If *chromsizes* is a string, it jpayne@68: will be considered a genome assembly name. If that assembly name is jpayne@68: not available in pybedtools.genome_registry, then it will be searched jpayne@68: for on the UCSC Genome Browser. jpayne@68: jpayne@68: Example usage: jpayne@68: jpayne@68: >>> hg19 = pybedtools.chromsizes('hg19') jpayne@68: >>> a = pybedtools.example_bedtool('a.bed') jpayne@68: >>> a = a.set_chromsizes(hg19) jpayne@68: >>> print(a.chromsizes['chr1']) jpayne@68: (0, 249250621) jpayne@68: jpayne@68: """ jpayne@68: if isinstance(chromsizes, str): jpayne@68: self.chromsizes = pybedtools.chromsizes(chromsizes) jpayne@68: elif isinstance(chromsizes, dict): jpayne@68: self.chromsizes = chromsizes jpayne@68: else: jpayne@68: raise ValueError( jpayne@68: "Need to specify chromsizes either as a string" jpayne@68: " (assembly name) or a dictionary" jpayne@68: ) jpayne@68: return self jpayne@68: jpayne@68: def _collapse( jpayne@68: self, jpayne@68: iterable: Iterable, jpayne@68: fn: Optional[str] = None, jpayne@68: trackline: Optional[str] = None, jpayne@68: in_compressed: bool = False, jpayne@68: out_compressed: bool = False, jpayne@68: ) -> str: jpayne@68: """ jpayne@68: Collapses an iterable into file *fn* (or a new tempfile if *fn* is jpayne@68: None). jpayne@68: jpayne@68: Returns the newly created filename. jpayne@68: jpayne@68: Parameters jpayne@68: ---------- jpayne@68: jpayne@68: iterable : iter jpayne@68: Any iterable object whose items can be converted to an Interval. jpayne@68: jpayne@68: fn : str jpayne@68: Output filename, if None then creates a temp file for output jpayne@68: jpayne@68: trackline : str jpayne@68: If not None, string to be added to the top of the output. Newline jpayne@68: will be added. jpayne@68: jpayne@68: in_compressed : bool jpayne@68: Indicates whether the input is compressed jpayne@68: jpayne@68: out_compressed : bool jpayne@68: Indicates whether the output should be compressed jpayne@68: """ jpayne@68: if fn is None: jpayne@68: fn = self._tmp() jpayne@68: jpayne@68: in_open_func = gzip.open if in_compressed else open jpayne@68: out_open_func = gzip.open if out_compressed else open jpayne@68: jpayne@68: # special case: if BAM-format BedTool is provided, no trackline should jpayne@68: # be supplied, and don't iterate -- copy the file wholesale jpayne@68: if isinstance(iterable, BedTool) and iterable._isbam: jpayne@68: if trackline: jpayne@68: raise ValueError( jpayne@68: "trackline provided, but input is a BAM " jpayne@68: "file, which takes no track line" jpayne@68: ) jpayne@68: with open(fn, "wb") as out_: jpayne@68: out_.write(open(self.fn, "rb").read()) jpayne@68: return fn jpayne@68: jpayne@68: # If we're just working with filename-based BedTool objects, just copy jpayne@68: # the files directly jpayne@68: if isinstance(iterable, BedTool) and isinstance(iterable.fn, str): jpayne@68: with out_open_func(fn, "wt") as out_: jpayne@68: if sys.version_info > (3,0): jpayne@68: in_ = in_open_func(iterable.fn, "rt", errors="ignore") jpayne@68: else: jpayne@68: in_ = in_open_func(iterable.fn, "rt") jpayne@68: if trackline: jpayne@68: out_.write(trackline.strip() + "\n") jpayne@68: out_.writelines(in_) jpayne@68: in_.close() jpayne@68: else: jpayne@68: with out_open_func(fn, "wt") as out_: jpayne@68: for i in iterable: jpayne@68: if isinstance(i, (list, tuple)): jpayne@68: i = create_interval_from_list(list(i)) jpayne@68: out_.write(str(i)) jpayne@68: return fn jpayne@68: jpayne@68: def handle_kwargs(self, prog:str, arg_order: Optional[list[str]] = None, **kwargs): jpayne@68: """ jpayne@68: Handle most cases of BEDTool program calls, but leave the specifics jpayne@68: up to individual methods. jpayne@68: jpayne@68: *prog* is a BEDTools program name, e.g., 'intersectBed'. jpayne@68: jpayne@68: *arg_order* lists any arguments that are sensitive to order. Everything jpayne@68: else will be reverse-sorted. jpayne@68: jpayne@68: *kwargs* are passed directly from the calling method (like jpayne@68: self.intersect). jpayne@68: jpayne@68: This method figures out, given how this BedTool was constructed, what jpayne@68: to send to BEDTools programs -- for example, an open file to stdin with jpayne@68: the `-` argument, or a filename with the `-a` argument. jpayne@68: """ jpayne@68: pybedtools.logger.debug( jpayne@68: "BedTool.handle_kwargs() got these kwargs:\n%s", pprint.pformat(kwargs) jpayne@68: ) jpayne@68: jpayne@68: # If you pass in a list, how should it be converted to a BedTools arg? jpayne@68: default_list_delimiter = " " jpayne@68: list_delimiters = { jpayne@68: "annotateBed": " ", jpayne@68: "getOverlap": ",", jpayne@68: "groupBy": ",", jpayne@68: "multiIntersectBed": " ", jpayne@68: "mergeBed": ",", jpayne@68: "intersectBed": " ", jpayne@68: "mapBed": ",", jpayne@68: } jpayne@68: stdin = None jpayne@68: jpayne@68: # ----------------------------------------------------------------- jpayne@68: # Decide how to send instream1 to BEDTools. If there's no implicit jpayne@68: # instream1 arg, then do nothing. jpayne@68: # jpayne@68: try: jpayne@68: # e.g., 'a' for intersectBed jpayne@68: if self._isbam: jpayne@68: inarg1 = _bam_registry[prog] jpayne@68: else: jpayne@68: inarg1 = _implicit_registry[prog] jpayne@68: jpayne@68: # e.g., self.fn or 'a.bed' or an iterator... jpayne@68: instream1 = kwargs[inarg1] jpayne@68: jpayne@68: # If it's a BedTool, then get underlying stream jpayne@68: if isinstance(instream1, BedTool): jpayne@68: instream1 = instream1.fn jpayne@68: jpayne@68: # Filename? No pipe, just provide the file jpayne@68: if isinstance(instream1, str): jpayne@68: kwargs[inarg1] = instream1 jpayne@68: stdin = None jpayne@68: jpayne@68: # Open file? Pipe it jpayne@68: # elif isinstance(instream1, file): jpayne@68: # kwargs[inarg1] = 'stdin' jpayne@68: # stdin = instream1 jpayne@68: jpayne@68: # A generator or iterator: pipe it as a generator of lines jpayne@68: else: jpayne@68: kwargs[inarg1] = "stdin" jpayne@68: stdin = (str(i) for i in instream1) jpayne@68: except KeyError: jpayne@68: pass jpayne@68: jpayne@68: # ----------------------------------------------------------------- jpayne@68: # Decide how to send instream2 to BEDTools. jpayne@68: try: jpayne@68: # e.g., 'b' for intersectBed jpayne@68: inarg2 = _other_registry[prog] jpayne@68: jpayne@68: # e.g., another BedTool jpayne@68: instream2 = kwargs[inarg2] jpayne@68: jpayne@68: # Get stream if BedTool jpayne@68: if isinstance(instream2, BedTool): jpayne@68: instream2 = instream2.fn jpayne@68: jpayne@68: # Filename jpayne@68: if isinstance(instream2, str): jpayne@68: kwargs[inarg2] = instream2 jpayne@68: jpayne@68: # If it's a list of strings, then we need to figure out if it's jpayne@68: # a list of filenames or a list of intervals (see issue #156) jpayne@68: # jpayne@68: # Several options: jpayne@68: # jpayne@68: # - assume intervals have tabs but filenames don't jpayne@68: # - assume that, upon being split on tabs, an interval is >=3 fields jpayne@68: # - try creating an interval out of the first thing, success means interval jpayne@68: # jpayne@68: # The last seems the most robust. It does allow filenames with jpayne@68: # tabs; deciding whether or not such filenames are a good idea is jpayne@68: # left to the user. jpayne@68: # jpayne@68: elif isinstance(instream2, (list, tuple)) and isinstance( jpayne@68: instream2[0], str jpayne@68: ): jpayne@68: try: jpayne@68: _ = create_interval_from_list(instream2[0].split("\t")) jpayne@68: kwargs[inarg2] = self._collapse(instream2) jpayne@68: except IndexError: jpayne@68: kwargs[inarg2] = instream2 jpayne@68: jpayne@68: # Otherwise we need to collapse it in order to send to BEDTools jpayne@68: # programs jpayne@68: else: jpayne@68: kwargs[inarg2] = self._collapse(instream2) jpayne@68: jpayne@68: except KeyError: jpayne@68: pass jpayne@68: jpayne@68: # If stream not specified, then a tempfile will be created jpayne@68: if kwargs.pop("stream", None): jpayne@68: tmp = None jpayne@68: else: jpayne@68: output = kwargs.pop("output", None) jpayne@68: if output: jpayne@68: tmp = output jpayne@68: else: jpayne@68: tmp = self._tmp() jpayne@68: jpayne@68: additional_args = kwargs.pop("additional_args", None) jpayne@68: jpayne@68: # Parse the kwargs into BEDTools-ready args jpayne@68: cmds = [prog] jpayne@68: jpayne@68: # arg_order mechanism added to fix #345 jpayne@68: if arg_order is None: jpayne@68: arg_order = [] jpayne@68: jpayne@68: for arg in arg_order: jpayne@68: if arg in kwargs: jpayne@68: val = kwargs.pop(arg) jpayne@68: cmds.append("-" + arg) jpayne@68: cmds.append(val) jpayne@68: jpayne@68: # The reverse-sort is a temp fix for issue #81 jpayne@68: for key, value in sorted(list(kwargs.items()), reverse=True): jpayne@68: if isinstance(value, bool): jpayne@68: if value: jpayne@68: cmds.append("-" + key) jpayne@68: else: jpayne@68: continue jpayne@68: elif isinstance(value, list) or isinstance(value, tuple): jpayne@68: value = list(map(str, value)) jpayne@68: try: jpayne@68: delim = list_delimiters[prog] jpayne@68: except KeyError: jpayne@68: delim = default_list_delimiter jpayne@68: jpayne@68: if delim == " ": jpayne@68: cmds.append("-" + key) jpayne@68: cmds.extend(value) jpayne@68: jpayne@68: # make comma-separated list if that's what's needed jpayne@68: else: jpayne@68: cmds.append("-" + key) jpayne@68: cmds.append(delim.join(value)) jpayne@68: jpayne@68: else: jpayne@68: cmds.append("-" + key) jpayne@68: cmds.append(str(value)) jpayne@68: jpayne@68: if additional_args: jpayne@68: cmds.append(additional_args) jpayne@68: jpayne@68: return cmds, tmp, stdin jpayne@68: jpayne@68: def check_genome(self, **kwargs): jpayne@68: """ jpayne@68: Handles the different ways of specifying a genome in kwargs: jpayne@68: jpayne@68: g='genome.file' specifies a file directly jpayne@68: genome='dm3' gets the file from genome registry jpayne@68: self.chromsizes could be a dict.\ jpayne@68: """ jpayne@68: jpayne@68: # If both g and genome are missing, assume self.chromsizes jpayne@68: if ("g" not in kwargs) and ("genome" not in kwargs): jpayne@68: if hasattr(self, "chromsizes"): jpayne@68: kwargs["g"] = self.chromsizes jpayne@68: else: jpayne@68: raise ValueError( jpayne@68: 'No genome specified. Use the "g" or ' jpayne@68: '"genome" kwargs, or use the ' jpayne@68: ".set_chromsizes() method" jpayne@68: ) jpayne@68: jpayne@68: # If both specified, rather than make an implicit decision, raise an jpayne@68: # exception jpayne@68: if "g" in kwargs and "genome" in kwargs: jpayne@68: raise ValueError('Cannot specify both "g" and "genome"') jpayne@68: jpayne@68: # Something like genome='dm3' was specified jpayne@68: if "g" not in kwargs and "genome" in kwargs: jpayne@68: if isinstance(kwargs["genome"], dict): jpayne@68: genome_dict = kwargs["genome"] jpayne@68: else: jpayne@68: genome_dict = pybedtools.chromsizes(kwargs["genome"]) jpayne@68: genome_file = pybedtools.chromsizes_to_file(genome_dict) jpayne@68: kwargs["g"] = genome_file jpayne@68: del kwargs["genome"] jpayne@68: jpayne@68: # By the time we get here, 'g' is specified. jpayne@68: jpayne@68: # If a dict was provided, convert to tempfile here jpayne@68: if isinstance(kwargs["g"], dict): jpayne@68: kwargs["g"] = pybedtools.chromsizes_to_file(kwargs["g"]) jpayne@68: jpayne@68: if not os.path.exists(kwargs["g"]): jpayne@68: msg = 'Genome file "%s" does not exist' % (kwargs["g"]) jpayne@68: raise FileNotFoundError(msg) jpayne@68: jpayne@68: return kwargs jpayne@68: jpayne@68: @_log_to_history jpayne@68: def remove_invalid(self): jpayne@68: """ jpayne@68: Remove invalid features that may break BEDTools programs. jpayne@68: jpayne@68: >>> a = pybedtools.BedTool("chr1 10 100\\nchr1 10 1", jpayne@68: ... from_string=True) jpayne@68: >>> print(a.remove_invalid()) #doctest: +NORMALIZE_WHITESPACE jpayne@68: chr1 10 100 jpayne@68: jpayne@68: jpayne@68: """ jpayne@68: tmp = self._tmp() jpayne@68: jpayne@68: # If it's a file-based BedTool -- which is likely, if we're trying to jpayne@68: # remove invalid features -- then we need to parse it line by line. jpayne@68: if isinstance(self.fn, str): jpayne@68: i = IntervalIterator(open(self.fn, "r")) jpayne@68: else: jpayne@68: tmp = self.saveas() jpayne@68: i = IntervalIterator(open(tmp.fn, "r")) jpayne@68: jpayne@68: def _generator(): jpayne@68: while True: jpayne@68: try: jpayne@68: feature = next(i) jpayne@68: if feature.start <= feature.stop: jpayne@68: yield feature jpayne@68: else: jpayne@68: continue jpayne@68: except pybedtools.MalformedBedLineError: jpayne@68: continue jpayne@68: except OverflowError: jpayne@68: # This can happen if coords are negative jpayne@68: continue jpayne@68: except IndexError: jpayne@68: continue jpayne@68: except StopIteration: jpayne@68: break jpayne@68: jpayne@68: return BedTool(_generator()) jpayne@68: jpayne@68: def all_hits(self, interval: Interval, same_strand: bool = False, overlap: float = 0.0): jpayne@68: """ jpayne@68: Return all intervals that overlap `interval`. jpayne@68: jpayne@68: Calls the `all_hits` method of an IntervalFile to return all intervals jpayne@68: in this current BedTool that overlap `interval`. jpayne@68: jpayne@68: Require that overlaps have the same strand with same_strand=True. jpayne@68: jpayne@68: Notes: jpayne@68: If this current BedTool is generator-based, it will be jpayne@68: converted into a file first. jpayne@68: jpayne@68: If this current BedTool refers to a BAM file, it will be jpayne@68: converted to a BED file first using default arguments. If you jpayne@68: don't want this to happen, please convert to BED first before jpayne@68: using this method. jpayne@68: """ jpayne@68: if not isinstance(interval, Interval): jpayne@68: raise ValueError("Need an Interval instance") jpayne@68: fn = self.fn jpayne@68: if not isinstance(fn, str): jpayne@68: fn = self.saveas().fn jpayne@68: if self._isbam: jpayne@68: fn = self.bam_to_bed().fn jpayne@68: interval_file = pybedtools.IntervalFile(fn) jpayne@68: return interval_file.all_hits(interval, same_strand, overlap) jpayne@68: jpayne@68: def any_hits(self, interval: Interval, same_strand: bool = False, overlap: float=0.0): jpayne@68: """ jpayne@68: Return whether or not any intervals overlap `interval`. jpayne@68: jpayne@68: Calls the `any_hits` method of an IntervalFile. If there were any hits jpayne@68: within `interval` in this BedTool, then return 1; otherwise 0. jpayne@68: jpayne@68: Require that overlaps have the same strand with same_strand=True. jpayne@68: jpayne@68: Notes: jpayne@68: If this current BedTool is generator-based, it will be jpayne@68: converted into a file first. jpayne@68: jpayne@68: If this current BedTool refers to a BAM file, it will be jpayne@68: converted to a BED file first using default arguments. If you jpayne@68: don't want this to happen, please convert to BED first before jpayne@68: using this method. jpayne@68: """ jpayne@68: if not isinstance(interval, Interval): jpayne@68: raise ValueError("Need an Interval instance") jpayne@68: fn = self.fn jpayne@68: if not isinstance(fn, str): jpayne@68: fn = self.saveas().fn jpayne@68: if self._isbam: jpayne@68: fn = self.bam_to_bed().fn jpayne@68: interval_file = pybedtools.IntervalFile(fn) jpayne@68: return interval_file.any_hits(interval, same_strand, overlap) jpayne@68: jpayne@68: def count_hits(self, interval: Interval, same_strand: bool = False, overlap: float=0.0) -> int: jpayne@68: """ jpayne@68: Return the number of intervals that overlap `interval`. jpayne@68: jpayne@68: Calls the `count_hits` method of an IntervalFile. Returns the number jpayne@68: of valid hits in this BedTool that overlap `interval`. jpayne@68: jpayne@68: Require that overlaps have the same strand with same_strand=True. jpayne@68: jpayne@68: Notes: jpayne@68: If this current BedTool is generator-based, it will be jpayne@68: converted into a file first. jpayne@68: jpayne@68: If this current BedTool refers to a BAM file, it will be jpayne@68: converted to a BED file first using default arguments. If you jpayne@68: don't want this to happen, please convert to BED first before jpayne@68: using this method. jpayne@68: """ jpayne@68: if not isinstance(interval, Interval): jpayne@68: raise ValueError("Need an Interval instance") jpayne@68: fn = self.fn jpayne@68: if not isinstance(fn, str): jpayne@68: fn = self.saveas().fn jpayne@68: if self._isbam: jpayne@68: fn = self.bam_to_bed().fn jpayne@68: interval_file = pybedtools.IntervalFile(fn) jpayne@68: return interval_file.count_hits(interval, same_strand, overlap) jpayne@68: jpayne@68: @_log_to_history jpayne@68: @_wraps(prog="bed12ToBed6", implicit="i", bam=None, other=None) jpayne@68: def bed6(self, *args, **kwargs) -> BedTool: # type: ignore jpayne@68: """ jpayne@68: Wraps `bedtools bed12tobed6`. jpayne@68: """ jpayne@68: jpayne@68: # Alias for backward compatibility jpayne@68: bed12tobed6 = bed6 jpayne@68: jpayne@68: @_log_to_history jpayne@68: @_wraps(prog="bamToBed", implicit="i", other=None, nonbam="ALL", bam="i") jpayne@68: def bam_to_bed(self, *args, **kwargs) -> BedTool: # type: ignore jpayne@68: """ jpayne@68: Wraps `bedtools bamtobed`. jpayne@68: """ jpayne@68: jpayne@68: # Alias for backward compatibility jpayne@68: bamtobed = bam_to_bed jpayne@68: jpayne@68: @_wraps(prog="bedToBam", implicit="i", uses_genome=True, force_bam=True) jpayne@68: def _bed_to_bam(self, *args, **kwargs): jpayne@68: """ jpayne@68: Wraps bedToBam and is called internally for BED/GFF/VCF files by jpayne@68: self.to_bam (which needs to do something different for SAM files...) jpayne@68: """ jpayne@68: jpayne@68: @_log_to_history jpayne@68: def to_bam(self, **kwargs): jpayne@68: """ jpayne@68: Wraps `bedtools bedtobam` jpayne@68: jpayne@68: If self.fn is in BED/VCF/GFF format, call BEDTools' bedToBam. If jpayne@68: self.fn is in SAM format, then create a header out of the genome file jpayne@68: and then convert using `samtools`. jpayne@68: """ jpayne@68: if self.file_type == "bam": jpayne@68: return self jpayne@68: if self.file_type in ("bed", "gff", "vcf"): jpayne@68: return self._bed_to_bam(**kwargs) jpayne@68: jpayne@68: # TODO: to maintain backwards compatibility we go from Interval to jpayne@68: # AlignedSegment. jpayne@68: if self.file_type == "sam": jpayne@68: jpayne@68: # Use pysam, but construct the header out of a provided genome jpayne@68: # file. jpayne@68: jpayne@68: # construct a genome out of whatever kwargs were passed in jpayne@68: kwargs = self.check_genome(**kwargs) jpayne@68: jpayne@68: # Build a header that we can use for the output BAM file. jpayne@68: genome = dict(i.split() for i in open(kwargs["g"])) jpayne@68: SQ = [] jpayne@68: ref_ids = {} jpayne@68: text_header = ["@HD\tVN:1.0"] jpayne@68: jpayne@68: for i, (k, v) in enumerate(genome.items()): jpayne@68: SQ.append(dict(SN=k, LN=int(v))) jpayne@68: ref_ids[k] = i jpayne@68: text_header.append("@SQ\tSN:{0}\tLN:{1}".format(k, v)) jpayne@68: jpayne@68: # And the text-format header jpayne@68: text_header = "\n".join(text_header) + "\n" jpayne@68: jpayne@68: # The strategy is to write an actual SAM file to disk, along with jpayne@68: # a header, and then read that back in. jpayne@68: # jpayne@68: # Painfully inefficient, but this will change once all py2 tests jpayne@68: # pass. jpayne@68: sam_tmp = self._tmp() jpayne@68: bam_tmp = self._tmp() jpayne@68: with open(sam_tmp, "w") as fout: jpayne@68: fout.write(text_header) jpayne@68: for interval in self: jpayne@68: fout.write("\t".join(map(str, interval.fields)) + "\n") jpayne@68: jpayne@68: samfile = pysam.AlignmentFile(sam_tmp, "r") jpayne@68: bamfile = pysam.AlignmentFile(bam_tmp, "wb", template=samfile) jpayne@68: for alignment in samfile: jpayne@68: bamfile.write(alignment) jpayne@68: jpayne@68: samfile.close() jpayne@68: bamfile.close() jpayne@68: new_bedtool = BedTool(bam_tmp) jpayne@68: new_bedtool._isbam = True jpayne@68: return new_bedtool jpayne@68: jpayne@68: # Alias for backward compatibility jpayne@68: bedtobam = to_bam jpayne@68: jpayne@68: @_log_to_history jpayne@68: @_wraps(prog="intersectBed", implicit="a", other="b", bam="abam", jpayne@68: nonbam="bed", arg_order=["a", "abam"]) jpayne@68: def intersect(self, *args, **kwargs) -> BedTool: # type: ignore jpayne@68: """ jpayne@68: Wraps `bedtools intersect`. jpayne@68: """ jpayne@68: jpayne@68: @_log_to_history jpayne@68: @_wraps( jpayne@68: prog="fastaFromBed", jpayne@68: implicit="bed", jpayne@68: bam=None, jpayne@68: other="fi", jpayne@68: make_tempfile_for="fo", jpayne@68: check_stderr=_check_sequence_stderr, jpayne@68: add_to_bedtool={"fo": "seqfn"}, jpayne@68: ) jpayne@68: def sequence(self, *args, **kwargs) -> BedTool: # type: ignore jpayne@68: ''' jpayne@68: Wraps `bedtools getfasta`. jpayne@68: jpayne@68: *fi* is passed in by the user; *bed* is automatically passed in as the jpayne@68: bedfile of this object; *fo* by default is a temp file. Use jpayne@68: save_seqs() to save as a file. jpayne@68: jpayne@68: The end result is that this BedTool will assign a value to the attribute , self.seqfn, jpayne@68: that points to the new fasta file. jpayne@68: jpayne@68: Example usage: jpayne@68: jpayne@68: >>> a = pybedtools.BedTool(""" jpayne@68: ... chr1 1 10 jpayne@68: ... chr1 50 55""", from_string=True) jpayne@68: >>> fasta = pybedtools.example_filename('test.fa') jpayne@68: >>> a = a.sequence(fi=fasta) jpayne@68: >>> print(open(a.seqfn).read()) jpayne@68: >chr1:1-10 jpayne@68: GATGAGTCT jpayne@68: >chr1:50-55 jpayne@68: CCATC jpayne@68: jpayne@68: jpayne@68: ''' jpayne@68: jpayne@68: # Alias for backwards compatibility jpayne@68: getfasta = sequence jpayne@68: jpayne@68: @staticmethod jpayne@68: def seq(loc, fasta) -> str: jpayne@68: """ jpayne@68: Return just the sequence from a region string or a single location jpayne@68: >>> fn = pybedtools.example_filename('test.fa') jpayne@68: >>> BedTool.seq('chr1:2-10', fn) jpayne@68: 'GATGAGTCT' jpayne@68: >>> BedTool.seq(('chr1', 1, 10), fn) jpayne@68: 'GATGAGTCT' jpayne@68: """ jpayne@68: if isinstance(loc, str): jpayne@68: chrom, start_end = loc.split(":") jpayne@68: start, end = list(map(int, start_end.split("-"))) jpayne@68: start -= 1 jpayne@68: else: jpayne@68: chrom, start, end = loc[0], loc[1], loc[2] jpayne@68: jpayne@68: loc = BedTool("%s\t%i\t%i" % (chrom, start, end), from_string=True) jpayne@68: lseq = loc.sequence(fi=fasta) jpayne@68: return "".join([l.rstrip() for l in open(lseq.seqfn, "r") if l[0] != ">"]) jpayne@68: jpayne@68: @_log_to_history jpayne@68: @_wraps( jpayne@68: prog="nucBed", implicit="bed", other="fi", check_stderr=_check_sequence_stderr jpayne@68: ) jpayne@68: def nucleotide_content(self) -> BedTool: # type: ignore jpayne@68: """ jpayne@68: Wraps `bedtools nuc`. jpayne@68: jpayne@68: Profiles nucleotide content. The returned BED file contains extra jpayne@68: information about the nucleotide content jpayne@68: """ jpayne@68: jpayne@68: # Alias for backwards compatibility jpayne@68: nuc = nucleotide_content jpayne@68: jpayne@68: @_log_to_history jpayne@68: @_wraps(prog="multiBamCov", implicit="bed") jpayne@68: def multi_bam_coverage(self) -> BedTool: # type: ignore jpayne@68: """ jpayne@68: Wraps `bedtools multicov`. jpayne@68: jpayne@68: Pass a list of sorted and indexed BAM files as `bams` jpayne@68: """ jpayne@68: jpayne@68: # Alias for backwards compatibility jpayne@68: multicov = multi_bam_coverage jpayne@68: jpayne@68: @_log_to_history jpayne@68: @_wraps(prog="subtractBed", implicit="a", other="b", bam=None) jpayne@68: def subtract(self, *args, **kwargs) -> BedTool: # type: ignore jpayne@68: """ jpayne@68: Wraps `bedtools subtract`. jpayne@68: jpayne@68: Subtracts from another BED file and returns a new BedTool object. jpayne@68: jpayne@68: Example usage: jpayne@68: jpayne@68: >>> a = pybedtools.example_bedtool('a.bed') jpayne@68: >>> b = pybedtools.example_bedtool('b.bed') jpayne@68: jpayne@68: Do a "stranded" subtraction: jpayne@68: jpayne@68: >>> c = a.subtract(b, s=True) jpayne@68: jpayne@68: Require 50% of features in `a` to overlap: jpayne@68: jpayne@68: >>> c = a.subtract(b, f=0.5) jpayne@68: """ jpayne@68: if "a" not in kwargs: jpayne@68: kwargs["a"] = self.fn jpayne@68: jpayne@68: if "b" not in kwargs: jpayne@68: if len(args) > 0: jpayne@68: kwargs["b"] = args[0] jpayne@68: else: jpayne@68: raise ValueError("Must specify a BED file to subtract, either as a positional argument or as the 'b' keyword argument.") jpayne@68: jpayne@68: cmds, tmp, stdin = self.handle_kwargs(prog="subtractBed", **kwargs) jpayne@68: stream = call_bedtools(cmds, tmp, stdin=stdin) jpayne@68: return BedTool(stream) jpayne@68: jpayne@68: @_log_to_history jpayne@68: @_wraps(prog="slopBed", implicit="i", other=None, bam=None, uses_genome=True) jpayne@68: def slop(self, *args, **kwargs) -> BedTool: # type: ignore jpayne@68: """ jpayne@68: Wraps `bedtools slop`. jpayne@68: """ jpayne@68: jpayne@68: @_log_to_history jpayne@68: @_wraps(prog="shiftBed", implicit="i", other=None, bam=None, uses_genome=True) jpayne@68: def shift(self, *args, **kwargs) -> BedTool: # type: ignore jpayne@68: """ jpayne@68: Wraps `bedtools shift`. jpayne@68: jpayne@68: Shift each feature by user-defined number of bases. Returns a new BedTool object. jpayne@68: jpayne@68: Example usage: jpayne@68: jpayne@68: >>> a = pybedtools.example_bedtool('a.bed') jpayne@68: jpayne@68: Shift every feature by 5bp: jpayne@68: jpayne@68: >>> b = a.shift(genome='hg19', s=5) jpayne@68: >>> print(b) #doctest: +NORMALIZE_WHITESPACE jpayne@68: chr1 6 105 feature1 0 + jpayne@68: chr1 105 205 feature2 0 + jpayne@68: chr1 155 505 feature3 0 - jpayne@68: chr1 905 955 feature4 0 + jpayne@68: jpayne@68: jpayne@68: Shift features on the '+' strand by -1bp and on '-' strand by +3bp: jpayne@68: jpayne@68: >>> b = a.shift(genome='hg19', p=-1, m=3) jpayne@68: >>> print(b) #doctest: +NORMALIZE_WHITESPACE jpayne@68: chr1 0 99 feature1 0 + jpayne@68: chr1 99 199 feature2 0 + jpayne@68: chr1 153 503 feature3 0 - jpayne@68: chr1 899 949 feature4 0 + jpayne@68: jpayne@68: jpayne@68: # Disabling, see https://github.com/arq5x/bedtools2/issues/807 jpayne@68: Shift features by a fraction of their length (0.50): jpayne@68: jpayne@68: #>>> b = a.shift(genome='hg19', pct=True, s=0.50) jpayne@68: #>>> print(b) #doctest: +NORMALIZE_WHITESPACE jpayne@68: #chr1 50 149 feature1 0 + jpayne@68: #chr1 150 250 feature2 0 + jpayne@68: #chr1 325 675 feature3 0 - jpayne@68: #chr1 925 975 feature4 0 + jpayne@68: # jpayne@68: jpayne@68: """ jpayne@68: jpayne@68: @_log_to_history jpayne@68: @_wraps(prog="mergeBed", implicit="i", other=None, bam=None) jpayne@68: def merge(self, *args, **kwargs) -> BedTool: # type: ignore jpayne@68: """ jpayne@68: Wraps `bedtools merge`. jpayne@68: jpayne@68: Merge overlapping features together. Returns a new BedTool object. jpayne@68: jpayne@68: Example usage: jpayne@68: jpayne@68: >>> a = pybedtools.example_bedtool('a.bed') jpayne@68: jpayne@68: Merge: jpayne@68: jpayne@68: >>> c = a.merge() jpayne@68: jpayne@68: Allow merging of features 500 bp apart: jpayne@68: jpayne@68: >>> c = a.merge(d=500) jpayne@68: jpayne@68: """ jpayne@68: jpayne@68: @_log_to_history jpayne@68: @_wraps(prog="closestBed", implicit="a", other="b", bam=None) jpayne@68: def closest(self, *args, **kwargs) -> BedTool: # type: ignore jpayne@68: """ jpayne@68: Wraps `bedtools closest`. jpayne@68: jpayne@68: Return a new BedTool object containing closest features in *b*. Note jpayne@68: that the resulting file is no longer a valid BED format; use the jpayne@68: special "_closest" methods to work with the resulting file. jpayne@68: jpayne@68: Example usage:: jpayne@68: jpayne@68: a = BedTool('in.bed') jpayne@68: jpayne@68: # get the closest feature in 'other.bed' on the same strand jpayne@68: b = a.closest('other.bed', s=True) jpayne@68: jpayne@68: """ jpayne@68: jpayne@68: @_log_to_history jpayne@68: @_wraps(prog="windowBed", implicit="a", other="b", bam="abam", nonbam="bed") jpayne@68: def window(self, *args, **kwargs) -> BedTool: # type: ignore jpayne@68: """ jpayne@68: Wraps `bedtools window`. jpayne@68: jpayne@68: Example usage:: jpayne@68: jpayne@68: >>> a = pybedtools.example_bedtool('a.bed') jpayne@68: >>> b = pybedtools.example_bedtool('b.bed') jpayne@68: >>> print(a.window(b, w=1000)) #doctest: +NORMALIZE_WHITESPACE jpayne@68: chr1 1 100 feature1 0 + chr1 155 200 feature5 0 - jpayne@68: chr1 1 100 feature1 0 + chr1 800 901 feature6 0 + jpayne@68: chr1 100 200 feature2 0 + chr1 155 200 feature5 0 - jpayne@68: chr1 100 200 feature2 0 + chr1 800 901 feature6 0 + jpayne@68: chr1 150 500 feature3 0 - chr1 155 200 feature5 0 - jpayne@68: chr1 150 500 feature3 0 - chr1 800 901 feature6 0 + jpayne@68: chr1 900 950 feature4 0 + chr1 155 200 feature5 0 - jpayne@68: chr1 900 950 feature4 0 + chr1 800 901 feature6 0 + jpayne@68: jpayne@68: """ jpayne@68: jpayne@68: @_log_to_history jpayne@68: @_wraps(prog="shuffleBed", implicit="i", other=None, bam=None, uses_genome=True) jpayne@68: def shuffle(self, *args, **kwargs) -> BedTool: # type: ignore jpayne@68: """ jpayne@68: Wraps `bedtools shuffle`. jpayne@68: jpayne@68: Example usage: jpayne@68: jpayne@68: >>> a = pybedtools.example_bedtool('a.bed') jpayne@68: >>> seed = 1 # so this test always returns the same results jpayne@68: >>> b = a.shuffle(genome='hg19', chrom=True, seed=seed) jpayne@68: >>> print(b) #doctest: +NORMALIZE_WHITESPACE jpayne@68: chr1 123081365 123081464 feature1 0 + jpayne@68: chr1 243444570 243444670 feature2 0 + jpayne@68: chr1 194620241 194620591 feature3 0 - jpayne@68: chr1 172792873 172792923 feature4 0 + jpayne@68: jpayne@68: """ jpayne@68: jpayne@68: @_log_to_history jpayne@68: @_wraps(prog="sortBed", implicit="i", uses_genome=True, genome_if=["g", "genome"]) jpayne@68: def sort(self, *args, **kwargs) -> BedTool: # type: ignore jpayne@68: """ jpayne@68: Wraps `bedtools sort`. jpayne@68: jpayne@68: Note that chromosomes are sorted lexicographically, so chr12 will come jpayne@68: before chr9. jpayne@68: jpayne@68: Example usage: jpayne@68: jpayne@68: >>> a = pybedtools.BedTool(''' jpayne@68: ... chr9 300 400 jpayne@68: ... chr1 100 200 jpayne@68: ... chr1 1 50 jpayne@68: ... chr12 1 100 jpayne@68: ... chr9 500 600 jpayne@68: ... ''', from_string=True) jpayne@68: >>> print(a.sort()) #doctest: +NORMALIZE_WHITESPACE jpayne@68: chr1 1 50 jpayne@68: chr1 100 200 jpayne@68: chr12 1 100 jpayne@68: chr9 300 400 jpayne@68: chr9 500 600 jpayne@68: jpayne@68: """ jpayne@68: jpayne@68: @_log_to_history jpayne@68: @_wraps(prog="annotateBed", implicit="i") jpayne@68: def annotate(self, *args, **kwargs) -> BedTool: # type: ignore jpayne@68: """ jpayne@68: Wraps `bedtools annotate`. jpayne@68: jpayne@68: Annotate this BedTool with a list of other files. jpayne@68: Example usage: jpayne@68: jpayne@68: >>> a = pybedtools.example_bedtool('a.bed') jpayne@68: >>> b_fn = pybedtools.example_filename('b.bed') jpayne@68: >>> print(a.annotate(files=b_fn)) #doctest: +NORMALIZE_WHITESPACE jpayne@68: chr1 1 100 feature1 0 + 0.000000 jpayne@68: chr1 100 200 feature2 0 + 0.450000 jpayne@68: chr1 150 500 feature3 0 - 0.128571 jpayne@68: chr1 900 950 feature4 0 + 0.020000 jpayne@68: jpayne@68: """ jpayne@68: jpayne@68: @_log_to_history jpayne@68: @_wraps(prog="flankBed", implicit="i", uses_genome=True) jpayne@68: def flank(self, *args, **kwargs) -> BedTool: jpayne@68: """ jpayne@68: Wraps `bedtools flank`. jpayne@68: jpayne@68: Example usage: jpayne@68: jpayne@68: >>> a = pybedtools.example_bedtool('a.bed') jpayne@68: >>> print(a.flank(genome='hg19', b=100)) #doctest: +NORMALIZE_WHITESPACE jpayne@68: chr1 0 1 feature1 0 + jpayne@68: chr1 100 200 feature1 0 + jpayne@68: chr1 0 100 feature2 0 + jpayne@68: chr1 200 300 feature2 0 + jpayne@68: chr1 50 150 feature3 0 - jpayne@68: chr1 500 600 feature3 0 - jpayne@68: chr1 800 900 feature4 0 + jpayne@68: chr1 950 1050 feature4 0 + jpayne@68: jpayne@68: jpayne@68: """ jpayne@68: kwargs = self.check_genome(**kwargs) jpayne@68: jpayne@68: if "i" not in kwargs: jpayne@68: kwargs["i"] = self.fn jpayne@68: jpayne@68: cmds, tmp, stdin = self.handle_kwargs(prog="flankBed", **kwargs) jpayne@68: stream = call_bedtools(cmds, tmp, stdin=stdin) jpayne@68: return BedTool(stream) jpayne@68: jpayne@68: @_log_to_history jpayne@68: @_wraps( jpayne@68: prog="genomeCoverageBed", jpayne@68: implicit="i", jpayne@68: bam="ibam", jpayne@68: genome_none_if=["ibam"], jpayne@68: genome_ok_if=["ibam"], jpayne@68: uses_genome=True, jpayne@68: nonbam="ALL", jpayne@68: ) jpayne@68: def genome_coverage(self, *args, **kwargs): jpayne@68: """ jpayne@68: Wraps `bedtools genomecov`. jpayne@68: jpayne@68: Note that some invocations of `bedtools genomecov` do not result in jpayne@68: a properly-formatted BED file. For example, the default behavior is to jpayne@68: report a histogram of coverage. Iterating over the resulting, jpayne@68: non-BED-format file will raise exceptions in pybedtools' parser. jpayne@68: jpayne@68: Consider using the `BedTool.to_dataframe` method to convert these jpayne@68: non-BED files into a pandas DataFrame for further use. jpayne@68: jpayne@68: Example usage: jpayne@68: jpayne@68: BAM file input does not require a genome: jpayne@68: jpayne@68: >>> a = pybedtools.example_bedtool('x.bam') jpayne@68: >>> b = a.genome_coverage(bg=True) jpayne@68: >>> b.head(3) #doctest: +NORMALIZE_WHITESPACE jpayne@68: chr2L 9329 9365 1 jpayne@68: chr2L 10212 10248 1 jpayne@68: chr2L 10255 10291 1 jpayne@68: jpayne@68: Other input does require a genome: jpayne@68: jpayne@68: >>> a = pybedtools.example_bedtool('x.bed') jpayne@68: >>> b = a.genome_coverage(bg=True, genome='dm3') jpayne@68: >>> b.head(3) #doctest: +NORMALIZE_WHITESPACE jpayne@68: chr2L 9329 9365 1 jpayne@68: chr2L 10212 10248 1 jpayne@68: chr2L 10255 10291 1 jpayne@68: jpayne@68: Non-BED format results: jpayne@68: >>> a = pybedtools.example_bedtool('x.bed') jpayne@68: >>> b = a.genome_coverage(genome='dm3') jpayne@68: >>> df = b.to_dataframe(names=['chrom', 'depth', 'n', 'chromsize', 'fraction']) jpayne@68: """ jpayne@68: jpayne@68: # Alias for backwards compatibility jpayne@68: genomecov = genome_coverage jpayne@68: jpayne@68: @_log_to_history jpayne@68: @_wraps(prog="coverageBed", implicit="a", other="b", bam="abam", nonbam="ALL") jpayne@68: def coverage(self, *args, **kwargs) -> BedTool: # type: ignore jpayne@68: """ jpayne@68: Wraps `bedtools coverage`. jpayne@68: jpayne@68: Note that starting in version 2.24.0, BEDTools swapped the semantics of jpayne@68: the "a" and "b" files. jpayne@68: jpayne@68: Example usage: jpayne@68: jpayne@68: >>> a = pybedtools.example_bedtool('a.bed') jpayne@68: >>> b = pybedtools.example_bedtool('b.bed') jpayne@68: >>> c = b.coverage(a) jpayne@68: >>> c.head(3) #doctest: +NORMALIZE_WHITESPACE jpayne@68: chr1 155 200 feature5 0 - 2 45 45 1.0000000 jpayne@68: chr1 800 901 feature6 0 + 1 1 101 0.0099010 jpayne@68: """ jpayne@68: jpayne@68: @_log_to_history jpayne@68: @_wraps( jpayne@68: prog="maskFastaFromBed", jpayne@68: implicit="bed", jpayne@68: other="fi", jpayne@68: make_tempfile_for="fo", jpayne@68: add_to_bedtool={"fo": "seqfn"}, jpayne@68: check_stderr=_check_sequence_stderr, jpayne@68: ) jpayne@68: def mask_fasta(self, *args, **kwargs) -> BedTool: # type: ignore jpayne@68: """ jpayne@68: Wraps `bedtools maskfasta`. jpayne@68: jpayne@68: Masks a fasta file at the positions in a BED file and saves result as jpayne@68: 'out' and stores the filename in seqfn. jpayne@68: jpayne@68: >>> a = pybedtools.BedTool('chr1 100 110', from_string=True) jpayne@68: >>> fasta_fn = pybedtools.example_filename('test.fa') jpayne@68: >>> a = a.mask_fasta(fi=fasta_fn, fo='masked.fa.example') jpayne@68: >>> b = a.slop(b=2, genome='hg19') jpayne@68: >>> b = b.sequence(fi=a.seqfn) jpayne@68: >>> print(open(b.seqfn).read()) jpayne@68: >chr1:98-112 jpayne@68: TTNNNNNNNNNNAT jpayne@68: jpayne@68: >>> os.unlink('masked.fa.example') jpayne@68: >>> if os.path.exists('masked.fa.example.fai'): jpayne@68: ... os.unlink('masked.fa.example.fai') jpayne@68: """ jpayne@68: jpayne@68: # Alias for backwards compatibility jpayne@68: maskfasta = mask_fasta jpayne@68: jpayne@68: @_log_to_history jpayne@68: @_wraps(prog="complementBed", implicit="i", uses_genome=True) jpayne@68: def complement(self, *args, **kwargs) -> BedTool: # type: ignore jpayne@68: """ jpayne@68: Wraps `bedtools complement`. jpayne@68: Example usage: jpayne@68: jpayne@68: >>> a = pybedtools.example_bedtool('a.bed') jpayne@68: >>> a.complement(genome='hg19').head(5) #doctest: +NORMALIZE_WHITESPACE jpayne@68: chr1 0 1 jpayne@68: chr1 500 900 jpayne@68: chr1 950 249250621 jpayne@68: chr2 0 243199373 jpayne@68: chr3 0 198022430 jpayne@68: """ jpayne@68: jpayne@68: @_log_to_history jpayne@68: @_wraps(prog="getOverlap", implicit="i") jpayne@68: def overlap(self, *args, **kwargs) -> BedTool: # type: ignore jpayne@68: """ jpayne@68: Wraps `bedtools overlap`. jpayne@68: jpayne@68: Example usage: jpayne@68: jpayne@68: >>> a = pybedtools.example_bedtool('a.bed') jpayne@68: >>> b = pybedtools.example_bedtool('b.bed') jpayne@68: >>> c = a.window(b, w=10).overlap(cols=[2,3,8,9]) jpayne@68: >>> print(c) #doctest: +NORMALIZE_WHITESPACE jpayne@68: chr1 100 200 feature2 0 + chr1 155 200 feature5 0 - 45 jpayne@68: chr1 150 500 feature3 0 - chr1 155 200 feature5 0 - 45 jpayne@68: chr1 900 950 feature4 0 + chr1 800 901 feature6 0 + 1 jpayne@68: jpayne@68: """ jpayne@68: jpayne@68: # TODO: needs test files and doctests written jpayne@68: @_log_to_history jpayne@68: @_wraps(prog="pairToBed", implicit="a", other="b", bam="abam", nonbam="bedpe") jpayne@68: def pair_to_bed(self, *args, **kwargs) -> BedTool: # type: ignore jpayne@68: """ jpayne@68: Wraps `bedtools pairtobed`. jpayne@68: """ jpayne@68: jpayne@68: # Alias for backwards compatibility jpayne@68: pairtobed = pair_to_bed jpayne@68: jpayne@68: @_log_to_history jpayne@68: @_wraps(prog="pairToPair", implicit="a", other="b") jpayne@68: def pair_to_pair(self) -> BedTool: # type: ignore jpayne@68: """ jpayne@68: Wraps `bedtools pairtopair`. jpayne@68: """ jpayne@68: jpayne@68: # Alias for backwards compatibility jpayne@68: pairtopair = pair_to_pair jpayne@68: jpayne@68: @_log_to_history jpayne@68: @_wraps(prog="groupBy", implicit="i") jpayne@68: def groupby(self, *args, **kwargs) -> BedTool: jpayne@68: """ jpayne@68: Wraps `bedtools groupby`. jpayne@68: jpayne@68: Example usage: jpayne@68: jpayne@68: >>> a = pybedtools.example_bedtool('gdc.gff') jpayne@68: >>> b = pybedtools.example_bedtool('gdc.bed') jpayne@68: >>> c = a.intersect(b, c=True) jpayne@68: >>> d = c.groupby(g=[1, 4, 5], c=10, o=['sum']) jpayne@68: >>> print(d) #doctest: +NORMALIZE_WHITESPACE jpayne@68: chr2L 41 70 0 jpayne@68: chr2L 71 130 2 jpayne@68: chr2L 131 170 4 jpayne@68: chr2L 171 200 0 jpayne@68: chr2L 201 220 1 jpayne@68: chr2L 41 130 2 jpayne@68: chr2L 171 220 1 jpayne@68: chr2L 41 220 7 jpayne@68: chr2L 161 230 6 jpayne@68: chr2L 41 220 7 jpayne@68: jpayne@68: jpayne@68: """ jpayne@68: jpayne@68: @_log_to_history jpayne@68: @_wraps(prog="tagBam", implicit="i", bam="i") jpayne@68: def tag_bam(self, *args, **kwargs) -> pysam.AlignmentFile: # type: ignore jpayne@68: """ jpayne@68: Wraps `bedtools tag`. jpayne@68: jpayne@68: `files` and `labels` should lists of equal length. jpayne@68: jpayne@68: """ jpayne@68: if "i" in kwargs: jpayne@68: if "labels" in kwargs: jpayne@68: if len(kwargs["i"]) != len(kwargs["labels"]): jpayne@68: raise ValueError("files and labels must be lists of equal length") jpayne@68: jpayne@68: jpayne@68: # Alias for backwards compatibility jpayne@68: tag = tag_bam jpayne@68: jpayne@68: @_log_to_history jpayne@68: @_wraps(prog="mapBed", implicit="a", other="b") jpayne@68: def map(self, *args, **kwargs) -> BedTool: # type: ignore jpayne@68: """ jpayne@68: Wraps `bedtools map`; See also :meth:`BedTool.each`. jpayne@68: """ jpayne@68: jpayne@68: @_log_to_history jpayne@68: @_wraps(prog="multiIntersectBed", uses_genome=True, genome_if=["empty"]) jpayne@68: def multi_intersect(self, *args, **kwargs) -> BedTool: # type: ignore jpayne@68: """ jpayne@68: Wraps `bedtools multiintersect`. jpayne@68: jpayne@68: Provide a list of filenames as the "i" argument. e.g. if you already jpayne@68: have BedTool objects then use their `.fn` attribute, like this:: jpayne@68: jpayne@68: >>> x = pybedtools.BedTool() jpayne@68: >>> a = pybedtools.example_bedtool('a.bed') jpayne@68: >>> b = pybedtools.example_bedtool('b.bed') jpayne@68: >>> result = x.multi_intersect(i=[a.fn, b.fn]) jpayne@68: >>> print(result) #doctest: +NORMALIZE_WHITESPACE jpayne@68: chr1 1 155 1 1 1 0 jpayne@68: chr1 155 200 2 1,2 1 1 jpayne@68: chr1 200 500 1 1 1 0 jpayne@68: chr1 800 900 1 2 0 1 jpayne@68: chr1 900 901 2 1,2 1 1 jpayne@68: chr1 901 950 1 1 1 0 jpayne@68: jpayne@68: jpayne@68: """ jpayne@68: jpayne@68: # Alias for backwards compatibility jpayne@68: multiinter = multi_intersect jpayne@68: jpayne@68: @_log_to_history jpayne@68: @_wraps(prog="randomBed", uses_genome=True) jpayne@68: def random(self, *args, **kwargs) -> BedTool: # type: ignore jpayne@68: """ jpayne@68: Wraps `bedtools random`. jpayne@68: jpayne@68: Since this method does not operate on an existing file, create jpayne@68: a BedTool with no arguments and then call this method, e.g., jpayne@68: jpayne@68: >>> x = BedTool() jpayne@68: >>> y = x.random(l=100, n=10, genome='hg19') jpayne@68: """ jpayne@68: jpayne@68: @_log_to_history jpayne@68: @_wraps("bedpeToBam", implicit="i", uses_genome=True, force_bam=True) jpayne@68: def bedpe_to_bam(self, *args, **kwargs) -> pysam.AlignmentFile: # type: ignore jpayne@68: """ jpayne@68: Wraps `bedtools bedpetobam`. jpayne@68: """ jpayne@68: jpayne@68: # Alias for backwards compatibility jpayne@68: bedpetobam = bedpe_to_bam jpayne@68: jpayne@68: @_log_to_history jpayne@68: @_wraps(prog="clusterBed", implicit="i") jpayne@68: def cluster(self, *args, **kwargs) -> BedTool: # type: ignore jpayne@68: """ jpayne@68: Wraps `bedtools cluster`. jpayne@68: """ jpayne@68: jpayne@68: @_log_to_history jpayne@68: @_wraps(prog="unionBedGraphs") jpayne@68: def union_bedgraphs(self, *args, **kwargs) -> BedTool: # type: ignore jpayne@68: """ jpayne@68: Wraps `bedtools unionbedg`. jpayne@68: jpayne@68: Warning: using the `header=True` kwarg will result in a file that is jpayne@68: not in true BED format, which may break downstream analysis. jpayne@68: """ jpayne@68: if "header" in kwargs: jpayne@68: if kwargs["header"] is True: jpayne@68: warn("Using header=True with unionbedg will result in a file that is not in true BED format, which may break downstream analysis.") jpayne@68: jpayne@68: jpayne@68: # Alias for backwards compatibility jpayne@68: unionbedg = union_bedgraphs jpayne@68: jpayne@68: @_log_to_history jpayne@68: @_wraps(prog="windowMaker", uses_genome=True, genome_none_if=["b"], other="b", arg_order=["w"]) jpayne@68: def window_maker(self, *args, **kwargs) -> BedTool: # type: ignore jpayne@68: """ jpayne@68: Wraps `bedtools makewindows`. jpayne@68: """ jpayne@68: jpayne@68: # Alias for backwards compatibility jpayne@68: makewindows = window_maker jpayne@68: jpayne@68: @_log_to_history jpayne@68: @_wraps(prog="expandCols", implicit="i") jpayne@68: def expand(self, *args, **kwargs) -> BedTool: # type: ignore jpayne@68: """ jpayne@68: Wraps `bedtools expand` jpayne@68: """ jpayne@68: jpayne@68: @_log_to_history jpayne@68: @_wraps(prog="linksBed", implicit="i", add_to_bedtool={"stdout": "links_html"}) jpayne@68: def links(self, *args, **kwargs) -> BedTool: # type: ignore jpayne@68: """ jpayne@68: Wraps `linksBed`. jpayne@68: jpayne@68: The resulting BedTool will assign a value to the attribute `links_html`. This jpayne@68: attribute is a temp filename containing the HTML links. jpayne@68: """ jpayne@68: jpayne@68: @_log_to_history jpayne@68: @_wraps(prog="bedToIgv", implicit="i", add_to_bedtool={"stdout": "igv_script"}) jpayne@68: def igv(self, *args, **kwargs) -> BedTool: # type: ignore jpayne@68: """ jpayne@68: Wraps `bedtools igv`. jpayne@68: jpayne@68: The resulting BedTool will assign a value to the attribute `igv_script`. This jpayne@68: attribute is a temp filename containing the IGV script. jpayne@68: """ jpayne@68: jpayne@68: @_log_to_history jpayne@68: @_wraps( jpayne@68: prog="bamToFastq", jpayne@68: implicit="i", jpayne@68: bam="i", jpayne@68: make_tempfile_for="fq", jpayne@68: add_to_bedtool={"fq": "fastq"}, jpayne@68: ) jpayne@68: def bam_to_fastq(self, *args, **kwargs) -> BedTool: # type: ignore jpayne@68: """ jpayne@68: Wraps `bedtools bamtofastq`. jpayne@68: jpayne@68: The `fq` argument is required. jpayne@68: jpayne@68: The resulting BedTool will assign a value to the attribute `fastq`. jpayne@68: """ jpayne@68: jpayne@68: # Alias for backwards compatibility jpayne@68: bamtofastq = bam_to_fastq jpayne@68: jpayne@68: @_wraps( jpayne@68: prog="jaccard", jpayne@68: implicit="a", jpayne@68: other="b", jpayne@68: does_not_return_bedtool=_jaccard_output_to_dict, jpayne@68: ) jpayne@68: def jaccard(self, *args, **kwargs) -> dict[str, Any]: # type: ignore jpayne@68: """ jpayne@68: Returns a dictionary with keys (intersection, union, jaccard). jpayne@68: """ jpayne@68: jpayne@68: @_wraps( jpayne@68: prog="reldist", jpayne@68: implicit="a", jpayne@68: other="b", jpayne@68: does_not_return_bedtool=_reldist_output_handler, jpayne@68: ) jpayne@68: def reldist(self, *args, **kwargs) -> BedTool | dict[str, Any]: # type: ignore jpayne@68: """ jpayne@68: If detail=False, then return a dictionary with keys (reldist, count, jpayne@68: total, fraction), which is the summary of the bedtools reldist. jpayne@68: jpayne@68: Otherwise return a BedTool, with the relative distance for each jpayne@68: interval in A in the last column. jpayne@68: """ jpayne@68: if "detail" in kwargs: jpayne@68: if kwargs["detail"] is False: jpayne@68: warn("Using detail=False with reldist will return a dictionary with keys (reldist, count, total, fraction), which is the summary of the bedtools reldist." jpayne@68: "Not a BedTool object.") jpayne@68: jpayne@68: jpayne@68: @_wraps(prog="sample", implicit="i", bam="i") jpayne@68: def sample(self, *args, **kwargs) -> BedTool: # type: ignore jpayne@68: """ jpayne@68: Wraps 'sample'. jpayne@68: """ jpayne@68: jpayne@68: @_wraps( jpayne@68: prog="fisher", jpayne@68: implicit="a", jpayne@68: other="b", jpayne@68: uses_genome=True, jpayne@68: does_not_return_bedtool=FisherOutput, jpayne@68: ) jpayne@68: def fisher(self, *args, **kwargs) -> FisherOutput: # type: ignore jpayne@68: """ jpayne@68: Wraps 'fisher'. Returns an object representing the output. jpayne@68: jpayne@68: >>> a = pybedtools.example_bedtool('a.bed') jpayne@68: >>> b = pybedtools.example_bedtool('b.bed') jpayne@68: >>> f = a.fisher(b, genome='hg19') jpayne@68: >>> print(f) # doctest: +NORMALIZE_WHITESPACE jpayne@68: # Number of query intervals: 4 jpayne@68: # Number of db intervals: 2 jpayne@68: # Number of overlaps: 3 jpayne@68: # Number of possible intervals (estimated): 13958448 jpayne@68: # phyper(3 - 1, 4, 13958448 - 4, 2, lower.tail=F) jpayne@68: # Contingency Table Of Counts jpayne@68: #_________________________________________ jpayne@68: # | in -b | not in -b | jpayne@68: # in -a | 3 | 1 | jpayne@68: # not in -a | 0 | 13958444 | jpayne@68: #_________________________________________ jpayne@68: # p-values for fisher's exact test jpayne@68: left right two-tail ratio jpayne@68: 1 8.8247e-21 8.8247e-21 inf jpayne@68: jpayne@68: jpayne@68: jpayne@68: >>> f.table['not in -a']['in -b'] jpayne@68: 0 jpayne@68: jpayne@68: >>> f.table['not in -a']['not in -b'] jpayne@68: 13958444 jpayne@68: jpayne@68: >>> f.table['in -a']['in -b'] jpayne@68: 3 jpayne@68: jpayne@68: >>> f.table['in -a']['not in -b'] jpayne@68: 1 jpayne@68: jpayne@68: >>> f.two_tail jpayne@68: 8.8247e-21 jpayne@68: """ jpayne@68: jpayne@68: @_wraps(prog="split", implicit="i", does_not_return_bedtool=SplitOutput) jpayne@68: def splitbed(self, *args, **kwargs) -> SplitOutput: # type: ignore jpayne@68: """ jpayne@68: Wraps 'bedtools split'. jpayne@68: jpayne@68: BedTool objects have long had a `split` method which splits intervals jpayne@68: according to a custom function. Now that BEDTools has a `split` tool, jpayne@68: the method name conflicts. To maintain backwards compatibility, the jpayne@68: method wrapping the BEDTools command is called `splitbed`. jpayne@68: jpayne@68: Since this tool does not return a single BED file, the method parses jpayne@68: the output and returns a SplitOutput object, which includes an jpayne@68: attribute, `bedtools`, that is a list of BedTool objects created from jpayne@68: the split files. jpayne@68: jpayne@68: To keep the working directory clean, you may want to consider using jpayne@68: `prefix=BedTool._tmp()` to get a temp file that will be deleted when jpayne@68: Python exits cleanly. jpayne@68: jpayne@68: >>> a = pybedtools.example_bedtool('a.bed') jpayne@68: >>> s = a.splitbed(n=2, p="split") jpayne@68: >>> assert len(a) == 4, len(a) jpayne@68: >>> assert len(s.bedtools) == 2 jpayne@68: >>> print(s.bedtools[0]) # doctest: +NORMALIZE_WHITESPACE jpayne@68: chr1 150 500 feature3 0 - jpayne@68: jpayne@68: >>> print(s.bedtools[1]) # doctest: +NORMALIZE_WHITESPACE jpayne@68: chr1 100 200 feature2 0 + jpayne@68: chr1 1 100 feature1 0 + jpayne@68: chr1 900 950 feature4 0 + jpayne@68: jpayne@68: """ jpayne@68: jpayne@68: @_wraps(prog="spacing", implicit="i") jpayne@68: def spacing(self, *args, **kwargs) -> BedTool: # type: ignore jpayne@68: """ jpayne@68: Wraps `bedtools spacing` jpayne@68: jpayne@68: >>> a = pybedtools.example_bedtool('a.bed') jpayne@68: >>> print(a.spacing()) # doctest: +NORMALIZE_WHITESPACE jpayne@68: chr1 1 100 feature1 0 + . jpayne@68: chr1 100 200 feature2 0 + 0 jpayne@68: chr1 150 500 feature3 0 - -1 jpayne@68: chr1 900 950 feature4 0 + 400 jpayne@68: """ jpayne@68: jpayne@68: def count(self) -> int: jpayne@68: """ jpayne@68: Count the number features in this BedTool. jpayne@68: jpayne@68: Number of features in BED file. Does the same thing as len(self), which jpayne@68: actually just calls this method. jpayne@68: jpayne@68: Only counts the actual features. Ignores any track lines, browser jpayne@68: lines, lines starting with a "#", or blank lines. jpayne@68: jpayne@68: Example usage: jpayne@68: jpayne@68: >>> a = pybedtools.example_bedtool('a.bed') jpayne@68: >>> a.count() jpayne@68: 4 jpayne@68: """ jpayne@68: if hasattr(self, "next") or hasattr(self, "__next__"): jpayne@68: return sum(1 for _ in self) jpayne@68: return sum(1 for _ in iter(self)) jpayne@68: jpayne@68: def print_sequence(self) -> str: jpayne@68: """ jpayne@68: Print the sequence that was retrieved by BedTool.sequence. jpayne@68: See usage example in :meth:`BedTool.sequence`. jpayne@68: jpayne@68: Returns: jpayne@68: str: The sequence as a string. jpayne@68: jpayne@68: Raises: jpayne@68: ValueError: If the sequence has not been generated using .sequence(fasta). jpayne@68: """ jpayne@68: if self.seqfn is None: jpayne@68: raise ValueError("Use .sequence(fasta) to get the sequence first") jpayne@68: f = open(self.seqfn) jpayne@68: s = f.read() jpayne@68: f.close() jpayne@68: return s jpayne@68: jpayne@68: def save_seqs(self, fn:str) -> BedTool: jpayne@68: """ jpayne@68: Save sequences, after calling BedTool.sequence. jpayne@68: jpayne@68: In order to use this function, you need to have called jpayne@68: the :meth:`BedTool.sequence()` method. jpayne@68: jpayne@68: A new BedTool object is returned which references the newly saved file. jpayne@68: jpayne@68: Example usage: jpayne@68: jpayne@68: >>> a = pybedtools.BedTool(''' jpayne@68: ... chr1 1 10 jpayne@68: ... chr1 50 55''', from_string=True) jpayne@68: >>> fasta = pybedtools.example_filename('test.fa') jpayne@68: >>> a = a.sequence(fi=fasta) jpayne@68: >>> print(open(a.seqfn).read()) jpayne@68: >chr1:1-10 jpayne@68: GATGAGTCT jpayne@68: >chr1:50-55 jpayne@68: CCATC jpayne@68: jpayne@68: >>> b = a.save_seqs('example.fa') jpayne@68: >>> assert open(b.fn).read() == open(a.fn).read() jpayne@68: >>> if os.path.exists('example.fa'): jpayne@68: ... os.unlink('example.fa') jpayne@68: """ jpayne@68: jpayne@68: if self.seqfn is None: jpayne@68: raise ValueError("Use .sequence(fasta) to get the sequence first") jpayne@68: jpayne@68: with open(fn, "w") as fout: jpayne@68: if self.seqfn is None: jpayne@68: raise ValueError("Use .sequence(fasta) to get the sequence first") jpayne@68: with open(self.seqfn) as seqfile: jpayne@68: fout.write(seqfile.read()) jpayne@68: jpayne@68: new_bedtool = BedTool(self.fn) jpayne@68: new_bedtool.seqfn = fn jpayne@68: return new_bedtool jpayne@68: jpayne@68: def randomstats( jpayne@68: self, jpayne@68: other: BedTool, jpayne@68: iterations: int, jpayne@68: new: bool = False, jpayne@68: genome_fn: Optional[str] = None, jpayne@68: include_distribution: bool = False, jpayne@68: **kwargs jpayne@68: ) -> dict[str, Any]: jpayne@68: """ jpayne@68: Dictionary of results from many randomly shuffled intersections. jpayne@68: jpayne@68: Sends args and kwargs to :meth:`BedTool.randomintersection` and jpayne@68: compiles results into a dictionary with useful stats. Requires jpayne@68: numpy. jpayne@68: jpayne@68: If `include_distribution` is True, then the dictionary will include the jpayne@68: full distribution; otherwise, the distribution is deleted and cleaned jpayne@68: up to save on memory usage. jpayne@68: jpayne@68: This is one possible way of assigning significance to overlaps between jpayne@68: two files. See, for example: jpayne@68: jpayne@68: Negre N, Brown CD, Shah PK, Kheradpour P, Morrison CA, et al. 2010 jpayne@68: A Comprehensive Map of Insulator Elements for the Drosophila jpayne@68: Genome. PLoS Genet 6(1): e1000814. doi:10.1371/journal.pgen.1000814 jpayne@68: jpayne@68: Example usage: jpayne@68: jpayne@68: Make chromsizes a very small genome for this example: jpayne@68: jpayne@68: >>> chromsizes = {'chr1':(1,1000)} jpayne@68: >>> a = pybedtools.example_bedtool('a.bed').set_chromsizes(chromsizes) jpayne@68: >>> b = pybedtools.example_bedtool('b.bed') jpayne@68: >>> try: jpayne@68: ... results = a.randomstats(b, 100, debug=True) jpayne@68: ... except ImportError: jpayne@68: ... pass jpayne@68: jpayne@68: *results* is a dictionary that you can inspect. jpayne@68: jpayne@68: (Note that the following examples are not run as part of the doctests jpayne@68: to avoid forcing users to install NumPy just to pass tests) jpayne@68: jpayne@68: The actual overlap:: jpayne@68: jpayne@68: print(results['actual']) jpayne@68: 3 jpayne@68: jpayne@68: The median of all randomized overlaps:: jpayne@68: jpayne@68: print(results['median randomized']) jpayne@68: 2.0 jpayne@68: jpayne@68: The percentile of the actual overlap in the distribution of randomized jpayne@68: overlaps, which can be used to get an empirical p-value:: jpayne@68: jpayne@68: print(results['percentile']) jpayne@68: 90.0 jpayne@68: """ jpayne@68: if ("intersect_kwargs" not in kwargs) or (kwargs["intersect_kwargs"] is None): jpayne@68: kwargs["intersect_kwargs"] = {"u": True} jpayne@68: try: jpayne@68: import numpy as np jpayne@68: except ImportError: jpayne@68: raise ImportError("Need to install NumPy for stats...") jpayne@68: jpayne@68: def percentileofscore(a, score): jpayne@68: """ jpayne@68: copied from scipy.stats.percentileofscore, to avoid dependency on jpayne@68: scipy. jpayne@68: """ jpayne@68: a = np.array(a) jpayne@68: n = len(a) jpayne@68: jpayne@68: if not (np.any(a == score)): jpayne@68: a = np.append(a, score) jpayne@68: a_len = np.array(list(range(len(a)))) jpayne@68: else: jpayne@68: a_len = np.array(list(range(len(a)))) + 1.0 jpayne@68: jpayne@68: a = np.sort(a) jpayne@68: idx = tuple([a == score]) jpayne@68: pct = (np.mean(a_len[idx]) / n) * 100.0 jpayne@68: return pct jpayne@68: jpayne@68: if isinstance(other, str): jpayne@68: other = BedTool(other) jpayne@68: else: jpayne@68: assert isinstance( jpayne@68: other, BedTool jpayne@68: ), "Either filename or another BedTool instance required" jpayne@68: jpayne@68: # Actual (unshuffled) counts. jpayne@68: i_kwargs = kwargs["intersect_kwargs"] jpayne@68: actual = len(self.intersect(other, **i_kwargs)) jpayne@68: jpayne@68: # List of counts from randomly shuffled versions. jpayne@68: # Length of counts == *iterations*. jpayne@68: jpayne@68: if not new: jpayne@68: distribution = self.randomintersection( jpayne@68: other, iterations=iterations, **kwargs jpayne@68: ) jpayne@68: else: jpayne@68: # use new mechanism jpayne@68: if genome_fn is None: jpayne@68: raise ValueError( jpayne@68: "`genome_fn` must be provided if using the " jpayne@68: "new _randomintersection mechanism" jpayne@68: ) jpayne@68: distribution = self._randomintersection( jpayne@68: other, iterations=iterations, genome_fn=genome_fn, **kwargs jpayne@68: ) jpayne@68: jpayne@68: distribution = np.array(list(distribution)) jpayne@68: jpayne@68: # Median of distribution jpayne@68: med_count = np.median(distribution) jpayne@68: jpayne@68: n = float(len(distribution)) jpayne@68: jpayne@68: frac_above = sum(distribution > actual) / n jpayne@68: frac_below = sum(distribution < actual) / n jpayne@68: jpayne@68: normalized = actual / med_count jpayne@68: jpayne@68: lower_thresh = 2.5 jpayne@68: upper_thresh = 97.5 jpayne@68: lower, upper = np.percentile(distribution, [lower_thresh, upper_thresh]) jpayne@68: jpayne@68: actual_percentile = percentileofscore(distribution, actual) jpayne@68: d = { jpayne@68: "iterations": iterations, jpayne@68: "actual": actual, jpayne@68: "file_a": self.fn, jpayne@68: "file_b": other.fn, jpayne@68: self.fn: len(self), jpayne@68: other.fn: len(other), jpayne@68: "self": len(self), jpayne@68: "other": len(other), jpayne@68: "frac randomized above actual": frac_above, jpayne@68: "frac randomized below actual": frac_below, jpayne@68: "median randomized": med_count, jpayne@68: "normalized": normalized, jpayne@68: "percentile": actual_percentile, jpayne@68: "lower_%sth" % lower_thresh: lower, jpayne@68: "upper_%sth" % upper_thresh: upper, jpayne@68: } jpayne@68: if include_distribution: jpayne@68: d["distribution"] = distribution jpayne@68: else: jpayne@68: del distribution jpayne@68: return d jpayne@68: jpayne@68: def random_op(self, *args, **kwargs): jpayne@68: """ jpayne@68: For backwards compatibility; see BedTool.parallel_apply instead. jpayne@68: """ jpayne@68: return self.parallel_apply(*args, **kwargs) jpayne@68: jpayne@68: def parallel_apply( jpayne@68: self, jpayne@68: iterations: int, jpayne@68: func: Callable, jpayne@68: func_args: Iterable, jpayne@68: func_kwargs: dict, jpayne@68: processes: int=1, jpayne@68: _orig_pool: Optional[Pool] = None # type: ignore jpayne@68: ): jpayne@68: """ jpayne@68: Generalized method for applying a function in parallel. jpayne@68: jpayne@68: Typically used when having to do many random shufflings. jpayne@68: jpayne@68: `func_args` and `func_kwargs` will be passed to `func` each time in jpayne@68: `iterations`, and these iterations will be split across `processes` jpayne@68: processes. jpayne@68: jpayne@68: Notes on the function, `func`: jpayne@68: jpayne@68: * the function should manually remove any tempfiles created. This jpayne@68: is because the BedTool.TEMPFILES list of auto-created tempfiles jpayne@68: does not share state across processes, so things will not get jpayne@68: cleaned up automatically as they do in a single-process jpayne@68: pybedtools session. jpayne@68: jpayne@68: * this includes deleting any "chromsizes" or genome files -- jpayne@68: generally it will be best to require a genome filename in jpayne@68: `func_kwargs` if you'll be using any BedTool methods that accept jpayne@68: the `g` kwarg. jpayne@68: jpayne@68: * the function should be a module-level function (rather than a jpayne@68: class method) because class methods can't be pickled across jpayne@68: process boundaries jpayne@68: jpayne@68: * the function can have any signature and have any return value jpayne@68: jpayne@68: `_orig_pool` can be a previously-created multiprocessing.Pool instance; jpayne@68: otherwise, a new Pool will be created with `processes` jpayne@68: """ jpayne@68: if processes == 1: jpayne@68: for _ in range(iterations): jpayne@68: yield func(*func_args, **func_kwargs) jpayne@68: raise StopIteration jpayne@68: jpayne@68: if _orig_pool: jpayne@68: p = _orig_pool jpayne@68: else: jpayne@68: p = Pool(processes) jpayne@68: iterations_each = [iterations / processes] * processes jpayne@68: iterations_each[-1] += iterations % processes jpayne@68: jpayne@68: # FYI some useful info on apply_async: jpayne@68: # http://stackoverflow.com/questions/8533318/ jpayne@68: # python-multiprocessing-pool-when-to-use-apply-apply-async-or-map jpayne@68: # jpayne@68: # Here, we don't care about the order, and don't want the subprocesses jpayne@68: # to block. jpayne@68: results = [ jpayne@68: p.apply_async(func, func_args, func_kwargs) for _ in range(iterations) jpayne@68: ] jpayne@68: for r in results: jpayne@68: yield r.get() jpayne@68: raise StopIteration jpayne@68: jpayne@68: def random_jaccard( jpayne@68: self, jpayne@68: other: BedTool, jpayne@68: genome_fn: Optional[str] = None, jpayne@68: iterations: Optional[int] = None, jpayne@68: processes: int = 1, jpayne@68: _orig_pool: Optional[Pool] = None, jpayne@68: shuffle_kwargs: Optional[dict[str, Any]] = None, jpayne@68: jaccard_kwargs: Optional[dict[str, Any]] = None, jpayne@68: ) -> list: jpayne@68: """ jpayne@68: Computes the naive Jaccard statistic (intersection divided by union). jpayne@68: jpayne@68: .. note:: jpayne@68: jpayne@68: If you don't need the randomization functionality of this method, jpayne@68: you can use the simpler BedTool.jaccard method instead. jpayne@68: jpayne@68: See Favorov et al. (2012) PLoS Comput Biol 8(5): e1002529 for more jpayne@68: info on the Jaccard statistic for intersections. jpayne@68: jpayne@68: If `iterations` is None, then do not perform random shufflings. jpayne@68: jpayne@68: If `iterations` is an integer, perform `iterations` random shufflings, jpayne@68: each time computing the Jaccard statistic to build an empirical jpayne@68: distribution. `genome_fn` will also be needed; optional `processes` jpayne@68: will split the iterations across multiple CPUs. jpayne@68: jpayne@68: Returns a tuple of the observed Jaccard statistic and a list of the jpayne@68: randomized statistics (which will be an empty list if `iterations` was jpayne@68: None). jpayne@68: """ jpayne@68: if shuffle_kwargs is None: jpayne@68: shuffle_kwargs = {} jpayne@68: if jaccard_kwargs is None: jpayne@68: jaccard_kwargs = {} jpayne@68: if not genome_fn: jpayne@68: raise ValueError("Need a genome filename in order to perform randomization") jpayne@68: return list( jpayne@68: self.parallel_apply( jpayne@68: iterations=iterations if iterations else 1, jpayne@68: func=pybedtools.stats.random_jaccard, jpayne@68: func_args=(self, other), jpayne@68: func_kwargs=dict( jpayne@68: genome_fn=genome_fn, jpayne@68: shuffle_kwargs=shuffle_kwargs, jpayne@68: jaccard_kwargs=jaccard_kwargs, jpayne@68: ), jpayne@68: processes=processes, jpayne@68: _orig_pool=_orig_pool, jpayne@68: ) jpayne@68: ) jpayne@68: jpayne@68: def _randomintersection( jpayne@68: self, jpayne@68: other: BedTool, jpayne@68: iterations: int, jpayne@68: genome_fn: str, jpayne@68: intersect_kwargs: Optional[dict[str, Any]] = None, jpayne@68: _orig_pool:Optional[Pool] = None, jpayne@68: shuffle_kwargs: Optional[dict[str, Any]] = None, jpayne@68: processes: int = 1, jpayne@68: ): jpayne@68: """ jpayne@68: Re-implementation of BedTool.randomintersection using the new jpayne@68: `random_op` method jpayne@68: """ jpayne@68: if shuffle_kwargs is None: jpayne@68: shuffle_kwargs = {} jpayne@68: if intersect_kwargs is None: jpayne@68: intersect_kwargs = dict(u=True) jpayne@68: if not genome_fn: jpayne@68: raise ValueError("Need a genome filename in order to perform randomization") jpayne@68: return list( jpayne@68: self.parallel_apply( jpayne@68: iterations=iterations, jpayne@68: func=pybedtools.stats.random_intersection, jpayne@68: func_args=(self, other), jpayne@68: func_kwargs=dict( jpayne@68: genome_fn=genome_fn, jpayne@68: shuffle_kwargs=shuffle_kwargs, jpayne@68: intersect_kwargs=intersect_kwargs, jpayne@68: ), jpayne@68: processes=processes, jpayne@68: _orig_pool=_orig_pool, jpayne@68: ) jpayne@68: ) jpayne@68: jpayne@68: def randomintersection_bp( jpayne@68: self, jpayne@68: other: BedTool, jpayne@68: iterations: int, jpayne@68: genome_fn: str, jpayne@68: intersect_kwargs: Optional[dict[str, Any]] = None, jpayne@68: shuffle_kwargs: Optional[dict[str, Any]] = None, jpayne@68: processes: int = 1, jpayne@68: _orig_pool:Optional[Pool] = None, jpayne@68: ) -> list[int]: jpayne@68: """ jpayne@68: Like randomintersection, but return the bp overlap instead of the jpayne@68: number of intersecting intervals. jpayne@68: """ jpayne@68: if shuffle_kwargs is None: jpayne@68: shuffle_kwargs = {} jpayne@68: if intersect_kwargs is None: jpayne@68: intersect_kwargs = {} jpayne@68: if not genome_fn: jpayne@68: raise ValueError("Need a genome filename in order to perform randomization") jpayne@68: return list( jpayne@68: self.parallel_apply( jpayne@68: iterations=iterations, jpayne@68: func=pybedtools.stats.random_intersection_bp, jpayne@68: func_args=(self, other), jpayne@68: func_kwargs=dict( jpayne@68: genome_fn=genome_fn, jpayne@68: shuffle_kwargs=shuffle_kwargs, jpayne@68: intersect_kwargs=intersect_kwargs, jpayne@68: ), jpayne@68: processes=processes, jpayne@68: _orig_pool=_orig_pool, jpayne@68: ) jpayne@68: ) jpayne@68: jpayne@68: def randomintersection( jpayne@68: self, jpayne@68: other: BedTool, jpayne@68: iterations: int, jpayne@68: intersect_kwargs: Optional[dict[str, Any]] = None, jpayne@68: shuffle_kwargs: Optional[dict[str, Any]] = None, jpayne@68: debug: bool = False, jpayne@68: report_iterations: bool = False, jpayne@68: processes: Optional[int] = None, jpayne@68: _orig_processes: Optional[int] = None, jpayne@68: ) -> Iterator[int]: jpayne@68: """ jpayne@68: Perform `iterations` shufflings, each time intersecting with `other`. jpayne@68: jpayne@68: Returns a generator of integers where each integer is the number of jpayne@68: intersections of a shuffled file with *other*. This distribution can jpayne@68: be used in downstream analysis for things like empirical p-values. jpayne@68: jpayne@68: *intersect_kwargs* and *shuffle_kwargs* are passed to self.intersect() jpayne@68: and self.shuffle() respectively. By default for intersect, u=True is jpayne@68: specified -- but s=True might be a useful option for strand-specific jpayne@68: work. jpayne@68: jpayne@68: Useful kwargs for *shuffle_kwargs* are chrom, excl, or incl. If you jpayne@68: use the "seed" kwarg, that seed will be used *each* time shuffleBed is jpayne@68: called -- so all your randomization results will be identical for each jpayne@68: iteration. To get around this and to allow for tests, debug=True will jpayne@68: set the seed to the iteration number. You may also break up the jpayne@68: intersections across multiple processes with *processes* > 1. jpayne@68: jpayne@68: Example usage: jpayne@68: jpayne@68: >>> chromsizes = {'chr1':(0, 1000)} jpayne@68: >>> a = pybedtools.example_bedtool('a.bed') jpayne@68: >>> a = a.set_chromsizes(chromsizes) jpayne@68: >>> b = pybedtools.example_bedtool('b.bed') jpayne@68: >>> results = a.randomintersection(b, 10, debug=True) jpayne@68: >>> print(list(results)) jpayne@68: [1, 0, 1, 2, 4, 2, 2, 1, 2, 4] jpayne@68: jpayne@68: """ jpayne@68: if processes is not None: jpayne@68: p = Pool(processes) jpayne@68: iterations_each = [iterations // processes] * processes jpayne@68: iterations_each[-1] += iterations % processes jpayne@68: results = [ jpayne@68: p.apply_async( jpayne@68: _call_randomintersect, jpayne@68: (self, other, it), jpayne@68: dict( jpayne@68: intersect_kwargs=intersect_kwargs, jpayne@68: shuffle_kwargs=shuffle_kwargs, jpayne@68: debug=debug, jpayne@68: report_iterations=report_iterations, jpayne@68: _orig_processes=processes, jpayne@68: ), jpayne@68: ) jpayne@68: for it in iterations_each jpayne@68: ] jpayne@68: for r in results: jpayne@68: for value in r.get(): jpayne@68: yield value jpayne@68: raise StopIteration jpayne@68: jpayne@68: if shuffle_kwargs is None: jpayne@68: shuffle_kwargs = {} jpayne@68: if intersect_kwargs is None: jpayne@68: intersect_kwargs = {"u": True} jpayne@68: jpayne@68: if "u" not in intersect_kwargs: jpayne@68: intersect_kwargs["u"] = True jpayne@68: jpayne@68: resort = intersect_kwargs.get("sorted", False) jpayne@68: jpayne@68: for i in range(iterations): jpayne@68: if debug: jpayne@68: shuffle_kwargs["seed"] = i jpayne@68: if report_iterations: jpayne@68: if _orig_processes > 1: jpayne@68: msg = "\rapprox (total across %s processes): %s" % ( jpayne@68: _orig_processes, jpayne@68: i * _orig_processes, jpayne@68: ) jpayne@68: else: jpayne@68: msg = "\r%s" % i jpayne@68: sys.stderr.write(msg) jpayne@68: sys.stderr.flush() jpayne@68: jpayne@68: # Re-sort if sorted=True in kwargs jpayne@68: if resort: jpayne@68: tmp0 = self.shuffle(**shuffle_kwargs) jpayne@68: tmp = tmp0.sort() jpayne@68: else: jpayne@68: tmp = self.shuffle(**shuffle_kwargs) jpayne@68: jpayne@68: tmp2 = tmp.intersect(other, stream=True, **intersect_kwargs) jpayne@68: jpayne@68: yield len(tmp2) jpayne@68: jpayne@68: # Close the open stdouts from subprocess.Popen calls. Note: doing jpayne@68: # this in self.__del__ doesn't fix the open file limit bug; it jpayne@68: # needs to be done here. jpayne@68: # if resort: jpayne@68: # tmp0.fn.close() jpayne@68: # tmp.fn.close() jpayne@68: tmp2.fn.close() jpayne@68: del tmp jpayne@68: del tmp2 jpayne@68: jpayne@68: @_log_to_history jpayne@68: def cat(self, *others: Iterable[str|Path|BedTool], **kwargs) -> BedTool: jpayne@68: """ jpayne@68: Concatenate interval files together. jpayne@68: jpayne@68: Concatenates two BedTool objects (or an object and a file) and does an jpayne@68: optional post-merge of the features. jpayne@68: jpayne@68: *postmerge=True* by default; use *postmerge=False* if you want to keep jpayne@68: features separate. jpayne@68: jpayne@68: *force_truncate=False* by default; *force_truncate=True* to truncate jpayne@68: all files to chrom, start, stop. jpayne@68: jpayne@68: When *force_truncate=False* and *postmerge=False*, the output will jpayne@68: contain the smallest number of fields observed across all inputs. This jpayne@68: maintains compatibility with BEDTools programs, which assume constant jpayne@68: number of fields in all lines of a file. jpayne@68: jpayne@68: Other kwargs are sent to :meth:`BedTool.merge` (and assuming that jpayne@68: *postmerge=True*). jpayne@68: jpayne@68: Example usage: jpayne@68: jpayne@68: >>> a = pybedtools.example_bedtool('a.bed') jpayne@68: >>> b = pybedtools.example_bedtool('b.bed') jpayne@68: >>> print(a.cat(b)) #doctest: +NORMALIZE_WHITESPACE jpayne@68: chr1 1 500 jpayne@68: chr1 800 950 jpayne@68: jpayne@68: >>> print(a.cat(*[b,b], jpayne@68: ... postmerge=False)) #doctest: +NORMALIZE_WHITESPACE jpayne@68: chr1 1 100 feature1 0 + jpayne@68: chr1 100 200 feature2 0 + jpayne@68: chr1 150 500 feature3 0 - jpayne@68: chr1 900 950 feature4 0 + jpayne@68: chr1 155 200 feature5 0 - jpayne@68: chr1 800 901 feature6 0 + jpayne@68: chr1 155 200 feature5 0 - jpayne@68: chr1 800 901 feature6 0 + jpayne@68: jpayne@68: """ jpayne@68: same_type = None jpayne@68: same_field_num = None jpayne@68: field_nums = set() jpayne@68: jpayne@68: assert len(others) > 0, "You must specify at least one other bedfile!" jpayne@68: other_beds = [] jpayne@68: for other in others: jpayne@68: if isinstance(other, (str, Path)): jpayne@68: other = BedTool(other) jpayne@68: else: jpayne@68: assert isinstance( jpayne@68: other, BedTool jpayne@68: ), "Either filename or another BedTool instance required" jpayne@68: other_beds.append(other) jpayne@68: jpayne@68: # postmerge and force_truncate don't get passed on to merge jpayne@68: postmerge = kwargs.pop("postmerge", True) jpayne@68: force_truncate = kwargs.pop("force_truncate", False) jpayne@68: stream_merge = kwargs.get("stream", False) jpayne@68: if stream_merge and postmerge: jpayne@68: raise ValueError( jpayne@68: "The post-merge step in the `cat()` method " jpayne@68: "performs a sort, which uses stream=True. Using " jpayne@68: "stream=True for the merge as well will result in a " jpayne@68: "deadlock!" jpayne@68: ) jpayne@68: jpayne@68: # if filetypes and field counts are the same, don't truncate jpayne@68: if not force_truncate: jpayne@68: try: jpayne@68: filetypes = set( jpayne@68: [self.file_type] + [i.file_type for i in other_beds] jpayne@68: ).difference(["empty"]) jpayne@68: field_nums = ( jpayne@68: set([self.field_count()] + [i.field_count() for i in other_beds]) jpayne@68: .difference([None]) jpayne@68: .difference([0]) jpayne@68: ) jpayne@68: same_field_num = len(field_nums) == 1 jpayne@68: same_type = len(set(filetypes)) == 1 jpayne@68: except ValueError: jpayne@68: raise ValueError( jpayne@68: "Can't check filetype or field count -- " jpayne@68: "is one of the files you're merging a 'streaming' " jpayne@68: "BedTool? If so, use .saveas() to save to file first" jpayne@68: ) jpayne@68: jpayne@68: tmp = self._tmp() jpayne@68: jpayne@68: if not force_truncate and same_type and same_field_num: jpayne@68: with open(tmp, "w") as TMP: jpayne@68: for f in self: jpayne@68: TMP.write(str(f)) jpayne@68: for other in other_beds: jpayne@68: for f in other: jpayne@68: TMP.write(str(f)) jpayne@68: jpayne@68: # Types match, so we can use the min number of fields observed across jpayne@68: # all inputs jpayne@68: elif not force_truncate and same_type: jpayne@68: minfields = min(field_nums) jpayne@68: with open(tmp, "w") as TMP: jpayne@68: for f in self: jpayne@68: TMP.write("\t".join(f.fields[:minfields]) + "\n") jpayne@68: for other in other_beds: jpayne@68: for f in other: jpayne@68: TMP.write("\t".join(f.fields[:minfields]) + "\n") jpayne@68: jpayne@68: # Otherwise, use the zero-based chrom/start/stop to create a BED3, jpayne@68: # which will work when catting a GFF and a BED together. jpayne@68: else: jpayne@68: with open(tmp, "w") as TMP: jpayne@68: for f in self: jpayne@68: TMP.write("%s\t%i\t%i\n" % (f.chrom, f.start, f.end)) jpayne@68: for other in other_beds: jpayne@68: for f in other: jpayne@68: TMP.write("%s\t%i\t%i\n" % (f.chrom, f.start, f.end)) jpayne@68: jpayne@68: c = BedTool(tmp) jpayne@68: if postmerge: jpayne@68: d = c.sort(stream=True).merge(**kwargs) jpayne@68: jpayne@68: # Explicitly delete -- needed when using multiprocessing jpayne@68: os.unlink(tmp) jpayne@68: return d jpayne@68: else: jpayne@68: return c jpayne@68: jpayne@68: @_log_to_history jpayne@68: def saveas(self, fn: Optional[str] = None, trackline: Optional[str] = None, compressed: Optional[bool] = None) -> BedTool: jpayne@68: """ jpayne@68: Make a copy of the BedTool. jpayne@68: jpayne@68: Optionally adds `trackline` to the beginning of the file. jpayne@68: jpayne@68: Optionally compresses output using gzip. jpayne@68: jpayne@68: if the filename extension is .gz, or compressed=True, jpayne@68: the output is compressed using gzip jpayne@68: jpayne@68: Returns a new BedTool for the newly saved file. jpayne@68: jpayne@68: A newline is automatically added to the trackline if it does not jpayne@68: already have one. jpayne@68: jpayne@68: Example usage: jpayne@68: jpayne@68: >>> a = pybedtools.example_bedtool('a.bed') jpayne@68: >>> b = a.saveas('other.bed') jpayne@68: >>> b.fn jpayne@68: 'other.bed' jpayne@68: >>> print(b == a) jpayne@68: True jpayne@68: jpayne@68: >>> b = a.saveas('other.bed', trackline="name='test run' color=0,55,0") jpayne@68: >>> open(b.fn).readline() jpayne@68: "name='test run' color=0,55,0\\n" jpayne@68: >>> if os.path.exists('other.bed'): jpayne@68: ... os.unlink('other.bed') jpayne@68: """ jpayne@68: if fn is None: jpayne@68: fn = self._tmp() jpayne@68: jpayne@68: # Default to compressed if extension is .gz jpayne@68: if compressed is None: jpayne@68: __, extension = os.path.splitext(fn) jpayne@68: if extension == ".gz": jpayne@68: compressed = True jpayne@68: else: jpayne@68: compressed = False jpayne@68: jpayne@68: in_compressed = isinstance(self.fn, str) and isGZIP(self.fn) jpayne@68: jpayne@68: fn = self._collapse( jpayne@68: self, jpayne@68: fn=fn, jpayne@68: trackline=trackline, jpayne@68: in_compressed=in_compressed, jpayne@68: out_compressed=compressed, jpayne@68: ) jpayne@68: return BedTool(fn) jpayne@68: jpayne@68: @_log_to_history jpayne@68: def moveto(self, fn: Optional[str]=None) -> BedTool: jpayne@68: """ jpayne@68: Move to a new filename (can be much quicker than BedTool.saveas()) jpayne@68: jpayne@68: Move BED file to new filename, `fn`. jpayne@68: jpayne@68: Returns a new BedTool for the new file. jpayne@68: jpayne@68: Example usage: jpayne@68: jpayne@68: >>> # make a copy so we don't mess up the example file jpayne@68: >>> a = pybedtools.example_bedtool('a.bed').saveas() jpayne@68: >>> a_contents = str(a) jpayne@68: >>> b = a.moveto('other.bed') jpayne@68: >>> b.fn jpayne@68: 'other.bed' jpayne@68: >>> b == a_contents jpayne@68: True jpayne@68: """ jpayne@68: if not isinstance(self.fn, str): jpayne@68: fn = self._collapse(self, fn=fn) jpayne@68: else: jpayne@68: shutil.move(self.fn, fn) jpayne@68: return BedTool(fn) jpayne@68: jpayne@68: @_log_to_history jpayne@68: def random_subset(self, n: Optional[int] = None, f: Optional[float] = None, seed: Optional[float|int]=None) -> BedTool: jpayne@68: """ jpayne@68: Return a BedTool containing a random subset. jpayne@68: jpayne@68: NOTE: using `n` will be slower and use more memory than using `f`. jpayne@68: jpayne@68: Parameters jpayne@68: ---------- jpayne@68: jpayne@68: n : int jpayne@68: Number of features to return. Only one of `n` or `f` can be provided. jpayne@68: jpayne@68: f : float, 0 <= f <= 1 jpayne@68: Fraction of features to return. Cannot be provided with `n`. jpayne@68: jpayne@68: seed : float or int jpayne@68: Set random.seed jpayne@68: jpayne@68: Example jpayne@68: ------- jpayne@68: jpayne@68: >>> seed = 0 # only for test, otherwise use None jpayne@68: jpayne@68: `n` will always give the same number of returned features, but will be jpayne@68: slower since it is creating an index and then shuffling it. jpayne@68: jpayne@68: >>> a = pybedtools.example_bedtool('a.bed') jpayne@68: >>> b = a.random_subset(n=2) jpayne@68: >>> len(b) jpayne@68: 2 jpayne@68: jpayne@68: Using a fraction `f` will be faster but depending on seed will result jpayne@68: in slightly different total numbers. jpayne@68: jpayne@68: >>> a = pybedtools.example_bedtool('x.bam') jpayne@68: >>> len(a) jpayne@68: 45593 jpayne@68: >>> b = a.random_subset(f=0.4, seed=seed) jpayne@68: >>> len(b) jpayne@68: 18316 jpayne@68: jpayne@68: Check that we have approximately the right fraction jpayne@68: >>> print('{0:.2f}'.format(len(b) / len(a))) jpayne@68: 0.40 jpayne@68: jpayne@68: """ jpayne@68: if ((n is None) and (f is None)) or ((n is not None) and (f is not None)): jpayne@68: raise ValueError("Exactly one of `n` or `f` must be provided") jpayne@68: jpayne@68: tmpfn = self._tmp() jpayne@68: if seed is not None: jpayne@68: random.seed(seed) jpayne@68: jpayne@68: if n: jpayne@68: idxs = list(range(len(self))) jpayne@68: random.shuffle(idxs) jpayne@68: idxs = idxs[:n] jpayne@68: with open(tmpfn, "w") as tmp: jpayne@68: for i, feature in enumerate(self): jpayne@68: if i in idxs: jpayne@68: tmp.write(str(feature)) jpayne@68: jpayne@68: elif f: jpayne@68: with open(tmpfn, "w") as tmp: jpayne@68: for i in self: jpayne@68: if random.random() <= f: jpayne@68: tmp.write(str(i)) jpayne@68: jpayne@68: return BedTool(tmpfn) jpayne@68: jpayne@68: def total_coverage(self) -> int: jpayne@68: """ jpayne@68: Return the total number of bases covered by this interval file. jpayne@68: jpayne@68: Does a self.merge() first to remove potentially multiple-counting jpayne@68: bases. jpayne@68: jpayne@68: Example usage: jpayne@68: jpayne@68: >>> a = pybedtools.example_bedtool('a.bed') jpayne@68: jpayne@68: This does a merge() first, so this is what the total coverage is jpayne@68: counting: jpayne@68: jpayne@68: >>> print(a.merge()) #doctest: +NORMALIZE_WHITESPACE jpayne@68: chr1 1 500 jpayne@68: chr1 900 950 jpayne@68: jpayne@68: jpayne@68: >>> print(a.total_coverage()) jpayne@68: 549 jpayne@68: """ jpayne@68: b = self.merge() jpayne@68: total_bp = 0 jpayne@68: for feature in b.features(): jpayne@68: total_bp += len(feature) jpayne@68: return total_bp jpayne@68: jpayne@68: @_log_to_history jpayne@68: def with_attrs(self, **kwargs) -> BedTool: jpayne@68: """ jpayne@68: Helper method for adding attributes in the middle of a pipeline. jpayne@68: jpayne@68: Given arbitrary keyword arguments, turns the keys and values into jpayne@68: attributes. Useful for labeling BedTools at creation time. jpayne@68: jpayne@68: Example usage: jpayne@68: jpayne@68: >>> # add a "label" attribute to each BedTool jpayne@68: >>> a = pybedtools.example_bedtool('a.bed')\ jpayne@68: .with_attrs(label='transcription factor 1') jpayne@68: >>> b = pybedtools.example_bedtool('b.bed')\ jpayne@68: .with_attrs(label='transcription factor 2') jpayne@68: >>> for i in [a, b]: jpayne@68: ... print('{0} features for {1}'.format(i.count(), i.label)) jpayne@68: 4 features for transcription factor 1 jpayne@68: 2 features for transcription factor 2 jpayne@68: jpayne@68: """ jpayne@68: for key, value in list(kwargs.items()): jpayne@68: setattr(self, key, value) jpayne@68: return self jpayne@68: jpayne@68: def as_intervalfile(self) -> IntervalFile: jpayne@68: """ jpayne@68: Returns an IntervalFile of this BedTool for low-level interface. jpayne@68: """ jpayne@68: if not isinstance(self.fn, str): jpayne@68: fn = self._collapse(self.fn) jpayne@68: else: jpayne@68: fn = self.fn jpayne@68: return IntervalFile(fn) jpayne@68: jpayne@68: def liftover(self, chainfile: str, unmapped: Optional[str] = None, liftover_args: str = "") -> BedTool: jpayne@68: """ jpayne@68: Returns a new BedTool of the liftedOver features, saving the unmapped jpayne@68: ones as `unmapped`. If `unmapped` is None, then discards the unmapped jpayne@68: features. jpayne@68: jpayne@68: `liftover_args` is a string of additional args that is passed, jpayne@68: verbatim, to liftOver. jpayne@68: jpayne@68: Needs `liftOver` from UCSC to be on the path and a `chainfile` jpayne@68: downloaded from UCSC. jpayne@68: """ jpayne@68: result = BedTool._tmp() jpayne@68: if unmapped is None: jpayne@68: unmapped = BedTool._tmp() jpayne@68: cmds = ["liftOver", liftover_args, self.fn, chainfile, result, unmapped] jpayne@68: os.system(" ".join(cmds)) jpayne@68: return BedTool(result) jpayne@68: jpayne@68: def absolute_distance(self, other: BedTool, closest_kwargs: Optional[dict[str, Any]]=None, use_midpoints: bool=False) -> Iterator[int]: jpayne@68: """ jpayne@68: Returns an iterator of the *absolute* distances between features in jpayne@68: self and other. jpayne@68: jpayne@68: If `use_midpoints` is True, then only use the midpoints of features jpayne@68: (which will return values where features are overlapping). Otherwise, jpayne@68: when features overlap the value will always be zero. jpayne@68: jpayne@68: `closest_kwargs` are passed to self.closest(); either `d` or jpayne@68: 'D` are required in order to get back distance values (`d=True` is jpayne@68: default) jpayne@68: """ jpayne@68: from .featurefuncs import midpoint jpayne@68: jpayne@68: if closest_kwargs is None: jpayne@68: closest_kwargs = {"d": True} jpayne@68: jpayne@68: if "D" not in closest_kwargs: jpayne@68: closest_kwargs.update(dict(d=True)) jpayne@68: jpayne@68: if use_midpoints: jpayne@68: mid_self = self.each(midpoint).saveas() jpayne@68: mid_other = other.each(midpoint).saveas() jpayne@68: c = mid_self.closest(mid_other, stream=True, **closest_kwargs) jpayne@68: else: jpayne@68: c = self.closest(other, stream=True, **closest_kwargs) jpayne@68: for i in c: jpayne@68: yield int(i[-1]) jpayne@68: jpayne@68: def relative_distance(self, other: BedTool, genome:Optional[dict|str] =None, g: Optional[str]=None) -> Iterator[float]: jpayne@68: """ jpayne@68: Returns an iterator of relative distances between features in self and jpayne@68: other. jpayne@68: jpayne@68: First computes the midpoints of self and other, then returns distances jpayne@68: of each feature in `other` relative to the distance between `self` jpayne@68: features. jpayne@68: jpayne@68: Requires either `genome` (dictionary of chromsizes or assembly name) or jpayne@68: `g` (filename of chromsizes file). jpayne@68: """ jpayne@68: g_dict = {} jpayne@68: if (genome is None) and (g is None): jpayne@68: raise ValueError("Need either `genome` or `g` arg for relative distance") jpayne@68: elif genome and g: jpayne@68: raise ValueError("Please specify only one of `genome` or `g`") jpayne@68: elif genome: jpayne@68: g_dict = dict(genome=genome) jpayne@68: elif g: jpayne@68: g_dict = dict(g=g) jpayne@68: jpayne@68: from .featurefuncs import midpoint jpayne@68: jpayne@68: # This gets the space between features in self. jpayne@68: c = self.each(midpoint).complement(**g_dict) jpayne@68: jpayne@68: hits = c.intersect(other, wao=True, stream=True) # TODO: should this be other or mid_other? jpayne@68: for i in hits: jpayne@68: yield float(i[-1]) / len(i) jpayne@68: jpayne@68: def colormap_normalize(self, jpayne@68: vmin: Optional[float|int]=None, jpayne@68: vmax: Optional[float|int]=None, jpayne@68: percentile: bool=False, jpayne@68: log: bool=False) -> mcolors.LogNorm|mcolors.Normalize: jpayne@68: """ jpayne@68: Returns a normalization instance for use by featurefuncs.add_color(). jpayne@68: jpayne@68: Parameters jpayne@68: ---------- jpayne@68: vmin, vmax : float, int, or None jpayne@68: `vmin` and `vmax` set the colormap bounds; if None then jpayne@68: these will be determined from the scores in the BED file. jpayne@68: jpayne@68: log : bool jpayne@68: If True, put the scores on a log scale; of course be careful jpayne@68: if you have negative scores jpayne@68: jpayne@68: percentile : bool jpayne@68: If True, interpret vmin and vmax as a percentile in the range jpayne@68: [0,100] rather than absolute values. jpayne@68: """ jpayne@68: field_count = self.field_count() jpayne@68: if (self.file_type != "bed") or (field_count < 5): jpayne@68: raise ValueError("colorizing only works for BED files with score " "fields") jpayne@68: try: jpayne@68: import matplotlib.colors as mcolors jpayne@68: except ImportError: jpayne@68: raise ImportError("matplotlib.colors must be installed to use colormap_normalize") jpayne@68: jpayne@68: try: jpayne@68: import numpy as np jpayne@68: except ImportError: jpayne@68: raise ImportError("numpy must be installed to use colormap_normalize") jpayne@68: jpayne@68: if log: jpayne@68: norm = mcolors.LogNorm() jpayne@68: else: jpayne@68: norm = mcolors.Normalize() jpayne@68: jpayne@68: scores = np.array([i.score for i in self], dtype=float) jpayne@68: scores = scores[np.isfinite(scores)] jpayne@68: norm.autoscale(scores) jpayne@68: jpayne@68: if vmin is not None: jpayne@68: if percentile: jpayne@68: vmin = float(np.percentile(scores, vmin)) jpayne@68: norm.vmin = vmin jpayne@68: if vmax is not None: jpayne@68: if percentile: jpayne@68: vmax = float(np.percentile(scores, vmax)) jpayne@68: norm.vmax = vmax jpayne@68: jpayne@68: return norm jpayne@68: jpayne@68: def at(self, inds: list[int]) -> BedTool: jpayne@68: """ jpayne@68: Returns a new BedTool with only intervals at lines `inds` jpayne@68: jpayne@68: Parameters jpayne@68: ---------- jpayne@68: inds : List[int] jpayne@68: List of line numbers jpayne@68: jpayne@68: Returns jpayne@68: ------- jpayne@68: BedTool jpayne@68: New BedTool with only intervals at `inds` jpayne@68: """ jpayne@68: length = len(inds) jpayne@68: jpayne@68: def _gen(): jpayne@68: k = 0 jpayne@68: for i, feature in enumerate(self): jpayne@68: if i == inds[k]: jpayne@68: yield feature jpayne@68: k += 1 jpayne@68: if k == length: jpayne@68: break jpayne@68: jpayne@68: return BedTool(_gen()).saveas() jpayne@68: jpayne@68: def to_dataframe(self, disable_auto_names: bool = False, *args, **kwargs) -> pd.DataFrame: jpayne@68: """ jpayne@68: Create a pandas.DataFrame, passing args and kwargs to pandas.read_csv jpayne@68: The separator kwarg `sep` is given a tab `\\t` as value by default. jpayne@68: jpayne@68: Parameters jpayne@68: ---------- jpayne@68: disable_auto_names : bool jpayne@68: By default, the created dataframe fills in column names jpayne@68: automatically according to the detected filetype (e.g., "chrom", jpayne@68: "start", "end" for a BED3 file). Set this argument to True to jpayne@68: disable this behavior. jpayne@68: """ jpayne@68: # Complain if BAM or if not a file jpayne@68: if self._isbam: jpayne@68: raise ValueError("BAM not supported for converting to DataFrame") jpayne@68: if not isinstance(self.fn, str): jpayne@68: raise ValueError("use .saveas() to make sure self.fn is a file") jpayne@68: jpayne@68: try: jpayne@68: import pandas jpayne@68: except ImportError: jpayne@68: raise ImportError("pandas must be installed to convert to pandas.DataFrame") jpayne@68: # Otherwise we're good: jpayne@68: names = kwargs.get("names", None) jpayne@68: if names is None and not disable_auto_names: jpayne@68: try: jpayne@68: _names = settings._column_names[self.file_type][: self.field_count()] jpayne@68: if len(_names) < self.field_count(): jpayne@68: warn( jpayne@68: "Default names for filetype %s are:\n%s\nbut file has " jpayne@68: "%s fields; you can supply custom names with the " jpayne@68: "`names` kwarg" % (self.file_type, _names, self.field_count()) jpayne@68: ) jpayne@68: _names = None jpayne@68: except KeyError: jpayne@68: _names = None jpayne@68: kwargs["names"] = _names jpayne@68: jpayne@68: if os.path.isfile(self.fn) and os.path.getsize(self.fn) > 0: jpayne@68: return pandas.read_csv(self.fn, *args, sep="\t", **kwargs) # type: ignore jpayne@68: else: jpayne@68: return pandas.DataFrame() jpayne@68: jpayne@68: def tail(self, lines:int = 10, as_string: bool = False) -> Optional[str]: jpayne@68: """ jpayne@68: Like `head`, but prints last 10 lines of the file by default. jpayne@68: jpayne@68: To avoid consuming iterables, this only works with file-based, non-BAM jpayne@68: BedTool objects. jpayne@68: jpayne@68: Use `as_string=True` to return a string. jpayne@68: """ jpayne@68: if self._isbam: jpayne@68: raise ValueError("tail() not yet implemented for BAM files") jpayne@68: if not isinstance(self.fn, str): jpayne@68: raise ValueError( jpayne@68: "tail() not implemented for non-file-based " jpayne@68: "BedTool objects. Please use saveas() first." jpayne@68: ) jpayne@68: bufsize = 8192 jpayne@68: offset = bufsize jpayne@68: f = open(self.fn, "rb") jpayne@68: jpayne@68: # whence=2 arg means relative to end (i.e., go to the end) jpayne@68: f.seek(0, 2) jpayne@68: file_size = f.tell() jpayne@68: data = [] jpayne@68: while True: jpayne@68: if file_size < bufsize: jpayne@68: offset = file_size jpayne@68: f.seek(-offset, 2) jpayne@68: chunk = f.read(offset) jpayne@68: data.extend(chunk.splitlines(True)) jpayne@68: if len(data) >= lines or offset == file_size: jpayne@68: break jpayne@68: offset += bufsize jpayne@68: jpayne@68: result = "".join([i.decode() for i in data[-lines:]]) jpayne@68: if as_string: jpayne@68: return result jpayne@68: else: jpayne@68: print(result) jpayne@68: jpayne@68: jpayne@68: class BAM(object): jpayne@68: def __init__(self, stream): jpayne@68: """ jpayne@68: Wraps pysam.Samfile so that it yields pybedtools.Interval objects when jpayne@68: iterated over. jpayne@68: jpayne@68: The pysam.Samfile can be accessed via the .pysam_bamfile attribute. jpayne@68: """ jpayne@68: self.stream = stream jpayne@68: if not isinstance(self.stream, str): jpayne@68: raise ValueError("Only files are supported, not streams") jpayne@68: self.pysam_bamfile = pysam.Samfile(self.stream) jpayne@68: jpayne@68: def _aligned_segment_to_interval(self, r): jpayne@68: jpayne@68: if r.rname >= 0: jpayne@68: rname = self.pysam_bamfile.get_reference_name(r.rname) jpayne@68: else: jpayne@68: rname = "*" jpayne@68: jpayne@68: if r.rnext >= 0: jpayne@68: if r.rnext == r.rname: jpayne@68: rnext = "=" jpayne@68: else: jpayne@68: rnext = self.pysam_bamfile.get_reference_name(r.rnext) jpayne@68: else: jpayne@68: rnext = "*" jpayne@68: jpayne@68: # SAM spec says if unavailable should be set to 0. Pysam sets to -1. jpayne@68: jpayne@68: if r.pnext <= 0: jpayne@68: pnext = "0" jpayne@68: else: jpayne@68: # +1 here because cbedtools.pyx expects SAM -- which is 1-based -- jpayne@68: # but pysam uses 0-based. jpayne@68: pnext = str(r.pnext + 1) jpayne@68: jpayne@68: if r.cigarstring: jpayne@68: cigarstring = r.cigarstring jpayne@68: else: jpayne@68: cigarstring = "*" jpayne@68: jpayne@68: # Rudimentary support. jpayne@68: # TODO: remove when refactoring to new BAM iterating jpayne@68: tags = [] jpayne@68: for k, v in r.tags: jpayne@68: if isinstance(v, int): jpayne@68: t = "i" jpayne@68: elif isinstance(v, float): jpayne@68: t = "f" jpayne@68: else: jpayne@68: t = "Z" jpayne@68: tags.append("{0}:{1}:{2}".format(k, t, v)) jpayne@68: jpayne@68: tags = "\t".join(tags) jpayne@68: jpayne@68: if r.seq: jpayne@68: seq = r.seq jpayne@68: else: jpayne@68: seq = "*" jpayne@68: jpayne@68: if r.qual: jpayne@68: qual = r.qual jpayne@68: else: jpayne@68: qual = "*" jpayne@68: jpayne@68: fields = [ jpayne@68: r.qname, jpayne@68: str(r.flag), jpayne@68: rname, jpayne@68: # +1 here because cbedtools.pyx expects SAM -- which is 1-based -- jpayne@68: # but pysam uses 0-based. jpayne@68: str(r.pos + 1), jpayne@68: str(r.mapq), jpayne@68: cigarstring, jpayne@68: rnext, jpayne@68: pnext, jpayne@68: str(r.tlen), jpayne@68: seq, jpayne@68: qual, jpayne@68: ] jpayne@68: if tags: jpayne@68: fields.append(tags) jpayne@68: jpayne@68: if None in fields: jpayne@68: raise ValueError("Found 'None' in fields: %s" % fields) jpayne@68: return create_interval_from_list(fields) jpayne@68: jpayne@68: def __iter__(self): jpayne@68: return self jpayne@68: jpayne@68: # TODO: this is PAINFUL but it ensures that existing tests work. Once all jpayne@68: # tests work, the new behavior will be to yield pysam AlignedSegment jpayne@68: # objects directly. jpayne@68: def __next__(self): jpayne@68: return self._aligned_segment_to_interval(next(self.pysam_bamfile)) jpayne@68: jpayne@68: def next(self): jpayne@68: return self.__next__() jpayne@68: jpayne@68: jpayne@68: class History(list): jpayne@68: def __init__(self): jpayne@68: """ jpayne@68: Represents one or many HistorySteps. Mostly used for nicely formatting jpayne@68: a series of HistorySteps. jpayne@68: """ jpayne@68: list.__init__(self) jpayne@68: jpayne@68: jpayne@68: class HistoryStep(object): jpayne@68: def __init__(self, method, args, kwargs, bedtool_instance, parent_tag, result_tag): jpayne@68: """ jpayne@68: Class to represent one step in the history. jpayne@68: jpayne@68: Mostly used for its __repr__ method, to try and exactly replicate code jpayne@68: that can be pasted to re-do history steps jpayne@68: """ jpayne@68: try: jpayne@68: self.method = method._name jpayne@68: except AttributeError: jpayne@68: self.method = method.__name__ jpayne@68: jpayne@68: self.args = args jpayne@68: self.kwargs = kwargs jpayne@68: self.fn = bedtool_instance.fn jpayne@68: self.parent_tag = parent_tag jpayne@68: self.result_tag = result_tag jpayne@68: jpayne@68: def _clean_arg(self, arg: str|BedTool) -> str: jpayne@68: """ jpayne@68: Wrap strings in quotes and convert bedtool instances to filenames. jpayne@68: """ jpayne@68: if isinstance(arg, BedTool): jpayne@68: arg = arg.fn jpayne@68: if isinstance(arg, str): jpayne@68: arg = '"%s"' % arg jpayne@68: return arg jpayne@68: jpayne@68: def __repr__(self): jpayne@68: # Still not sure whether to use pybedtools.bedtool() or bedtool() jpayne@68: s = "" jpayne@68: s += " " jpayne@68: if os.path.exists(self.fn): jpayne@68: s += 'BedTool("%(fn)s").%(method)s(%%s%%s)' % self.__dict__ jpayne@68: else: jpayne@68: s += 'BedTool("MISSING FILE: %(fn)s")' % self.__dict__ jpayne@68: s += ".%(method)s(%%s%%s)" % self.__dict__ jpayne@68: jpayne@68: # Format args and kwargs jpayne@68: args_string = ",".join(map(self._clean_arg, self.args)) jpayne@68: kwargs_string = ",".join( jpayne@68: ["%s=%s" % (i[0], self._clean_arg(i[1])) for i in list(self.kwargs.items())] jpayne@68: ) jpayne@68: # stick a comma on the end if there's something here jpayne@68: if len(args_string) > 0: jpayne@68: args_string += ", " jpayne@68: jpayne@68: s = s % (args_string, kwargs_string) jpayne@68: s += ", parent tag: %s" % self.parent_tag jpayne@68: s += ", result tag: %s" % self.result_tag jpayne@68: return s jpayne@68: jpayne@68: jpayne@68: def example_bedtool(fn): jpayne@68: """ jpayne@68: Return a bedtool using a bed file from the pybedtools examples directory. jpayne@68: Use :func:`list_example_files` to see a list of files that are included. jpayne@68: """ jpayne@68: fn = os.path.join(filenames.data_dir(), fn) jpayne@68: if not os.path.exists(fn): jpayne@68: msg = "%s does not exist" % fn jpayne@68: raise FileNotFoundError(msg) jpayne@68: return BedTool(fn) jpayne@68: jpayne@68: jpayne@68: if __name__ == "__main__": jpayne@68: import doctest jpayne@68: jpayne@68: doctest.testmod(optionflags=doctest.ELLIPSIS | doctest.NORMALIZE_WHITESPACE)