jpayne@69: import os jpayne@69: import tempfile jpayne@69: jpayne@69: from . import abc as resources_abc jpayne@69: from contextlib import contextmanager, suppress jpayne@69: from importlib import import_module jpayne@69: from importlib.abc import ResourceLoader jpayne@69: from io import BytesIO, TextIOWrapper jpayne@69: from pathlib import Path jpayne@69: from types import ModuleType jpayne@69: from typing import Iterable, Iterator, Optional, Set, Union # noqa: F401 jpayne@69: from typing import cast jpayne@69: from typing.io import BinaryIO, TextIO jpayne@69: from zipimport import ZipImportError jpayne@69: jpayne@69: jpayne@69: __all__ = [ jpayne@69: 'Package', jpayne@69: 'Resource', jpayne@69: 'contents', jpayne@69: 'is_resource', jpayne@69: 'open_binary', jpayne@69: 'open_text', jpayne@69: 'path', jpayne@69: 'read_binary', jpayne@69: 'read_text', jpayne@69: ] jpayne@69: jpayne@69: jpayne@69: Package = Union[str, ModuleType] jpayne@69: Resource = Union[str, os.PathLike] jpayne@69: jpayne@69: jpayne@69: def _get_package(package) -> ModuleType: jpayne@69: """Take a package name or module object and return the module. jpayne@69: jpayne@69: If a name, the module is imported. If the passed or imported module jpayne@69: object is not a package, raise an exception. jpayne@69: """ jpayne@69: if hasattr(package, '__spec__'): jpayne@69: if package.__spec__.submodule_search_locations is None: jpayne@69: raise TypeError('{!r} is not a package'.format( jpayne@69: package.__spec__.name)) jpayne@69: else: jpayne@69: return package jpayne@69: else: jpayne@69: module = import_module(package) jpayne@69: if module.__spec__.submodule_search_locations is None: jpayne@69: raise TypeError('{!r} is not a package'.format(package)) jpayne@69: else: jpayne@69: return module jpayne@69: jpayne@69: jpayne@69: def _normalize_path(path) -> str: jpayne@69: """Normalize a path by ensuring it is a string. jpayne@69: jpayne@69: If the resulting string contains path separators, an exception is raised. jpayne@69: """ jpayne@69: parent, file_name = os.path.split(path) jpayne@69: if parent: jpayne@69: raise ValueError('{!r} must be only a file name'.format(path)) jpayne@69: else: jpayne@69: return file_name jpayne@69: jpayne@69: jpayne@69: def _get_resource_reader( jpayne@69: package: ModuleType) -> Optional[resources_abc.ResourceReader]: jpayne@69: # Return the package's loader if it's a ResourceReader. We can't use jpayne@69: # a issubclass() check here because apparently abc.'s __subclasscheck__() jpayne@69: # hook wants to create a weak reference to the object, but jpayne@69: # zipimport.zipimporter does not support weak references, resulting in a jpayne@69: # TypeError. That seems terrible. jpayne@69: spec = package.__spec__ jpayne@69: if hasattr(spec.loader, 'get_resource_reader'): jpayne@69: return cast(resources_abc.ResourceReader, jpayne@69: spec.loader.get_resource_reader(spec.name)) jpayne@69: return None jpayne@69: jpayne@69: jpayne@69: def _check_location(package): jpayne@69: if package.__spec__.origin is None or not package.__spec__.has_location: jpayne@69: raise FileNotFoundError(f'Package has no location {package!r}') jpayne@69: jpayne@69: jpayne@69: def open_binary(package: Package, resource: Resource) -> BinaryIO: jpayne@69: """Return a file-like object opened for binary reading of the resource.""" jpayne@69: resource = _normalize_path(resource) jpayne@69: package = _get_package(package) jpayne@69: reader = _get_resource_reader(package) jpayne@69: if reader is not None: jpayne@69: return reader.open_resource(resource) jpayne@69: _check_location(package) jpayne@69: absolute_package_path = os.path.abspath(package.__spec__.origin) jpayne@69: package_path = os.path.dirname(absolute_package_path) jpayne@69: full_path = os.path.join(package_path, resource) jpayne@69: try: jpayne@69: return open(full_path, mode='rb') jpayne@69: except OSError: jpayne@69: # Just assume the loader is a resource loader; all the relevant jpayne@69: # importlib.machinery loaders are and an AttributeError for jpayne@69: # get_data() will make it clear what is needed from the loader. jpayne@69: loader = cast(ResourceLoader, package.__spec__.loader) jpayne@69: data = None jpayne@69: if hasattr(package.__spec__.loader, 'get_data'): jpayne@69: with suppress(OSError): jpayne@69: data = loader.get_data(full_path) jpayne@69: if data is None: jpayne@69: package_name = package.__spec__.name jpayne@69: message = '{!r} resource not found in {!r}'.format( jpayne@69: resource, package_name) jpayne@69: raise FileNotFoundError(message) jpayne@69: else: jpayne@69: return BytesIO(data) jpayne@69: jpayne@69: jpayne@69: def open_text(package: Package, jpayne@69: resource: Resource, jpayne@69: encoding: str = 'utf-8', jpayne@69: errors: str = 'strict') -> TextIO: jpayne@69: """Return a file-like object opened for text reading of the resource.""" jpayne@69: resource = _normalize_path(resource) jpayne@69: package = _get_package(package) jpayne@69: reader = _get_resource_reader(package) jpayne@69: if reader is not None: jpayne@69: return TextIOWrapper(reader.open_resource(resource), encoding, errors) jpayne@69: _check_location(package) jpayne@69: absolute_package_path = os.path.abspath(package.__spec__.origin) jpayne@69: package_path = os.path.dirname(absolute_package_path) jpayne@69: full_path = os.path.join(package_path, resource) jpayne@69: try: jpayne@69: return open(full_path, mode='r', encoding=encoding, errors=errors) jpayne@69: except OSError: jpayne@69: # Just assume the loader is a resource loader; all the relevant jpayne@69: # importlib.machinery loaders are and an AttributeError for jpayne@69: # get_data() will make it clear what is needed from the loader. jpayne@69: loader = cast(ResourceLoader, package.__spec__.loader) jpayne@69: data = None jpayne@69: if hasattr(package.__spec__.loader, 'get_data'): jpayne@69: with suppress(OSError): jpayne@69: data = loader.get_data(full_path) jpayne@69: if data is None: jpayne@69: package_name = package.__spec__.name jpayne@69: message = '{!r} resource not found in {!r}'.format( jpayne@69: resource, package_name) jpayne@69: raise FileNotFoundError(message) jpayne@69: else: jpayne@69: return TextIOWrapper(BytesIO(data), encoding, errors) jpayne@69: jpayne@69: jpayne@69: def read_binary(package: Package, resource: Resource) -> bytes: jpayne@69: """Return the binary contents of the resource.""" jpayne@69: resource = _normalize_path(resource) jpayne@69: package = _get_package(package) jpayne@69: with open_binary(package, resource) as fp: jpayne@69: return fp.read() jpayne@69: jpayne@69: jpayne@69: def read_text(package: Package, jpayne@69: resource: Resource, jpayne@69: encoding: str = 'utf-8', jpayne@69: errors: str = 'strict') -> str: jpayne@69: """Return the decoded string of the resource. jpayne@69: jpayne@69: The decoding-related arguments have the same semantics as those of jpayne@69: bytes.decode(). jpayne@69: """ jpayne@69: resource = _normalize_path(resource) jpayne@69: package = _get_package(package) jpayne@69: with open_text(package, resource, encoding, errors) as fp: jpayne@69: return fp.read() jpayne@69: jpayne@69: jpayne@69: @contextmanager jpayne@69: def path(package: Package, resource: Resource) -> Iterator[Path]: jpayne@69: """A context manager providing a file path object to the resource. jpayne@69: jpayne@69: If the resource does not already exist on its own on the file system, jpayne@69: a temporary file will be created. If the file was created, the file jpayne@69: will be deleted upon exiting the context manager (no exception is jpayne@69: raised if the file was deleted prior to the context manager jpayne@69: exiting). jpayne@69: """ jpayne@69: resource = _normalize_path(resource) jpayne@69: package = _get_package(package) jpayne@69: reader = _get_resource_reader(package) jpayne@69: if reader is not None: jpayne@69: try: jpayne@69: yield Path(reader.resource_path(resource)) jpayne@69: return jpayne@69: except FileNotFoundError: jpayne@69: pass jpayne@69: else: jpayne@69: _check_location(package) jpayne@69: # Fall-through for both the lack of resource_path() *and* if jpayne@69: # resource_path() raises FileNotFoundError. jpayne@69: package_directory = Path(package.__spec__.origin).parent jpayne@69: file_path = package_directory / resource jpayne@69: if file_path.exists(): jpayne@69: yield file_path jpayne@69: else: jpayne@69: with open_binary(package, resource) as fp: jpayne@69: data = fp.read() jpayne@69: # Not using tempfile.NamedTemporaryFile as it leads to deeper 'try' jpayne@69: # blocks due to the need to close the temporary file to work on jpayne@69: # Windows properly. jpayne@69: fd, raw_path = tempfile.mkstemp() jpayne@69: try: jpayne@69: os.write(fd, data) jpayne@69: os.close(fd) jpayne@69: yield Path(raw_path) jpayne@69: finally: jpayne@69: try: jpayne@69: os.remove(raw_path) jpayne@69: except FileNotFoundError: jpayne@69: pass jpayne@69: jpayne@69: jpayne@69: def is_resource(package: Package, name: str) -> bool: jpayne@69: """True if 'name' is a resource inside 'package'. jpayne@69: jpayne@69: Directories are *not* resources. jpayne@69: """ jpayne@69: package = _get_package(package) jpayne@69: _normalize_path(name) jpayne@69: reader = _get_resource_reader(package) jpayne@69: if reader is not None: jpayne@69: return reader.is_resource(name) jpayne@69: try: jpayne@69: package_contents = set(contents(package)) jpayne@69: except (NotADirectoryError, FileNotFoundError): jpayne@69: return False jpayne@69: if name not in package_contents: jpayne@69: return False jpayne@69: # Just because the given file_name lives as an entry in the package's jpayne@69: # contents doesn't necessarily mean it's a resource. Directories are not jpayne@69: # resources, so let's try to find out if it's a directory or not. jpayne@69: path = Path(package.__spec__.origin).parent / name jpayne@69: return path.is_file() jpayne@69: jpayne@69: jpayne@69: def contents(package: Package) -> Iterable[str]: jpayne@69: """Return an iterable of entries in 'package'. jpayne@69: jpayne@69: Note that not all entries are resources. Specifically, directories are jpayne@69: not considered resources. Use `is_resource()` on each entry returned here jpayne@69: to check if it is a resource or not. jpayne@69: """ jpayne@69: package = _get_package(package) jpayne@69: reader = _get_resource_reader(package) jpayne@69: if reader is not None: jpayne@69: return reader.contents() jpayne@69: # Is the package a namespace package? By definition, namespace packages jpayne@69: # cannot have resources. We could use _check_location() and catch the jpayne@69: # exception, but that's extra work, so just inline the check. jpayne@69: elif package.__spec__.origin is None or not package.__spec__.has_location: jpayne@69: return () jpayne@69: else: jpayne@69: package_directory = Path(package.__spec__.origin).parent jpayne@69: return os.listdir(package_directory)