jpayne@68: import io
jpayne@68: import os
jpayne@68: import re
jpayne@68: import abc
jpayne@68: import csv
jpayne@68: import sys
jpayne@68: import email
jpayne@68: import pathlib
jpayne@68: import zipfile
jpayne@68: import operator
jpayne@68: import functools
jpayne@68: import itertools
jpayne@68: import collections
jpayne@68:
jpayne@68: from configparser import ConfigParser
jpayne@68: from contextlib import suppress
jpayne@68: from importlib import import_module
jpayne@68: from importlib.abc import MetaPathFinder
jpayne@68: from itertools import starmap
jpayne@68:
jpayne@68:
jpayne@68: __all__ = [
jpayne@68: 'Distribution',
jpayne@68: 'DistributionFinder',
jpayne@68: 'PackageNotFoundError',
jpayne@68: 'distribution',
jpayne@68: 'distributions',
jpayne@68: 'entry_points',
jpayne@68: 'files',
jpayne@68: 'metadata',
jpayne@68: 'requires',
jpayne@68: 'version',
jpayne@68: ]
jpayne@68:
jpayne@68:
jpayne@68: class PackageNotFoundError(ModuleNotFoundError):
jpayne@68: """The package was not found."""
jpayne@68:
jpayne@68:
jpayne@68: class EntryPoint(
jpayne@68: collections.namedtuple('EntryPointBase', 'name value group')):
jpayne@68: """An entry point as defined by Python packaging conventions.
jpayne@68:
jpayne@68: See `the packaging docs on entry points
jpayne@68: `_
jpayne@68: for more information.
jpayne@68: """
jpayne@68:
jpayne@68: pattern = re.compile(
jpayne@68: r'(?P[\w.]+)\s*'
jpayne@68: r'(:\s*(?P[\w.]+))?\s*'
jpayne@68: r'(?P\[.*\])?\s*$'
jpayne@68: )
jpayne@68: """
jpayne@68: A regular expression describing the syntax for an entry point,
jpayne@68: which might look like:
jpayne@68:
jpayne@68: - module
jpayne@68: - package.module
jpayne@68: - package.module:attribute
jpayne@68: - package.module:object.attribute
jpayne@68: - package.module:attr [extra1, extra2]
jpayne@68:
jpayne@68: Other combinations are possible as well.
jpayne@68:
jpayne@68: The expression is lenient about whitespace around the ':',
jpayne@68: following the attr, and following any extras.
jpayne@68: """
jpayne@68:
jpayne@68: def load(self):
jpayne@68: """Load the entry point from its definition. If only a module
jpayne@68: is indicated by the value, return that module. Otherwise,
jpayne@68: return the named object.
jpayne@68: """
jpayne@68: match = self.pattern.match(self.value)
jpayne@68: module = import_module(match.group('module'))
jpayne@68: attrs = filter(None, (match.group('attr') or '').split('.'))
jpayne@68: return functools.reduce(getattr, attrs, module)
jpayne@68:
jpayne@68: @property
jpayne@68: def extras(self):
jpayne@68: match = self.pattern.match(self.value)
jpayne@68: return list(re.finditer(r'\w+', match.group('extras') or ''))
jpayne@68:
jpayne@68: @classmethod
jpayne@68: def _from_config(cls, config):
jpayne@68: return [
jpayne@68: cls(name, value, group)
jpayne@68: for group in config.sections()
jpayne@68: for name, value in config.items(group)
jpayne@68: ]
jpayne@68:
jpayne@68: @classmethod
jpayne@68: def _from_text(cls, text):
jpayne@68: config = ConfigParser(delimiters='=')
jpayne@68: # case sensitive: https://stackoverflow.com/q/1611799/812183
jpayne@68: config.optionxform = str
jpayne@68: try:
jpayne@68: config.read_string(text)
jpayne@68: except AttributeError: # pragma: nocover
jpayne@68: # Python 2 has no read_string
jpayne@68: config.readfp(io.StringIO(text))
jpayne@68: return EntryPoint._from_config(config)
jpayne@68:
jpayne@68: def __iter__(self):
jpayne@68: """
jpayne@68: Supply iter so one may construct dicts of EntryPoints easily.
jpayne@68: """
jpayne@68: return iter((self.name, self))
jpayne@68:
jpayne@68: def __reduce__(self):
jpayne@68: return (
jpayne@68: self.__class__,
jpayne@68: (self.name, self.value, self.group),
jpayne@68: )
jpayne@68:
jpayne@68:
jpayne@68: class PackagePath(pathlib.PurePosixPath):
jpayne@68: """A reference to a path in a package"""
jpayne@68:
jpayne@68: def read_text(self, encoding='utf-8'):
jpayne@68: with self.locate().open(encoding=encoding) as stream:
jpayne@68: return stream.read()
jpayne@68:
jpayne@68: def read_binary(self):
jpayne@68: with self.locate().open('rb') as stream:
jpayne@68: return stream.read()
jpayne@68:
jpayne@68: def locate(self):
jpayne@68: """Return a path-like object for this path"""
jpayne@68: return self.dist.locate_file(self)
jpayne@68:
jpayne@68:
jpayne@68: class FileHash:
jpayne@68: def __init__(self, spec):
jpayne@68: self.mode, _, self.value = spec.partition('=')
jpayne@68:
jpayne@68: def __repr__(self):
jpayne@68: return ''.format(self.mode, self.value)
jpayne@68:
jpayne@68:
jpayne@68: class Distribution:
jpayne@68: """A Python distribution package."""
jpayne@68:
jpayne@68: @abc.abstractmethod
jpayne@68: def read_text(self, filename):
jpayne@68: """Attempt to load metadata file given by the name.
jpayne@68:
jpayne@68: :param filename: The name of the file in the distribution info.
jpayne@68: :return: The text if found, otherwise None.
jpayne@68: """
jpayne@68:
jpayne@68: @abc.abstractmethod
jpayne@68: def locate_file(self, path):
jpayne@68: """
jpayne@68: Given a path to a file in this distribution, return a path
jpayne@68: to it.
jpayne@68: """
jpayne@68:
jpayne@68: @classmethod
jpayne@68: def from_name(cls, name):
jpayne@68: """Return the Distribution for the given package name.
jpayne@68:
jpayne@68: :param name: The name of the distribution package to search for.
jpayne@68: :return: The Distribution instance (or subclass thereof) for the named
jpayne@68: package, if found.
jpayne@68: :raises PackageNotFoundError: When the named package's distribution
jpayne@68: metadata cannot be found.
jpayne@68: """
jpayne@68: for resolver in cls._discover_resolvers():
jpayne@68: dists = resolver(DistributionFinder.Context(name=name))
jpayne@68: dist = next(dists, None)
jpayne@68: if dist is not None:
jpayne@68: return dist
jpayne@68: else:
jpayne@68: raise PackageNotFoundError(name)
jpayne@68:
jpayne@68: @classmethod
jpayne@68: def discover(cls, **kwargs):
jpayne@68: """Return an iterable of Distribution objects for all packages.
jpayne@68:
jpayne@68: Pass a ``context`` or pass keyword arguments for constructing
jpayne@68: a context.
jpayne@68:
jpayne@68: :context: A ``DistributionFinder.Context`` object.
jpayne@68: :return: Iterable of Distribution objects for all packages.
jpayne@68: """
jpayne@68: context = kwargs.pop('context', None)
jpayne@68: if context and kwargs:
jpayne@68: raise ValueError("cannot accept context and kwargs")
jpayne@68: context = context or DistributionFinder.Context(**kwargs)
jpayne@68: return itertools.chain.from_iterable(
jpayne@68: resolver(context)
jpayne@68: for resolver in cls._discover_resolvers()
jpayne@68: )
jpayne@68:
jpayne@68: @staticmethod
jpayne@68: def at(path):
jpayne@68: """Return a Distribution for the indicated metadata path
jpayne@68:
jpayne@68: :param path: a string or path-like object
jpayne@68: :return: a concrete Distribution instance for the path
jpayne@68: """
jpayne@68: return PathDistribution(pathlib.Path(path))
jpayne@68:
jpayne@68: @staticmethod
jpayne@68: def _discover_resolvers():
jpayne@68: """Search the meta_path for resolvers."""
jpayne@68: declared = (
jpayne@68: getattr(finder, 'find_distributions', None)
jpayne@68: for finder in sys.meta_path
jpayne@68: )
jpayne@68: return filter(None, declared)
jpayne@68:
jpayne@68: @property
jpayne@68: def metadata(self):
jpayne@68: """Return the parsed metadata for this Distribution.
jpayne@68:
jpayne@68: The returned object will have keys that name the various bits of
jpayne@68: metadata. See PEP 566 for details.
jpayne@68: """
jpayne@68: text = (
jpayne@68: self.read_text('METADATA')
jpayne@68: or self.read_text('PKG-INFO')
jpayne@68: # This last clause is here to support old egg-info files. Its
jpayne@68: # effect is to just end up using the PathDistribution's self._path
jpayne@68: # (which points to the egg-info file) attribute unchanged.
jpayne@68: or self.read_text('')
jpayne@68: )
jpayne@68: return email.message_from_string(text)
jpayne@68:
jpayne@68: @property
jpayne@68: def version(self):
jpayne@68: """Return the 'Version' metadata for the distribution package."""
jpayne@68: return self.metadata['Version']
jpayne@68:
jpayne@68: @property
jpayne@68: def entry_points(self):
jpayne@68: return EntryPoint._from_text(self.read_text('entry_points.txt'))
jpayne@68:
jpayne@68: @property
jpayne@68: def files(self):
jpayne@68: """Files in this distribution.
jpayne@68:
jpayne@68: :return: List of PackagePath for this distribution or None
jpayne@68:
jpayne@68: Result is `None` if the metadata file that enumerates files
jpayne@68: (i.e. RECORD for dist-info or SOURCES.txt for egg-info) is
jpayne@68: missing.
jpayne@68: Result may be empty if the metadata exists but is empty.
jpayne@68: """
jpayne@68: file_lines = self._read_files_distinfo() or self._read_files_egginfo()
jpayne@68:
jpayne@68: def make_file(name, hash=None, size_str=None):
jpayne@68: result = PackagePath(name)
jpayne@68: result.hash = FileHash(hash) if hash else None
jpayne@68: result.size = int(size_str) if size_str else None
jpayne@68: result.dist = self
jpayne@68: return result
jpayne@68:
jpayne@68: return file_lines and list(starmap(make_file, csv.reader(file_lines)))
jpayne@68:
jpayne@68: def _read_files_distinfo(self):
jpayne@68: """
jpayne@68: Read the lines of RECORD
jpayne@68: """
jpayne@68: text = self.read_text('RECORD')
jpayne@68: return text and text.splitlines()
jpayne@68:
jpayne@68: def _read_files_egginfo(self):
jpayne@68: """
jpayne@68: SOURCES.txt might contain literal commas, so wrap each line
jpayne@68: in quotes.
jpayne@68: """
jpayne@68: text = self.read_text('SOURCES.txt')
jpayne@68: return text and map('"{}"'.format, text.splitlines())
jpayne@68:
jpayne@68: @property
jpayne@68: def requires(self):
jpayne@68: """Generated requirements specified for this Distribution"""
jpayne@68: reqs = self._read_dist_info_reqs() or self._read_egg_info_reqs()
jpayne@68: return reqs and list(reqs)
jpayne@68:
jpayne@68: def _read_dist_info_reqs(self):
jpayne@68: return self.metadata.get_all('Requires-Dist')
jpayne@68:
jpayne@68: def _read_egg_info_reqs(self):
jpayne@68: source = self.read_text('requires.txt')
jpayne@68: return source and self._deps_from_requires_text(source)
jpayne@68:
jpayne@68: @classmethod
jpayne@68: def _deps_from_requires_text(cls, source):
jpayne@68: section_pairs = cls._read_sections(source.splitlines())
jpayne@68: sections = {
jpayne@68: section: list(map(operator.itemgetter('line'), results))
jpayne@68: for section, results in
jpayne@68: itertools.groupby(section_pairs, operator.itemgetter('section'))
jpayne@68: }
jpayne@68: return cls._convert_egg_info_reqs_to_simple_reqs(sections)
jpayne@68:
jpayne@68: @staticmethod
jpayne@68: def _read_sections(lines):
jpayne@68: section = None
jpayne@68: for line in filter(None, lines):
jpayne@68: section_match = re.match(r'\[(.*)\]$', line)
jpayne@68: if section_match:
jpayne@68: section = section_match.group(1)
jpayne@68: continue
jpayne@68: yield locals()
jpayne@68:
jpayne@68: @staticmethod
jpayne@68: def _convert_egg_info_reqs_to_simple_reqs(sections):
jpayne@68: """
jpayne@68: Historically, setuptools would solicit and store 'extra'
jpayne@68: requirements, including those with environment markers,
jpayne@68: in separate sections. More modern tools expect each
jpayne@68: dependency to be defined separately, with any relevant
jpayne@68: extras and environment markers attached directly to that
jpayne@68: requirement. This method converts the former to the
jpayne@68: latter. See _test_deps_from_requires_text for an example.
jpayne@68: """
jpayne@68: def make_condition(name):
jpayne@68: return name and 'extra == "{name}"'.format(name=name)
jpayne@68:
jpayne@68: def parse_condition(section):
jpayne@68: section = section or ''
jpayne@68: extra, sep, markers = section.partition(':')
jpayne@68: if extra and markers:
jpayne@68: markers = '({markers})'.format(markers=markers)
jpayne@68: conditions = list(filter(None, [markers, make_condition(extra)]))
jpayne@68: return '; ' + ' and '.join(conditions) if conditions else ''
jpayne@68:
jpayne@68: for section, deps in sections.items():
jpayne@68: for dep in deps:
jpayne@68: yield dep + parse_condition(section)
jpayne@68:
jpayne@68:
jpayne@68: class DistributionFinder(MetaPathFinder):
jpayne@68: """
jpayne@68: A MetaPathFinder capable of discovering installed distributions.
jpayne@68: """
jpayne@68:
jpayne@68: class Context:
jpayne@68: """
jpayne@68: Keyword arguments presented by the caller to
jpayne@68: ``distributions()`` or ``Distribution.discover()``
jpayne@68: to narrow the scope of a search for distributions
jpayne@68: in all DistributionFinders.
jpayne@68:
jpayne@68: Each DistributionFinder may expect any parameters
jpayne@68: and should attempt to honor the canonical
jpayne@68: parameters defined below when appropriate.
jpayne@68: """
jpayne@68:
jpayne@68: name = None
jpayne@68: """
jpayne@68: Specific name for which a distribution finder should match.
jpayne@68: A name of ``None`` matches all distributions.
jpayne@68: """
jpayne@68:
jpayne@68: def __init__(self, **kwargs):
jpayne@68: vars(self).update(kwargs)
jpayne@68:
jpayne@68: @property
jpayne@68: def path(self):
jpayne@68: """
jpayne@68: The path that a distribution finder should search.
jpayne@68:
jpayne@68: Typically refers to Python package paths and defaults
jpayne@68: to ``sys.path``.
jpayne@68: """
jpayne@68: return vars(self).get('path', sys.path)
jpayne@68:
jpayne@68: @property
jpayne@68: def pattern(self):
jpayne@68: return '.*' if self.name is None else re.escape(self.name)
jpayne@68:
jpayne@68: @abc.abstractmethod
jpayne@68: def find_distributions(self, context=Context()):
jpayne@68: """
jpayne@68: Find distributions.
jpayne@68:
jpayne@68: Return an iterable of all Distribution instances capable of
jpayne@68: loading the metadata for packages matching the ``context``,
jpayne@68: a DistributionFinder.Context instance.
jpayne@68: """
jpayne@68:
jpayne@68:
jpayne@68: class MetadataPathFinder(DistributionFinder):
jpayne@68: @classmethod
jpayne@68: def find_distributions(cls, context=DistributionFinder.Context()):
jpayne@68: """
jpayne@68: Find distributions.
jpayne@68:
jpayne@68: Return an iterable of all Distribution instances capable of
jpayne@68: loading the metadata for packages matching ``context.name``
jpayne@68: (or all names if ``None`` indicated) along the paths in the list
jpayne@68: of directories ``context.path``.
jpayne@68: """
jpayne@68: found = cls._search_paths(context.pattern, context.path)
jpayne@68: return map(PathDistribution, found)
jpayne@68:
jpayne@68: @classmethod
jpayne@68: def _search_paths(cls, pattern, paths):
jpayne@68: """Find metadata directories in paths heuristically."""
jpayne@68: return itertools.chain.from_iterable(
jpayne@68: cls._search_path(path, pattern)
jpayne@68: for path in map(cls._switch_path, paths)
jpayne@68: )
jpayne@68:
jpayne@68: @staticmethod
jpayne@68: def _switch_path(path):
jpayne@68: PYPY_OPEN_BUG = False
jpayne@68: if not PYPY_OPEN_BUG or os.path.isfile(path): # pragma: no branch
jpayne@68: with suppress(Exception):
jpayne@68: return zipfile.Path(path)
jpayne@68: return pathlib.Path(path)
jpayne@68:
jpayne@68: @classmethod
jpayne@68: def _matches_info(cls, normalized, item):
jpayne@68: template = r'{pattern}(-.*)?\.(dist|egg)-info'
jpayne@68: manifest = template.format(pattern=normalized)
jpayne@68: return re.match(manifest, item.name, flags=re.IGNORECASE)
jpayne@68:
jpayne@68: @classmethod
jpayne@68: def _matches_legacy(cls, normalized, item):
jpayne@68: template = r'{pattern}-.*\.egg[\\/]EGG-INFO'
jpayne@68: manifest = template.format(pattern=normalized)
jpayne@68: return re.search(manifest, str(item), flags=re.IGNORECASE)
jpayne@68:
jpayne@68: @classmethod
jpayne@68: def _search_path(cls, root, pattern):
jpayne@68: if not root.is_dir():
jpayne@68: return ()
jpayne@68: normalized = pattern.replace('-', '_')
jpayne@68: return (item for item in root.iterdir()
jpayne@68: if cls._matches_info(normalized, item)
jpayne@68: or cls._matches_legacy(normalized, item))
jpayne@68:
jpayne@68:
jpayne@68: class PathDistribution(Distribution):
jpayne@68: def __init__(self, path):
jpayne@68: """Construct a distribution from a path to the metadata directory.
jpayne@68:
jpayne@68: :param path: A pathlib.Path or similar object supporting
jpayne@68: .joinpath(), __div__, .parent, and .read_text().
jpayne@68: """
jpayne@68: self._path = path
jpayne@68:
jpayne@68: def read_text(self, filename):
jpayne@68: with suppress(FileNotFoundError, IsADirectoryError, KeyError,
jpayne@68: NotADirectoryError, PermissionError):
jpayne@68: return self._path.joinpath(filename).read_text(encoding='utf-8')
jpayne@68: read_text.__doc__ = Distribution.read_text.__doc__
jpayne@68:
jpayne@68: def locate_file(self, path):
jpayne@68: return self._path.parent / path
jpayne@68:
jpayne@68:
jpayne@68: def distribution(distribution_name):
jpayne@68: """Get the ``Distribution`` instance for the named package.
jpayne@68:
jpayne@68: :param distribution_name: The name of the distribution package as a string.
jpayne@68: :return: A ``Distribution`` instance (or subclass thereof).
jpayne@68: """
jpayne@68: return Distribution.from_name(distribution_name)
jpayne@68:
jpayne@68:
jpayne@68: def distributions(**kwargs):
jpayne@68: """Get all ``Distribution`` instances in the current environment.
jpayne@68:
jpayne@68: :return: An iterable of ``Distribution`` instances.
jpayne@68: """
jpayne@68: return Distribution.discover(**kwargs)
jpayne@68:
jpayne@68:
jpayne@68: def metadata(distribution_name):
jpayne@68: """Get the metadata for the named package.
jpayne@68:
jpayne@68: :param distribution_name: The name of the distribution package to query.
jpayne@68: :return: An email.Message containing the parsed metadata.
jpayne@68: """
jpayne@68: return Distribution.from_name(distribution_name).metadata
jpayne@68:
jpayne@68:
jpayne@68: def version(distribution_name):
jpayne@68: """Get the version string for the named package.
jpayne@68:
jpayne@68: :param distribution_name: The name of the distribution package to query.
jpayne@68: :return: The version string for the package as defined in the package's
jpayne@68: "Version" metadata key.
jpayne@68: """
jpayne@68: return distribution(distribution_name).version
jpayne@68:
jpayne@68:
jpayne@68: def entry_points():
jpayne@68: """Return EntryPoint objects for all installed packages.
jpayne@68:
jpayne@68: :return: EntryPoint objects for all installed packages.
jpayne@68: """
jpayne@68: eps = itertools.chain.from_iterable(
jpayne@68: dist.entry_points for dist in distributions())
jpayne@68: by_group = operator.attrgetter('group')
jpayne@68: ordered = sorted(eps, key=by_group)
jpayne@68: grouped = itertools.groupby(ordered, by_group)
jpayne@68: return {
jpayne@68: group: tuple(eps)
jpayne@68: for group, eps in grouped
jpayne@68: }
jpayne@68:
jpayne@68:
jpayne@68: def files(distribution_name):
jpayne@68: """Return a list of files for the named package.
jpayne@68:
jpayne@68: :param distribution_name: The name of the distribution package to query.
jpayne@68: :return: List of files composing the distribution.
jpayne@68: """
jpayne@68: return distribution(distribution_name).files
jpayne@68:
jpayne@68:
jpayne@68: def requires(distribution_name):
jpayne@68: """
jpayne@68: Return a list of requirements for the named package.
jpayne@68:
jpayne@68: :return: An iterator of requirements, suitable for
jpayne@68: packaging.requirement.Requirement.
jpayne@68: """
jpayne@68: return distribution(distribution_name).requires