jpayne@69
|
1 """RPC Implementation, originally written for the Python Idle IDE
|
jpayne@69
|
2
|
jpayne@69
|
3 For security reasons, GvR requested that Idle's Python execution server process
|
jpayne@69
|
4 connect to the Idle process, which listens for the connection. Since Idle has
|
jpayne@69
|
5 only one client per server, this was not a limitation.
|
jpayne@69
|
6
|
jpayne@69
|
7 +---------------------------------+ +-------------+
|
jpayne@69
|
8 | socketserver.BaseRequestHandler | | SocketIO |
|
jpayne@69
|
9 +---------------------------------+ +-------------+
|
jpayne@69
|
10 ^ | register() |
|
jpayne@69
|
11 | | unregister()|
|
jpayne@69
|
12 | +-------------+
|
jpayne@69
|
13 | ^ ^
|
jpayne@69
|
14 | | |
|
jpayne@69
|
15 | + -------------------+ |
|
jpayne@69
|
16 | | |
|
jpayne@69
|
17 +-------------------------+ +-----------------+
|
jpayne@69
|
18 | RPCHandler | | RPCClient |
|
jpayne@69
|
19 | [attribute of RPCServer]| | |
|
jpayne@69
|
20 +-------------------------+ +-----------------+
|
jpayne@69
|
21
|
jpayne@69
|
22 The RPCServer handler class is expected to provide register/unregister methods.
|
jpayne@69
|
23 RPCHandler inherits the mix-in class SocketIO, which provides these methods.
|
jpayne@69
|
24
|
jpayne@69
|
25 See the Idle run.main() docstring for further information on how this was
|
jpayne@69
|
26 accomplished in Idle.
|
jpayne@69
|
27
|
jpayne@69
|
28 """
|
jpayne@69
|
29 import builtins
|
jpayne@69
|
30 import copyreg
|
jpayne@69
|
31 import io
|
jpayne@69
|
32 import marshal
|
jpayne@69
|
33 import os
|
jpayne@69
|
34 import pickle
|
jpayne@69
|
35 import queue
|
jpayne@69
|
36 import select
|
jpayne@69
|
37 import socket
|
jpayne@69
|
38 import socketserver
|
jpayne@69
|
39 import struct
|
jpayne@69
|
40 import sys
|
jpayne@69
|
41 import threading
|
jpayne@69
|
42 import traceback
|
jpayne@69
|
43 import types
|
jpayne@69
|
44
|
jpayne@69
|
45 def unpickle_code(ms):
|
jpayne@69
|
46 "Return code object from marshal string ms."
|
jpayne@69
|
47 co = marshal.loads(ms)
|
jpayne@69
|
48 assert isinstance(co, types.CodeType)
|
jpayne@69
|
49 return co
|
jpayne@69
|
50
|
jpayne@69
|
51 def pickle_code(co):
|
jpayne@69
|
52 "Return unpickle function and tuple with marshalled co code object."
|
jpayne@69
|
53 assert isinstance(co, types.CodeType)
|
jpayne@69
|
54 ms = marshal.dumps(co)
|
jpayne@69
|
55 return unpickle_code, (ms,)
|
jpayne@69
|
56
|
jpayne@69
|
57 def dumps(obj, protocol=None):
|
jpayne@69
|
58 "Return pickled (or marshalled) string for obj."
|
jpayne@69
|
59 # IDLE passes 'None' to select pickle.DEFAULT_PROTOCOL.
|
jpayne@69
|
60 f = io.BytesIO()
|
jpayne@69
|
61 p = CodePickler(f, protocol)
|
jpayne@69
|
62 p.dump(obj)
|
jpayne@69
|
63 return f.getvalue()
|
jpayne@69
|
64
|
jpayne@69
|
65
|
jpayne@69
|
66 class CodePickler(pickle.Pickler):
|
jpayne@69
|
67 dispatch_table = {types.CodeType: pickle_code, **copyreg.dispatch_table}
|
jpayne@69
|
68
|
jpayne@69
|
69
|
jpayne@69
|
70 BUFSIZE = 8*1024
|
jpayne@69
|
71 LOCALHOST = '127.0.0.1'
|
jpayne@69
|
72
|
jpayne@69
|
73 class RPCServer(socketserver.TCPServer):
|
jpayne@69
|
74
|
jpayne@69
|
75 def __init__(self, addr, handlerclass=None):
|
jpayne@69
|
76 if handlerclass is None:
|
jpayne@69
|
77 handlerclass = RPCHandler
|
jpayne@69
|
78 socketserver.TCPServer.__init__(self, addr, handlerclass)
|
jpayne@69
|
79
|
jpayne@69
|
80 def server_bind(self):
|
jpayne@69
|
81 "Override TCPServer method, no bind() phase for connecting entity"
|
jpayne@69
|
82 pass
|
jpayne@69
|
83
|
jpayne@69
|
84 def server_activate(self):
|
jpayne@69
|
85 """Override TCPServer method, connect() instead of listen()
|
jpayne@69
|
86
|
jpayne@69
|
87 Due to the reversed connection, self.server_address is actually the
|
jpayne@69
|
88 address of the Idle Client to which we are connecting.
|
jpayne@69
|
89
|
jpayne@69
|
90 """
|
jpayne@69
|
91 self.socket.connect(self.server_address)
|
jpayne@69
|
92
|
jpayne@69
|
93 def get_request(self):
|
jpayne@69
|
94 "Override TCPServer method, return already connected socket"
|
jpayne@69
|
95 return self.socket, self.server_address
|
jpayne@69
|
96
|
jpayne@69
|
97 def handle_error(self, request, client_address):
|
jpayne@69
|
98 """Override TCPServer method
|
jpayne@69
|
99
|
jpayne@69
|
100 Error message goes to __stderr__. No error message if exiting
|
jpayne@69
|
101 normally or socket raised EOF. Other exceptions not handled in
|
jpayne@69
|
102 server code will cause os._exit.
|
jpayne@69
|
103
|
jpayne@69
|
104 """
|
jpayne@69
|
105 try:
|
jpayne@69
|
106 raise
|
jpayne@69
|
107 except SystemExit:
|
jpayne@69
|
108 raise
|
jpayne@69
|
109 except:
|
jpayne@69
|
110 erf = sys.__stderr__
|
jpayne@69
|
111 print('\n' + '-'*40, file=erf)
|
jpayne@69
|
112 print('Unhandled server exception!', file=erf)
|
jpayne@69
|
113 print('Thread: %s' % threading.current_thread().name, file=erf)
|
jpayne@69
|
114 print('Client Address: ', client_address, file=erf)
|
jpayne@69
|
115 print('Request: ', repr(request), file=erf)
|
jpayne@69
|
116 traceback.print_exc(file=erf)
|
jpayne@69
|
117 print('\n*** Unrecoverable, server exiting!', file=erf)
|
jpayne@69
|
118 print('-'*40, file=erf)
|
jpayne@69
|
119 os._exit(0)
|
jpayne@69
|
120
|
jpayne@69
|
121 #----------------- end class RPCServer --------------------
|
jpayne@69
|
122
|
jpayne@69
|
123 objecttable = {}
|
jpayne@69
|
124 request_queue = queue.Queue(0)
|
jpayne@69
|
125 response_queue = queue.Queue(0)
|
jpayne@69
|
126
|
jpayne@69
|
127
|
jpayne@69
|
128 class SocketIO(object):
|
jpayne@69
|
129
|
jpayne@69
|
130 nextseq = 0
|
jpayne@69
|
131
|
jpayne@69
|
132 def __init__(self, sock, objtable=None, debugging=None):
|
jpayne@69
|
133 self.sockthread = threading.current_thread()
|
jpayne@69
|
134 if debugging is not None:
|
jpayne@69
|
135 self.debugging = debugging
|
jpayne@69
|
136 self.sock = sock
|
jpayne@69
|
137 if objtable is None:
|
jpayne@69
|
138 objtable = objecttable
|
jpayne@69
|
139 self.objtable = objtable
|
jpayne@69
|
140 self.responses = {}
|
jpayne@69
|
141 self.cvars = {}
|
jpayne@69
|
142
|
jpayne@69
|
143 def close(self):
|
jpayne@69
|
144 sock = self.sock
|
jpayne@69
|
145 self.sock = None
|
jpayne@69
|
146 if sock is not None:
|
jpayne@69
|
147 sock.close()
|
jpayne@69
|
148
|
jpayne@69
|
149 def exithook(self):
|
jpayne@69
|
150 "override for specific exit action"
|
jpayne@69
|
151 os._exit(0)
|
jpayne@69
|
152
|
jpayne@69
|
153 def debug(self, *args):
|
jpayne@69
|
154 if not self.debugging:
|
jpayne@69
|
155 return
|
jpayne@69
|
156 s = self.location + " " + str(threading.current_thread().name)
|
jpayne@69
|
157 for a in args:
|
jpayne@69
|
158 s = s + " " + str(a)
|
jpayne@69
|
159 print(s, file=sys.__stderr__)
|
jpayne@69
|
160
|
jpayne@69
|
161 def register(self, oid, object):
|
jpayne@69
|
162 self.objtable[oid] = object
|
jpayne@69
|
163
|
jpayne@69
|
164 def unregister(self, oid):
|
jpayne@69
|
165 try:
|
jpayne@69
|
166 del self.objtable[oid]
|
jpayne@69
|
167 except KeyError:
|
jpayne@69
|
168 pass
|
jpayne@69
|
169
|
jpayne@69
|
170 def localcall(self, seq, request):
|
jpayne@69
|
171 self.debug("localcall:", request)
|
jpayne@69
|
172 try:
|
jpayne@69
|
173 how, (oid, methodname, args, kwargs) = request
|
jpayne@69
|
174 except TypeError:
|
jpayne@69
|
175 return ("ERROR", "Bad request format")
|
jpayne@69
|
176 if oid not in self.objtable:
|
jpayne@69
|
177 return ("ERROR", "Unknown object id: %r" % (oid,))
|
jpayne@69
|
178 obj = self.objtable[oid]
|
jpayne@69
|
179 if methodname == "__methods__":
|
jpayne@69
|
180 methods = {}
|
jpayne@69
|
181 _getmethods(obj, methods)
|
jpayne@69
|
182 return ("OK", methods)
|
jpayne@69
|
183 if methodname == "__attributes__":
|
jpayne@69
|
184 attributes = {}
|
jpayne@69
|
185 _getattributes(obj, attributes)
|
jpayne@69
|
186 return ("OK", attributes)
|
jpayne@69
|
187 if not hasattr(obj, methodname):
|
jpayne@69
|
188 return ("ERROR", "Unsupported method name: %r" % (methodname,))
|
jpayne@69
|
189 method = getattr(obj, methodname)
|
jpayne@69
|
190 try:
|
jpayne@69
|
191 if how == 'CALL':
|
jpayne@69
|
192 ret = method(*args, **kwargs)
|
jpayne@69
|
193 if isinstance(ret, RemoteObject):
|
jpayne@69
|
194 ret = remoteref(ret)
|
jpayne@69
|
195 return ("OK", ret)
|
jpayne@69
|
196 elif how == 'QUEUE':
|
jpayne@69
|
197 request_queue.put((seq, (method, args, kwargs)))
|
jpayne@69
|
198 return("QUEUED", None)
|
jpayne@69
|
199 else:
|
jpayne@69
|
200 return ("ERROR", "Unsupported message type: %s" % how)
|
jpayne@69
|
201 except SystemExit:
|
jpayne@69
|
202 raise
|
jpayne@69
|
203 except KeyboardInterrupt:
|
jpayne@69
|
204 raise
|
jpayne@69
|
205 except OSError:
|
jpayne@69
|
206 raise
|
jpayne@69
|
207 except Exception as ex:
|
jpayne@69
|
208 return ("CALLEXC", ex)
|
jpayne@69
|
209 except:
|
jpayne@69
|
210 msg = "*** Internal Error: rpc.py:SocketIO.localcall()\n\n"\
|
jpayne@69
|
211 " Object: %s \n Method: %s \n Args: %s\n"
|
jpayne@69
|
212 print(msg % (oid, method, args), file=sys.__stderr__)
|
jpayne@69
|
213 traceback.print_exc(file=sys.__stderr__)
|
jpayne@69
|
214 return ("EXCEPTION", None)
|
jpayne@69
|
215
|
jpayne@69
|
216 def remotecall(self, oid, methodname, args, kwargs):
|
jpayne@69
|
217 self.debug("remotecall:asynccall: ", oid, methodname)
|
jpayne@69
|
218 seq = self.asynccall(oid, methodname, args, kwargs)
|
jpayne@69
|
219 return self.asyncreturn(seq)
|
jpayne@69
|
220
|
jpayne@69
|
221 def remotequeue(self, oid, methodname, args, kwargs):
|
jpayne@69
|
222 self.debug("remotequeue:asyncqueue: ", oid, methodname)
|
jpayne@69
|
223 seq = self.asyncqueue(oid, methodname, args, kwargs)
|
jpayne@69
|
224 return self.asyncreturn(seq)
|
jpayne@69
|
225
|
jpayne@69
|
226 def asynccall(self, oid, methodname, args, kwargs):
|
jpayne@69
|
227 request = ("CALL", (oid, methodname, args, kwargs))
|
jpayne@69
|
228 seq = self.newseq()
|
jpayne@69
|
229 if threading.current_thread() != self.sockthread:
|
jpayne@69
|
230 cvar = threading.Condition()
|
jpayne@69
|
231 self.cvars[seq] = cvar
|
jpayne@69
|
232 self.debug(("asynccall:%d:" % seq), oid, methodname, args, kwargs)
|
jpayne@69
|
233 self.putmessage((seq, request))
|
jpayne@69
|
234 return seq
|
jpayne@69
|
235
|
jpayne@69
|
236 def asyncqueue(self, oid, methodname, args, kwargs):
|
jpayne@69
|
237 request = ("QUEUE", (oid, methodname, args, kwargs))
|
jpayne@69
|
238 seq = self.newseq()
|
jpayne@69
|
239 if threading.current_thread() != self.sockthread:
|
jpayne@69
|
240 cvar = threading.Condition()
|
jpayne@69
|
241 self.cvars[seq] = cvar
|
jpayne@69
|
242 self.debug(("asyncqueue:%d:" % seq), oid, methodname, args, kwargs)
|
jpayne@69
|
243 self.putmessage((seq, request))
|
jpayne@69
|
244 return seq
|
jpayne@69
|
245
|
jpayne@69
|
246 def asyncreturn(self, seq):
|
jpayne@69
|
247 self.debug("asyncreturn:%d:call getresponse(): " % seq)
|
jpayne@69
|
248 response = self.getresponse(seq, wait=0.05)
|
jpayne@69
|
249 self.debug(("asyncreturn:%d:response: " % seq), response)
|
jpayne@69
|
250 return self.decoderesponse(response)
|
jpayne@69
|
251
|
jpayne@69
|
252 def decoderesponse(self, response):
|
jpayne@69
|
253 how, what = response
|
jpayne@69
|
254 if how == "OK":
|
jpayne@69
|
255 return what
|
jpayne@69
|
256 if how == "QUEUED":
|
jpayne@69
|
257 return None
|
jpayne@69
|
258 if how == "EXCEPTION":
|
jpayne@69
|
259 self.debug("decoderesponse: EXCEPTION")
|
jpayne@69
|
260 return None
|
jpayne@69
|
261 if how == "EOF":
|
jpayne@69
|
262 self.debug("decoderesponse: EOF")
|
jpayne@69
|
263 self.decode_interrupthook()
|
jpayne@69
|
264 return None
|
jpayne@69
|
265 if how == "ERROR":
|
jpayne@69
|
266 self.debug("decoderesponse: Internal ERROR:", what)
|
jpayne@69
|
267 raise RuntimeError(what)
|
jpayne@69
|
268 if how == "CALLEXC":
|
jpayne@69
|
269 self.debug("decoderesponse: Call Exception:", what)
|
jpayne@69
|
270 raise what
|
jpayne@69
|
271 raise SystemError(how, what)
|
jpayne@69
|
272
|
jpayne@69
|
273 def decode_interrupthook(self):
|
jpayne@69
|
274 ""
|
jpayne@69
|
275 raise EOFError
|
jpayne@69
|
276
|
jpayne@69
|
277 def mainloop(self):
|
jpayne@69
|
278 """Listen on socket until I/O not ready or EOF
|
jpayne@69
|
279
|
jpayne@69
|
280 pollresponse() will loop looking for seq number None, which
|
jpayne@69
|
281 never comes, and exit on EOFError.
|
jpayne@69
|
282
|
jpayne@69
|
283 """
|
jpayne@69
|
284 try:
|
jpayne@69
|
285 self.getresponse(myseq=None, wait=0.05)
|
jpayne@69
|
286 except EOFError:
|
jpayne@69
|
287 self.debug("mainloop:return")
|
jpayne@69
|
288 return
|
jpayne@69
|
289
|
jpayne@69
|
290 def getresponse(self, myseq, wait):
|
jpayne@69
|
291 response = self._getresponse(myseq, wait)
|
jpayne@69
|
292 if response is not None:
|
jpayne@69
|
293 how, what = response
|
jpayne@69
|
294 if how == "OK":
|
jpayne@69
|
295 response = how, self._proxify(what)
|
jpayne@69
|
296 return response
|
jpayne@69
|
297
|
jpayne@69
|
298 def _proxify(self, obj):
|
jpayne@69
|
299 if isinstance(obj, RemoteProxy):
|
jpayne@69
|
300 return RPCProxy(self, obj.oid)
|
jpayne@69
|
301 if isinstance(obj, list):
|
jpayne@69
|
302 return list(map(self._proxify, obj))
|
jpayne@69
|
303 # XXX Check for other types -- not currently needed
|
jpayne@69
|
304 return obj
|
jpayne@69
|
305
|
jpayne@69
|
306 def _getresponse(self, myseq, wait):
|
jpayne@69
|
307 self.debug("_getresponse:myseq:", myseq)
|
jpayne@69
|
308 if threading.current_thread() is self.sockthread:
|
jpayne@69
|
309 # this thread does all reading of requests or responses
|
jpayne@69
|
310 while 1:
|
jpayne@69
|
311 response = self.pollresponse(myseq, wait)
|
jpayne@69
|
312 if response is not None:
|
jpayne@69
|
313 return response
|
jpayne@69
|
314 else:
|
jpayne@69
|
315 # wait for notification from socket handling thread
|
jpayne@69
|
316 cvar = self.cvars[myseq]
|
jpayne@69
|
317 cvar.acquire()
|
jpayne@69
|
318 while myseq not in self.responses:
|
jpayne@69
|
319 cvar.wait()
|
jpayne@69
|
320 response = self.responses[myseq]
|
jpayne@69
|
321 self.debug("_getresponse:%s: thread woke up: response: %s" %
|
jpayne@69
|
322 (myseq, response))
|
jpayne@69
|
323 del self.responses[myseq]
|
jpayne@69
|
324 del self.cvars[myseq]
|
jpayne@69
|
325 cvar.release()
|
jpayne@69
|
326 return response
|
jpayne@69
|
327
|
jpayne@69
|
328 def newseq(self):
|
jpayne@69
|
329 self.nextseq = seq = self.nextseq + 2
|
jpayne@69
|
330 return seq
|
jpayne@69
|
331
|
jpayne@69
|
332 def putmessage(self, message):
|
jpayne@69
|
333 self.debug("putmessage:%d:" % message[0])
|
jpayne@69
|
334 try:
|
jpayne@69
|
335 s = dumps(message)
|
jpayne@69
|
336 except pickle.PicklingError:
|
jpayne@69
|
337 print("Cannot pickle:", repr(message), file=sys.__stderr__)
|
jpayne@69
|
338 raise
|
jpayne@69
|
339 s = struct.pack("<i", len(s)) + s
|
jpayne@69
|
340 while len(s) > 0:
|
jpayne@69
|
341 try:
|
jpayne@69
|
342 r, w, x = select.select([], [self.sock], [])
|
jpayne@69
|
343 n = self.sock.send(s[:BUFSIZE])
|
jpayne@69
|
344 except (AttributeError, TypeError):
|
jpayne@69
|
345 raise OSError("socket no longer exists")
|
jpayne@69
|
346 s = s[n:]
|
jpayne@69
|
347
|
jpayne@69
|
348 buff = b''
|
jpayne@69
|
349 bufneed = 4
|
jpayne@69
|
350 bufstate = 0 # meaning: 0 => reading count; 1 => reading data
|
jpayne@69
|
351
|
jpayne@69
|
352 def pollpacket(self, wait):
|
jpayne@69
|
353 self._stage0()
|
jpayne@69
|
354 if len(self.buff) < self.bufneed:
|
jpayne@69
|
355 r, w, x = select.select([self.sock.fileno()], [], [], wait)
|
jpayne@69
|
356 if len(r) == 0:
|
jpayne@69
|
357 return None
|
jpayne@69
|
358 try:
|
jpayne@69
|
359 s = self.sock.recv(BUFSIZE)
|
jpayne@69
|
360 except OSError:
|
jpayne@69
|
361 raise EOFError
|
jpayne@69
|
362 if len(s) == 0:
|
jpayne@69
|
363 raise EOFError
|
jpayne@69
|
364 self.buff += s
|
jpayne@69
|
365 self._stage0()
|
jpayne@69
|
366 return self._stage1()
|
jpayne@69
|
367
|
jpayne@69
|
368 def _stage0(self):
|
jpayne@69
|
369 if self.bufstate == 0 and len(self.buff) >= 4:
|
jpayne@69
|
370 s = self.buff[:4]
|
jpayne@69
|
371 self.buff = self.buff[4:]
|
jpayne@69
|
372 self.bufneed = struct.unpack("<i", s)[0]
|
jpayne@69
|
373 self.bufstate = 1
|
jpayne@69
|
374
|
jpayne@69
|
375 def _stage1(self):
|
jpayne@69
|
376 if self.bufstate == 1 and len(self.buff) >= self.bufneed:
|
jpayne@69
|
377 packet = self.buff[:self.bufneed]
|
jpayne@69
|
378 self.buff = self.buff[self.bufneed:]
|
jpayne@69
|
379 self.bufneed = 4
|
jpayne@69
|
380 self.bufstate = 0
|
jpayne@69
|
381 return packet
|
jpayne@69
|
382
|
jpayne@69
|
383 def pollmessage(self, wait):
|
jpayne@69
|
384 packet = self.pollpacket(wait)
|
jpayne@69
|
385 if packet is None:
|
jpayne@69
|
386 return None
|
jpayne@69
|
387 try:
|
jpayne@69
|
388 message = pickle.loads(packet)
|
jpayne@69
|
389 except pickle.UnpicklingError:
|
jpayne@69
|
390 print("-----------------------", file=sys.__stderr__)
|
jpayne@69
|
391 print("cannot unpickle packet:", repr(packet), file=sys.__stderr__)
|
jpayne@69
|
392 traceback.print_stack(file=sys.__stderr__)
|
jpayne@69
|
393 print("-----------------------", file=sys.__stderr__)
|
jpayne@69
|
394 raise
|
jpayne@69
|
395 return message
|
jpayne@69
|
396
|
jpayne@69
|
397 def pollresponse(self, myseq, wait):
|
jpayne@69
|
398 """Handle messages received on the socket.
|
jpayne@69
|
399
|
jpayne@69
|
400 Some messages received may be asynchronous 'call' or 'queue' requests,
|
jpayne@69
|
401 and some may be responses for other threads.
|
jpayne@69
|
402
|
jpayne@69
|
403 'call' requests are passed to self.localcall() with the expectation of
|
jpayne@69
|
404 immediate execution, during which time the socket is not serviced.
|
jpayne@69
|
405
|
jpayne@69
|
406 'queue' requests are used for tasks (which may block or hang) to be
|
jpayne@69
|
407 processed in a different thread. These requests are fed into
|
jpayne@69
|
408 request_queue by self.localcall(). Responses to queued requests are
|
jpayne@69
|
409 taken from response_queue and sent across the link with the associated
|
jpayne@69
|
410 sequence numbers. Messages in the queues are (sequence_number,
|
jpayne@69
|
411 request/response) tuples and code using this module removing messages
|
jpayne@69
|
412 from the request_queue is responsible for returning the correct
|
jpayne@69
|
413 sequence number in the response_queue.
|
jpayne@69
|
414
|
jpayne@69
|
415 pollresponse() will loop until a response message with the myseq
|
jpayne@69
|
416 sequence number is received, and will save other responses in
|
jpayne@69
|
417 self.responses and notify the owning thread.
|
jpayne@69
|
418
|
jpayne@69
|
419 """
|
jpayne@69
|
420 while 1:
|
jpayne@69
|
421 # send queued response if there is one available
|
jpayne@69
|
422 try:
|
jpayne@69
|
423 qmsg = response_queue.get(0)
|
jpayne@69
|
424 except queue.Empty:
|
jpayne@69
|
425 pass
|
jpayne@69
|
426 else:
|
jpayne@69
|
427 seq, response = qmsg
|
jpayne@69
|
428 message = (seq, ('OK', response))
|
jpayne@69
|
429 self.putmessage(message)
|
jpayne@69
|
430 # poll for message on link
|
jpayne@69
|
431 try:
|
jpayne@69
|
432 message = self.pollmessage(wait)
|
jpayne@69
|
433 if message is None: # socket not ready
|
jpayne@69
|
434 return None
|
jpayne@69
|
435 except EOFError:
|
jpayne@69
|
436 self.handle_EOF()
|
jpayne@69
|
437 return None
|
jpayne@69
|
438 except AttributeError:
|
jpayne@69
|
439 return None
|
jpayne@69
|
440 seq, resq = message
|
jpayne@69
|
441 how = resq[0]
|
jpayne@69
|
442 self.debug("pollresponse:%d:myseq:%s" % (seq, myseq))
|
jpayne@69
|
443 # process or queue a request
|
jpayne@69
|
444 if how in ("CALL", "QUEUE"):
|
jpayne@69
|
445 self.debug("pollresponse:%d:localcall:call:" % seq)
|
jpayne@69
|
446 response = self.localcall(seq, resq)
|
jpayne@69
|
447 self.debug("pollresponse:%d:localcall:response:%s"
|
jpayne@69
|
448 % (seq, response))
|
jpayne@69
|
449 if how == "CALL":
|
jpayne@69
|
450 self.putmessage((seq, response))
|
jpayne@69
|
451 elif how == "QUEUE":
|
jpayne@69
|
452 # don't acknowledge the 'queue' request!
|
jpayne@69
|
453 pass
|
jpayne@69
|
454 continue
|
jpayne@69
|
455 # return if completed message transaction
|
jpayne@69
|
456 elif seq == myseq:
|
jpayne@69
|
457 return resq
|
jpayne@69
|
458 # must be a response for a different thread:
|
jpayne@69
|
459 else:
|
jpayne@69
|
460 cv = self.cvars.get(seq, None)
|
jpayne@69
|
461 # response involving unknown sequence number is discarded,
|
jpayne@69
|
462 # probably intended for prior incarnation of server
|
jpayne@69
|
463 if cv is not None:
|
jpayne@69
|
464 cv.acquire()
|
jpayne@69
|
465 self.responses[seq] = resq
|
jpayne@69
|
466 cv.notify()
|
jpayne@69
|
467 cv.release()
|
jpayne@69
|
468 continue
|
jpayne@69
|
469
|
jpayne@69
|
470 def handle_EOF(self):
|
jpayne@69
|
471 "action taken upon link being closed by peer"
|
jpayne@69
|
472 self.EOFhook()
|
jpayne@69
|
473 self.debug("handle_EOF")
|
jpayne@69
|
474 for key in self.cvars:
|
jpayne@69
|
475 cv = self.cvars[key]
|
jpayne@69
|
476 cv.acquire()
|
jpayne@69
|
477 self.responses[key] = ('EOF', None)
|
jpayne@69
|
478 cv.notify()
|
jpayne@69
|
479 cv.release()
|
jpayne@69
|
480 # call our (possibly overridden) exit function
|
jpayne@69
|
481 self.exithook()
|
jpayne@69
|
482
|
jpayne@69
|
483 def EOFhook(self):
|
jpayne@69
|
484 "Classes using rpc client/server can override to augment EOF action"
|
jpayne@69
|
485 pass
|
jpayne@69
|
486
|
jpayne@69
|
487 #----------------- end class SocketIO --------------------
|
jpayne@69
|
488
|
jpayne@69
|
489 class RemoteObject(object):
|
jpayne@69
|
490 # Token mix-in class
|
jpayne@69
|
491 pass
|
jpayne@69
|
492
|
jpayne@69
|
493
|
jpayne@69
|
494 def remoteref(obj):
|
jpayne@69
|
495 oid = id(obj)
|
jpayne@69
|
496 objecttable[oid] = obj
|
jpayne@69
|
497 return RemoteProxy(oid)
|
jpayne@69
|
498
|
jpayne@69
|
499
|
jpayne@69
|
500 class RemoteProxy(object):
|
jpayne@69
|
501
|
jpayne@69
|
502 def __init__(self, oid):
|
jpayne@69
|
503 self.oid = oid
|
jpayne@69
|
504
|
jpayne@69
|
505
|
jpayne@69
|
506 class RPCHandler(socketserver.BaseRequestHandler, SocketIO):
|
jpayne@69
|
507
|
jpayne@69
|
508 debugging = False
|
jpayne@69
|
509 location = "#S" # Server
|
jpayne@69
|
510
|
jpayne@69
|
511 def __init__(self, sock, addr, svr):
|
jpayne@69
|
512 svr.current_handler = self ## cgt xxx
|
jpayne@69
|
513 SocketIO.__init__(self, sock)
|
jpayne@69
|
514 socketserver.BaseRequestHandler.__init__(self, sock, addr, svr)
|
jpayne@69
|
515
|
jpayne@69
|
516 def handle(self):
|
jpayne@69
|
517 "handle() method required by socketserver"
|
jpayne@69
|
518 self.mainloop()
|
jpayne@69
|
519
|
jpayne@69
|
520 def get_remote_proxy(self, oid):
|
jpayne@69
|
521 return RPCProxy(self, oid)
|
jpayne@69
|
522
|
jpayne@69
|
523
|
jpayne@69
|
524 class RPCClient(SocketIO):
|
jpayne@69
|
525
|
jpayne@69
|
526 debugging = False
|
jpayne@69
|
527 location = "#C" # Client
|
jpayne@69
|
528
|
jpayne@69
|
529 nextseq = 1 # Requests coming from the client are odd numbered
|
jpayne@69
|
530
|
jpayne@69
|
531 def __init__(self, address, family=socket.AF_INET, type=socket.SOCK_STREAM):
|
jpayne@69
|
532 self.listening_sock = socket.socket(family, type)
|
jpayne@69
|
533 self.listening_sock.bind(address)
|
jpayne@69
|
534 self.listening_sock.listen(1)
|
jpayne@69
|
535
|
jpayne@69
|
536 def accept(self):
|
jpayne@69
|
537 working_sock, address = self.listening_sock.accept()
|
jpayne@69
|
538 if self.debugging:
|
jpayne@69
|
539 print("****** Connection request from ", address, file=sys.__stderr__)
|
jpayne@69
|
540 if address[0] == LOCALHOST:
|
jpayne@69
|
541 SocketIO.__init__(self, working_sock)
|
jpayne@69
|
542 else:
|
jpayne@69
|
543 print("** Invalid host: ", address, file=sys.__stderr__)
|
jpayne@69
|
544 raise OSError
|
jpayne@69
|
545
|
jpayne@69
|
546 def get_remote_proxy(self, oid):
|
jpayne@69
|
547 return RPCProxy(self, oid)
|
jpayne@69
|
548
|
jpayne@69
|
549
|
jpayne@69
|
550 class RPCProxy(object):
|
jpayne@69
|
551
|
jpayne@69
|
552 __methods = None
|
jpayne@69
|
553 __attributes = None
|
jpayne@69
|
554
|
jpayne@69
|
555 def __init__(self, sockio, oid):
|
jpayne@69
|
556 self.sockio = sockio
|
jpayne@69
|
557 self.oid = oid
|
jpayne@69
|
558
|
jpayne@69
|
559 def __getattr__(self, name):
|
jpayne@69
|
560 if self.__methods is None:
|
jpayne@69
|
561 self.__getmethods()
|
jpayne@69
|
562 if self.__methods.get(name):
|
jpayne@69
|
563 return MethodProxy(self.sockio, self.oid, name)
|
jpayne@69
|
564 if self.__attributes is None:
|
jpayne@69
|
565 self.__getattributes()
|
jpayne@69
|
566 if name in self.__attributes:
|
jpayne@69
|
567 value = self.sockio.remotecall(self.oid, '__getattribute__',
|
jpayne@69
|
568 (name,), {})
|
jpayne@69
|
569 return value
|
jpayne@69
|
570 else:
|
jpayne@69
|
571 raise AttributeError(name)
|
jpayne@69
|
572
|
jpayne@69
|
573 def __getattributes(self):
|
jpayne@69
|
574 self.__attributes = self.sockio.remotecall(self.oid,
|
jpayne@69
|
575 "__attributes__", (), {})
|
jpayne@69
|
576
|
jpayne@69
|
577 def __getmethods(self):
|
jpayne@69
|
578 self.__methods = self.sockio.remotecall(self.oid,
|
jpayne@69
|
579 "__methods__", (), {})
|
jpayne@69
|
580
|
jpayne@69
|
581 def _getmethods(obj, methods):
|
jpayne@69
|
582 # Helper to get a list of methods from an object
|
jpayne@69
|
583 # Adds names to dictionary argument 'methods'
|
jpayne@69
|
584 for name in dir(obj):
|
jpayne@69
|
585 attr = getattr(obj, name)
|
jpayne@69
|
586 if callable(attr):
|
jpayne@69
|
587 methods[name] = 1
|
jpayne@69
|
588 if isinstance(obj, type):
|
jpayne@69
|
589 for super in obj.__bases__:
|
jpayne@69
|
590 _getmethods(super, methods)
|
jpayne@69
|
591
|
jpayne@69
|
592 def _getattributes(obj, attributes):
|
jpayne@69
|
593 for name in dir(obj):
|
jpayne@69
|
594 attr = getattr(obj, name)
|
jpayne@69
|
595 if not callable(attr):
|
jpayne@69
|
596 attributes[name] = 1
|
jpayne@69
|
597
|
jpayne@69
|
598
|
jpayne@69
|
599 class MethodProxy(object):
|
jpayne@69
|
600
|
jpayne@69
|
601 def __init__(self, sockio, oid, name):
|
jpayne@69
|
602 self.sockio = sockio
|
jpayne@69
|
603 self.oid = oid
|
jpayne@69
|
604 self.name = name
|
jpayne@69
|
605
|
jpayne@69
|
606 def __call__(self, /, *args, **kwargs):
|
jpayne@69
|
607 value = self.sockio.remotecall(self.oid, self.name, args, kwargs)
|
jpayne@69
|
608 return value
|
jpayne@69
|
609
|
jpayne@69
|
610
|
jpayne@69
|
611 # XXX KBK 09Sep03 We need a proper unit test for this module. Previously
|
jpayne@69
|
612 # existing test code was removed at Rev 1.27 (r34098).
|
jpayne@69
|
613
|
jpayne@69
|
614 def displayhook(value):
|
jpayne@69
|
615 """Override standard display hook to use non-locale encoding"""
|
jpayne@69
|
616 if value is None:
|
jpayne@69
|
617 return
|
jpayne@69
|
618 # Set '_' to None to avoid recursion
|
jpayne@69
|
619 builtins._ = None
|
jpayne@69
|
620 text = repr(value)
|
jpayne@69
|
621 try:
|
jpayne@69
|
622 sys.stdout.write(text)
|
jpayne@69
|
623 except UnicodeEncodeError:
|
jpayne@69
|
624 # let's use ascii while utf8-bmp codec doesn't present
|
jpayne@69
|
625 encoding = 'ascii'
|
jpayne@69
|
626 bytes = text.encode(encoding, 'backslashreplace')
|
jpayne@69
|
627 text = bytes.decode(encoding, 'strict')
|
jpayne@69
|
628 sys.stdout.write(text)
|
jpayne@69
|
629 sys.stdout.write("\n")
|
jpayne@69
|
630 builtins._ = value
|
jpayne@69
|
631
|
jpayne@69
|
632
|
jpayne@69
|
633 if __name__ == '__main__':
|
jpayne@69
|
634 from unittest import main
|
jpayne@69
|
635 main('idlelib.idle_test.test_rpc', verbosity=2,)
|