comparison CSP2/CSP2_env/env-d9b9114564458d9d-741b3de822f2aaca6c6caa4325c4afce/lib/python3.8/importlib/resources.py @ 68:5028fdace37b

planemo upload commit 2e9511a184a1ca667c7be0c6321a36dc4e3d116d
author jpayne
date Tue, 18 Mar 2025 16:23:26 -0400
parents
children
comparison
equal deleted inserted replaced
67:0e9998148a16 68:5028fdace37b
1 import os
2 import tempfile
3
4 from . import abc as resources_abc
5 from contextlib import contextmanager, suppress
6 from importlib import import_module
7 from importlib.abc import ResourceLoader
8 from io import BytesIO, TextIOWrapper
9 from pathlib import Path
10 from types import ModuleType
11 from typing import Iterable, Iterator, Optional, Set, Union # noqa: F401
12 from typing import cast
13 from typing.io import BinaryIO, TextIO
14 from zipimport import ZipImportError
15
16
17 __all__ = [
18 'Package',
19 'Resource',
20 'contents',
21 'is_resource',
22 'open_binary',
23 'open_text',
24 'path',
25 'read_binary',
26 'read_text',
27 ]
28
29
30 Package = Union[str, ModuleType]
31 Resource = Union[str, os.PathLike]
32
33
34 def _get_package(package) -> ModuleType:
35 """Take a package name or module object and return the module.
36
37 If a name, the module is imported. If the passed or imported module
38 object is not a package, raise an exception.
39 """
40 if hasattr(package, '__spec__'):
41 if package.__spec__.submodule_search_locations is None:
42 raise TypeError('{!r} is not a package'.format(
43 package.__spec__.name))
44 else:
45 return package
46 else:
47 module = import_module(package)
48 if module.__spec__.submodule_search_locations is None:
49 raise TypeError('{!r} is not a package'.format(package))
50 else:
51 return module
52
53
54 def _normalize_path(path) -> str:
55 """Normalize a path by ensuring it is a string.
56
57 If the resulting string contains path separators, an exception is raised.
58 """
59 parent, file_name = os.path.split(path)
60 if parent:
61 raise ValueError('{!r} must be only a file name'.format(path))
62 else:
63 return file_name
64
65
66 def _get_resource_reader(
67 package: ModuleType) -> Optional[resources_abc.ResourceReader]:
68 # Return the package's loader if it's a ResourceReader. We can't use
69 # a issubclass() check here because apparently abc.'s __subclasscheck__()
70 # hook wants to create a weak reference to the object, but
71 # zipimport.zipimporter does not support weak references, resulting in a
72 # TypeError. That seems terrible.
73 spec = package.__spec__
74 if hasattr(spec.loader, 'get_resource_reader'):
75 return cast(resources_abc.ResourceReader,
76 spec.loader.get_resource_reader(spec.name))
77 return None
78
79
80 def _check_location(package):
81 if package.__spec__.origin is None or not package.__spec__.has_location:
82 raise FileNotFoundError(f'Package has no location {package!r}')
83
84
85 def open_binary(package: Package, resource: Resource) -> BinaryIO:
86 """Return a file-like object opened for binary reading of the resource."""
87 resource = _normalize_path(resource)
88 package = _get_package(package)
89 reader = _get_resource_reader(package)
90 if reader is not None:
91 return reader.open_resource(resource)
92 _check_location(package)
93 absolute_package_path = os.path.abspath(package.__spec__.origin)
94 package_path = os.path.dirname(absolute_package_path)
95 full_path = os.path.join(package_path, resource)
96 try:
97 return open(full_path, mode='rb')
98 except OSError:
99 # Just assume the loader is a resource loader; all the relevant
100 # importlib.machinery loaders are and an AttributeError for
101 # get_data() will make it clear what is needed from the loader.
102 loader = cast(ResourceLoader, package.__spec__.loader)
103 data = None
104 if hasattr(package.__spec__.loader, 'get_data'):
105 with suppress(OSError):
106 data = loader.get_data(full_path)
107 if data is None:
108 package_name = package.__spec__.name
109 message = '{!r} resource not found in {!r}'.format(
110 resource, package_name)
111 raise FileNotFoundError(message)
112 else:
113 return BytesIO(data)
114
115
116 def open_text(package: Package,
117 resource: Resource,
118 encoding: str = 'utf-8',
119 errors: str = 'strict') -> TextIO:
120 """Return a file-like object opened for text reading of the resource."""
121 resource = _normalize_path(resource)
122 package = _get_package(package)
123 reader = _get_resource_reader(package)
124 if reader is not None:
125 return TextIOWrapper(reader.open_resource(resource), encoding, errors)
126 _check_location(package)
127 absolute_package_path = os.path.abspath(package.__spec__.origin)
128 package_path = os.path.dirname(absolute_package_path)
129 full_path = os.path.join(package_path, resource)
130 try:
131 return open(full_path, mode='r', encoding=encoding, errors=errors)
132 except OSError:
133 # Just assume the loader is a resource loader; all the relevant
134 # importlib.machinery loaders are and an AttributeError for
135 # get_data() will make it clear what is needed from the loader.
136 loader = cast(ResourceLoader, package.__spec__.loader)
137 data = None
138 if hasattr(package.__spec__.loader, 'get_data'):
139 with suppress(OSError):
140 data = loader.get_data(full_path)
141 if data is None:
142 package_name = package.__spec__.name
143 message = '{!r} resource not found in {!r}'.format(
144 resource, package_name)
145 raise FileNotFoundError(message)
146 else:
147 return TextIOWrapper(BytesIO(data), encoding, errors)
148
149
150 def read_binary(package: Package, resource: Resource) -> bytes:
151 """Return the binary contents of the resource."""
152 resource = _normalize_path(resource)
153 package = _get_package(package)
154 with open_binary(package, resource) as fp:
155 return fp.read()
156
157
158 def read_text(package: Package,
159 resource: Resource,
160 encoding: str = 'utf-8',
161 errors: str = 'strict') -> str:
162 """Return the decoded string of the resource.
163
164 The decoding-related arguments have the same semantics as those of
165 bytes.decode().
166 """
167 resource = _normalize_path(resource)
168 package = _get_package(package)
169 with open_text(package, resource, encoding, errors) as fp:
170 return fp.read()
171
172
173 @contextmanager
174 def path(package: Package, resource: Resource) -> Iterator[Path]:
175 """A context manager providing a file path object to the resource.
176
177 If the resource does not already exist on its own on the file system,
178 a temporary file will be created. If the file was created, the file
179 will be deleted upon exiting the context manager (no exception is
180 raised if the file was deleted prior to the context manager
181 exiting).
182 """
183 resource = _normalize_path(resource)
184 package = _get_package(package)
185 reader = _get_resource_reader(package)
186 if reader is not None:
187 try:
188 yield Path(reader.resource_path(resource))
189 return
190 except FileNotFoundError:
191 pass
192 else:
193 _check_location(package)
194 # Fall-through for both the lack of resource_path() *and* if
195 # resource_path() raises FileNotFoundError.
196 package_directory = Path(package.__spec__.origin).parent
197 file_path = package_directory / resource
198 if file_path.exists():
199 yield file_path
200 else:
201 with open_binary(package, resource) as fp:
202 data = fp.read()
203 # Not using tempfile.NamedTemporaryFile as it leads to deeper 'try'
204 # blocks due to the need to close the temporary file to work on
205 # Windows properly.
206 fd, raw_path = tempfile.mkstemp()
207 try:
208 os.write(fd, data)
209 os.close(fd)
210 yield Path(raw_path)
211 finally:
212 try:
213 os.remove(raw_path)
214 except FileNotFoundError:
215 pass
216
217
218 def is_resource(package: Package, name: str) -> bool:
219 """True if 'name' is a resource inside 'package'.
220
221 Directories are *not* resources.
222 """
223 package = _get_package(package)
224 _normalize_path(name)
225 reader = _get_resource_reader(package)
226 if reader is not None:
227 return reader.is_resource(name)
228 try:
229 package_contents = set(contents(package))
230 except (NotADirectoryError, FileNotFoundError):
231 return False
232 if name not in package_contents:
233 return False
234 # Just because the given file_name lives as an entry in the package's
235 # contents doesn't necessarily mean it's a resource. Directories are not
236 # resources, so let's try to find out if it's a directory or not.
237 path = Path(package.__spec__.origin).parent / name
238 return path.is_file()
239
240
241 def contents(package: Package) -> Iterable[str]:
242 """Return an iterable of entries in 'package'.
243
244 Note that not all entries are resources. Specifically, directories are
245 not considered resources. Use `is_resource()` on each entry returned here
246 to check if it is a resource or not.
247 """
248 package = _get_package(package)
249 reader = _get_resource_reader(package)
250 if reader is not None:
251 return reader.contents()
252 # Is the package a namespace package? By definition, namespace packages
253 # cannot have resources. We could use _check_location() and catch the
254 # exception, but that's extra work, so just inline the check.
255 elif package.__spec__.origin is None or not package.__spec__.has_location:
256 return ()
257 else:
258 package_directory = Path(package.__spec__.origin).parent
259 return os.listdir(package_directory)