Mercurial > repos > rliterman > csp2
comparison CSP2/CSP2_env/env-d9b9114564458d9d-741b3de822f2aaca6c6caa4325c4afce/lib/python3.8/multiprocessing/resource_sharer.py @ 68:5028fdace37b
planemo upload commit 2e9511a184a1ca667c7be0c6321a36dc4e3d116d
author | jpayne |
---|---|
date | Tue, 18 Mar 2025 16:23:26 -0400 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
67:0e9998148a16 | 68:5028fdace37b |
---|---|
1 # | |
2 # We use a background thread for sharing fds on Unix, and for sharing sockets on | |
3 # Windows. | |
4 # | |
5 # A client which wants to pickle a resource registers it with the resource | |
6 # sharer and gets an identifier in return. The unpickling process will connect | |
7 # to the resource sharer, sends the identifier and its pid, and then receives | |
8 # the resource. | |
9 # | |
10 | |
11 import os | |
12 import signal | |
13 import socket | |
14 import sys | |
15 import threading | |
16 | |
17 from . import process | |
18 from .context import reduction | |
19 from . import util | |
20 | |
21 __all__ = ['stop'] | |
22 | |
23 | |
24 if sys.platform == 'win32': | |
25 __all__ += ['DupSocket'] | |
26 | |
27 class DupSocket(object): | |
28 '''Picklable wrapper for a socket.''' | |
29 def __init__(self, sock): | |
30 new_sock = sock.dup() | |
31 def send(conn, pid): | |
32 share = new_sock.share(pid) | |
33 conn.send_bytes(share) | |
34 self._id = _resource_sharer.register(send, new_sock.close) | |
35 | |
36 def detach(self): | |
37 '''Get the socket. This should only be called once.''' | |
38 with _resource_sharer.get_connection(self._id) as conn: | |
39 share = conn.recv_bytes() | |
40 return socket.fromshare(share) | |
41 | |
42 else: | |
43 __all__ += ['DupFd'] | |
44 | |
45 class DupFd(object): | |
46 '''Wrapper for fd which can be used at any time.''' | |
47 def __init__(self, fd): | |
48 new_fd = os.dup(fd) | |
49 def send(conn, pid): | |
50 reduction.send_handle(conn, new_fd, pid) | |
51 def close(): | |
52 os.close(new_fd) | |
53 self._id = _resource_sharer.register(send, close) | |
54 | |
55 def detach(self): | |
56 '''Get the fd. This should only be called once.''' | |
57 with _resource_sharer.get_connection(self._id) as conn: | |
58 return reduction.recv_handle(conn) | |
59 | |
60 | |
61 class _ResourceSharer(object): | |
62 '''Manager for resources using background thread.''' | |
63 def __init__(self): | |
64 self._key = 0 | |
65 self._cache = {} | |
66 self._old_locks = [] | |
67 self._lock = threading.Lock() | |
68 self._listener = None | |
69 self._address = None | |
70 self._thread = None | |
71 util.register_after_fork(self, _ResourceSharer._afterfork) | |
72 | |
73 def register(self, send, close): | |
74 '''Register resource, returning an identifier.''' | |
75 with self._lock: | |
76 if self._address is None: | |
77 self._start() | |
78 self._key += 1 | |
79 self._cache[self._key] = (send, close) | |
80 return (self._address, self._key) | |
81 | |
82 @staticmethod | |
83 def get_connection(ident): | |
84 '''Return connection from which to receive identified resource.''' | |
85 from .connection import Client | |
86 address, key = ident | |
87 c = Client(address, authkey=process.current_process().authkey) | |
88 c.send((key, os.getpid())) | |
89 return c | |
90 | |
91 def stop(self, timeout=None): | |
92 '''Stop the background thread and clear registered resources.''' | |
93 from .connection import Client | |
94 with self._lock: | |
95 if self._address is not None: | |
96 c = Client(self._address, | |
97 authkey=process.current_process().authkey) | |
98 c.send(None) | |
99 c.close() | |
100 self._thread.join(timeout) | |
101 if self._thread.is_alive(): | |
102 util.sub_warning('_ResourceSharer thread did ' | |
103 'not stop when asked') | |
104 self._listener.close() | |
105 self._thread = None | |
106 self._address = None | |
107 self._listener = None | |
108 for key, (send, close) in self._cache.items(): | |
109 close() | |
110 self._cache.clear() | |
111 | |
112 def _afterfork(self): | |
113 for key, (send, close) in self._cache.items(): | |
114 close() | |
115 self._cache.clear() | |
116 # If self._lock was locked at the time of the fork, it may be broken | |
117 # -- see issue 6721. Replace it without letting it be gc'ed. | |
118 self._old_locks.append(self._lock) | |
119 self._lock = threading.Lock() | |
120 if self._listener is not None: | |
121 self._listener.close() | |
122 self._listener = None | |
123 self._address = None | |
124 self._thread = None | |
125 | |
126 def _start(self): | |
127 from .connection import Listener | |
128 assert self._listener is None, "Already have Listener" | |
129 util.debug('starting listener and thread for sending handles') | |
130 self._listener = Listener(authkey=process.current_process().authkey) | |
131 self._address = self._listener.address | |
132 t = threading.Thread(target=self._serve) | |
133 t.daemon = True | |
134 t.start() | |
135 self._thread = t | |
136 | |
137 def _serve(self): | |
138 if hasattr(signal, 'pthread_sigmask'): | |
139 signal.pthread_sigmask(signal.SIG_BLOCK, signal.valid_signals()) | |
140 while 1: | |
141 try: | |
142 with self._listener.accept() as conn: | |
143 msg = conn.recv() | |
144 if msg is None: | |
145 break | |
146 key, destination_pid = msg | |
147 send, close = self._cache.pop(key) | |
148 try: | |
149 send(conn, destination_pid) | |
150 finally: | |
151 close() | |
152 except: | |
153 if not util.is_exiting(): | |
154 sys.excepthook(*sys.exc_info()) | |
155 | |
156 | |
157 _resource_sharer = _ResourceSharer() | |
158 stop = _resource_sharer.stop |