jpayne@68: import os jpayne@68: import tempfile jpayne@68: jpayne@68: from . import abc as resources_abc jpayne@68: from contextlib import contextmanager, suppress jpayne@68: from importlib import import_module jpayne@68: from importlib.abc import ResourceLoader jpayne@68: from io import BytesIO, TextIOWrapper jpayne@68: from pathlib import Path jpayne@68: from types import ModuleType jpayne@68: from typing import Iterable, Iterator, Optional, Set, Union # noqa: F401 jpayne@68: from typing import cast jpayne@68: from typing.io import BinaryIO, TextIO jpayne@68: from zipimport import ZipImportError jpayne@68: jpayne@68: jpayne@68: __all__ = [ jpayne@68: 'Package', jpayne@68: 'Resource', jpayne@68: 'contents', jpayne@68: 'is_resource', jpayne@68: 'open_binary', jpayne@68: 'open_text', jpayne@68: 'path', jpayne@68: 'read_binary', jpayne@68: 'read_text', jpayne@68: ] jpayne@68: jpayne@68: jpayne@68: Package = Union[str, ModuleType] jpayne@68: Resource = Union[str, os.PathLike] jpayne@68: jpayne@68: jpayne@68: def _get_package(package) -> ModuleType: jpayne@68: """Take a package name or module object and return the module. jpayne@68: jpayne@68: If a name, the module is imported. If the passed or imported module jpayne@68: object is not a package, raise an exception. jpayne@68: """ jpayne@68: if hasattr(package, '__spec__'): jpayne@68: if package.__spec__.submodule_search_locations is None: jpayne@68: raise TypeError('{!r} is not a package'.format( jpayne@68: package.__spec__.name)) jpayne@68: else: jpayne@68: return package jpayne@68: else: jpayne@68: module = import_module(package) jpayne@68: if module.__spec__.submodule_search_locations is None: jpayne@68: raise TypeError('{!r} is not a package'.format(package)) jpayne@68: else: jpayne@68: return module jpayne@68: jpayne@68: jpayne@68: def _normalize_path(path) -> str: jpayne@68: """Normalize a path by ensuring it is a string. jpayne@68: jpayne@68: If the resulting string contains path separators, an exception is raised. jpayne@68: """ jpayne@68: parent, file_name = os.path.split(path) jpayne@68: if parent: jpayne@68: raise ValueError('{!r} must be only a file name'.format(path)) jpayne@68: else: jpayne@68: return file_name jpayne@68: jpayne@68: jpayne@68: def _get_resource_reader( jpayne@68: package: ModuleType) -> Optional[resources_abc.ResourceReader]: jpayne@68: # Return the package's loader if it's a ResourceReader. We can't use jpayne@68: # a issubclass() check here because apparently abc.'s __subclasscheck__() jpayne@68: # hook wants to create a weak reference to the object, but jpayne@68: # zipimport.zipimporter does not support weak references, resulting in a jpayne@68: # TypeError. That seems terrible. jpayne@68: spec = package.__spec__ jpayne@68: if hasattr(spec.loader, 'get_resource_reader'): jpayne@68: return cast(resources_abc.ResourceReader, jpayne@68: spec.loader.get_resource_reader(spec.name)) jpayne@68: return None jpayne@68: jpayne@68: jpayne@68: def _check_location(package): jpayne@68: if package.__spec__.origin is None or not package.__spec__.has_location: jpayne@68: raise FileNotFoundError(f'Package has no location {package!r}') jpayne@68: jpayne@68: jpayne@68: def open_binary(package: Package, resource: Resource) -> BinaryIO: jpayne@68: """Return a file-like object opened for binary reading of the resource.""" jpayne@68: resource = _normalize_path(resource) jpayne@68: package = _get_package(package) jpayne@68: reader = _get_resource_reader(package) jpayne@68: if reader is not None: jpayne@68: return reader.open_resource(resource) jpayne@68: _check_location(package) jpayne@68: absolute_package_path = os.path.abspath(package.__spec__.origin) jpayne@68: package_path = os.path.dirname(absolute_package_path) jpayne@68: full_path = os.path.join(package_path, resource) jpayne@68: try: jpayne@68: return open(full_path, mode='rb') jpayne@68: except OSError: jpayne@68: # Just assume the loader is a resource loader; all the relevant jpayne@68: # importlib.machinery loaders are and an AttributeError for jpayne@68: # get_data() will make it clear what is needed from the loader. jpayne@68: loader = cast(ResourceLoader, package.__spec__.loader) jpayne@68: data = None jpayne@68: if hasattr(package.__spec__.loader, 'get_data'): jpayne@68: with suppress(OSError): jpayne@68: data = loader.get_data(full_path) jpayne@68: if data is None: jpayne@68: package_name = package.__spec__.name jpayne@68: message = '{!r} resource not found in {!r}'.format( jpayne@68: resource, package_name) jpayne@68: raise FileNotFoundError(message) jpayne@68: else: jpayne@68: return BytesIO(data) jpayne@68: jpayne@68: jpayne@68: def open_text(package: Package, jpayne@68: resource: Resource, jpayne@68: encoding: str = 'utf-8', jpayne@68: errors: str = 'strict') -> TextIO: jpayne@68: """Return a file-like object opened for text reading of the resource.""" jpayne@68: resource = _normalize_path(resource) jpayne@68: package = _get_package(package) jpayne@68: reader = _get_resource_reader(package) jpayne@68: if reader is not None: jpayne@68: return TextIOWrapper(reader.open_resource(resource), encoding, errors) jpayne@68: _check_location(package) jpayne@68: absolute_package_path = os.path.abspath(package.__spec__.origin) jpayne@68: package_path = os.path.dirname(absolute_package_path) jpayne@68: full_path = os.path.join(package_path, resource) jpayne@68: try: jpayne@68: return open(full_path, mode='r', encoding=encoding, errors=errors) jpayne@68: except OSError: jpayne@68: # Just assume the loader is a resource loader; all the relevant jpayne@68: # importlib.machinery loaders are and an AttributeError for jpayne@68: # get_data() will make it clear what is needed from the loader. jpayne@68: loader = cast(ResourceLoader, package.__spec__.loader) jpayne@68: data = None jpayne@68: if hasattr(package.__spec__.loader, 'get_data'): jpayne@68: with suppress(OSError): jpayne@68: data = loader.get_data(full_path) jpayne@68: if data is None: jpayne@68: package_name = package.__spec__.name jpayne@68: message = '{!r} resource not found in {!r}'.format( jpayne@68: resource, package_name) jpayne@68: raise FileNotFoundError(message) jpayne@68: else: jpayne@68: return TextIOWrapper(BytesIO(data), encoding, errors) jpayne@68: jpayne@68: jpayne@68: def read_binary(package: Package, resource: Resource) -> bytes: jpayne@68: """Return the binary contents of the resource.""" jpayne@68: resource = _normalize_path(resource) jpayne@68: package = _get_package(package) jpayne@68: with open_binary(package, resource) as fp: jpayne@68: return fp.read() jpayne@68: jpayne@68: jpayne@68: def read_text(package: Package, jpayne@68: resource: Resource, jpayne@68: encoding: str = 'utf-8', jpayne@68: errors: str = 'strict') -> str: jpayne@68: """Return the decoded string of the resource. jpayne@68: jpayne@68: The decoding-related arguments have the same semantics as those of jpayne@68: bytes.decode(). jpayne@68: """ jpayne@68: resource = _normalize_path(resource) jpayne@68: package = _get_package(package) jpayne@68: with open_text(package, resource, encoding, errors) as fp: jpayne@68: return fp.read() jpayne@68: jpayne@68: jpayne@68: @contextmanager jpayne@68: def path(package: Package, resource: Resource) -> Iterator[Path]: jpayne@68: """A context manager providing a file path object to the resource. jpayne@68: jpayne@68: If the resource does not already exist on its own on the file system, jpayne@68: a temporary file will be created. If the file was created, the file jpayne@68: will be deleted upon exiting the context manager (no exception is jpayne@68: raised if the file was deleted prior to the context manager jpayne@68: exiting). jpayne@68: """ jpayne@68: resource = _normalize_path(resource) jpayne@68: package = _get_package(package) jpayne@68: reader = _get_resource_reader(package) jpayne@68: if reader is not None: jpayne@68: try: jpayne@68: yield Path(reader.resource_path(resource)) jpayne@68: return jpayne@68: except FileNotFoundError: jpayne@68: pass jpayne@68: else: jpayne@68: _check_location(package) jpayne@68: # Fall-through for both the lack of resource_path() *and* if jpayne@68: # resource_path() raises FileNotFoundError. jpayne@68: package_directory = Path(package.__spec__.origin).parent jpayne@68: file_path = package_directory / resource jpayne@68: if file_path.exists(): jpayne@68: yield file_path jpayne@68: else: jpayne@68: with open_binary(package, resource) as fp: jpayne@68: data = fp.read() jpayne@68: # Not using tempfile.NamedTemporaryFile as it leads to deeper 'try' jpayne@68: # blocks due to the need to close the temporary file to work on jpayne@68: # Windows properly. jpayne@68: fd, raw_path = tempfile.mkstemp() jpayne@68: try: jpayne@68: os.write(fd, data) jpayne@68: os.close(fd) jpayne@68: yield Path(raw_path) jpayne@68: finally: jpayne@68: try: jpayne@68: os.remove(raw_path) jpayne@68: except FileNotFoundError: jpayne@68: pass jpayne@68: jpayne@68: jpayne@68: def is_resource(package: Package, name: str) -> bool: jpayne@68: """True if 'name' is a resource inside 'package'. jpayne@68: jpayne@68: Directories are *not* resources. jpayne@68: """ jpayne@68: package = _get_package(package) jpayne@68: _normalize_path(name) jpayne@68: reader = _get_resource_reader(package) jpayne@68: if reader is not None: jpayne@68: return reader.is_resource(name) jpayne@68: try: jpayne@68: package_contents = set(contents(package)) jpayne@68: except (NotADirectoryError, FileNotFoundError): jpayne@68: return False jpayne@68: if name not in package_contents: jpayne@68: return False jpayne@68: # Just because the given file_name lives as an entry in the package's jpayne@68: # contents doesn't necessarily mean it's a resource. Directories are not jpayne@68: # resources, so let's try to find out if it's a directory or not. jpayne@68: path = Path(package.__spec__.origin).parent / name jpayne@68: return path.is_file() jpayne@68: jpayne@68: jpayne@68: def contents(package: Package) -> Iterable[str]: jpayne@68: """Return an iterable of entries in 'package'. jpayne@68: jpayne@68: Note that not all entries are resources. Specifically, directories are jpayne@68: not considered resources. Use `is_resource()` on each entry returned here jpayne@68: to check if it is a resource or not. jpayne@68: """ jpayne@68: package = _get_package(package) jpayne@68: reader = _get_resource_reader(package) jpayne@68: if reader is not None: jpayne@68: return reader.contents() jpayne@68: # Is the package a namespace package? By definition, namespace packages jpayne@68: # cannot have resources. We could use _check_location() and catch the jpayne@68: # exception, but that's extra work, so just inline the check. jpayne@68: elif package.__spec__.origin is None or not package.__spec__.has_location: jpayne@68: return () jpayne@68: else: jpayne@68: package_directory = Path(package.__spec__.origin).parent jpayne@68: return os.listdir(package_directory)