annotate CSP2/CSP2_env/env-d9b9114564458d9d-741b3de822f2aaca6c6caa4325c4afce/lib/python3.8/importlib/resources.py @ 68:5028fdace37b

planemo upload commit 2e9511a184a1ca667c7be0c6321a36dc4e3d116d
author jpayne
date Tue, 18 Mar 2025 16:23:26 -0400
parents
children
rev   line source
jpayne@68 1 import os
jpayne@68 2 import tempfile
jpayne@68 3
jpayne@68 4 from . import abc as resources_abc
jpayne@68 5 from contextlib import contextmanager, suppress
jpayne@68 6 from importlib import import_module
jpayne@68 7 from importlib.abc import ResourceLoader
jpayne@68 8 from io import BytesIO, TextIOWrapper
jpayne@68 9 from pathlib import Path
jpayne@68 10 from types import ModuleType
jpayne@68 11 from typing import Iterable, Iterator, Optional, Set, Union # noqa: F401
jpayne@68 12 from typing import cast
jpayne@68 13 from typing.io import BinaryIO, TextIO
jpayne@68 14 from zipimport import ZipImportError
jpayne@68 15
jpayne@68 16
jpayne@68 17 __all__ = [
jpayne@68 18 'Package',
jpayne@68 19 'Resource',
jpayne@68 20 'contents',
jpayne@68 21 'is_resource',
jpayne@68 22 'open_binary',
jpayne@68 23 'open_text',
jpayne@68 24 'path',
jpayne@68 25 'read_binary',
jpayne@68 26 'read_text',
jpayne@68 27 ]
jpayne@68 28
jpayne@68 29
jpayne@68 30 Package = Union[str, ModuleType]
jpayne@68 31 Resource = Union[str, os.PathLike]
jpayne@68 32
jpayne@68 33
jpayne@68 34 def _get_package(package) -> ModuleType:
jpayne@68 35 """Take a package name or module object and return the module.
jpayne@68 36
jpayne@68 37 If a name, the module is imported. If the passed or imported module
jpayne@68 38 object is not a package, raise an exception.
jpayne@68 39 """
jpayne@68 40 if hasattr(package, '__spec__'):
jpayne@68 41 if package.__spec__.submodule_search_locations is None:
jpayne@68 42 raise TypeError('{!r} is not a package'.format(
jpayne@68 43 package.__spec__.name))
jpayne@68 44 else:
jpayne@68 45 return package
jpayne@68 46 else:
jpayne@68 47 module = import_module(package)
jpayne@68 48 if module.__spec__.submodule_search_locations is None:
jpayne@68 49 raise TypeError('{!r} is not a package'.format(package))
jpayne@68 50 else:
jpayne@68 51 return module
jpayne@68 52
jpayne@68 53
jpayne@68 54 def _normalize_path(path) -> str:
jpayne@68 55 """Normalize a path by ensuring it is a string.
jpayne@68 56
jpayne@68 57 If the resulting string contains path separators, an exception is raised.
jpayne@68 58 """
jpayne@68 59 parent, file_name = os.path.split(path)
jpayne@68 60 if parent:
jpayne@68 61 raise ValueError('{!r} must be only a file name'.format(path))
jpayne@68 62 else:
jpayne@68 63 return file_name
jpayne@68 64
jpayne@68 65
jpayne@68 66 def _get_resource_reader(
jpayne@68 67 package: ModuleType) -> Optional[resources_abc.ResourceReader]:
jpayne@68 68 # Return the package's loader if it's a ResourceReader. We can't use
jpayne@68 69 # a issubclass() check here because apparently abc.'s __subclasscheck__()
jpayne@68 70 # hook wants to create a weak reference to the object, but
jpayne@68 71 # zipimport.zipimporter does not support weak references, resulting in a
jpayne@68 72 # TypeError. That seems terrible.
jpayne@68 73 spec = package.__spec__
jpayne@68 74 if hasattr(spec.loader, 'get_resource_reader'):
jpayne@68 75 return cast(resources_abc.ResourceReader,
jpayne@68 76 spec.loader.get_resource_reader(spec.name))
jpayne@68 77 return None
jpayne@68 78
jpayne@68 79
jpayne@68 80 def _check_location(package):
jpayne@68 81 if package.__spec__.origin is None or not package.__spec__.has_location:
jpayne@68 82 raise FileNotFoundError(f'Package has no location {package!r}')
jpayne@68 83
jpayne@68 84
jpayne@68 85 def open_binary(package: Package, resource: Resource) -> BinaryIO:
jpayne@68 86 """Return a file-like object opened for binary reading of the resource."""
jpayne@68 87 resource = _normalize_path(resource)
jpayne@68 88 package = _get_package(package)
jpayne@68 89 reader = _get_resource_reader(package)
jpayne@68 90 if reader is not None:
jpayne@68 91 return reader.open_resource(resource)
jpayne@68 92 _check_location(package)
jpayne@68 93 absolute_package_path = os.path.abspath(package.__spec__.origin)
jpayne@68 94 package_path = os.path.dirname(absolute_package_path)
jpayne@68 95 full_path = os.path.join(package_path, resource)
jpayne@68 96 try:
jpayne@68 97 return open(full_path, mode='rb')
jpayne@68 98 except OSError:
jpayne@68 99 # Just assume the loader is a resource loader; all the relevant
jpayne@68 100 # importlib.machinery loaders are and an AttributeError for
jpayne@68 101 # get_data() will make it clear what is needed from the loader.
jpayne@68 102 loader = cast(ResourceLoader, package.__spec__.loader)
jpayne@68 103 data = None
jpayne@68 104 if hasattr(package.__spec__.loader, 'get_data'):
jpayne@68 105 with suppress(OSError):
jpayne@68 106 data = loader.get_data(full_path)
jpayne@68 107 if data is None:
jpayne@68 108 package_name = package.__spec__.name
jpayne@68 109 message = '{!r} resource not found in {!r}'.format(
jpayne@68 110 resource, package_name)
jpayne@68 111 raise FileNotFoundError(message)
jpayne@68 112 else:
jpayne@68 113 return BytesIO(data)
jpayne@68 114
jpayne@68 115
jpayne@68 116 def open_text(package: Package,
jpayne@68 117 resource: Resource,
jpayne@68 118 encoding: str = 'utf-8',
jpayne@68 119 errors: str = 'strict') -> TextIO:
jpayne@68 120 """Return a file-like object opened for text reading of the resource."""
jpayne@68 121 resource = _normalize_path(resource)
jpayne@68 122 package = _get_package(package)
jpayne@68 123 reader = _get_resource_reader(package)
jpayne@68 124 if reader is not None:
jpayne@68 125 return TextIOWrapper(reader.open_resource(resource), encoding, errors)
jpayne@68 126 _check_location(package)
jpayne@68 127 absolute_package_path = os.path.abspath(package.__spec__.origin)
jpayne@68 128 package_path = os.path.dirname(absolute_package_path)
jpayne@68 129 full_path = os.path.join(package_path, resource)
jpayne@68 130 try:
jpayne@68 131 return open(full_path, mode='r', encoding=encoding, errors=errors)
jpayne@68 132 except OSError:
jpayne@68 133 # Just assume the loader is a resource loader; all the relevant
jpayne@68 134 # importlib.machinery loaders are and an AttributeError for
jpayne@68 135 # get_data() will make it clear what is needed from the loader.
jpayne@68 136 loader = cast(ResourceLoader, package.__spec__.loader)
jpayne@68 137 data = None
jpayne@68 138 if hasattr(package.__spec__.loader, 'get_data'):
jpayne@68 139 with suppress(OSError):
jpayne@68 140 data = loader.get_data(full_path)
jpayne@68 141 if data is None:
jpayne@68 142 package_name = package.__spec__.name
jpayne@68 143 message = '{!r} resource not found in {!r}'.format(
jpayne@68 144 resource, package_name)
jpayne@68 145 raise FileNotFoundError(message)
jpayne@68 146 else:
jpayne@68 147 return TextIOWrapper(BytesIO(data), encoding, errors)
jpayne@68 148
jpayne@68 149
jpayne@68 150 def read_binary(package: Package, resource: Resource) -> bytes:
jpayne@68 151 """Return the binary contents of the resource."""
jpayne@68 152 resource = _normalize_path(resource)
jpayne@68 153 package = _get_package(package)
jpayne@68 154 with open_binary(package, resource) as fp:
jpayne@68 155 return fp.read()
jpayne@68 156
jpayne@68 157
jpayne@68 158 def read_text(package: Package,
jpayne@68 159 resource: Resource,
jpayne@68 160 encoding: str = 'utf-8',
jpayne@68 161 errors: str = 'strict') -> str:
jpayne@68 162 """Return the decoded string of the resource.
jpayne@68 163
jpayne@68 164 The decoding-related arguments have the same semantics as those of
jpayne@68 165 bytes.decode().
jpayne@68 166 """
jpayne@68 167 resource = _normalize_path(resource)
jpayne@68 168 package = _get_package(package)
jpayne@68 169 with open_text(package, resource, encoding, errors) as fp:
jpayne@68 170 return fp.read()
jpayne@68 171
jpayne@68 172
jpayne@68 173 @contextmanager
jpayne@68 174 def path(package: Package, resource: Resource) -> Iterator[Path]:
jpayne@68 175 """A context manager providing a file path object to the resource.
jpayne@68 176
jpayne@68 177 If the resource does not already exist on its own on the file system,
jpayne@68 178 a temporary file will be created. If the file was created, the file
jpayne@68 179 will be deleted upon exiting the context manager (no exception is
jpayne@68 180 raised if the file was deleted prior to the context manager
jpayne@68 181 exiting).
jpayne@68 182 """
jpayne@68 183 resource = _normalize_path(resource)
jpayne@68 184 package = _get_package(package)
jpayne@68 185 reader = _get_resource_reader(package)
jpayne@68 186 if reader is not None:
jpayne@68 187 try:
jpayne@68 188 yield Path(reader.resource_path(resource))
jpayne@68 189 return
jpayne@68 190 except FileNotFoundError:
jpayne@68 191 pass
jpayne@68 192 else:
jpayne@68 193 _check_location(package)
jpayne@68 194 # Fall-through for both the lack of resource_path() *and* if
jpayne@68 195 # resource_path() raises FileNotFoundError.
jpayne@68 196 package_directory = Path(package.__spec__.origin).parent
jpayne@68 197 file_path = package_directory / resource
jpayne@68 198 if file_path.exists():
jpayne@68 199 yield file_path
jpayne@68 200 else:
jpayne@68 201 with open_binary(package, resource) as fp:
jpayne@68 202 data = fp.read()
jpayne@68 203 # Not using tempfile.NamedTemporaryFile as it leads to deeper 'try'
jpayne@68 204 # blocks due to the need to close the temporary file to work on
jpayne@68 205 # Windows properly.
jpayne@68 206 fd, raw_path = tempfile.mkstemp()
jpayne@68 207 try:
jpayne@68 208 os.write(fd, data)
jpayne@68 209 os.close(fd)
jpayne@68 210 yield Path(raw_path)
jpayne@68 211 finally:
jpayne@68 212 try:
jpayne@68 213 os.remove(raw_path)
jpayne@68 214 except FileNotFoundError:
jpayne@68 215 pass
jpayne@68 216
jpayne@68 217
jpayne@68 218 def is_resource(package: Package, name: str) -> bool:
jpayne@68 219 """True if 'name' is a resource inside 'package'.
jpayne@68 220
jpayne@68 221 Directories are *not* resources.
jpayne@68 222 """
jpayne@68 223 package = _get_package(package)
jpayne@68 224 _normalize_path(name)
jpayne@68 225 reader = _get_resource_reader(package)
jpayne@68 226 if reader is not None:
jpayne@68 227 return reader.is_resource(name)
jpayne@68 228 try:
jpayne@68 229 package_contents = set(contents(package))
jpayne@68 230 except (NotADirectoryError, FileNotFoundError):
jpayne@68 231 return False
jpayne@68 232 if name not in package_contents:
jpayne@68 233 return False
jpayne@68 234 # Just because the given file_name lives as an entry in the package's
jpayne@68 235 # contents doesn't necessarily mean it's a resource. Directories are not
jpayne@68 236 # resources, so let's try to find out if it's a directory or not.
jpayne@68 237 path = Path(package.__spec__.origin).parent / name
jpayne@68 238 return path.is_file()
jpayne@68 239
jpayne@68 240
jpayne@68 241 def contents(package: Package) -> Iterable[str]:
jpayne@68 242 """Return an iterable of entries in 'package'.
jpayne@68 243
jpayne@68 244 Note that not all entries are resources. Specifically, directories are
jpayne@68 245 not considered resources. Use `is_resource()` on each entry returned here
jpayne@68 246 to check if it is a resource or not.
jpayne@68 247 """
jpayne@68 248 package = _get_package(package)
jpayne@68 249 reader = _get_resource_reader(package)
jpayne@68 250 if reader is not None:
jpayne@68 251 return reader.contents()
jpayne@68 252 # Is the package a namespace package? By definition, namespace packages
jpayne@68 253 # cannot have resources. We could use _check_location() and catch the
jpayne@68 254 # exception, but that's extra work, so just inline the check.
jpayne@68 255 elif package.__spec__.origin is None or not package.__spec__.has_location:
jpayne@68 256 return ()
jpayne@68 257 else:
jpayne@68 258 package_directory = Path(package.__spec__.origin).parent
jpayne@68 259 return os.listdir(package_directory)