annotate CSP2/CSP2_env/env-d9b9114564458d9d-741b3de822f2aaca6c6caa4325c4afce/lib/python3.8/unittest/async_case.py @ 69:33d812a61356

planemo upload commit 2e9511a184a1ca667c7be0c6321a36dc4e3d116d
author jpayne
date Tue, 18 Mar 2025 17:55:14 -0400
parents
children
rev   line source
jpayne@69 1 import asyncio
jpayne@69 2 import inspect
jpayne@69 3
jpayne@69 4 from .case import TestCase
jpayne@69 5
jpayne@69 6
jpayne@69 7
jpayne@69 8 class IsolatedAsyncioTestCase(TestCase):
jpayne@69 9 # Names intentionally have a long prefix
jpayne@69 10 # to reduce a chance of clashing with user-defined attributes
jpayne@69 11 # from inherited test case
jpayne@69 12 #
jpayne@69 13 # The class doesn't call loop.run_until_complete(self.setUp()) and family
jpayne@69 14 # but uses a different approach:
jpayne@69 15 # 1. create a long-running task that reads self.setUp()
jpayne@69 16 # awaitable from queue along with a future
jpayne@69 17 # 2. await the awaitable object passing in and set the result
jpayne@69 18 # into the future object
jpayne@69 19 # 3. Outer code puts the awaitable and the future object into a queue
jpayne@69 20 # with waiting for the future
jpayne@69 21 # The trick is necessary because every run_until_complete() call
jpayne@69 22 # creates a new task with embedded ContextVar context.
jpayne@69 23 # To share contextvars between setUp(), test and tearDown() we need to execute
jpayne@69 24 # them inside the same task.
jpayne@69 25
jpayne@69 26 # Note: the test case modifies event loop policy if the policy was not instantiated
jpayne@69 27 # yet.
jpayne@69 28 # asyncio.get_event_loop_policy() creates a default policy on demand but never
jpayne@69 29 # returns None
jpayne@69 30 # I believe this is not an issue in user level tests but python itself for testing
jpayne@69 31 # should reset a policy in every test module
jpayne@69 32 # by calling asyncio.set_event_loop_policy(None) in tearDownModule()
jpayne@69 33
jpayne@69 34 def __init__(self, methodName='runTest'):
jpayne@69 35 super().__init__(methodName)
jpayne@69 36 self._asyncioTestLoop = None
jpayne@69 37 self._asyncioCallsQueue = None
jpayne@69 38
jpayne@69 39 async def asyncSetUp(self):
jpayne@69 40 pass
jpayne@69 41
jpayne@69 42 async def asyncTearDown(self):
jpayne@69 43 pass
jpayne@69 44
jpayne@69 45 def addAsyncCleanup(self, func, /, *args, **kwargs):
jpayne@69 46 # A trivial trampoline to addCleanup()
jpayne@69 47 # the function exists because it has a different semantics
jpayne@69 48 # and signature:
jpayne@69 49 # addCleanup() accepts regular functions
jpayne@69 50 # but addAsyncCleanup() accepts coroutines
jpayne@69 51 #
jpayne@69 52 # We intentionally don't add inspect.iscoroutinefunction() check
jpayne@69 53 # for func argument because there is no way
jpayne@69 54 # to check for async function reliably:
jpayne@69 55 # 1. It can be "async def func()" iself
jpayne@69 56 # 2. Class can implement "async def __call__()" method
jpayne@69 57 # 3. Regular "def func()" that returns awaitable object
jpayne@69 58 self.addCleanup(*(func, *args), **kwargs)
jpayne@69 59
jpayne@69 60 def _callSetUp(self):
jpayne@69 61 self.setUp()
jpayne@69 62 self._callAsync(self.asyncSetUp)
jpayne@69 63
jpayne@69 64 def _callTestMethod(self, method):
jpayne@69 65 self._callMaybeAsync(method)
jpayne@69 66
jpayne@69 67 def _callTearDown(self):
jpayne@69 68 self._callAsync(self.asyncTearDown)
jpayne@69 69 self.tearDown()
jpayne@69 70
jpayne@69 71 def _callCleanup(self, function, *args, **kwargs):
jpayne@69 72 self._callMaybeAsync(function, *args, **kwargs)
jpayne@69 73
jpayne@69 74 def _callAsync(self, func, /, *args, **kwargs):
jpayne@69 75 assert self._asyncioTestLoop is not None
jpayne@69 76 ret = func(*args, **kwargs)
jpayne@69 77 assert inspect.isawaitable(ret)
jpayne@69 78 fut = self._asyncioTestLoop.create_future()
jpayne@69 79 self._asyncioCallsQueue.put_nowait((fut, ret))
jpayne@69 80 return self._asyncioTestLoop.run_until_complete(fut)
jpayne@69 81
jpayne@69 82 def _callMaybeAsync(self, func, /, *args, **kwargs):
jpayne@69 83 assert self._asyncioTestLoop is not None
jpayne@69 84 ret = func(*args, **kwargs)
jpayne@69 85 if inspect.isawaitable(ret):
jpayne@69 86 fut = self._asyncioTestLoop.create_future()
jpayne@69 87 self._asyncioCallsQueue.put_nowait((fut, ret))
jpayne@69 88 return self._asyncioTestLoop.run_until_complete(fut)
jpayne@69 89 else:
jpayne@69 90 return ret
jpayne@69 91
jpayne@69 92 async def _asyncioLoopRunner(self, fut):
jpayne@69 93 self._asyncioCallsQueue = queue = asyncio.Queue()
jpayne@69 94 fut.set_result(None)
jpayne@69 95 while True:
jpayne@69 96 query = await queue.get()
jpayne@69 97 queue.task_done()
jpayne@69 98 if query is None:
jpayne@69 99 return
jpayne@69 100 fut, awaitable = query
jpayne@69 101 try:
jpayne@69 102 ret = await awaitable
jpayne@69 103 if not fut.cancelled():
jpayne@69 104 fut.set_result(ret)
jpayne@69 105 except asyncio.CancelledError:
jpayne@69 106 raise
jpayne@69 107 except Exception as ex:
jpayne@69 108 if not fut.cancelled():
jpayne@69 109 fut.set_exception(ex)
jpayne@69 110
jpayne@69 111 def _setupAsyncioLoop(self):
jpayne@69 112 assert self._asyncioTestLoop is None
jpayne@69 113 loop = asyncio.new_event_loop()
jpayne@69 114 asyncio.set_event_loop(loop)
jpayne@69 115 loop.set_debug(True)
jpayne@69 116 self._asyncioTestLoop = loop
jpayne@69 117 fut = loop.create_future()
jpayne@69 118 self._asyncioCallsTask = loop.create_task(self._asyncioLoopRunner(fut))
jpayne@69 119 loop.run_until_complete(fut)
jpayne@69 120
jpayne@69 121 def _tearDownAsyncioLoop(self):
jpayne@69 122 assert self._asyncioTestLoop is not None
jpayne@69 123 loop = self._asyncioTestLoop
jpayne@69 124 self._asyncioTestLoop = None
jpayne@69 125 self._asyncioCallsQueue.put_nowait(None)
jpayne@69 126 loop.run_until_complete(self._asyncioCallsQueue.join())
jpayne@69 127
jpayne@69 128 try:
jpayne@69 129 # cancel all tasks
jpayne@69 130 to_cancel = asyncio.all_tasks(loop)
jpayne@69 131 if not to_cancel:
jpayne@69 132 return
jpayne@69 133
jpayne@69 134 for task in to_cancel:
jpayne@69 135 task.cancel()
jpayne@69 136
jpayne@69 137 loop.run_until_complete(
jpayne@69 138 asyncio.gather(*to_cancel, loop=loop, return_exceptions=True))
jpayne@69 139
jpayne@69 140 for task in to_cancel:
jpayne@69 141 if task.cancelled():
jpayne@69 142 continue
jpayne@69 143 if task.exception() is not None:
jpayne@69 144 loop.call_exception_handler({
jpayne@69 145 'message': 'unhandled exception during test shutdown',
jpayne@69 146 'exception': task.exception(),
jpayne@69 147 'task': task,
jpayne@69 148 })
jpayne@69 149 # shutdown asyncgens
jpayne@69 150 loop.run_until_complete(loop.shutdown_asyncgens())
jpayne@69 151 finally:
jpayne@69 152 asyncio.set_event_loop(None)
jpayne@69 153 loop.close()
jpayne@69 154
jpayne@69 155 def run(self, result=None):
jpayne@69 156 self._setupAsyncioLoop()
jpayne@69 157 try:
jpayne@69 158 return super().run(result)
jpayne@69 159 finally:
jpayne@69 160 self._tearDownAsyncioLoop()