annotate CSP2/CSP2_env/env-d9b9114564458d9d-741b3de822f2aaca6c6caa4325c4afce/lib/python3.8/unittest/async_case.py @ 68:5028fdace37b

planemo upload commit 2e9511a184a1ca667c7be0c6321a36dc4e3d116d
author jpayne
date Tue, 18 Mar 2025 16:23:26 -0400
parents
children
rev   line source
jpayne@68 1 import asyncio
jpayne@68 2 import inspect
jpayne@68 3
jpayne@68 4 from .case import TestCase
jpayne@68 5
jpayne@68 6
jpayne@68 7
jpayne@68 8 class IsolatedAsyncioTestCase(TestCase):
jpayne@68 9 # Names intentionally have a long prefix
jpayne@68 10 # to reduce a chance of clashing with user-defined attributes
jpayne@68 11 # from inherited test case
jpayne@68 12 #
jpayne@68 13 # The class doesn't call loop.run_until_complete(self.setUp()) and family
jpayne@68 14 # but uses a different approach:
jpayne@68 15 # 1. create a long-running task that reads self.setUp()
jpayne@68 16 # awaitable from queue along with a future
jpayne@68 17 # 2. await the awaitable object passing in and set the result
jpayne@68 18 # into the future object
jpayne@68 19 # 3. Outer code puts the awaitable and the future object into a queue
jpayne@68 20 # with waiting for the future
jpayne@68 21 # The trick is necessary because every run_until_complete() call
jpayne@68 22 # creates a new task with embedded ContextVar context.
jpayne@68 23 # To share contextvars between setUp(), test and tearDown() we need to execute
jpayne@68 24 # them inside the same task.
jpayne@68 25
jpayne@68 26 # Note: the test case modifies event loop policy if the policy was not instantiated
jpayne@68 27 # yet.
jpayne@68 28 # asyncio.get_event_loop_policy() creates a default policy on demand but never
jpayne@68 29 # returns None
jpayne@68 30 # I believe this is not an issue in user level tests but python itself for testing
jpayne@68 31 # should reset a policy in every test module
jpayne@68 32 # by calling asyncio.set_event_loop_policy(None) in tearDownModule()
jpayne@68 33
jpayne@68 34 def __init__(self, methodName='runTest'):
jpayne@68 35 super().__init__(methodName)
jpayne@68 36 self._asyncioTestLoop = None
jpayne@68 37 self._asyncioCallsQueue = None
jpayne@68 38
jpayne@68 39 async def asyncSetUp(self):
jpayne@68 40 pass
jpayne@68 41
jpayne@68 42 async def asyncTearDown(self):
jpayne@68 43 pass
jpayne@68 44
jpayne@68 45 def addAsyncCleanup(self, func, /, *args, **kwargs):
jpayne@68 46 # A trivial trampoline to addCleanup()
jpayne@68 47 # the function exists because it has a different semantics
jpayne@68 48 # and signature:
jpayne@68 49 # addCleanup() accepts regular functions
jpayne@68 50 # but addAsyncCleanup() accepts coroutines
jpayne@68 51 #
jpayne@68 52 # We intentionally don't add inspect.iscoroutinefunction() check
jpayne@68 53 # for func argument because there is no way
jpayne@68 54 # to check for async function reliably:
jpayne@68 55 # 1. It can be "async def func()" iself
jpayne@68 56 # 2. Class can implement "async def __call__()" method
jpayne@68 57 # 3. Regular "def func()" that returns awaitable object
jpayne@68 58 self.addCleanup(*(func, *args), **kwargs)
jpayne@68 59
jpayne@68 60 def _callSetUp(self):
jpayne@68 61 self.setUp()
jpayne@68 62 self._callAsync(self.asyncSetUp)
jpayne@68 63
jpayne@68 64 def _callTestMethod(self, method):
jpayne@68 65 self._callMaybeAsync(method)
jpayne@68 66
jpayne@68 67 def _callTearDown(self):
jpayne@68 68 self._callAsync(self.asyncTearDown)
jpayne@68 69 self.tearDown()
jpayne@68 70
jpayne@68 71 def _callCleanup(self, function, *args, **kwargs):
jpayne@68 72 self._callMaybeAsync(function, *args, **kwargs)
jpayne@68 73
jpayne@68 74 def _callAsync(self, func, /, *args, **kwargs):
jpayne@68 75 assert self._asyncioTestLoop is not None
jpayne@68 76 ret = func(*args, **kwargs)
jpayne@68 77 assert inspect.isawaitable(ret)
jpayne@68 78 fut = self._asyncioTestLoop.create_future()
jpayne@68 79 self._asyncioCallsQueue.put_nowait((fut, ret))
jpayne@68 80 return self._asyncioTestLoop.run_until_complete(fut)
jpayne@68 81
jpayne@68 82 def _callMaybeAsync(self, func, /, *args, **kwargs):
jpayne@68 83 assert self._asyncioTestLoop is not None
jpayne@68 84 ret = func(*args, **kwargs)
jpayne@68 85 if inspect.isawaitable(ret):
jpayne@68 86 fut = self._asyncioTestLoop.create_future()
jpayne@68 87 self._asyncioCallsQueue.put_nowait((fut, ret))
jpayne@68 88 return self._asyncioTestLoop.run_until_complete(fut)
jpayne@68 89 else:
jpayne@68 90 return ret
jpayne@68 91
jpayne@68 92 async def _asyncioLoopRunner(self, fut):
jpayne@68 93 self._asyncioCallsQueue = queue = asyncio.Queue()
jpayne@68 94 fut.set_result(None)
jpayne@68 95 while True:
jpayne@68 96 query = await queue.get()
jpayne@68 97 queue.task_done()
jpayne@68 98 if query is None:
jpayne@68 99 return
jpayne@68 100 fut, awaitable = query
jpayne@68 101 try:
jpayne@68 102 ret = await awaitable
jpayne@68 103 if not fut.cancelled():
jpayne@68 104 fut.set_result(ret)
jpayne@68 105 except asyncio.CancelledError:
jpayne@68 106 raise
jpayne@68 107 except Exception as ex:
jpayne@68 108 if not fut.cancelled():
jpayne@68 109 fut.set_exception(ex)
jpayne@68 110
jpayne@68 111 def _setupAsyncioLoop(self):
jpayne@68 112 assert self._asyncioTestLoop is None
jpayne@68 113 loop = asyncio.new_event_loop()
jpayne@68 114 asyncio.set_event_loop(loop)
jpayne@68 115 loop.set_debug(True)
jpayne@68 116 self._asyncioTestLoop = loop
jpayne@68 117 fut = loop.create_future()
jpayne@68 118 self._asyncioCallsTask = loop.create_task(self._asyncioLoopRunner(fut))
jpayne@68 119 loop.run_until_complete(fut)
jpayne@68 120
jpayne@68 121 def _tearDownAsyncioLoop(self):
jpayne@68 122 assert self._asyncioTestLoop is not None
jpayne@68 123 loop = self._asyncioTestLoop
jpayne@68 124 self._asyncioTestLoop = None
jpayne@68 125 self._asyncioCallsQueue.put_nowait(None)
jpayne@68 126 loop.run_until_complete(self._asyncioCallsQueue.join())
jpayne@68 127
jpayne@68 128 try:
jpayne@68 129 # cancel all tasks
jpayne@68 130 to_cancel = asyncio.all_tasks(loop)
jpayne@68 131 if not to_cancel:
jpayne@68 132 return
jpayne@68 133
jpayne@68 134 for task in to_cancel:
jpayne@68 135 task.cancel()
jpayne@68 136
jpayne@68 137 loop.run_until_complete(
jpayne@68 138 asyncio.gather(*to_cancel, loop=loop, return_exceptions=True))
jpayne@68 139
jpayne@68 140 for task in to_cancel:
jpayne@68 141 if task.cancelled():
jpayne@68 142 continue
jpayne@68 143 if task.exception() is not None:
jpayne@68 144 loop.call_exception_handler({
jpayne@68 145 'message': 'unhandled exception during test shutdown',
jpayne@68 146 'exception': task.exception(),
jpayne@68 147 'task': task,
jpayne@68 148 })
jpayne@68 149 # shutdown asyncgens
jpayne@68 150 loop.run_until_complete(loop.shutdown_asyncgens())
jpayne@68 151 finally:
jpayne@68 152 asyncio.set_event_loop(None)
jpayne@68 153 loop.close()
jpayne@68 154
jpayne@68 155 def run(self, result=None):
jpayne@68 156 self._setupAsyncioLoop()
jpayne@68 157 try:
jpayne@68 158 return super().run(result)
jpayne@68 159 finally:
jpayne@68 160 self._tearDownAsyncioLoop()