jpayne@69: import io jpayne@69: import os jpayne@69: import re jpayne@69: import abc jpayne@69: import csv jpayne@69: import sys jpayne@69: import email jpayne@69: import pathlib jpayne@69: import zipfile jpayne@69: import operator jpayne@69: import functools jpayne@69: import itertools jpayne@69: import collections jpayne@69: jpayne@69: from configparser import ConfigParser jpayne@69: from contextlib import suppress jpayne@69: from importlib import import_module jpayne@69: from importlib.abc import MetaPathFinder jpayne@69: from itertools import starmap jpayne@69: jpayne@69: jpayne@69: __all__ = [ jpayne@69: 'Distribution', jpayne@69: 'DistributionFinder', jpayne@69: 'PackageNotFoundError', jpayne@69: 'distribution', jpayne@69: 'distributions', jpayne@69: 'entry_points', jpayne@69: 'files', jpayne@69: 'metadata', jpayne@69: 'requires', jpayne@69: 'version', jpayne@69: ] jpayne@69: jpayne@69: jpayne@69: class PackageNotFoundError(ModuleNotFoundError): jpayne@69: """The package was not found.""" jpayne@69: jpayne@69: jpayne@69: class EntryPoint( jpayne@69: collections.namedtuple('EntryPointBase', 'name value group')): jpayne@69: """An entry point as defined by Python packaging conventions. jpayne@69: jpayne@69: See `the packaging docs on entry points jpayne@69: `_ jpayne@69: for more information. jpayne@69: """ jpayne@69: jpayne@69: pattern = re.compile( jpayne@69: r'(?P[\w.]+)\s*' jpayne@69: r'(:\s*(?P[\w.]+))?\s*' jpayne@69: r'(?P\[.*\])?\s*$' jpayne@69: ) jpayne@69: """ jpayne@69: A regular expression describing the syntax for an entry point, jpayne@69: which might look like: jpayne@69: jpayne@69: - module jpayne@69: - package.module jpayne@69: - package.module:attribute jpayne@69: - package.module:object.attribute jpayne@69: - package.module:attr [extra1, extra2] jpayne@69: jpayne@69: Other combinations are possible as well. jpayne@69: jpayne@69: The expression is lenient about whitespace around the ':', jpayne@69: following the attr, and following any extras. jpayne@69: """ jpayne@69: jpayne@69: def load(self): jpayne@69: """Load the entry point from its definition. If only a module jpayne@69: is indicated by the value, return that module. Otherwise, jpayne@69: return the named object. jpayne@69: """ jpayne@69: match = self.pattern.match(self.value) jpayne@69: module = import_module(match.group('module')) jpayne@69: attrs = filter(None, (match.group('attr') or '').split('.')) jpayne@69: return functools.reduce(getattr, attrs, module) jpayne@69: jpayne@69: @property jpayne@69: def extras(self): jpayne@69: match = self.pattern.match(self.value) jpayne@69: return list(re.finditer(r'\w+', match.group('extras') or '')) jpayne@69: jpayne@69: @classmethod jpayne@69: def _from_config(cls, config): jpayne@69: return [ jpayne@69: cls(name, value, group) jpayne@69: for group in config.sections() jpayne@69: for name, value in config.items(group) jpayne@69: ] jpayne@69: jpayne@69: @classmethod jpayne@69: def _from_text(cls, text): jpayne@69: config = ConfigParser(delimiters='=') jpayne@69: # case sensitive: https://stackoverflow.com/q/1611799/812183 jpayne@69: config.optionxform = str jpayne@69: try: jpayne@69: config.read_string(text) jpayne@69: except AttributeError: # pragma: nocover jpayne@69: # Python 2 has no read_string jpayne@69: config.readfp(io.StringIO(text)) jpayne@69: return EntryPoint._from_config(config) jpayne@69: jpayne@69: def __iter__(self): jpayne@69: """ jpayne@69: Supply iter so one may construct dicts of EntryPoints easily. jpayne@69: """ jpayne@69: return iter((self.name, self)) jpayne@69: jpayne@69: def __reduce__(self): jpayne@69: return ( jpayne@69: self.__class__, jpayne@69: (self.name, self.value, self.group), jpayne@69: ) jpayne@69: jpayne@69: jpayne@69: class PackagePath(pathlib.PurePosixPath): jpayne@69: """A reference to a path in a package""" jpayne@69: jpayne@69: def read_text(self, encoding='utf-8'): jpayne@69: with self.locate().open(encoding=encoding) as stream: jpayne@69: return stream.read() jpayne@69: jpayne@69: def read_binary(self): jpayne@69: with self.locate().open('rb') as stream: jpayne@69: return stream.read() jpayne@69: jpayne@69: def locate(self): jpayne@69: """Return a path-like object for this path""" jpayne@69: return self.dist.locate_file(self) jpayne@69: jpayne@69: jpayne@69: class FileHash: jpayne@69: def __init__(self, spec): jpayne@69: self.mode, _, self.value = spec.partition('=') jpayne@69: jpayne@69: def __repr__(self): jpayne@69: return ''.format(self.mode, self.value) jpayne@69: jpayne@69: jpayne@69: class Distribution: jpayne@69: """A Python distribution package.""" jpayne@69: jpayne@69: @abc.abstractmethod jpayne@69: def read_text(self, filename): jpayne@69: """Attempt to load metadata file given by the name. jpayne@69: jpayne@69: :param filename: The name of the file in the distribution info. jpayne@69: :return: The text if found, otherwise None. jpayne@69: """ jpayne@69: jpayne@69: @abc.abstractmethod jpayne@69: def locate_file(self, path): jpayne@69: """ jpayne@69: Given a path to a file in this distribution, return a path jpayne@69: to it. jpayne@69: """ jpayne@69: jpayne@69: @classmethod jpayne@69: def from_name(cls, name): jpayne@69: """Return the Distribution for the given package name. jpayne@69: jpayne@69: :param name: The name of the distribution package to search for. jpayne@69: :return: The Distribution instance (or subclass thereof) for the named jpayne@69: package, if found. jpayne@69: :raises PackageNotFoundError: When the named package's distribution jpayne@69: metadata cannot be found. jpayne@69: """ jpayne@69: for resolver in cls._discover_resolvers(): jpayne@69: dists = resolver(DistributionFinder.Context(name=name)) jpayne@69: dist = next(dists, None) jpayne@69: if dist is not None: jpayne@69: return dist jpayne@69: else: jpayne@69: raise PackageNotFoundError(name) jpayne@69: jpayne@69: @classmethod jpayne@69: def discover(cls, **kwargs): jpayne@69: """Return an iterable of Distribution objects for all packages. jpayne@69: jpayne@69: Pass a ``context`` or pass keyword arguments for constructing jpayne@69: a context. jpayne@69: jpayne@69: :context: A ``DistributionFinder.Context`` object. jpayne@69: :return: Iterable of Distribution objects for all packages. jpayne@69: """ jpayne@69: context = kwargs.pop('context', None) jpayne@69: if context and kwargs: jpayne@69: raise ValueError("cannot accept context and kwargs") jpayne@69: context = context or DistributionFinder.Context(**kwargs) jpayne@69: return itertools.chain.from_iterable( jpayne@69: resolver(context) jpayne@69: for resolver in cls._discover_resolvers() jpayne@69: ) jpayne@69: jpayne@69: @staticmethod jpayne@69: def at(path): jpayne@69: """Return a Distribution for the indicated metadata path jpayne@69: jpayne@69: :param path: a string or path-like object jpayne@69: :return: a concrete Distribution instance for the path jpayne@69: """ jpayne@69: return PathDistribution(pathlib.Path(path)) jpayne@69: jpayne@69: @staticmethod jpayne@69: def _discover_resolvers(): jpayne@69: """Search the meta_path for resolvers.""" jpayne@69: declared = ( jpayne@69: getattr(finder, 'find_distributions', None) jpayne@69: for finder in sys.meta_path jpayne@69: ) jpayne@69: return filter(None, declared) jpayne@69: jpayne@69: @property jpayne@69: def metadata(self): jpayne@69: """Return the parsed metadata for this Distribution. jpayne@69: jpayne@69: The returned object will have keys that name the various bits of jpayne@69: metadata. See PEP 566 for details. jpayne@69: """ jpayne@69: text = ( jpayne@69: self.read_text('METADATA') jpayne@69: or self.read_text('PKG-INFO') jpayne@69: # This last clause is here to support old egg-info files. Its jpayne@69: # effect is to just end up using the PathDistribution's self._path jpayne@69: # (which points to the egg-info file) attribute unchanged. jpayne@69: or self.read_text('') jpayne@69: ) jpayne@69: return email.message_from_string(text) jpayne@69: jpayne@69: @property jpayne@69: def version(self): jpayne@69: """Return the 'Version' metadata for the distribution package.""" jpayne@69: return self.metadata['Version'] jpayne@69: jpayne@69: @property jpayne@69: def entry_points(self): jpayne@69: return EntryPoint._from_text(self.read_text('entry_points.txt')) jpayne@69: jpayne@69: @property jpayne@69: def files(self): jpayne@69: """Files in this distribution. jpayne@69: jpayne@69: :return: List of PackagePath for this distribution or None jpayne@69: jpayne@69: Result is `None` if the metadata file that enumerates files jpayne@69: (i.e. RECORD for dist-info or SOURCES.txt for egg-info) is jpayne@69: missing. jpayne@69: Result may be empty if the metadata exists but is empty. jpayne@69: """ jpayne@69: file_lines = self._read_files_distinfo() or self._read_files_egginfo() jpayne@69: jpayne@69: def make_file(name, hash=None, size_str=None): jpayne@69: result = PackagePath(name) jpayne@69: result.hash = FileHash(hash) if hash else None jpayne@69: result.size = int(size_str) if size_str else None jpayne@69: result.dist = self jpayne@69: return result jpayne@69: jpayne@69: return file_lines and list(starmap(make_file, csv.reader(file_lines))) jpayne@69: jpayne@69: def _read_files_distinfo(self): jpayne@69: """ jpayne@69: Read the lines of RECORD jpayne@69: """ jpayne@69: text = self.read_text('RECORD') jpayne@69: return text and text.splitlines() jpayne@69: jpayne@69: def _read_files_egginfo(self): jpayne@69: """ jpayne@69: SOURCES.txt might contain literal commas, so wrap each line jpayne@69: in quotes. jpayne@69: """ jpayne@69: text = self.read_text('SOURCES.txt') jpayne@69: return text and map('"{}"'.format, text.splitlines()) jpayne@69: jpayne@69: @property jpayne@69: def requires(self): jpayne@69: """Generated requirements specified for this Distribution""" jpayne@69: reqs = self._read_dist_info_reqs() or self._read_egg_info_reqs() jpayne@69: return reqs and list(reqs) jpayne@69: jpayne@69: def _read_dist_info_reqs(self): jpayne@69: return self.metadata.get_all('Requires-Dist') jpayne@69: jpayne@69: def _read_egg_info_reqs(self): jpayne@69: source = self.read_text('requires.txt') jpayne@69: return source and self._deps_from_requires_text(source) jpayne@69: jpayne@69: @classmethod jpayne@69: def _deps_from_requires_text(cls, source): jpayne@69: section_pairs = cls._read_sections(source.splitlines()) jpayne@69: sections = { jpayne@69: section: list(map(operator.itemgetter('line'), results)) jpayne@69: for section, results in jpayne@69: itertools.groupby(section_pairs, operator.itemgetter('section')) jpayne@69: } jpayne@69: return cls._convert_egg_info_reqs_to_simple_reqs(sections) jpayne@69: jpayne@69: @staticmethod jpayne@69: def _read_sections(lines): jpayne@69: section = None jpayne@69: for line in filter(None, lines): jpayne@69: section_match = re.match(r'\[(.*)\]$', line) jpayne@69: if section_match: jpayne@69: section = section_match.group(1) jpayne@69: continue jpayne@69: yield locals() jpayne@69: jpayne@69: @staticmethod jpayne@69: def _convert_egg_info_reqs_to_simple_reqs(sections): jpayne@69: """ jpayne@69: Historically, setuptools would solicit and store 'extra' jpayne@69: requirements, including those with environment markers, jpayne@69: in separate sections. More modern tools expect each jpayne@69: dependency to be defined separately, with any relevant jpayne@69: extras and environment markers attached directly to that jpayne@69: requirement. This method converts the former to the jpayne@69: latter. See _test_deps_from_requires_text for an example. jpayne@69: """ jpayne@69: def make_condition(name): jpayne@69: return name and 'extra == "{name}"'.format(name=name) jpayne@69: jpayne@69: def parse_condition(section): jpayne@69: section = section or '' jpayne@69: extra, sep, markers = section.partition(':') jpayne@69: if extra and markers: jpayne@69: markers = '({markers})'.format(markers=markers) jpayne@69: conditions = list(filter(None, [markers, make_condition(extra)])) jpayne@69: return '; ' + ' and '.join(conditions) if conditions else '' jpayne@69: jpayne@69: for section, deps in sections.items(): jpayne@69: for dep in deps: jpayne@69: yield dep + parse_condition(section) jpayne@69: jpayne@69: jpayne@69: class DistributionFinder(MetaPathFinder): jpayne@69: """ jpayne@69: A MetaPathFinder capable of discovering installed distributions. jpayne@69: """ jpayne@69: jpayne@69: class Context: jpayne@69: """ jpayne@69: Keyword arguments presented by the caller to jpayne@69: ``distributions()`` or ``Distribution.discover()`` jpayne@69: to narrow the scope of a search for distributions jpayne@69: in all DistributionFinders. jpayne@69: jpayne@69: Each DistributionFinder may expect any parameters jpayne@69: and should attempt to honor the canonical jpayne@69: parameters defined below when appropriate. jpayne@69: """ jpayne@69: jpayne@69: name = None jpayne@69: """ jpayne@69: Specific name for which a distribution finder should match. jpayne@69: A name of ``None`` matches all distributions. jpayne@69: """ jpayne@69: jpayne@69: def __init__(self, **kwargs): jpayne@69: vars(self).update(kwargs) jpayne@69: jpayne@69: @property jpayne@69: def path(self): jpayne@69: """ jpayne@69: The path that a distribution finder should search. jpayne@69: jpayne@69: Typically refers to Python package paths and defaults jpayne@69: to ``sys.path``. jpayne@69: """ jpayne@69: return vars(self).get('path', sys.path) jpayne@69: jpayne@69: @property jpayne@69: def pattern(self): jpayne@69: return '.*' if self.name is None else re.escape(self.name) jpayne@69: jpayne@69: @abc.abstractmethod jpayne@69: def find_distributions(self, context=Context()): jpayne@69: """ jpayne@69: Find distributions. jpayne@69: jpayne@69: Return an iterable of all Distribution instances capable of jpayne@69: loading the metadata for packages matching the ``context``, jpayne@69: a DistributionFinder.Context instance. jpayne@69: """ jpayne@69: jpayne@69: jpayne@69: class MetadataPathFinder(DistributionFinder): jpayne@69: @classmethod jpayne@69: def find_distributions(cls, context=DistributionFinder.Context()): jpayne@69: """ jpayne@69: Find distributions. jpayne@69: jpayne@69: Return an iterable of all Distribution instances capable of jpayne@69: loading the metadata for packages matching ``context.name`` jpayne@69: (or all names if ``None`` indicated) along the paths in the list jpayne@69: of directories ``context.path``. jpayne@69: """ jpayne@69: found = cls._search_paths(context.pattern, context.path) jpayne@69: return map(PathDistribution, found) jpayne@69: jpayne@69: @classmethod jpayne@69: def _search_paths(cls, pattern, paths): jpayne@69: """Find metadata directories in paths heuristically.""" jpayne@69: return itertools.chain.from_iterable( jpayne@69: cls._search_path(path, pattern) jpayne@69: for path in map(cls._switch_path, paths) jpayne@69: ) jpayne@69: jpayne@69: @staticmethod jpayne@69: def _switch_path(path): jpayne@69: PYPY_OPEN_BUG = False jpayne@69: if not PYPY_OPEN_BUG or os.path.isfile(path): # pragma: no branch jpayne@69: with suppress(Exception): jpayne@69: return zipfile.Path(path) jpayne@69: return pathlib.Path(path) jpayne@69: jpayne@69: @classmethod jpayne@69: def _matches_info(cls, normalized, item): jpayne@69: template = r'{pattern}(-.*)?\.(dist|egg)-info' jpayne@69: manifest = template.format(pattern=normalized) jpayne@69: return re.match(manifest, item.name, flags=re.IGNORECASE) jpayne@69: jpayne@69: @classmethod jpayne@69: def _matches_legacy(cls, normalized, item): jpayne@69: template = r'{pattern}-.*\.egg[\\/]EGG-INFO' jpayne@69: manifest = template.format(pattern=normalized) jpayne@69: return re.search(manifest, str(item), flags=re.IGNORECASE) jpayne@69: jpayne@69: @classmethod jpayne@69: def _search_path(cls, root, pattern): jpayne@69: if not root.is_dir(): jpayne@69: return () jpayne@69: normalized = pattern.replace('-', '_') jpayne@69: return (item for item in root.iterdir() jpayne@69: if cls._matches_info(normalized, item) jpayne@69: or cls._matches_legacy(normalized, item)) jpayne@69: jpayne@69: jpayne@69: class PathDistribution(Distribution): jpayne@69: def __init__(self, path): jpayne@69: """Construct a distribution from a path to the metadata directory. jpayne@69: jpayne@69: :param path: A pathlib.Path or similar object supporting jpayne@69: .joinpath(), __div__, .parent, and .read_text(). jpayne@69: """ jpayne@69: self._path = path jpayne@69: jpayne@69: def read_text(self, filename): jpayne@69: with suppress(FileNotFoundError, IsADirectoryError, KeyError, jpayne@69: NotADirectoryError, PermissionError): jpayne@69: return self._path.joinpath(filename).read_text(encoding='utf-8') jpayne@69: read_text.__doc__ = Distribution.read_text.__doc__ jpayne@69: jpayne@69: def locate_file(self, path): jpayne@69: return self._path.parent / path jpayne@69: jpayne@69: jpayne@69: def distribution(distribution_name): jpayne@69: """Get the ``Distribution`` instance for the named package. jpayne@69: jpayne@69: :param distribution_name: The name of the distribution package as a string. jpayne@69: :return: A ``Distribution`` instance (or subclass thereof). jpayne@69: """ jpayne@69: return Distribution.from_name(distribution_name) jpayne@69: jpayne@69: jpayne@69: def distributions(**kwargs): jpayne@69: """Get all ``Distribution`` instances in the current environment. jpayne@69: jpayne@69: :return: An iterable of ``Distribution`` instances. jpayne@69: """ jpayne@69: return Distribution.discover(**kwargs) jpayne@69: jpayne@69: jpayne@69: def metadata(distribution_name): jpayne@69: """Get the metadata for the named package. jpayne@69: jpayne@69: :param distribution_name: The name of the distribution package to query. jpayne@69: :return: An email.Message containing the parsed metadata. jpayne@69: """ jpayne@69: return Distribution.from_name(distribution_name).metadata jpayne@69: jpayne@69: jpayne@69: def version(distribution_name): jpayne@69: """Get the version string for the named package. jpayne@69: jpayne@69: :param distribution_name: The name of the distribution package to query. jpayne@69: :return: The version string for the package as defined in the package's jpayne@69: "Version" metadata key. jpayne@69: """ jpayne@69: return distribution(distribution_name).version jpayne@69: jpayne@69: jpayne@69: def entry_points(): jpayne@69: """Return EntryPoint objects for all installed packages. jpayne@69: jpayne@69: :return: EntryPoint objects for all installed packages. jpayne@69: """ jpayne@69: eps = itertools.chain.from_iterable( jpayne@69: dist.entry_points for dist in distributions()) jpayne@69: by_group = operator.attrgetter('group') jpayne@69: ordered = sorted(eps, key=by_group) jpayne@69: grouped = itertools.groupby(ordered, by_group) jpayne@69: return { jpayne@69: group: tuple(eps) jpayne@69: for group, eps in grouped jpayne@69: } jpayne@69: jpayne@69: jpayne@69: def files(distribution_name): jpayne@69: """Return a list of files for the named package. jpayne@69: jpayne@69: :param distribution_name: The name of the distribution package to query. jpayne@69: :return: List of files composing the distribution. jpayne@69: """ jpayne@69: return distribution(distribution_name).files jpayne@69: jpayne@69: jpayne@69: def requires(distribution_name): jpayne@69: """ jpayne@69: Return a list of requirements for the named package. jpayne@69: jpayne@69: :return: An iterator of requirements, suitable for jpayne@69: packaging.requirement.Requirement. jpayne@69: """ jpayne@69: return distribution(distribution_name).requires