jpayne@68
|
1 import os
|
jpayne@68
|
2 import tempfile
|
jpayne@68
|
3
|
jpayne@68
|
4 from . import abc as resources_abc
|
jpayne@68
|
5 from contextlib import contextmanager, suppress
|
jpayne@68
|
6 from importlib import import_module
|
jpayne@68
|
7 from importlib.abc import ResourceLoader
|
jpayne@68
|
8 from io import BytesIO, TextIOWrapper
|
jpayne@68
|
9 from pathlib import Path
|
jpayne@68
|
10 from types import ModuleType
|
jpayne@68
|
11 from typing import Iterable, Iterator, Optional, Set, Union # noqa: F401
|
jpayne@68
|
12 from typing import cast
|
jpayne@68
|
13 from typing.io import BinaryIO, TextIO
|
jpayne@68
|
14 from zipimport import ZipImportError
|
jpayne@68
|
15
|
jpayne@68
|
16
|
jpayne@68
|
17 __all__ = [
|
jpayne@68
|
18 'Package',
|
jpayne@68
|
19 'Resource',
|
jpayne@68
|
20 'contents',
|
jpayne@68
|
21 'is_resource',
|
jpayne@68
|
22 'open_binary',
|
jpayne@68
|
23 'open_text',
|
jpayne@68
|
24 'path',
|
jpayne@68
|
25 'read_binary',
|
jpayne@68
|
26 'read_text',
|
jpayne@68
|
27 ]
|
jpayne@68
|
28
|
jpayne@68
|
29
|
jpayne@68
|
30 Package = Union[str, ModuleType]
|
jpayne@68
|
31 Resource = Union[str, os.PathLike]
|
jpayne@68
|
32
|
jpayne@68
|
33
|
jpayne@68
|
34 def _get_package(package) -> ModuleType:
|
jpayne@68
|
35 """Take a package name or module object and return the module.
|
jpayne@68
|
36
|
jpayne@68
|
37 If a name, the module is imported. If the passed or imported module
|
jpayne@68
|
38 object is not a package, raise an exception.
|
jpayne@68
|
39 """
|
jpayne@68
|
40 if hasattr(package, '__spec__'):
|
jpayne@68
|
41 if package.__spec__.submodule_search_locations is None:
|
jpayne@68
|
42 raise TypeError('{!r} is not a package'.format(
|
jpayne@68
|
43 package.__spec__.name))
|
jpayne@68
|
44 else:
|
jpayne@68
|
45 return package
|
jpayne@68
|
46 else:
|
jpayne@68
|
47 module = import_module(package)
|
jpayne@68
|
48 if module.__spec__.submodule_search_locations is None:
|
jpayne@68
|
49 raise TypeError('{!r} is not a package'.format(package))
|
jpayne@68
|
50 else:
|
jpayne@68
|
51 return module
|
jpayne@68
|
52
|
jpayne@68
|
53
|
jpayne@68
|
54 def _normalize_path(path) -> str:
|
jpayne@68
|
55 """Normalize a path by ensuring it is a string.
|
jpayne@68
|
56
|
jpayne@68
|
57 If the resulting string contains path separators, an exception is raised.
|
jpayne@68
|
58 """
|
jpayne@68
|
59 parent, file_name = os.path.split(path)
|
jpayne@68
|
60 if parent:
|
jpayne@68
|
61 raise ValueError('{!r} must be only a file name'.format(path))
|
jpayne@68
|
62 else:
|
jpayne@68
|
63 return file_name
|
jpayne@68
|
64
|
jpayne@68
|
65
|
jpayne@68
|
66 def _get_resource_reader(
|
jpayne@68
|
67 package: ModuleType) -> Optional[resources_abc.ResourceReader]:
|
jpayne@68
|
68 # Return the package's loader if it's a ResourceReader. We can't use
|
jpayne@68
|
69 # a issubclass() check here because apparently abc.'s __subclasscheck__()
|
jpayne@68
|
70 # hook wants to create a weak reference to the object, but
|
jpayne@68
|
71 # zipimport.zipimporter does not support weak references, resulting in a
|
jpayne@68
|
72 # TypeError. That seems terrible.
|
jpayne@68
|
73 spec = package.__spec__
|
jpayne@68
|
74 if hasattr(spec.loader, 'get_resource_reader'):
|
jpayne@68
|
75 return cast(resources_abc.ResourceReader,
|
jpayne@68
|
76 spec.loader.get_resource_reader(spec.name))
|
jpayne@68
|
77 return None
|
jpayne@68
|
78
|
jpayne@68
|
79
|
jpayne@68
|
80 def _check_location(package):
|
jpayne@68
|
81 if package.__spec__.origin is None or not package.__spec__.has_location:
|
jpayne@68
|
82 raise FileNotFoundError(f'Package has no location {package!r}')
|
jpayne@68
|
83
|
jpayne@68
|
84
|
jpayne@68
|
85 def open_binary(package: Package, resource: Resource) -> BinaryIO:
|
jpayne@68
|
86 """Return a file-like object opened for binary reading of the resource."""
|
jpayne@68
|
87 resource = _normalize_path(resource)
|
jpayne@68
|
88 package = _get_package(package)
|
jpayne@68
|
89 reader = _get_resource_reader(package)
|
jpayne@68
|
90 if reader is not None:
|
jpayne@68
|
91 return reader.open_resource(resource)
|
jpayne@68
|
92 _check_location(package)
|
jpayne@68
|
93 absolute_package_path = os.path.abspath(package.__spec__.origin)
|
jpayne@68
|
94 package_path = os.path.dirname(absolute_package_path)
|
jpayne@68
|
95 full_path = os.path.join(package_path, resource)
|
jpayne@68
|
96 try:
|
jpayne@68
|
97 return open(full_path, mode='rb')
|
jpayne@68
|
98 except OSError:
|
jpayne@68
|
99 # Just assume the loader is a resource loader; all the relevant
|
jpayne@68
|
100 # importlib.machinery loaders are and an AttributeError for
|
jpayne@68
|
101 # get_data() will make it clear what is needed from the loader.
|
jpayne@68
|
102 loader = cast(ResourceLoader, package.__spec__.loader)
|
jpayne@68
|
103 data = None
|
jpayne@68
|
104 if hasattr(package.__spec__.loader, 'get_data'):
|
jpayne@68
|
105 with suppress(OSError):
|
jpayne@68
|
106 data = loader.get_data(full_path)
|
jpayne@68
|
107 if data is None:
|
jpayne@68
|
108 package_name = package.__spec__.name
|
jpayne@68
|
109 message = '{!r} resource not found in {!r}'.format(
|
jpayne@68
|
110 resource, package_name)
|
jpayne@68
|
111 raise FileNotFoundError(message)
|
jpayne@68
|
112 else:
|
jpayne@68
|
113 return BytesIO(data)
|
jpayne@68
|
114
|
jpayne@68
|
115
|
jpayne@68
|
116 def open_text(package: Package,
|
jpayne@68
|
117 resource: Resource,
|
jpayne@68
|
118 encoding: str = 'utf-8',
|
jpayne@68
|
119 errors: str = 'strict') -> TextIO:
|
jpayne@68
|
120 """Return a file-like object opened for text reading of the resource."""
|
jpayne@68
|
121 resource = _normalize_path(resource)
|
jpayne@68
|
122 package = _get_package(package)
|
jpayne@68
|
123 reader = _get_resource_reader(package)
|
jpayne@68
|
124 if reader is not None:
|
jpayne@68
|
125 return TextIOWrapper(reader.open_resource(resource), encoding, errors)
|
jpayne@68
|
126 _check_location(package)
|
jpayne@68
|
127 absolute_package_path = os.path.abspath(package.__spec__.origin)
|
jpayne@68
|
128 package_path = os.path.dirname(absolute_package_path)
|
jpayne@68
|
129 full_path = os.path.join(package_path, resource)
|
jpayne@68
|
130 try:
|
jpayne@68
|
131 return open(full_path, mode='r', encoding=encoding, errors=errors)
|
jpayne@68
|
132 except OSError:
|
jpayne@68
|
133 # Just assume the loader is a resource loader; all the relevant
|
jpayne@68
|
134 # importlib.machinery loaders are and an AttributeError for
|
jpayne@68
|
135 # get_data() will make it clear what is needed from the loader.
|
jpayne@68
|
136 loader = cast(ResourceLoader, package.__spec__.loader)
|
jpayne@68
|
137 data = None
|
jpayne@68
|
138 if hasattr(package.__spec__.loader, 'get_data'):
|
jpayne@68
|
139 with suppress(OSError):
|
jpayne@68
|
140 data = loader.get_data(full_path)
|
jpayne@68
|
141 if data is None:
|
jpayne@68
|
142 package_name = package.__spec__.name
|
jpayne@68
|
143 message = '{!r} resource not found in {!r}'.format(
|
jpayne@68
|
144 resource, package_name)
|
jpayne@68
|
145 raise FileNotFoundError(message)
|
jpayne@68
|
146 else:
|
jpayne@68
|
147 return TextIOWrapper(BytesIO(data), encoding, errors)
|
jpayne@68
|
148
|
jpayne@68
|
149
|
jpayne@68
|
150 def read_binary(package: Package, resource: Resource) -> bytes:
|
jpayne@68
|
151 """Return the binary contents of the resource."""
|
jpayne@68
|
152 resource = _normalize_path(resource)
|
jpayne@68
|
153 package = _get_package(package)
|
jpayne@68
|
154 with open_binary(package, resource) as fp:
|
jpayne@68
|
155 return fp.read()
|
jpayne@68
|
156
|
jpayne@68
|
157
|
jpayne@68
|
158 def read_text(package: Package,
|
jpayne@68
|
159 resource: Resource,
|
jpayne@68
|
160 encoding: str = 'utf-8',
|
jpayne@68
|
161 errors: str = 'strict') -> str:
|
jpayne@68
|
162 """Return the decoded string of the resource.
|
jpayne@68
|
163
|
jpayne@68
|
164 The decoding-related arguments have the same semantics as those of
|
jpayne@68
|
165 bytes.decode().
|
jpayne@68
|
166 """
|
jpayne@68
|
167 resource = _normalize_path(resource)
|
jpayne@68
|
168 package = _get_package(package)
|
jpayne@68
|
169 with open_text(package, resource, encoding, errors) as fp:
|
jpayne@68
|
170 return fp.read()
|
jpayne@68
|
171
|
jpayne@68
|
172
|
jpayne@68
|
173 @contextmanager
|
jpayne@68
|
174 def path(package: Package, resource: Resource) -> Iterator[Path]:
|
jpayne@68
|
175 """A context manager providing a file path object to the resource.
|
jpayne@68
|
176
|
jpayne@68
|
177 If the resource does not already exist on its own on the file system,
|
jpayne@68
|
178 a temporary file will be created. If the file was created, the file
|
jpayne@68
|
179 will be deleted upon exiting the context manager (no exception is
|
jpayne@68
|
180 raised if the file was deleted prior to the context manager
|
jpayne@68
|
181 exiting).
|
jpayne@68
|
182 """
|
jpayne@68
|
183 resource = _normalize_path(resource)
|
jpayne@68
|
184 package = _get_package(package)
|
jpayne@68
|
185 reader = _get_resource_reader(package)
|
jpayne@68
|
186 if reader is not None:
|
jpayne@68
|
187 try:
|
jpayne@68
|
188 yield Path(reader.resource_path(resource))
|
jpayne@68
|
189 return
|
jpayne@68
|
190 except FileNotFoundError:
|
jpayne@68
|
191 pass
|
jpayne@68
|
192 else:
|
jpayne@68
|
193 _check_location(package)
|
jpayne@68
|
194 # Fall-through for both the lack of resource_path() *and* if
|
jpayne@68
|
195 # resource_path() raises FileNotFoundError.
|
jpayne@68
|
196 package_directory = Path(package.__spec__.origin).parent
|
jpayne@68
|
197 file_path = package_directory / resource
|
jpayne@68
|
198 if file_path.exists():
|
jpayne@68
|
199 yield file_path
|
jpayne@68
|
200 else:
|
jpayne@68
|
201 with open_binary(package, resource) as fp:
|
jpayne@68
|
202 data = fp.read()
|
jpayne@68
|
203 # Not using tempfile.NamedTemporaryFile as it leads to deeper 'try'
|
jpayne@68
|
204 # blocks due to the need to close the temporary file to work on
|
jpayne@68
|
205 # Windows properly.
|
jpayne@68
|
206 fd, raw_path = tempfile.mkstemp()
|
jpayne@68
|
207 try:
|
jpayne@68
|
208 os.write(fd, data)
|
jpayne@68
|
209 os.close(fd)
|
jpayne@68
|
210 yield Path(raw_path)
|
jpayne@68
|
211 finally:
|
jpayne@68
|
212 try:
|
jpayne@68
|
213 os.remove(raw_path)
|
jpayne@68
|
214 except FileNotFoundError:
|
jpayne@68
|
215 pass
|
jpayne@68
|
216
|
jpayne@68
|
217
|
jpayne@68
|
218 def is_resource(package: Package, name: str) -> bool:
|
jpayne@68
|
219 """True if 'name' is a resource inside 'package'.
|
jpayne@68
|
220
|
jpayne@68
|
221 Directories are *not* resources.
|
jpayne@68
|
222 """
|
jpayne@68
|
223 package = _get_package(package)
|
jpayne@68
|
224 _normalize_path(name)
|
jpayne@68
|
225 reader = _get_resource_reader(package)
|
jpayne@68
|
226 if reader is not None:
|
jpayne@68
|
227 return reader.is_resource(name)
|
jpayne@68
|
228 try:
|
jpayne@68
|
229 package_contents = set(contents(package))
|
jpayne@68
|
230 except (NotADirectoryError, FileNotFoundError):
|
jpayne@68
|
231 return False
|
jpayne@68
|
232 if name not in package_contents:
|
jpayne@68
|
233 return False
|
jpayne@68
|
234 # Just because the given file_name lives as an entry in the package's
|
jpayne@68
|
235 # contents doesn't necessarily mean it's a resource. Directories are not
|
jpayne@68
|
236 # resources, so let's try to find out if it's a directory or not.
|
jpayne@68
|
237 path = Path(package.__spec__.origin).parent / name
|
jpayne@68
|
238 return path.is_file()
|
jpayne@68
|
239
|
jpayne@68
|
240
|
jpayne@68
|
241 def contents(package: Package) -> Iterable[str]:
|
jpayne@68
|
242 """Return an iterable of entries in 'package'.
|
jpayne@68
|
243
|
jpayne@68
|
244 Note that not all entries are resources. Specifically, directories are
|
jpayne@68
|
245 not considered resources. Use `is_resource()` on each entry returned here
|
jpayne@68
|
246 to check if it is a resource or not.
|
jpayne@68
|
247 """
|
jpayne@68
|
248 package = _get_package(package)
|
jpayne@68
|
249 reader = _get_resource_reader(package)
|
jpayne@68
|
250 if reader is not None:
|
jpayne@68
|
251 return reader.contents()
|
jpayne@68
|
252 # Is the package a namespace package? By definition, namespace packages
|
jpayne@68
|
253 # cannot have resources. We could use _check_location() and catch the
|
jpayne@68
|
254 # exception, but that's extra work, so just inline the check.
|
jpayne@68
|
255 elif package.__spec__.origin is None or not package.__spec__.has_location:
|
jpayne@68
|
256 return ()
|
jpayne@68
|
257 else:
|
jpayne@68
|
258 package_directory = Path(package.__spec__.origin).parent
|
jpayne@68
|
259 return os.listdir(package_directory)
|