annotate CSP2/CSP2_env/env-d9b9114564458d9d-741b3de822f2aaca6c6caa4325c4afce/lib/python3.8/multiprocessing/resource_sharer.py @ 68:5028fdace37b

planemo upload commit 2e9511a184a1ca667c7be0c6321a36dc4e3d116d
author jpayne
date Tue, 18 Mar 2025 16:23:26 -0400
parents
children
rev   line source
jpayne@68 1 #
jpayne@68 2 # We use a background thread for sharing fds on Unix, and for sharing sockets on
jpayne@68 3 # Windows.
jpayne@68 4 #
jpayne@68 5 # A client which wants to pickle a resource registers it with the resource
jpayne@68 6 # sharer and gets an identifier in return. The unpickling process will connect
jpayne@68 7 # to the resource sharer, sends the identifier and its pid, and then receives
jpayne@68 8 # the resource.
jpayne@68 9 #
jpayne@68 10
jpayne@68 11 import os
jpayne@68 12 import signal
jpayne@68 13 import socket
jpayne@68 14 import sys
jpayne@68 15 import threading
jpayne@68 16
jpayne@68 17 from . import process
jpayne@68 18 from .context import reduction
jpayne@68 19 from . import util
jpayne@68 20
jpayne@68 21 __all__ = ['stop']
jpayne@68 22
jpayne@68 23
jpayne@68 24 if sys.platform == 'win32':
jpayne@68 25 __all__ += ['DupSocket']
jpayne@68 26
jpayne@68 27 class DupSocket(object):
jpayne@68 28 '''Picklable wrapper for a socket.'''
jpayne@68 29 def __init__(self, sock):
jpayne@68 30 new_sock = sock.dup()
jpayne@68 31 def send(conn, pid):
jpayne@68 32 share = new_sock.share(pid)
jpayne@68 33 conn.send_bytes(share)
jpayne@68 34 self._id = _resource_sharer.register(send, new_sock.close)
jpayne@68 35
jpayne@68 36 def detach(self):
jpayne@68 37 '''Get the socket. This should only be called once.'''
jpayne@68 38 with _resource_sharer.get_connection(self._id) as conn:
jpayne@68 39 share = conn.recv_bytes()
jpayne@68 40 return socket.fromshare(share)
jpayne@68 41
jpayne@68 42 else:
jpayne@68 43 __all__ += ['DupFd']
jpayne@68 44
jpayne@68 45 class DupFd(object):
jpayne@68 46 '''Wrapper for fd which can be used at any time.'''
jpayne@68 47 def __init__(self, fd):
jpayne@68 48 new_fd = os.dup(fd)
jpayne@68 49 def send(conn, pid):
jpayne@68 50 reduction.send_handle(conn, new_fd, pid)
jpayne@68 51 def close():
jpayne@68 52 os.close(new_fd)
jpayne@68 53 self._id = _resource_sharer.register(send, close)
jpayne@68 54
jpayne@68 55 def detach(self):
jpayne@68 56 '''Get the fd. This should only be called once.'''
jpayne@68 57 with _resource_sharer.get_connection(self._id) as conn:
jpayne@68 58 return reduction.recv_handle(conn)
jpayne@68 59
jpayne@68 60
jpayne@68 61 class _ResourceSharer(object):
jpayne@68 62 '''Manager for resources using background thread.'''
jpayne@68 63 def __init__(self):
jpayne@68 64 self._key = 0
jpayne@68 65 self._cache = {}
jpayne@68 66 self._old_locks = []
jpayne@68 67 self._lock = threading.Lock()
jpayne@68 68 self._listener = None
jpayne@68 69 self._address = None
jpayne@68 70 self._thread = None
jpayne@68 71 util.register_after_fork(self, _ResourceSharer._afterfork)
jpayne@68 72
jpayne@68 73 def register(self, send, close):
jpayne@68 74 '''Register resource, returning an identifier.'''
jpayne@68 75 with self._lock:
jpayne@68 76 if self._address is None:
jpayne@68 77 self._start()
jpayne@68 78 self._key += 1
jpayne@68 79 self._cache[self._key] = (send, close)
jpayne@68 80 return (self._address, self._key)
jpayne@68 81
jpayne@68 82 @staticmethod
jpayne@68 83 def get_connection(ident):
jpayne@68 84 '''Return connection from which to receive identified resource.'''
jpayne@68 85 from .connection import Client
jpayne@68 86 address, key = ident
jpayne@68 87 c = Client(address, authkey=process.current_process().authkey)
jpayne@68 88 c.send((key, os.getpid()))
jpayne@68 89 return c
jpayne@68 90
jpayne@68 91 def stop(self, timeout=None):
jpayne@68 92 '''Stop the background thread and clear registered resources.'''
jpayne@68 93 from .connection import Client
jpayne@68 94 with self._lock:
jpayne@68 95 if self._address is not None:
jpayne@68 96 c = Client(self._address,
jpayne@68 97 authkey=process.current_process().authkey)
jpayne@68 98 c.send(None)
jpayne@68 99 c.close()
jpayne@68 100 self._thread.join(timeout)
jpayne@68 101 if self._thread.is_alive():
jpayne@68 102 util.sub_warning('_ResourceSharer thread did '
jpayne@68 103 'not stop when asked')
jpayne@68 104 self._listener.close()
jpayne@68 105 self._thread = None
jpayne@68 106 self._address = None
jpayne@68 107 self._listener = None
jpayne@68 108 for key, (send, close) in self._cache.items():
jpayne@68 109 close()
jpayne@68 110 self._cache.clear()
jpayne@68 111
jpayne@68 112 def _afterfork(self):
jpayne@68 113 for key, (send, close) in self._cache.items():
jpayne@68 114 close()
jpayne@68 115 self._cache.clear()
jpayne@68 116 # If self._lock was locked at the time of the fork, it may be broken
jpayne@68 117 # -- see issue 6721. Replace it without letting it be gc'ed.
jpayne@68 118 self._old_locks.append(self._lock)
jpayne@68 119 self._lock = threading.Lock()
jpayne@68 120 if self._listener is not None:
jpayne@68 121 self._listener.close()
jpayne@68 122 self._listener = None
jpayne@68 123 self._address = None
jpayne@68 124 self._thread = None
jpayne@68 125
jpayne@68 126 def _start(self):
jpayne@68 127 from .connection import Listener
jpayne@68 128 assert self._listener is None, "Already have Listener"
jpayne@68 129 util.debug('starting listener and thread for sending handles')
jpayne@68 130 self._listener = Listener(authkey=process.current_process().authkey)
jpayne@68 131 self._address = self._listener.address
jpayne@68 132 t = threading.Thread(target=self._serve)
jpayne@68 133 t.daemon = True
jpayne@68 134 t.start()
jpayne@68 135 self._thread = t
jpayne@68 136
jpayne@68 137 def _serve(self):
jpayne@68 138 if hasattr(signal, 'pthread_sigmask'):
jpayne@68 139 signal.pthread_sigmask(signal.SIG_BLOCK, signal.valid_signals())
jpayne@68 140 while 1:
jpayne@68 141 try:
jpayne@68 142 with self._listener.accept() as conn:
jpayne@68 143 msg = conn.recv()
jpayne@68 144 if msg is None:
jpayne@68 145 break
jpayne@68 146 key, destination_pid = msg
jpayne@68 147 send, close = self._cache.pop(key)
jpayne@68 148 try:
jpayne@68 149 send(conn, destination_pid)
jpayne@68 150 finally:
jpayne@68 151 close()
jpayne@68 152 except:
jpayne@68 153 if not util.is_exiting():
jpayne@68 154 sys.excepthook(*sys.exc_info())
jpayne@68 155
jpayne@68 156
jpayne@68 157 _resource_sharer = _ResourceSharer()
jpayne@68 158 stop = _resource_sharer.stop