annotate CSP2/CSP2_env/env-d9b9114564458d9d-741b3de822f2aaca6c6caa4325c4afce/lib/python3.8/importlib/resources.py @ 69:33d812a61356

planemo upload commit 2e9511a184a1ca667c7be0c6321a36dc4e3d116d
author jpayne
date Tue, 18 Mar 2025 17:55:14 -0400
parents
children
rev   line source
jpayne@69 1 import os
jpayne@69 2 import tempfile
jpayne@69 3
jpayne@69 4 from . import abc as resources_abc
jpayne@69 5 from contextlib import contextmanager, suppress
jpayne@69 6 from importlib import import_module
jpayne@69 7 from importlib.abc import ResourceLoader
jpayne@69 8 from io import BytesIO, TextIOWrapper
jpayne@69 9 from pathlib import Path
jpayne@69 10 from types import ModuleType
jpayne@69 11 from typing import Iterable, Iterator, Optional, Set, Union # noqa: F401
jpayne@69 12 from typing import cast
jpayne@69 13 from typing.io import BinaryIO, TextIO
jpayne@69 14 from zipimport import ZipImportError
jpayne@69 15
jpayne@69 16
jpayne@69 17 __all__ = [
jpayne@69 18 'Package',
jpayne@69 19 'Resource',
jpayne@69 20 'contents',
jpayne@69 21 'is_resource',
jpayne@69 22 'open_binary',
jpayne@69 23 'open_text',
jpayne@69 24 'path',
jpayne@69 25 'read_binary',
jpayne@69 26 'read_text',
jpayne@69 27 ]
jpayne@69 28
jpayne@69 29
jpayne@69 30 Package = Union[str, ModuleType]
jpayne@69 31 Resource = Union[str, os.PathLike]
jpayne@69 32
jpayne@69 33
jpayne@69 34 def _get_package(package) -> ModuleType:
jpayne@69 35 """Take a package name or module object and return the module.
jpayne@69 36
jpayne@69 37 If a name, the module is imported. If the passed or imported module
jpayne@69 38 object is not a package, raise an exception.
jpayne@69 39 """
jpayne@69 40 if hasattr(package, '__spec__'):
jpayne@69 41 if package.__spec__.submodule_search_locations is None:
jpayne@69 42 raise TypeError('{!r} is not a package'.format(
jpayne@69 43 package.__spec__.name))
jpayne@69 44 else:
jpayne@69 45 return package
jpayne@69 46 else:
jpayne@69 47 module = import_module(package)
jpayne@69 48 if module.__spec__.submodule_search_locations is None:
jpayne@69 49 raise TypeError('{!r} is not a package'.format(package))
jpayne@69 50 else:
jpayne@69 51 return module
jpayne@69 52
jpayne@69 53
jpayne@69 54 def _normalize_path(path) -> str:
jpayne@69 55 """Normalize a path by ensuring it is a string.
jpayne@69 56
jpayne@69 57 If the resulting string contains path separators, an exception is raised.
jpayne@69 58 """
jpayne@69 59 parent, file_name = os.path.split(path)
jpayne@69 60 if parent:
jpayne@69 61 raise ValueError('{!r} must be only a file name'.format(path))
jpayne@69 62 else:
jpayne@69 63 return file_name
jpayne@69 64
jpayne@69 65
jpayne@69 66 def _get_resource_reader(
jpayne@69 67 package: ModuleType) -> Optional[resources_abc.ResourceReader]:
jpayne@69 68 # Return the package's loader if it's a ResourceReader. We can't use
jpayne@69 69 # a issubclass() check here because apparently abc.'s __subclasscheck__()
jpayne@69 70 # hook wants to create a weak reference to the object, but
jpayne@69 71 # zipimport.zipimporter does not support weak references, resulting in a
jpayne@69 72 # TypeError. That seems terrible.
jpayne@69 73 spec = package.__spec__
jpayne@69 74 if hasattr(spec.loader, 'get_resource_reader'):
jpayne@69 75 return cast(resources_abc.ResourceReader,
jpayne@69 76 spec.loader.get_resource_reader(spec.name))
jpayne@69 77 return None
jpayne@69 78
jpayne@69 79
jpayne@69 80 def _check_location(package):
jpayne@69 81 if package.__spec__.origin is None or not package.__spec__.has_location:
jpayne@69 82 raise FileNotFoundError(f'Package has no location {package!r}')
jpayne@69 83
jpayne@69 84
jpayne@69 85 def open_binary(package: Package, resource: Resource) -> BinaryIO:
jpayne@69 86 """Return a file-like object opened for binary reading of the resource."""
jpayne@69 87 resource = _normalize_path(resource)
jpayne@69 88 package = _get_package(package)
jpayne@69 89 reader = _get_resource_reader(package)
jpayne@69 90 if reader is not None:
jpayne@69 91 return reader.open_resource(resource)
jpayne@69 92 _check_location(package)
jpayne@69 93 absolute_package_path = os.path.abspath(package.__spec__.origin)
jpayne@69 94 package_path = os.path.dirname(absolute_package_path)
jpayne@69 95 full_path = os.path.join(package_path, resource)
jpayne@69 96 try:
jpayne@69 97 return open(full_path, mode='rb')
jpayne@69 98 except OSError:
jpayne@69 99 # Just assume the loader is a resource loader; all the relevant
jpayne@69 100 # importlib.machinery loaders are and an AttributeError for
jpayne@69 101 # get_data() will make it clear what is needed from the loader.
jpayne@69 102 loader = cast(ResourceLoader, package.__spec__.loader)
jpayne@69 103 data = None
jpayne@69 104 if hasattr(package.__spec__.loader, 'get_data'):
jpayne@69 105 with suppress(OSError):
jpayne@69 106 data = loader.get_data(full_path)
jpayne@69 107 if data is None:
jpayne@69 108 package_name = package.__spec__.name
jpayne@69 109 message = '{!r} resource not found in {!r}'.format(
jpayne@69 110 resource, package_name)
jpayne@69 111 raise FileNotFoundError(message)
jpayne@69 112 else:
jpayne@69 113 return BytesIO(data)
jpayne@69 114
jpayne@69 115
jpayne@69 116 def open_text(package: Package,
jpayne@69 117 resource: Resource,
jpayne@69 118 encoding: str = 'utf-8',
jpayne@69 119 errors: str = 'strict') -> TextIO:
jpayne@69 120 """Return a file-like object opened for text reading of the resource."""
jpayne@69 121 resource = _normalize_path(resource)
jpayne@69 122 package = _get_package(package)
jpayne@69 123 reader = _get_resource_reader(package)
jpayne@69 124 if reader is not None:
jpayne@69 125 return TextIOWrapper(reader.open_resource(resource), encoding, errors)
jpayne@69 126 _check_location(package)
jpayne@69 127 absolute_package_path = os.path.abspath(package.__spec__.origin)
jpayne@69 128 package_path = os.path.dirname(absolute_package_path)
jpayne@69 129 full_path = os.path.join(package_path, resource)
jpayne@69 130 try:
jpayne@69 131 return open(full_path, mode='r', encoding=encoding, errors=errors)
jpayne@69 132 except OSError:
jpayne@69 133 # Just assume the loader is a resource loader; all the relevant
jpayne@69 134 # importlib.machinery loaders are and an AttributeError for
jpayne@69 135 # get_data() will make it clear what is needed from the loader.
jpayne@69 136 loader = cast(ResourceLoader, package.__spec__.loader)
jpayne@69 137 data = None
jpayne@69 138 if hasattr(package.__spec__.loader, 'get_data'):
jpayne@69 139 with suppress(OSError):
jpayne@69 140 data = loader.get_data(full_path)
jpayne@69 141 if data is None:
jpayne@69 142 package_name = package.__spec__.name
jpayne@69 143 message = '{!r} resource not found in {!r}'.format(
jpayne@69 144 resource, package_name)
jpayne@69 145 raise FileNotFoundError(message)
jpayne@69 146 else:
jpayne@69 147 return TextIOWrapper(BytesIO(data), encoding, errors)
jpayne@69 148
jpayne@69 149
jpayne@69 150 def read_binary(package: Package, resource: Resource) -> bytes:
jpayne@69 151 """Return the binary contents of the resource."""
jpayne@69 152 resource = _normalize_path(resource)
jpayne@69 153 package = _get_package(package)
jpayne@69 154 with open_binary(package, resource) as fp:
jpayne@69 155 return fp.read()
jpayne@69 156
jpayne@69 157
jpayne@69 158 def read_text(package: Package,
jpayne@69 159 resource: Resource,
jpayne@69 160 encoding: str = 'utf-8',
jpayne@69 161 errors: str = 'strict') -> str:
jpayne@69 162 """Return the decoded string of the resource.
jpayne@69 163
jpayne@69 164 The decoding-related arguments have the same semantics as those of
jpayne@69 165 bytes.decode().
jpayne@69 166 """
jpayne@69 167 resource = _normalize_path(resource)
jpayne@69 168 package = _get_package(package)
jpayne@69 169 with open_text(package, resource, encoding, errors) as fp:
jpayne@69 170 return fp.read()
jpayne@69 171
jpayne@69 172
jpayne@69 173 @contextmanager
jpayne@69 174 def path(package: Package, resource: Resource) -> Iterator[Path]:
jpayne@69 175 """A context manager providing a file path object to the resource.
jpayne@69 176
jpayne@69 177 If the resource does not already exist on its own on the file system,
jpayne@69 178 a temporary file will be created. If the file was created, the file
jpayne@69 179 will be deleted upon exiting the context manager (no exception is
jpayne@69 180 raised if the file was deleted prior to the context manager
jpayne@69 181 exiting).
jpayne@69 182 """
jpayne@69 183 resource = _normalize_path(resource)
jpayne@69 184 package = _get_package(package)
jpayne@69 185 reader = _get_resource_reader(package)
jpayne@69 186 if reader is not None:
jpayne@69 187 try:
jpayne@69 188 yield Path(reader.resource_path(resource))
jpayne@69 189 return
jpayne@69 190 except FileNotFoundError:
jpayne@69 191 pass
jpayne@69 192 else:
jpayne@69 193 _check_location(package)
jpayne@69 194 # Fall-through for both the lack of resource_path() *and* if
jpayne@69 195 # resource_path() raises FileNotFoundError.
jpayne@69 196 package_directory = Path(package.__spec__.origin).parent
jpayne@69 197 file_path = package_directory / resource
jpayne@69 198 if file_path.exists():
jpayne@69 199 yield file_path
jpayne@69 200 else:
jpayne@69 201 with open_binary(package, resource) as fp:
jpayne@69 202 data = fp.read()
jpayne@69 203 # Not using tempfile.NamedTemporaryFile as it leads to deeper 'try'
jpayne@69 204 # blocks due to the need to close the temporary file to work on
jpayne@69 205 # Windows properly.
jpayne@69 206 fd, raw_path = tempfile.mkstemp()
jpayne@69 207 try:
jpayne@69 208 os.write(fd, data)
jpayne@69 209 os.close(fd)
jpayne@69 210 yield Path(raw_path)
jpayne@69 211 finally:
jpayne@69 212 try:
jpayne@69 213 os.remove(raw_path)
jpayne@69 214 except FileNotFoundError:
jpayne@69 215 pass
jpayne@69 216
jpayne@69 217
jpayne@69 218 def is_resource(package: Package, name: str) -> bool:
jpayne@69 219 """True if 'name' is a resource inside 'package'.
jpayne@69 220
jpayne@69 221 Directories are *not* resources.
jpayne@69 222 """
jpayne@69 223 package = _get_package(package)
jpayne@69 224 _normalize_path(name)
jpayne@69 225 reader = _get_resource_reader(package)
jpayne@69 226 if reader is not None:
jpayne@69 227 return reader.is_resource(name)
jpayne@69 228 try:
jpayne@69 229 package_contents = set(contents(package))
jpayne@69 230 except (NotADirectoryError, FileNotFoundError):
jpayne@69 231 return False
jpayne@69 232 if name not in package_contents:
jpayne@69 233 return False
jpayne@69 234 # Just because the given file_name lives as an entry in the package's
jpayne@69 235 # contents doesn't necessarily mean it's a resource. Directories are not
jpayne@69 236 # resources, so let's try to find out if it's a directory or not.
jpayne@69 237 path = Path(package.__spec__.origin).parent / name
jpayne@69 238 return path.is_file()
jpayne@69 239
jpayne@69 240
jpayne@69 241 def contents(package: Package) -> Iterable[str]:
jpayne@69 242 """Return an iterable of entries in 'package'.
jpayne@69 243
jpayne@69 244 Note that not all entries are resources. Specifically, directories are
jpayne@69 245 not considered resources. Use `is_resource()` on each entry returned here
jpayne@69 246 to check if it is a resource or not.
jpayne@69 247 """
jpayne@69 248 package = _get_package(package)
jpayne@69 249 reader = _get_resource_reader(package)
jpayne@69 250 if reader is not None:
jpayne@69 251 return reader.contents()
jpayne@69 252 # Is the package a namespace package? By definition, namespace packages
jpayne@69 253 # cannot have resources. We could use _check_location() and catch the
jpayne@69 254 # exception, but that's extra work, so just inline the check.
jpayne@69 255 elif package.__spec__.origin is None or not package.__spec__.has_location:
jpayne@69 256 return ()
jpayne@69 257 else:
jpayne@69 258 package_directory = Path(package.__spec__.origin).parent
jpayne@69 259 return os.listdir(package_directory)