Mercurial > repos > rliterman > csp2
comparison CSP2/CSP2_env/env-d9b9114564458d9d-741b3de822f2aaca6c6caa4325c4afce/lib/python3.8/importlib/resources.py @ 68:5028fdace37b
planemo upload commit 2e9511a184a1ca667c7be0c6321a36dc4e3d116d
author | jpayne |
---|---|
date | Tue, 18 Mar 2025 16:23:26 -0400 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
67:0e9998148a16 | 68:5028fdace37b |
---|---|
1 import os | |
2 import tempfile | |
3 | |
4 from . import abc as resources_abc | |
5 from contextlib import contextmanager, suppress | |
6 from importlib import import_module | |
7 from importlib.abc import ResourceLoader | |
8 from io import BytesIO, TextIOWrapper | |
9 from pathlib import Path | |
10 from types import ModuleType | |
11 from typing import Iterable, Iterator, Optional, Set, Union # noqa: F401 | |
12 from typing import cast | |
13 from typing.io import BinaryIO, TextIO | |
14 from zipimport import ZipImportError | |
15 | |
16 | |
17 __all__ = [ | |
18 'Package', | |
19 'Resource', | |
20 'contents', | |
21 'is_resource', | |
22 'open_binary', | |
23 'open_text', | |
24 'path', | |
25 'read_binary', | |
26 'read_text', | |
27 ] | |
28 | |
29 | |
30 Package = Union[str, ModuleType] | |
31 Resource = Union[str, os.PathLike] | |
32 | |
33 | |
34 def _get_package(package) -> ModuleType: | |
35 """Take a package name or module object and return the module. | |
36 | |
37 If a name, the module is imported. If the passed or imported module | |
38 object is not a package, raise an exception. | |
39 """ | |
40 if hasattr(package, '__spec__'): | |
41 if package.__spec__.submodule_search_locations is None: | |
42 raise TypeError('{!r} is not a package'.format( | |
43 package.__spec__.name)) | |
44 else: | |
45 return package | |
46 else: | |
47 module = import_module(package) | |
48 if module.__spec__.submodule_search_locations is None: | |
49 raise TypeError('{!r} is not a package'.format(package)) | |
50 else: | |
51 return module | |
52 | |
53 | |
54 def _normalize_path(path) -> str: | |
55 """Normalize a path by ensuring it is a string. | |
56 | |
57 If the resulting string contains path separators, an exception is raised. | |
58 """ | |
59 parent, file_name = os.path.split(path) | |
60 if parent: | |
61 raise ValueError('{!r} must be only a file name'.format(path)) | |
62 else: | |
63 return file_name | |
64 | |
65 | |
66 def _get_resource_reader( | |
67 package: ModuleType) -> Optional[resources_abc.ResourceReader]: | |
68 # Return the package's loader if it's a ResourceReader. We can't use | |
69 # a issubclass() check here because apparently abc.'s __subclasscheck__() | |
70 # hook wants to create a weak reference to the object, but | |
71 # zipimport.zipimporter does not support weak references, resulting in a | |
72 # TypeError. That seems terrible. | |
73 spec = package.__spec__ | |
74 if hasattr(spec.loader, 'get_resource_reader'): | |
75 return cast(resources_abc.ResourceReader, | |
76 spec.loader.get_resource_reader(spec.name)) | |
77 return None | |
78 | |
79 | |
80 def _check_location(package): | |
81 if package.__spec__.origin is None or not package.__spec__.has_location: | |
82 raise FileNotFoundError(f'Package has no location {package!r}') | |
83 | |
84 | |
85 def open_binary(package: Package, resource: Resource) -> BinaryIO: | |
86 """Return a file-like object opened for binary reading of the resource.""" | |
87 resource = _normalize_path(resource) | |
88 package = _get_package(package) | |
89 reader = _get_resource_reader(package) | |
90 if reader is not None: | |
91 return reader.open_resource(resource) | |
92 _check_location(package) | |
93 absolute_package_path = os.path.abspath(package.__spec__.origin) | |
94 package_path = os.path.dirname(absolute_package_path) | |
95 full_path = os.path.join(package_path, resource) | |
96 try: | |
97 return open(full_path, mode='rb') | |
98 except OSError: | |
99 # Just assume the loader is a resource loader; all the relevant | |
100 # importlib.machinery loaders are and an AttributeError for | |
101 # get_data() will make it clear what is needed from the loader. | |
102 loader = cast(ResourceLoader, package.__spec__.loader) | |
103 data = None | |
104 if hasattr(package.__spec__.loader, 'get_data'): | |
105 with suppress(OSError): | |
106 data = loader.get_data(full_path) | |
107 if data is None: | |
108 package_name = package.__spec__.name | |
109 message = '{!r} resource not found in {!r}'.format( | |
110 resource, package_name) | |
111 raise FileNotFoundError(message) | |
112 else: | |
113 return BytesIO(data) | |
114 | |
115 | |
116 def open_text(package: Package, | |
117 resource: Resource, | |
118 encoding: str = 'utf-8', | |
119 errors: str = 'strict') -> TextIO: | |
120 """Return a file-like object opened for text reading of the resource.""" | |
121 resource = _normalize_path(resource) | |
122 package = _get_package(package) | |
123 reader = _get_resource_reader(package) | |
124 if reader is not None: | |
125 return TextIOWrapper(reader.open_resource(resource), encoding, errors) | |
126 _check_location(package) | |
127 absolute_package_path = os.path.abspath(package.__spec__.origin) | |
128 package_path = os.path.dirname(absolute_package_path) | |
129 full_path = os.path.join(package_path, resource) | |
130 try: | |
131 return open(full_path, mode='r', encoding=encoding, errors=errors) | |
132 except OSError: | |
133 # Just assume the loader is a resource loader; all the relevant | |
134 # importlib.machinery loaders are and an AttributeError for | |
135 # get_data() will make it clear what is needed from the loader. | |
136 loader = cast(ResourceLoader, package.__spec__.loader) | |
137 data = None | |
138 if hasattr(package.__spec__.loader, 'get_data'): | |
139 with suppress(OSError): | |
140 data = loader.get_data(full_path) | |
141 if data is None: | |
142 package_name = package.__spec__.name | |
143 message = '{!r} resource not found in {!r}'.format( | |
144 resource, package_name) | |
145 raise FileNotFoundError(message) | |
146 else: | |
147 return TextIOWrapper(BytesIO(data), encoding, errors) | |
148 | |
149 | |
150 def read_binary(package: Package, resource: Resource) -> bytes: | |
151 """Return the binary contents of the resource.""" | |
152 resource = _normalize_path(resource) | |
153 package = _get_package(package) | |
154 with open_binary(package, resource) as fp: | |
155 return fp.read() | |
156 | |
157 | |
158 def read_text(package: Package, | |
159 resource: Resource, | |
160 encoding: str = 'utf-8', | |
161 errors: str = 'strict') -> str: | |
162 """Return the decoded string of the resource. | |
163 | |
164 The decoding-related arguments have the same semantics as those of | |
165 bytes.decode(). | |
166 """ | |
167 resource = _normalize_path(resource) | |
168 package = _get_package(package) | |
169 with open_text(package, resource, encoding, errors) as fp: | |
170 return fp.read() | |
171 | |
172 | |
173 @contextmanager | |
174 def path(package: Package, resource: Resource) -> Iterator[Path]: | |
175 """A context manager providing a file path object to the resource. | |
176 | |
177 If the resource does not already exist on its own on the file system, | |
178 a temporary file will be created. If the file was created, the file | |
179 will be deleted upon exiting the context manager (no exception is | |
180 raised if the file was deleted prior to the context manager | |
181 exiting). | |
182 """ | |
183 resource = _normalize_path(resource) | |
184 package = _get_package(package) | |
185 reader = _get_resource_reader(package) | |
186 if reader is not None: | |
187 try: | |
188 yield Path(reader.resource_path(resource)) | |
189 return | |
190 except FileNotFoundError: | |
191 pass | |
192 else: | |
193 _check_location(package) | |
194 # Fall-through for both the lack of resource_path() *and* if | |
195 # resource_path() raises FileNotFoundError. | |
196 package_directory = Path(package.__spec__.origin).parent | |
197 file_path = package_directory / resource | |
198 if file_path.exists(): | |
199 yield file_path | |
200 else: | |
201 with open_binary(package, resource) as fp: | |
202 data = fp.read() | |
203 # Not using tempfile.NamedTemporaryFile as it leads to deeper 'try' | |
204 # blocks due to the need to close the temporary file to work on | |
205 # Windows properly. | |
206 fd, raw_path = tempfile.mkstemp() | |
207 try: | |
208 os.write(fd, data) | |
209 os.close(fd) | |
210 yield Path(raw_path) | |
211 finally: | |
212 try: | |
213 os.remove(raw_path) | |
214 except FileNotFoundError: | |
215 pass | |
216 | |
217 | |
218 def is_resource(package: Package, name: str) -> bool: | |
219 """True if 'name' is a resource inside 'package'. | |
220 | |
221 Directories are *not* resources. | |
222 """ | |
223 package = _get_package(package) | |
224 _normalize_path(name) | |
225 reader = _get_resource_reader(package) | |
226 if reader is not None: | |
227 return reader.is_resource(name) | |
228 try: | |
229 package_contents = set(contents(package)) | |
230 except (NotADirectoryError, FileNotFoundError): | |
231 return False | |
232 if name not in package_contents: | |
233 return False | |
234 # Just because the given file_name lives as an entry in the package's | |
235 # contents doesn't necessarily mean it's a resource. Directories are not | |
236 # resources, so let's try to find out if it's a directory or not. | |
237 path = Path(package.__spec__.origin).parent / name | |
238 return path.is_file() | |
239 | |
240 | |
241 def contents(package: Package) -> Iterable[str]: | |
242 """Return an iterable of entries in 'package'. | |
243 | |
244 Note that not all entries are resources. Specifically, directories are | |
245 not considered resources. Use `is_resource()` on each entry returned here | |
246 to check if it is a resource or not. | |
247 """ | |
248 package = _get_package(package) | |
249 reader = _get_resource_reader(package) | |
250 if reader is not None: | |
251 return reader.contents() | |
252 # Is the package a namespace package? By definition, namespace packages | |
253 # cannot have resources. We could use _check_location() and catch the | |
254 # exception, but that's extra work, so just inline the check. | |
255 elif package.__spec__.origin is None or not package.__spec__.has_location: | |
256 return () | |
257 else: | |
258 package_directory = Path(package.__spec__.origin).parent | |
259 return os.listdir(package_directory) |