jpayne@69
|
1 import asyncio
|
jpayne@69
|
2 import inspect
|
jpayne@69
|
3
|
jpayne@69
|
4 from .case import TestCase
|
jpayne@69
|
5
|
jpayne@69
|
6
|
jpayne@69
|
7
|
jpayne@69
|
8 class IsolatedAsyncioTestCase(TestCase):
|
jpayne@69
|
9 # Names intentionally have a long prefix
|
jpayne@69
|
10 # to reduce a chance of clashing with user-defined attributes
|
jpayne@69
|
11 # from inherited test case
|
jpayne@69
|
12 #
|
jpayne@69
|
13 # The class doesn't call loop.run_until_complete(self.setUp()) and family
|
jpayne@69
|
14 # but uses a different approach:
|
jpayne@69
|
15 # 1. create a long-running task that reads self.setUp()
|
jpayne@69
|
16 # awaitable from queue along with a future
|
jpayne@69
|
17 # 2. await the awaitable object passing in and set the result
|
jpayne@69
|
18 # into the future object
|
jpayne@69
|
19 # 3. Outer code puts the awaitable and the future object into a queue
|
jpayne@69
|
20 # with waiting for the future
|
jpayne@69
|
21 # The trick is necessary because every run_until_complete() call
|
jpayne@69
|
22 # creates a new task with embedded ContextVar context.
|
jpayne@69
|
23 # To share contextvars between setUp(), test and tearDown() we need to execute
|
jpayne@69
|
24 # them inside the same task.
|
jpayne@69
|
25
|
jpayne@69
|
26 # Note: the test case modifies event loop policy if the policy was not instantiated
|
jpayne@69
|
27 # yet.
|
jpayne@69
|
28 # asyncio.get_event_loop_policy() creates a default policy on demand but never
|
jpayne@69
|
29 # returns None
|
jpayne@69
|
30 # I believe this is not an issue in user level tests but python itself for testing
|
jpayne@69
|
31 # should reset a policy in every test module
|
jpayne@69
|
32 # by calling asyncio.set_event_loop_policy(None) in tearDownModule()
|
jpayne@69
|
33
|
jpayne@69
|
34 def __init__(self, methodName='runTest'):
|
jpayne@69
|
35 super().__init__(methodName)
|
jpayne@69
|
36 self._asyncioTestLoop = None
|
jpayne@69
|
37 self._asyncioCallsQueue = None
|
jpayne@69
|
38
|
jpayne@69
|
39 async def asyncSetUp(self):
|
jpayne@69
|
40 pass
|
jpayne@69
|
41
|
jpayne@69
|
42 async def asyncTearDown(self):
|
jpayne@69
|
43 pass
|
jpayne@69
|
44
|
jpayne@69
|
45 def addAsyncCleanup(self, func, /, *args, **kwargs):
|
jpayne@69
|
46 # A trivial trampoline to addCleanup()
|
jpayne@69
|
47 # the function exists because it has a different semantics
|
jpayne@69
|
48 # and signature:
|
jpayne@69
|
49 # addCleanup() accepts regular functions
|
jpayne@69
|
50 # but addAsyncCleanup() accepts coroutines
|
jpayne@69
|
51 #
|
jpayne@69
|
52 # We intentionally don't add inspect.iscoroutinefunction() check
|
jpayne@69
|
53 # for func argument because there is no way
|
jpayne@69
|
54 # to check for async function reliably:
|
jpayne@69
|
55 # 1. It can be "async def func()" iself
|
jpayne@69
|
56 # 2. Class can implement "async def __call__()" method
|
jpayne@69
|
57 # 3. Regular "def func()" that returns awaitable object
|
jpayne@69
|
58 self.addCleanup(*(func, *args), **kwargs)
|
jpayne@69
|
59
|
jpayne@69
|
60 def _callSetUp(self):
|
jpayne@69
|
61 self.setUp()
|
jpayne@69
|
62 self._callAsync(self.asyncSetUp)
|
jpayne@69
|
63
|
jpayne@69
|
64 def _callTestMethod(self, method):
|
jpayne@69
|
65 self._callMaybeAsync(method)
|
jpayne@69
|
66
|
jpayne@69
|
67 def _callTearDown(self):
|
jpayne@69
|
68 self._callAsync(self.asyncTearDown)
|
jpayne@69
|
69 self.tearDown()
|
jpayne@69
|
70
|
jpayne@69
|
71 def _callCleanup(self, function, *args, **kwargs):
|
jpayne@69
|
72 self._callMaybeAsync(function, *args, **kwargs)
|
jpayne@69
|
73
|
jpayne@69
|
74 def _callAsync(self, func, /, *args, **kwargs):
|
jpayne@69
|
75 assert self._asyncioTestLoop is not None
|
jpayne@69
|
76 ret = func(*args, **kwargs)
|
jpayne@69
|
77 assert inspect.isawaitable(ret)
|
jpayne@69
|
78 fut = self._asyncioTestLoop.create_future()
|
jpayne@69
|
79 self._asyncioCallsQueue.put_nowait((fut, ret))
|
jpayne@69
|
80 return self._asyncioTestLoop.run_until_complete(fut)
|
jpayne@69
|
81
|
jpayne@69
|
82 def _callMaybeAsync(self, func, /, *args, **kwargs):
|
jpayne@69
|
83 assert self._asyncioTestLoop is not None
|
jpayne@69
|
84 ret = func(*args, **kwargs)
|
jpayne@69
|
85 if inspect.isawaitable(ret):
|
jpayne@69
|
86 fut = self._asyncioTestLoop.create_future()
|
jpayne@69
|
87 self._asyncioCallsQueue.put_nowait((fut, ret))
|
jpayne@69
|
88 return self._asyncioTestLoop.run_until_complete(fut)
|
jpayne@69
|
89 else:
|
jpayne@69
|
90 return ret
|
jpayne@69
|
91
|
jpayne@69
|
92 async def _asyncioLoopRunner(self, fut):
|
jpayne@69
|
93 self._asyncioCallsQueue = queue = asyncio.Queue()
|
jpayne@69
|
94 fut.set_result(None)
|
jpayne@69
|
95 while True:
|
jpayne@69
|
96 query = await queue.get()
|
jpayne@69
|
97 queue.task_done()
|
jpayne@69
|
98 if query is None:
|
jpayne@69
|
99 return
|
jpayne@69
|
100 fut, awaitable = query
|
jpayne@69
|
101 try:
|
jpayne@69
|
102 ret = await awaitable
|
jpayne@69
|
103 if not fut.cancelled():
|
jpayne@69
|
104 fut.set_result(ret)
|
jpayne@69
|
105 except asyncio.CancelledError:
|
jpayne@69
|
106 raise
|
jpayne@69
|
107 except Exception as ex:
|
jpayne@69
|
108 if not fut.cancelled():
|
jpayne@69
|
109 fut.set_exception(ex)
|
jpayne@69
|
110
|
jpayne@69
|
111 def _setupAsyncioLoop(self):
|
jpayne@69
|
112 assert self._asyncioTestLoop is None
|
jpayne@69
|
113 loop = asyncio.new_event_loop()
|
jpayne@69
|
114 asyncio.set_event_loop(loop)
|
jpayne@69
|
115 loop.set_debug(True)
|
jpayne@69
|
116 self._asyncioTestLoop = loop
|
jpayne@69
|
117 fut = loop.create_future()
|
jpayne@69
|
118 self._asyncioCallsTask = loop.create_task(self._asyncioLoopRunner(fut))
|
jpayne@69
|
119 loop.run_until_complete(fut)
|
jpayne@69
|
120
|
jpayne@69
|
121 def _tearDownAsyncioLoop(self):
|
jpayne@69
|
122 assert self._asyncioTestLoop is not None
|
jpayne@69
|
123 loop = self._asyncioTestLoop
|
jpayne@69
|
124 self._asyncioTestLoop = None
|
jpayne@69
|
125 self._asyncioCallsQueue.put_nowait(None)
|
jpayne@69
|
126 loop.run_until_complete(self._asyncioCallsQueue.join())
|
jpayne@69
|
127
|
jpayne@69
|
128 try:
|
jpayne@69
|
129 # cancel all tasks
|
jpayne@69
|
130 to_cancel = asyncio.all_tasks(loop)
|
jpayne@69
|
131 if not to_cancel:
|
jpayne@69
|
132 return
|
jpayne@69
|
133
|
jpayne@69
|
134 for task in to_cancel:
|
jpayne@69
|
135 task.cancel()
|
jpayne@69
|
136
|
jpayne@69
|
137 loop.run_until_complete(
|
jpayne@69
|
138 asyncio.gather(*to_cancel, loop=loop, return_exceptions=True))
|
jpayne@69
|
139
|
jpayne@69
|
140 for task in to_cancel:
|
jpayne@69
|
141 if task.cancelled():
|
jpayne@69
|
142 continue
|
jpayne@69
|
143 if task.exception() is not None:
|
jpayne@69
|
144 loop.call_exception_handler({
|
jpayne@69
|
145 'message': 'unhandled exception during test shutdown',
|
jpayne@69
|
146 'exception': task.exception(),
|
jpayne@69
|
147 'task': task,
|
jpayne@69
|
148 })
|
jpayne@69
|
149 # shutdown asyncgens
|
jpayne@69
|
150 loop.run_until_complete(loop.shutdown_asyncgens())
|
jpayne@69
|
151 finally:
|
jpayne@69
|
152 asyncio.set_event_loop(None)
|
jpayne@69
|
153 loop.close()
|
jpayne@69
|
154
|
jpayne@69
|
155 def run(self, result=None):
|
jpayne@69
|
156 self._setupAsyncioLoop()
|
jpayne@69
|
157 try:
|
jpayne@69
|
158 return super().run(result)
|
jpayne@69
|
159 finally:
|
jpayne@69
|
160 self._tearDownAsyncioLoop()
|