jpayne@69
|
1 import os
|
jpayne@69
|
2 import sys
|
jpayne@69
|
3 import threading
|
jpayne@69
|
4
|
jpayne@69
|
5 from . import process
|
jpayne@69
|
6 from . import reduction
|
jpayne@69
|
7
|
jpayne@69
|
8 __all__ = ()
|
jpayne@69
|
9
|
jpayne@69
|
10 #
|
jpayne@69
|
11 # Exceptions
|
jpayne@69
|
12 #
|
jpayne@69
|
13
|
jpayne@69
|
14 class ProcessError(Exception):
|
jpayne@69
|
15 pass
|
jpayne@69
|
16
|
jpayne@69
|
17 class BufferTooShort(ProcessError):
|
jpayne@69
|
18 pass
|
jpayne@69
|
19
|
jpayne@69
|
20 class TimeoutError(ProcessError):
|
jpayne@69
|
21 pass
|
jpayne@69
|
22
|
jpayne@69
|
23 class AuthenticationError(ProcessError):
|
jpayne@69
|
24 pass
|
jpayne@69
|
25
|
jpayne@69
|
26 #
|
jpayne@69
|
27 # Base type for contexts. Bound methods of an instance of this type are included in __all__ of __init__.py
|
jpayne@69
|
28 #
|
jpayne@69
|
29
|
jpayne@69
|
30 class BaseContext(object):
|
jpayne@69
|
31
|
jpayne@69
|
32 ProcessError = ProcessError
|
jpayne@69
|
33 BufferTooShort = BufferTooShort
|
jpayne@69
|
34 TimeoutError = TimeoutError
|
jpayne@69
|
35 AuthenticationError = AuthenticationError
|
jpayne@69
|
36
|
jpayne@69
|
37 current_process = staticmethod(process.current_process)
|
jpayne@69
|
38 parent_process = staticmethod(process.parent_process)
|
jpayne@69
|
39 active_children = staticmethod(process.active_children)
|
jpayne@69
|
40
|
jpayne@69
|
41 def cpu_count(self):
|
jpayne@69
|
42 '''Returns the number of CPUs in the system'''
|
jpayne@69
|
43 num = os.cpu_count()
|
jpayne@69
|
44 if num is None:
|
jpayne@69
|
45 raise NotImplementedError('cannot determine number of cpus')
|
jpayne@69
|
46 else:
|
jpayne@69
|
47 return num
|
jpayne@69
|
48
|
jpayne@69
|
49 def Manager(self):
|
jpayne@69
|
50 '''Returns a manager associated with a running server process
|
jpayne@69
|
51
|
jpayne@69
|
52 The managers methods such as `Lock()`, `Condition()` and `Queue()`
|
jpayne@69
|
53 can be used to create shared objects.
|
jpayne@69
|
54 '''
|
jpayne@69
|
55 from .managers import SyncManager
|
jpayne@69
|
56 m = SyncManager(ctx=self.get_context())
|
jpayne@69
|
57 m.start()
|
jpayne@69
|
58 return m
|
jpayne@69
|
59
|
jpayne@69
|
60 def Pipe(self, duplex=True):
|
jpayne@69
|
61 '''Returns two connection object connected by a pipe'''
|
jpayne@69
|
62 from .connection import Pipe
|
jpayne@69
|
63 return Pipe(duplex)
|
jpayne@69
|
64
|
jpayne@69
|
65 def Lock(self):
|
jpayne@69
|
66 '''Returns a non-recursive lock object'''
|
jpayne@69
|
67 from .synchronize import Lock
|
jpayne@69
|
68 return Lock(ctx=self.get_context())
|
jpayne@69
|
69
|
jpayne@69
|
70 def RLock(self):
|
jpayne@69
|
71 '''Returns a recursive lock object'''
|
jpayne@69
|
72 from .synchronize import RLock
|
jpayne@69
|
73 return RLock(ctx=self.get_context())
|
jpayne@69
|
74
|
jpayne@69
|
75 def Condition(self, lock=None):
|
jpayne@69
|
76 '''Returns a condition object'''
|
jpayne@69
|
77 from .synchronize import Condition
|
jpayne@69
|
78 return Condition(lock, ctx=self.get_context())
|
jpayne@69
|
79
|
jpayne@69
|
80 def Semaphore(self, value=1):
|
jpayne@69
|
81 '''Returns a semaphore object'''
|
jpayne@69
|
82 from .synchronize import Semaphore
|
jpayne@69
|
83 return Semaphore(value, ctx=self.get_context())
|
jpayne@69
|
84
|
jpayne@69
|
85 def BoundedSemaphore(self, value=1):
|
jpayne@69
|
86 '''Returns a bounded semaphore object'''
|
jpayne@69
|
87 from .synchronize import BoundedSemaphore
|
jpayne@69
|
88 return BoundedSemaphore(value, ctx=self.get_context())
|
jpayne@69
|
89
|
jpayne@69
|
90 def Event(self):
|
jpayne@69
|
91 '''Returns an event object'''
|
jpayne@69
|
92 from .synchronize import Event
|
jpayne@69
|
93 return Event(ctx=self.get_context())
|
jpayne@69
|
94
|
jpayne@69
|
95 def Barrier(self, parties, action=None, timeout=None):
|
jpayne@69
|
96 '''Returns a barrier object'''
|
jpayne@69
|
97 from .synchronize import Barrier
|
jpayne@69
|
98 return Barrier(parties, action, timeout, ctx=self.get_context())
|
jpayne@69
|
99
|
jpayne@69
|
100 def Queue(self, maxsize=0):
|
jpayne@69
|
101 '''Returns a queue object'''
|
jpayne@69
|
102 from .queues import Queue
|
jpayne@69
|
103 return Queue(maxsize, ctx=self.get_context())
|
jpayne@69
|
104
|
jpayne@69
|
105 def JoinableQueue(self, maxsize=0):
|
jpayne@69
|
106 '''Returns a queue object'''
|
jpayne@69
|
107 from .queues import JoinableQueue
|
jpayne@69
|
108 return JoinableQueue(maxsize, ctx=self.get_context())
|
jpayne@69
|
109
|
jpayne@69
|
110 def SimpleQueue(self):
|
jpayne@69
|
111 '''Returns a queue object'''
|
jpayne@69
|
112 from .queues import SimpleQueue
|
jpayne@69
|
113 return SimpleQueue(ctx=self.get_context())
|
jpayne@69
|
114
|
jpayne@69
|
115 def Pool(self, processes=None, initializer=None, initargs=(),
|
jpayne@69
|
116 maxtasksperchild=None):
|
jpayne@69
|
117 '''Returns a process pool object'''
|
jpayne@69
|
118 from .pool import Pool
|
jpayne@69
|
119 return Pool(processes, initializer, initargs, maxtasksperchild,
|
jpayne@69
|
120 context=self.get_context())
|
jpayne@69
|
121
|
jpayne@69
|
122 def RawValue(self, typecode_or_type, *args):
|
jpayne@69
|
123 '''Returns a shared object'''
|
jpayne@69
|
124 from .sharedctypes import RawValue
|
jpayne@69
|
125 return RawValue(typecode_or_type, *args)
|
jpayne@69
|
126
|
jpayne@69
|
127 def RawArray(self, typecode_or_type, size_or_initializer):
|
jpayne@69
|
128 '''Returns a shared array'''
|
jpayne@69
|
129 from .sharedctypes import RawArray
|
jpayne@69
|
130 return RawArray(typecode_or_type, size_or_initializer)
|
jpayne@69
|
131
|
jpayne@69
|
132 def Value(self, typecode_or_type, *args, lock=True):
|
jpayne@69
|
133 '''Returns a synchronized shared object'''
|
jpayne@69
|
134 from .sharedctypes import Value
|
jpayne@69
|
135 return Value(typecode_or_type, *args, lock=lock,
|
jpayne@69
|
136 ctx=self.get_context())
|
jpayne@69
|
137
|
jpayne@69
|
138 def Array(self, typecode_or_type, size_or_initializer, *, lock=True):
|
jpayne@69
|
139 '''Returns a synchronized shared array'''
|
jpayne@69
|
140 from .sharedctypes import Array
|
jpayne@69
|
141 return Array(typecode_or_type, size_or_initializer, lock=lock,
|
jpayne@69
|
142 ctx=self.get_context())
|
jpayne@69
|
143
|
jpayne@69
|
144 def freeze_support(self):
|
jpayne@69
|
145 '''Check whether this is a fake forked process in a frozen executable.
|
jpayne@69
|
146 If so then run code specified by commandline and exit.
|
jpayne@69
|
147 '''
|
jpayne@69
|
148 if sys.platform == 'win32' and getattr(sys, 'frozen', False):
|
jpayne@69
|
149 from .spawn import freeze_support
|
jpayne@69
|
150 freeze_support()
|
jpayne@69
|
151
|
jpayne@69
|
152 def get_logger(self):
|
jpayne@69
|
153 '''Return package logger -- if it does not already exist then
|
jpayne@69
|
154 it is created.
|
jpayne@69
|
155 '''
|
jpayne@69
|
156 from .util import get_logger
|
jpayne@69
|
157 return get_logger()
|
jpayne@69
|
158
|
jpayne@69
|
159 def log_to_stderr(self, level=None):
|
jpayne@69
|
160 '''Turn on logging and add a handler which prints to stderr'''
|
jpayne@69
|
161 from .util import log_to_stderr
|
jpayne@69
|
162 return log_to_stderr(level)
|
jpayne@69
|
163
|
jpayne@69
|
164 def allow_connection_pickling(self):
|
jpayne@69
|
165 '''Install support for sending connections and sockets
|
jpayne@69
|
166 between processes
|
jpayne@69
|
167 '''
|
jpayne@69
|
168 # This is undocumented. In previous versions of multiprocessing
|
jpayne@69
|
169 # its only effect was to make socket objects inheritable on Windows.
|
jpayne@69
|
170 from . import connection
|
jpayne@69
|
171
|
jpayne@69
|
172 def set_executable(self, executable):
|
jpayne@69
|
173 '''Sets the path to a python.exe or pythonw.exe binary used to run
|
jpayne@69
|
174 child processes instead of sys.executable when using the 'spawn'
|
jpayne@69
|
175 start method. Useful for people embedding Python.
|
jpayne@69
|
176 '''
|
jpayne@69
|
177 from .spawn import set_executable
|
jpayne@69
|
178 set_executable(executable)
|
jpayne@69
|
179
|
jpayne@69
|
180 def set_forkserver_preload(self, module_names):
|
jpayne@69
|
181 '''Set list of module names to try to load in forkserver process.
|
jpayne@69
|
182 This is really just a hint.
|
jpayne@69
|
183 '''
|
jpayne@69
|
184 from .forkserver import set_forkserver_preload
|
jpayne@69
|
185 set_forkserver_preload(module_names)
|
jpayne@69
|
186
|
jpayne@69
|
187 def get_context(self, method=None):
|
jpayne@69
|
188 if method is None:
|
jpayne@69
|
189 return self
|
jpayne@69
|
190 try:
|
jpayne@69
|
191 ctx = _concrete_contexts[method]
|
jpayne@69
|
192 except KeyError:
|
jpayne@69
|
193 raise ValueError('cannot find context for %r' % method) from None
|
jpayne@69
|
194 ctx._check_available()
|
jpayne@69
|
195 return ctx
|
jpayne@69
|
196
|
jpayne@69
|
197 def get_start_method(self, allow_none=False):
|
jpayne@69
|
198 return self._name
|
jpayne@69
|
199
|
jpayne@69
|
200 def set_start_method(self, method, force=False):
|
jpayne@69
|
201 raise ValueError('cannot set start method of concrete context')
|
jpayne@69
|
202
|
jpayne@69
|
203 @property
|
jpayne@69
|
204 def reducer(self):
|
jpayne@69
|
205 '''Controls how objects will be reduced to a form that can be
|
jpayne@69
|
206 shared with other processes.'''
|
jpayne@69
|
207 return globals().get('reduction')
|
jpayne@69
|
208
|
jpayne@69
|
209 @reducer.setter
|
jpayne@69
|
210 def reducer(self, reduction):
|
jpayne@69
|
211 globals()['reduction'] = reduction
|
jpayne@69
|
212
|
jpayne@69
|
213 def _check_available(self):
|
jpayne@69
|
214 pass
|
jpayne@69
|
215
|
jpayne@69
|
216 #
|
jpayne@69
|
217 # Type of default context -- underlying context can be set at most once
|
jpayne@69
|
218 #
|
jpayne@69
|
219
|
jpayne@69
|
220 class Process(process.BaseProcess):
|
jpayne@69
|
221 _start_method = None
|
jpayne@69
|
222 @staticmethod
|
jpayne@69
|
223 def _Popen(process_obj):
|
jpayne@69
|
224 return _default_context.get_context().Process._Popen(process_obj)
|
jpayne@69
|
225
|
jpayne@69
|
226 class DefaultContext(BaseContext):
|
jpayne@69
|
227 Process = Process
|
jpayne@69
|
228
|
jpayne@69
|
229 def __init__(self, context):
|
jpayne@69
|
230 self._default_context = context
|
jpayne@69
|
231 self._actual_context = None
|
jpayne@69
|
232
|
jpayne@69
|
233 def get_context(self, method=None):
|
jpayne@69
|
234 if method is None:
|
jpayne@69
|
235 if self._actual_context is None:
|
jpayne@69
|
236 self._actual_context = self._default_context
|
jpayne@69
|
237 return self._actual_context
|
jpayne@69
|
238 else:
|
jpayne@69
|
239 return super().get_context(method)
|
jpayne@69
|
240
|
jpayne@69
|
241 def set_start_method(self, method, force=False):
|
jpayne@69
|
242 if self._actual_context is not None and not force:
|
jpayne@69
|
243 raise RuntimeError('context has already been set')
|
jpayne@69
|
244 if method is None and force:
|
jpayne@69
|
245 self._actual_context = None
|
jpayne@69
|
246 return
|
jpayne@69
|
247 self._actual_context = self.get_context(method)
|
jpayne@69
|
248
|
jpayne@69
|
249 def get_start_method(self, allow_none=False):
|
jpayne@69
|
250 if self._actual_context is None:
|
jpayne@69
|
251 if allow_none:
|
jpayne@69
|
252 return None
|
jpayne@69
|
253 self._actual_context = self._default_context
|
jpayne@69
|
254 return self._actual_context._name
|
jpayne@69
|
255
|
jpayne@69
|
256 def get_all_start_methods(self):
|
jpayne@69
|
257 if sys.platform == 'win32':
|
jpayne@69
|
258 return ['spawn']
|
jpayne@69
|
259 else:
|
jpayne@69
|
260 if reduction.HAVE_SEND_HANDLE:
|
jpayne@69
|
261 return ['fork', 'spawn', 'forkserver']
|
jpayne@69
|
262 else:
|
jpayne@69
|
263 return ['fork', 'spawn']
|
jpayne@69
|
264
|
jpayne@69
|
265 #
|
jpayne@69
|
266 # Context types for fixed start method
|
jpayne@69
|
267 #
|
jpayne@69
|
268
|
jpayne@69
|
269 if sys.platform != 'win32':
|
jpayne@69
|
270
|
jpayne@69
|
271 class ForkProcess(process.BaseProcess):
|
jpayne@69
|
272 _start_method = 'fork'
|
jpayne@69
|
273 @staticmethod
|
jpayne@69
|
274 def _Popen(process_obj):
|
jpayne@69
|
275 from .popen_fork import Popen
|
jpayne@69
|
276 return Popen(process_obj)
|
jpayne@69
|
277
|
jpayne@69
|
278 class SpawnProcess(process.BaseProcess):
|
jpayne@69
|
279 _start_method = 'spawn'
|
jpayne@69
|
280 @staticmethod
|
jpayne@69
|
281 def _Popen(process_obj):
|
jpayne@69
|
282 from .popen_spawn_posix import Popen
|
jpayne@69
|
283 return Popen(process_obj)
|
jpayne@69
|
284
|
jpayne@69
|
285 class ForkServerProcess(process.BaseProcess):
|
jpayne@69
|
286 _start_method = 'forkserver'
|
jpayne@69
|
287 @staticmethod
|
jpayne@69
|
288 def _Popen(process_obj):
|
jpayne@69
|
289 from .popen_forkserver import Popen
|
jpayne@69
|
290 return Popen(process_obj)
|
jpayne@69
|
291
|
jpayne@69
|
292 class ForkContext(BaseContext):
|
jpayne@69
|
293 _name = 'fork'
|
jpayne@69
|
294 Process = ForkProcess
|
jpayne@69
|
295
|
jpayne@69
|
296 class SpawnContext(BaseContext):
|
jpayne@69
|
297 _name = 'spawn'
|
jpayne@69
|
298 Process = SpawnProcess
|
jpayne@69
|
299
|
jpayne@69
|
300 class ForkServerContext(BaseContext):
|
jpayne@69
|
301 _name = 'forkserver'
|
jpayne@69
|
302 Process = ForkServerProcess
|
jpayne@69
|
303 def _check_available(self):
|
jpayne@69
|
304 if not reduction.HAVE_SEND_HANDLE:
|
jpayne@69
|
305 raise ValueError('forkserver start method not available')
|
jpayne@69
|
306
|
jpayne@69
|
307 _concrete_contexts = {
|
jpayne@69
|
308 'fork': ForkContext(),
|
jpayne@69
|
309 'spawn': SpawnContext(),
|
jpayne@69
|
310 'forkserver': ForkServerContext(),
|
jpayne@69
|
311 }
|
jpayne@69
|
312 if sys.platform == 'darwin':
|
jpayne@69
|
313 # bpo-33725: running arbitrary code after fork() is no longer reliable
|
jpayne@69
|
314 # on macOS since macOS 10.14 (Mojave). Use spawn by default instead.
|
jpayne@69
|
315 _default_context = DefaultContext(_concrete_contexts['spawn'])
|
jpayne@69
|
316 else:
|
jpayne@69
|
317 _default_context = DefaultContext(_concrete_contexts['fork'])
|
jpayne@69
|
318
|
jpayne@69
|
319 else:
|
jpayne@69
|
320
|
jpayne@69
|
321 class SpawnProcess(process.BaseProcess):
|
jpayne@69
|
322 _start_method = 'spawn'
|
jpayne@69
|
323 @staticmethod
|
jpayne@69
|
324 def _Popen(process_obj):
|
jpayne@69
|
325 from .popen_spawn_win32 import Popen
|
jpayne@69
|
326 return Popen(process_obj)
|
jpayne@69
|
327
|
jpayne@69
|
328 class SpawnContext(BaseContext):
|
jpayne@69
|
329 _name = 'spawn'
|
jpayne@69
|
330 Process = SpawnProcess
|
jpayne@69
|
331
|
jpayne@69
|
332 _concrete_contexts = {
|
jpayne@69
|
333 'spawn': SpawnContext(),
|
jpayne@69
|
334 }
|
jpayne@69
|
335 _default_context = DefaultContext(_concrete_contexts['spawn'])
|
jpayne@69
|
336
|
jpayne@69
|
337 #
|
jpayne@69
|
338 # Force the start method
|
jpayne@69
|
339 #
|
jpayne@69
|
340
|
jpayne@69
|
341 def _force_start_method(method):
|
jpayne@69
|
342 _default_context._actual_context = _concrete_contexts[method]
|
jpayne@69
|
343
|
jpayne@69
|
344 #
|
jpayne@69
|
345 # Check that the current thread is spawning a child process
|
jpayne@69
|
346 #
|
jpayne@69
|
347
|
jpayne@69
|
348 _tls = threading.local()
|
jpayne@69
|
349
|
jpayne@69
|
350 def get_spawning_popen():
|
jpayne@69
|
351 return getattr(_tls, 'spawning_popen', None)
|
jpayne@69
|
352
|
jpayne@69
|
353 def set_spawning_popen(popen):
|
jpayne@69
|
354 _tls.spawning_popen = popen
|
jpayne@69
|
355
|
jpayne@69
|
356 def assert_spawning(obj):
|
jpayne@69
|
357 if get_spawning_popen() is None:
|
jpayne@69
|
358 raise RuntimeError(
|
jpayne@69
|
359 '%s objects should only be shared between processes'
|
jpayne@69
|
360 ' through inheritance' % type(obj).__name__
|
jpayne@69
|
361 )
|