comparison CSP2/CSP2_env/env-d9b9114564458d9d-741b3de822f2aaca6c6caa4325c4afce/lib/python3.8/unittest/mock.py @ 69:33d812a61356

planemo upload commit 2e9511a184a1ca667c7be0c6321a36dc4e3d116d
author jpayne
date Tue, 18 Mar 2025 17:55:14 -0400
parents
children
comparison
equal deleted inserted replaced
67:0e9998148a16 69:33d812a61356
1 # mock.py
2 # Test tools for mocking and patching.
3 # Maintained by Michael Foord
4 # Backport for other versions of Python available from
5 # https://pypi.org/project/mock
6
7 __all__ = (
8 'Mock',
9 'MagicMock',
10 'patch',
11 'sentinel',
12 'DEFAULT',
13 'ANY',
14 'call',
15 'create_autospec',
16 'AsyncMock',
17 'FILTER_DIR',
18 'NonCallableMock',
19 'NonCallableMagicMock',
20 'mock_open',
21 'PropertyMock',
22 'seal',
23 )
24
25
26 __version__ = '1.0'
27
28 import asyncio
29 import contextlib
30 import io
31 import inspect
32 import pprint
33 import sys
34 import builtins
35 from types import CodeType, ModuleType, MethodType
36 from unittest.util import safe_repr
37 from functools import wraps, partial
38
39
40 _builtins = {name for name in dir(builtins) if not name.startswith('_')}
41
42 FILTER_DIR = True
43
44 # Workaround for issue #12370
45 # Without this, the __class__ properties wouldn't be set correctly
46 _safe_super = super
47
48 def _is_async_obj(obj):
49 if _is_instance_mock(obj) and not isinstance(obj, AsyncMock):
50 return False
51 return asyncio.iscoroutinefunction(obj) or inspect.isawaitable(obj)
52
53
54 def _is_async_func(func):
55 if getattr(func, '__code__', None):
56 return asyncio.iscoroutinefunction(func)
57 else:
58 return False
59
60
61 def _is_instance_mock(obj):
62 # can't use isinstance on Mock objects because they override __class__
63 # The base class for all mocks is NonCallableMock
64 return issubclass(type(obj), NonCallableMock)
65
66
67 def _is_exception(obj):
68 return (
69 isinstance(obj, BaseException) or
70 isinstance(obj, type) and issubclass(obj, BaseException)
71 )
72
73
74 def _extract_mock(obj):
75 # Autospecced functions will return a FunctionType with "mock" attribute
76 # which is the actual mock object that needs to be used.
77 if isinstance(obj, FunctionTypes) and hasattr(obj, 'mock'):
78 return obj.mock
79 else:
80 return obj
81
82
83 def _get_signature_object(func, as_instance, eat_self):
84 """
85 Given an arbitrary, possibly callable object, try to create a suitable
86 signature object.
87 Return a (reduced func, signature) tuple, or None.
88 """
89 if isinstance(func, type) and not as_instance:
90 # If it's a type and should be modelled as a type, use __init__.
91 func = func.__init__
92 # Skip the `self` argument in __init__
93 eat_self = True
94 elif not isinstance(func, FunctionTypes):
95 # If we really want to model an instance of the passed type,
96 # __call__ should be looked up, not __init__.
97 try:
98 func = func.__call__
99 except AttributeError:
100 return None
101 if eat_self:
102 sig_func = partial(func, None)
103 else:
104 sig_func = func
105 try:
106 return func, inspect.signature(sig_func)
107 except ValueError:
108 # Certain callable types are not supported by inspect.signature()
109 return None
110
111
112 def _check_signature(func, mock, skipfirst, instance=False):
113 sig = _get_signature_object(func, instance, skipfirst)
114 if sig is None:
115 return
116 func, sig = sig
117 def checksig(self, /, *args, **kwargs):
118 sig.bind(*args, **kwargs)
119 _copy_func_details(func, checksig)
120 type(mock)._mock_check_sig = checksig
121 type(mock).__signature__ = sig
122
123
124 def _copy_func_details(func, funcopy):
125 # we explicitly don't copy func.__dict__ into this copy as it would
126 # expose original attributes that should be mocked
127 for attribute in (
128 '__name__', '__doc__', '__text_signature__',
129 '__module__', '__defaults__', '__kwdefaults__',
130 ):
131 try:
132 setattr(funcopy, attribute, getattr(func, attribute))
133 except AttributeError:
134 pass
135
136
137 def _callable(obj):
138 if isinstance(obj, type):
139 return True
140 if isinstance(obj, (staticmethod, classmethod, MethodType)):
141 return _callable(obj.__func__)
142 if getattr(obj, '__call__', None) is not None:
143 return True
144 return False
145
146
147 def _is_list(obj):
148 # checks for list or tuples
149 # XXXX badly named!
150 return type(obj) in (list, tuple)
151
152
153 def _instance_callable(obj):
154 """Given an object, return True if the object is callable.
155 For classes, return True if instances would be callable."""
156 if not isinstance(obj, type):
157 # already an instance
158 return getattr(obj, '__call__', None) is not None
159
160 # *could* be broken by a class overriding __mro__ or __dict__ via
161 # a metaclass
162 for base in (obj,) + obj.__mro__:
163 if base.__dict__.get('__call__') is not None:
164 return True
165 return False
166
167
168 def _set_signature(mock, original, instance=False):
169 # creates a function with signature (*args, **kwargs) that delegates to a
170 # mock. It still does signature checking by calling a lambda with the same
171 # signature as the original.
172
173 skipfirst = isinstance(original, type)
174 result = _get_signature_object(original, instance, skipfirst)
175 if result is None:
176 return mock
177 func, sig = result
178 def checksig(*args, **kwargs):
179 sig.bind(*args, **kwargs)
180 _copy_func_details(func, checksig)
181
182 name = original.__name__
183 if not name.isidentifier():
184 name = 'funcopy'
185 context = {'_checksig_': checksig, 'mock': mock}
186 src = """def %s(*args, **kwargs):
187 _checksig_(*args, **kwargs)
188 return mock(*args, **kwargs)""" % name
189 exec (src, context)
190 funcopy = context[name]
191 _setup_func(funcopy, mock, sig)
192 return funcopy
193
194
195 def _setup_func(funcopy, mock, sig):
196 funcopy.mock = mock
197
198 def assert_called_with(*args, **kwargs):
199 return mock.assert_called_with(*args, **kwargs)
200 def assert_called(*args, **kwargs):
201 return mock.assert_called(*args, **kwargs)
202 def assert_not_called(*args, **kwargs):
203 return mock.assert_not_called(*args, **kwargs)
204 def assert_called_once(*args, **kwargs):
205 return mock.assert_called_once(*args, **kwargs)
206 def assert_called_once_with(*args, **kwargs):
207 return mock.assert_called_once_with(*args, **kwargs)
208 def assert_has_calls(*args, **kwargs):
209 return mock.assert_has_calls(*args, **kwargs)
210 def assert_any_call(*args, **kwargs):
211 return mock.assert_any_call(*args, **kwargs)
212 def reset_mock():
213 funcopy.method_calls = _CallList()
214 funcopy.mock_calls = _CallList()
215 mock.reset_mock()
216 ret = funcopy.return_value
217 if _is_instance_mock(ret) and not ret is mock:
218 ret.reset_mock()
219
220 funcopy.called = False
221 funcopy.call_count = 0
222 funcopy.call_args = None
223 funcopy.call_args_list = _CallList()
224 funcopy.method_calls = _CallList()
225 funcopy.mock_calls = _CallList()
226
227 funcopy.return_value = mock.return_value
228 funcopy.side_effect = mock.side_effect
229 funcopy._mock_children = mock._mock_children
230
231 funcopy.assert_called_with = assert_called_with
232 funcopy.assert_called_once_with = assert_called_once_with
233 funcopy.assert_has_calls = assert_has_calls
234 funcopy.assert_any_call = assert_any_call
235 funcopy.reset_mock = reset_mock
236 funcopy.assert_called = assert_called
237 funcopy.assert_not_called = assert_not_called
238 funcopy.assert_called_once = assert_called_once
239 funcopy.__signature__ = sig
240
241 mock._mock_delegate = funcopy
242
243
244 def _setup_async_mock(mock):
245 mock._is_coroutine = asyncio.coroutines._is_coroutine
246 mock.await_count = 0
247 mock.await_args = None
248 mock.await_args_list = _CallList()
249
250 # Mock is not configured yet so the attributes are set
251 # to a function and then the corresponding mock helper function
252 # is called when the helper is accessed similar to _setup_func.
253 def wrapper(attr, /, *args, **kwargs):
254 return getattr(mock.mock, attr)(*args, **kwargs)
255
256 for attribute in ('assert_awaited',
257 'assert_awaited_once',
258 'assert_awaited_with',
259 'assert_awaited_once_with',
260 'assert_any_await',
261 'assert_has_awaits',
262 'assert_not_awaited'):
263
264 # setattr(mock, attribute, wrapper) causes late binding
265 # hence attribute will always be the last value in the loop
266 # Use partial(wrapper, attribute) to ensure the attribute is bound
267 # correctly.
268 setattr(mock, attribute, partial(wrapper, attribute))
269
270
271 def _is_magic(name):
272 return '__%s__' % name[2:-2] == name
273
274
275 class _SentinelObject(object):
276 "A unique, named, sentinel object."
277 def __init__(self, name):
278 self.name = name
279
280 def __repr__(self):
281 return 'sentinel.%s' % self.name
282
283 def __reduce__(self):
284 return 'sentinel.%s' % self.name
285
286
287 class _Sentinel(object):
288 """Access attributes to return a named object, usable as a sentinel."""
289 def __init__(self):
290 self._sentinels = {}
291
292 def __getattr__(self, name):
293 if name == '__bases__':
294 # Without this help(unittest.mock) raises an exception
295 raise AttributeError
296 return self._sentinels.setdefault(name, _SentinelObject(name))
297
298 def __reduce__(self):
299 return 'sentinel'
300
301
302 sentinel = _Sentinel()
303
304 DEFAULT = sentinel.DEFAULT
305 _missing = sentinel.MISSING
306 _deleted = sentinel.DELETED
307
308
309 _allowed_names = {
310 'return_value', '_mock_return_value', 'side_effect',
311 '_mock_side_effect', '_mock_parent', '_mock_new_parent',
312 '_mock_name', '_mock_new_name'
313 }
314
315
316 def _delegating_property(name):
317 _allowed_names.add(name)
318 _the_name = '_mock_' + name
319 def _get(self, name=name, _the_name=_the_name):
320 sig = self._mock_delegate
321 if sig is None:
322 return getattr(self, _the_name)
323 return getattr(sig, name)
324 def _set(self, value, name=name, _the_name=_the_name):
325 sig = self._mock_delegate
326 if sig is None:
327 self.__dict__[_the_name] = value
328 else:
329 setattr(sig, name, value)
330
331 return property(_get, _set)
332
333
334
335 class _CallList(list):
336
337 def __contains__(self, value):
338 if not isinstance(value, list):
339 return list.__contains__(self, value)
340 len_value = len(value)
341 len_self = len(self)
342 if len_value > len_self:
343 return False
344
345 for i in range(0, len_self - len_value + 1):
346 sub_list = self[i:i+len_value]
347 if sub_list == value:
348 return True
349 return False
350
351 def __repr__(self):
352 return pprint.pformat(list(self))
353
354
355 def _check_and_set_parent(parent, value, name, new_name):
356 value = _extract_mock(value)
357
358 if not _is_instance_mock(value):
359 return False
360 if ((value._mock_name or value._mock_new_name) or
361 (value._mock_parent is not None) or
362 (value._mock_new_parent is not None)):
363 return False
364
365 _parent = parent
366 while _parent is not None:
367 # setting a mock (value) as a child or return value of itself
368 # should not modify the mock
369 if _parent is value:
370 return False
371 _parent = _parent._mock_new_parent
372
373 if new_name:
374 value._mock_new_parent = parent
375 value._mock_new_name = new_name
376 if name:
377 value._mock_parent = parent
378 value._mock_name = name
379 return True
380
381 # Internal class to identify if we wrapped an iterator object or not.
382 class _MockIter(object):
383 def __init__(self, obj):
384 self.obj = iter(obj)
385 def __next__(self):
386 return next(self.obj)
387
388 class Base(object):
389 _mock_return_value = DEFAULT
390 _mock_side_effect = None
391 def __init__(self, /, *args, **kwargs):
392 pass
393
394
395
396 class NonCallableMock(Base):
397 """A non-callable version of `Mock`"""
398
399 def __new__(cls, /, *args, **kw):
400 # every instance has its own class
401 # so we can create magic methods on the
402 # class without stomping on other mocks
403 bases = (cls,)
404 if not issubclass(cls, AsyncMock):
405 # Check if spec is an async object or function
406 sig = inspect.signature(NonCallableMock.__init__)
407 bound_args = sig.bind_partial(cls, *args, **kw).arguments
408 spec_arg = [
409 arg for arg in bound_args.keys()
410 if arg.startswith('spec')
411 ]
412 if spec_arg:
413 # what if spec_set is different than spec?
414 if _is_async_obj(bound_args[spec_arg[0]]):
415 bases = (AsyncMockMixin, cls,)
416 new = type(cls.__name__, bases, {'__doc__': cls.__doc__})
417 instance = _safe_super(NonCallableMock, cls).__new__(new)
418 return instance
419
420
421 def __init__(
422 self, spec=None, wraps=None, name=None, spec_set=None,
423 parent=None, _spec_state=None, _new_name='', _new_parent=None,
424 _spec_as_instance=False, _eat_self=None, unsafe=False, **kwargs
425 ):
426 if _new_parent is None:
427 _new_parent = parent
428
429 __dict__ = self.__dict__
430 __dict__['_mock_parent'] = parent
431 __dict__['_mock_name'] = name
432 __dict__['_mock_new_name'] = _new_name
433 __dict__['_mock_new_parent'] = _new_parent
434 __dict__['_mock_sealed'] = False
435
436 if spec_set is not None:
437 spec = spec_set
438 spec_set = True
439 if _eat_self is None:
440 _eat_self = parent is not None
441
442 self._mock_add_spec(spec, spec_set, _spec_as_instance, _eat_self)
443
444 __dict__['_mock_children'] = {}
445 __dict__['_mock_wraps'] = wraps
446 __dict__['_mock_delegate'] = None
447
448 __dict__['_mock_called'] = False
449 __dict__['_mock_call_args'] = None
450 __dict__['_mock_call_count'] = 0
451 __dict__['_mock_call_args_list'] = _CallList()
452 __dict__['_mock_mock_calls'] = _CallList()
453
454 __dict__['method_calls'] = _CallList()
455 __dict__['_mock_unsafe'] = unsafe
456
457 if kwargs:
458 self.configure_mock(**kwargs)
459
460 _safe_super(NonCallableMock, self).__init__(
461 spec, wraps, name, spec_set, parent,
462 _spec_state
463 )
464
465
466 def attach_mock(self, mock, attribute):
467 """
468 Attach a mock as an attribute of this one, replacing its name and
469 parent. Calls to the attached mock will be recorded in the
470 `method_calls` and `mock_calls` attributes of this one."""
471 inner_mock = _extract_mock(mock)
472
473 inner_mock._mock_parent = None
474 inner_mock._mock_new_parent = None
475 inner_mock._mock_name = ''
476 inner_mock._mock_new_name = None
477
478 setattr(self, attribute, mock)
479
480
481 def mock_add_spec(self, spec, spec_set=False):
482 """Add a spec to a mock. `spec` can either be an object or a
483 list of strings. Only attributes on the `spec` can be fetched as
484 attributes from the mock.
485
486 If `spec_set` is True then only attributes on the spec can be set."""
487 self._mock_add_spec(spec, spec_set)
488
489
490 def _mock_add_spec(self, spec, spec_set, _spec_as_instance=False,
491 _eat_self=False):
492 _spec_class = None
493 _spec_signature = None
494 _spec_asyncs = []
495
496 for attr in dir(spec):
497 if asyncio.iscoroutinefunction(getattr(spec, attr, None)):
498 _spec_asyncs.append(attr)
499
500 if spec is not None and not _is_list(spec):
501 if isinstance(spec, type):
502 _spec_class = spec
503 else:
504 _spec_class = type(spec)
505 res = _get_signature_object(spec,
506 _spec_as_instance, _eat_self)
507 _spec_signature = res and res[1]
508
509 spec = dir(spec)
510
511 __dict__ = self.__dict__
512 __dict__['_spec_class'] = _spec_class
513 __dict__['_spec_set'] = spec_set
514 __dict__['_spec_signature'] = _spec_signature
515 __dict__['_mock_methods'] = spec
516 __dict__['_spec_asyncs'] = _spec_asyncs
517
518 def __get_return_value(self):
519 ret = self._mock_return_value
520 if self._mock_delegate is not None:
521 ret = self._mock_delegate.return_value
522
523 if ret is DEFAULT:
524 ret = self._get_child_mock(
525 _new_parent=self, _new_name='()'
526 )
527 self.return_value = ret
528 return ret
529
530
531 def __set_return_value(self, value):
532 if self._mock_delegate is not None:
533 self._mock_delegate.return_value = value
534 else:
535 self._mock_return_value = value
536 _check_and_set_parent(self, value, None, '()')
537
538 __return_value_doc = "The value to be returned when the mock is called."
539 return_value = property(__get_return_value, __set_return_value,
540 __return_value_doc)
541
542
543 @property
544 def __class__(self):
545 if self._spec_class is None:
546 return type(self)
547 return self._spec_class
548
549 called = _delegating_property('called')
550 call_count = _delegating_property('call_count')
551 call_args = _delegating_property('call_args')
552 call_args_list = _delegating_property('call_args_list')
553 mock_calls = _delegating_property('mock_calls')
554
555
556 def __get_side_effect(self):
557 delegated = self._mock_delegate
558 if delegated is None:
559 return self._mock_side_effect
560 sf = delegated.side_effect
561 if (sf is not None and not callable(sf)
562 and not isinstance(sf, _MockIter) and not _is_exception(sf)):
563 sf = _MockIter(sf)
564 delegated.side_effect = sf
565 return sf
566
567 def __set_side_effect(self, value):
568 value = _try_iter(value)
569 delegated = self._mock_delegate
570 if delegated is None:
571 self._mock_side_effect = value
572 else:
573 delegated.side_effect = value
574
575 side_effect = property(__get_side_effect, __set_side_effect)
576
577
578 def reset_mock(self, visited=None,*, return_value=False, side_effect=False):
579 "Restore the mock object to its initial state."
580 if visited is None:
581 visited = []
582 if id(self) in visited:
583 return
584 visited.append(id(self))
585
586 self.called = False
587 self.call_args = None
588 self.call_count = 0
589 self.mock_calls = _CallList()
590 self.call_args_list = _CallList()
591 self.method_calls = _CallList()
592
593 if return_value:
594 self._mock_return_value = DEFAULT
595 if side_effect:
596 self._mock_side_effect = None
597
598 for child in self._mock_children.values():
599 if isinstance(child, _SpecState) or child is _deleted:
600 continue
601 child.reset_mock(visited)
602
603 ret = self._mock_return_value
604 if _is_instance_mock(ret) and ret is not self:
605 ret.reset_mock(visited)
606
607
608 def configure_mock(self, /, **kwargs):
609 """Set attributes on the mock through keyword arguments.
610
611 Attributes plus return values and side effects can be set on child
612 mocks using standard dot notation and unpacking a dictionary in the
613 method call:
614
615 >>> attrs = {'method.return_value': 3, 'other.side_effect': KeyError}
616 >>> mock.configure_mock(**attrs)"""
617 for arg, val in sorted(kwargs.items(),
618 # we sort on the number of dots so that
619 # attributes are set before we set attributes on
620 # attributes
621 key=lambda entry: entry[0].count('.')):
622 args = arg.split('.')
623 final = args.pop()
624 obj = self
625 for entry in args:
626 obj = getattr(obj, entry)
627 setattr(obj, final, val)
628
629
630 def __getattr__(self, name):
631 if name in {'_mock_methods', '_mock_unsafe'}:
632 raise AttributeError(name)
633 elif self._mock_methods is not None:
634 if name not in self._mock_methods or name in _all_magics:
635 raise AttributeError("Mock object has no attribute %r" % name)
636 elif _is_magic(name):
637 raise AttributeError(name)
638 if not self._mock_unsafe:
639 if name.startswith(('assert', 'assret')):
640 raise AttributeError("Attributes cannot start with 'assert' "
641 "or 'assret'")
642
643 result = self._mock_children.get(name)
644 if result is _deleted:
645 raise AttributeError(name)
646 elif result is None:
647 wraps = None
648 if self._mock_wraps is not None:
649 # XXXX should we get the attribute without triggering code
650 # execution?
651 wraps = getattr(self._mock_wraps, name)
652
653 result = self._get_child_mock(
654 parent=self, name=name, wraps=wraps, _new_name=name,
655 _new_parent=self
656 )
657 self._mock_children[name] = result
658
659 elif isinstance(result, _SpecState):
660 result = create_autospec(
661 result.spec, result.spec_set, result.instance,
662 result.parent, result.name
663 )
664 self._mock_children[name] = result
665
666 return result
667
668
669 def _extract_mock_name(self):
670 _name_list = [self._mock_new_name]
671 _parent = self._mock_new_parent
672 last = self
673
674 dot = '.'
675 if _name_list == ['()']:
676 dot = ''
677
678 while _parent is not None:
679 last = _parent
680
681 _name_list.append(_parent._mock_new_name + dot)
682 dot = '.'
683 if _parent._mock_new_name == '()':
684 dot = ''
685
686 _parent = _parent._mock_new_parent
687
688 _name_list = list(reversed(_name_list))
689 _first = last._mock_name or 'mock'
690 if len(_name_list) > 1:
691 if _name_list[1] not in ('()', '().'):
692 _first += '.'
693 _name_list[0] = _first
694 return ''.join(_name_list)
695
696 def __repr__(self):
697 name = self._extract_mock_name()
698
699 name_string = ''
700 if name not in ('mock', 'mock.'):
701 name_string = ' name=%r' % name
702
703 spec_string = ''
704 if self._spec_class is not None:
705 spec_string = ' spec=%r'
706 if self._spec_set:
707 spec_string = ' spec_set=%r'
708 spec_string = spec_string % self._spec_class.__name__
709 return "<%s%s%s id='%s'>" % (
710 type(self).__name__,
711 name_string,
712 spec_string,
713 id(self)
714 )
715
716
717 def __dir__(self):
718 """Filter the output of `dir(mock)` to only useful members."""
719 if not FILTER_DIR:
720 return object.__dir__(self)
721
722 extras = self._mock_methods or []
723 from_type = dir(type(self))
724 from_dict = list(self.__dict__)
725 from_child_mocks = [
726 m_name for m_name, m_value in self._mock_children.items()
727 if m_value is not _deleted]
728
729 from_type = [e for e in from_type if not e.startswith('_')]
730 from_dict = [e for e in from_dict if not e.startswith('_') or
731 _is_magic(e)]
732 return sorted(set(extras + from_type + from_dict + from_child_mocks))
733
734
735 def __setattr__(self, name, value):
736 if name in _allowed_names:
737 # property setters go through here
738 return object.__setattr__(self, name, value)
739 elif (self._spec_set and self._mock_methods is not None and
740 name not in self._mock_methods and
741 name not in self.__dict__):
742 raise AttributeError("Mock object has no attribute '%s'" % name)
743 elif name in _unsupported_magics:
744 msg = 'Attempting to set unsupported magic method %r.' % name
745 raise AttributeError(msg)
746 elif name in _all_magics:
747 if self._mock_methods is not None and name not in self._mock_methods:
748 raise AttributeError("Mock object has no attribute '%s'" % name)
749
750 if not _is_instance_mock(value):
751 setattr(type(self), name, _get_method(name, value))
752 original = value
753 value = lambda *args, **kw: original(self, *args, **kw)
754 else:
755 # only set _new_name and not name so that mock_calls is tracked
756 # but not method calls
757 _check_and_set_parent(self, value, None, name)
758 setattr(type(self), name, value)
759 self._mock_children[name] = value
760 elif name == '__class__':
761 self._spec_class = value
762 return
763 else:
764 if _check_and_set_parent(self, value, name, name):
765 self._mock_children[name] = value
766
767 if self._mock_sealed and not hasattr(self, name):
768 mock_name = f'{self._extract_mock_name()}.{name}'
769 raise AttributeError(f'Cannot set {mock_name}')
770
771 return object.__setattr__(self, name, value)
772
773
774 def __delattr__(self, name):
775 if name in _all_magics and name in type(self).__dict__:
776 delattr(type(self), name)
777 if name not in self.__dict__:
778 # for magic methods that are still MagicProxy objects and
779 # not set on the instance itself
780 return
781
782 obj = self._mock_children.get(name, _missing)
783 if name in self.__dict__:
784 _safe_super(NonCallableMock, self).__delattr__(name)
785 elif obj is _deleted:
786 raise AttributeError(name)
787 if obj is not _missing:
788 del self._mock_children[name]
789 self._mock_children[name] = _deleted
790
791
792 def _format_mock_call_signature(self, args, kwargs):
793 name = self._mock_name or 'mock'
794 return _format_call_signature(name, args, kwargs)
795
796
797 def _format_mock_failure_message(self, args, kwargs, action='call'):
798 message = 'expected %s not found.\nExpected: %s\nActual: %s'
799 expected_string = self._format_mock_call_signature(args, kwargs)
800 call_args = self.call_args
801 actual_string = self._format_mock_call_signature(*call_args)
802 return message % (action, expected_string, actual_string)
803
804
805 def _get_call_signature_from_name(self, name):
806 """
807 * If call objects are asserted against a method/function like obj.meth1
808 then there could be no name for the call object to lookup. Hence just
809 return the spec_signature of the method/function being asserted against.
810 * If the name is not empty then remove () and split by '.' to get
811 list of names to iterate through the children until a potential
812 match is found. A child mock is created only during attribute access
813 so if we get a _SpecState then no attributes of the spec were accessed
814 and can be safely exited.
815 """
816 if not name:
817 return self._spec_signature
818
819 sig = None
820 names = name.replace('()', '').split('.')
821 children = self._mock_children
822
823 for name in names:
824 child = children.get(name)
825 if child is None or isinstance(child, _SpecState):
826 break
827 else:
828 children = child._mock_children
829 sig = child._spec_signature
830
831 return sig
832
833
834 def _call_matcher(self, _call):
835 """
836 Given a call (or simply an (args, kwargs) tuple), return a
837 comparison key suitable for matching with other calls.
838 This is a best effort method which relies on the spec's signature,
839 if available, or falls back on the arguments themselves.
840 """
841
842 if isinstance(_call, tuple) and len(_call) > 2:
843 sig = self._get_call_signature_from_name(_call[0])
844 else:
845 sig = self._spec_signature
846
847 if sig is not None:
848 if len(_call) == 2:
849 name = ''
850 args, kwargs = _call
851 else:
852 name, args, kwargs = _call
853 try:
854 return name, sig.bind(*args, **kwargs)
855 except TypeError as e:
856 return e.with_traceback(None)
857 else:
858 return _call
859
860 def assert_not_called(self):
861 """assert that the mock was never called.
862 """
863 if self.call_count != 0:
864 msg = ("Expected '%s' to not have been called. Called %s times.%s"
865 % (self._mock_name or 'mock',
866 self.call_count,
867 self._calls_repr()))
868 raise AssertionError(msg)
869
870 def assert_called(self):
871 """assert that the mock was called at least once
872 """
873 if self.call_count == 0:
874 msg = ("Expected '%s' to have been called." %
875 (self._mock_name or 'mock'))
876 raise AssertionError(msg)
877
878 def assert_called_once(self):
879 """assert that the mock was called only once.
880 """
881 if not self.call_count == 1:
882 msg = ("Expected '%s' to have been called once. Called %s times.%s"
883 % (self._mock_name or 'mock',
884 self.call_count,
885 self._calls_repr()))
886 raise AssertionError(msg)
887
888 def assert_called_with(self, /, *args, **kwargs):
889 """assert that the last call was made with the specified arguments.
890
891 Raises an AssertionError if the args and keyword args passed in are
892 different to the last call to the mock."""
893 if self.call_args is None:
894 expected = self._format_mock_call_signature(args, kwargs)
895 actual = 'not called.'
896 error_message = ('expected call not found.\nExpected: %s\nActual: %s'
897 % (expected, actual))
898 raise AssertionError(error_message)
899
900 def _error_message():
901 msg = self._format_mock_failure_message(args, kwargs)
902 return msg
903 expected = self._call_matcher((args, kwargs))
904 actual = self._call_matcher(self.call_args)
905 if expected != actual:
906 cause = expected if isinstance(expected, Exception) else None
907 raise AssertionError(_error_message()) from cause
908
909
910 def assert_called_once_with(self, /, *args, **kwargs):
911 """assert that the mock was called exactly once and that that call was
912 with the specified arguments."""
913 if not self.call_count == 1:
914 msg = ("Expected '%s' to be called once. Called %s times.%s"
915 % (self._mock_name or 'mock',
916 self.call_count,
917 self._calls_repr()))
918 raise AssertionError(msg)
919 return self.assert_called_with(*args, **kwargs)
920
921
922 def assert_has_calls(self, calls, any_order=False):
923 """assert the mock has been called with the specified calls.
924 The `mock_calls` list is checked for the calls.
925
926 If `any_order` is False (the default) then the calls must be
927 sequential. There can be extra calls before or after the
928 specified calls.
929
930 If `any_order` is True then the calls can be in any order, but
931 they must all appear in `mock_calls`."""
932 expected = [self._call_matcher(c) for c in calls]
933 cause = next((e for e in expected if isinstance(e, Exception)), None)
934 all_calls = _CallList(self._call_matcher(c) for c in self.mock_calls)
935 if not any_order:
936 if expected not in all_calls:
937 if cause is None:
938 problem = 'Calls not found.'
939 else:
940 problem = ('Error processing expected calls.\n'
941 'Errors: {}').format(
942 [e if isinstance(e, Exception) else None
943 for e in expected])
944 raise AssertionError(
945 f'{problem}\n'
946 f'Expected: {_CallList(calls)}'
947 f'{self._calls_repr(prefix="Actual").rstrip(".")}'
948 ) from cause
949 return
950
951 all_calls = list(all_calls)
952
953 not_found = []
954 for kall in expected:
955 try:
956 all_calls.remove(kall)
957 except ValueError:
958 not_found.append(kall)
959 if not_found:
960 raise AssertionError(
961 '%r does not contain all of %r in its call list, '
962 'found %r instead' % (self._mock_name or 'mock',
963 tuple(not_found), all_calls)
964 ) from cause
965
966
967 def assert_any_call(self, /, *args, **kwargs):
968 """assert the mock has been called with the specified arguments.
969
970 The assert passes if the mock has *ever* been called, unlike
971 `assert_called_with` and `assert_called_once_with` that only pass if
972 the call is the most recent one."""
973 expected = self._call_matcher((args, kwargs))
974 actual = [self._call_matcher(c) for c in self.call_args_list]
975 if expected not in actual:
976 cause = expected if isinstance(expected, Exception) else None
977 expected_string = self._format_mock_call_signature(args, kwargs)
978 raise AssertionError(
979 '%s call not found' % expected_string
980 ) from cause
981
982
983 def _get_child_mock(self, /, **kw):
984 """Create the child mocks for attributes and return value.
985 By default child mocks will be the same type as the parent.
986 Subclasses of Mock may want to override this to customize the way
987 child mocks are made.
988
989 For non-callable mocks the callable variant will be used (rather than
990 any custom subclass)."""
991 _new_name = kw.get("_new_name")
992 if _new_name in self.__dict__['_spec_asyncs']:
993 return AsyncMock(**kw)
994
995 _type = type(self)
996 if issubclass(_type, MagicMock) and _new_name in _async_method_magics:
997 # Any asynchronous magic becomes an AsyncMock
998 klass = AsyncMock
999 elif issubclass(_type, AsyncMockMixin):
1000 if (_new_name in _all_sync_magics or
1001 self._mock_methods and _new_name in self._mock_methods):
1002 # Any synchronous method on AsyncMock becomes a MagicMock
1003 klass = MagicMock
1004 else:
1005 klass = AsyncMock
1006 elif not issubclass(_type, CallableMixin):
1007 if issubclass(_type, NonCallableMagicMock):
1008 klass = MagicMock
1009 elif issubclass(_type, NonCallableMock):
1010 klass = Mock
1011 else:
1012 klass = _type.__mro__[1]
1013
1014 if self._mock_sealed:
1015 attribute = "." + kw["name"] if "name" in kw else "()"
1016 mock_name = self._extract_mock_name() + attribute
1017 raise AttributeError(mock_name)
1018
1019 return klass(**kw)
1020
1021
1022 def _calls_repr(self, prefix="Calls"):
1023 """Renders self.mock_calls as a string.
1024
1025 Example: "\nCalls: [call(1), call(2)]."
1026
1027 If self.mock_calls is empty, an empty string is returned. The
1028 output will be truncated if very long.
1029 """
1030 if not self.mock_calls:
1031 return ""
1032 return f"\n{prefix}: {safe_repr(self.mock_calls)}."
1033
1034
1035
1036 def _try_iter(obj):
1037 if obj is None:
1038 return obj
1039 if _is_exception(obj):
1040 return obj
1041 if _callable(obj):
1042 return obj
1043 try:
1044 return iter(obj)
1045 except TypeError:
1046 # XXXX backwards compatibility
1047 # but this will blow up on first call - so maybe we should fail early?
1048 return obj
1049
1050
1051 class CallableMixin(Base):
1052
1053 def __init__(self, spec=None, side_effect=None, return_value=DEFAULT,
1054 wraps=None, name=None, spec_set=None, parent=None,
1055 _spec_state=None, _new_name='', _new_parent=None, **kwargs):
1056 self.__dict__['_mock_return_value'] = return_value
1057 _safe_super(CallableMixin, self).__init__(
1058 spec, wraps, name, spec_set, parent,
1059 _spec_state, _new_name, _new_parent, **kwargs
1060 )
1061
1062 self.side_effect = side_effect
1063
1064
1065 def _mock_check_sig(self, /, *args, **kwargs):
1066 # stub method that can be replaced with one with a specific signature
1067 pass
1068
1069
1070 def __call__(self, /, *args, **kwargs):
1071 # can't use self in-case a function / method we are mocking uses self
1072 # in the signature
1073 self._mock_check_sig(*args, **kwargs)
1074 self._increment_mock_call(*args, **kwargs)
1075 return self._mock_call(*args, **kwargs)
1076
1077
1078 def _mock_call(self, /, *args, **kwargs):
1079 return self._execute_mock_call(*args, **kwargs)
1080
1081 def _increment_mock_call(self, /, *args, **kwargs):
1082 self.called = True
1083 self.call_count += 1
1084
1085 # handle call_args
1086 # needs to be set here so assertions on call arguments pass before
1087 # execution in the case of awaited calls
1088 _call = _Call((args, kwargs), two=True)
1089 self.call_args = _call
1090 self.call_args_list.append(_call)
1091
1092 # initial stuff for method_calls:
1093 do_method_calls = self._mock_parent is not None
1094 method_call_name = self._mock_name
1095
1096 # initial stuff for mock_calls:
1097 mock_call_name = self._mock_new_name
1098 is_a_call = mock_call_name == '()'
1099 self.mock_calls.append(_Call(('', args, kwargs)))
1100
1101 # follow up the chain of mocks:
1102 _new_parent = self._mock_new_parent
1103 while _new_parent is not None:
1104
1105 # handle method_calls:
1106 if do_method_calls:
1107 _new_parent.method_calls.append(_Call((method_call_name, args, kwargs)))
1108 do_method_calls = _new_parent._mock_parent is not None
1109 if do_method_calls:
1110 method_call_name = _new_parent._mock_name + '.' + method_call_name
1111
1112 # handle mock_calls:
1113 this_mock_call = _Call((mock_call_name, args, kwargs))
1114 _new_parent.mock_calls.append(this_mock_call)
1115
1116 if _new_parent._mock_new_name:
1117 if is_a_call:
1118 dot = ''
1119 else:
1120 dot = '.'
1121 is_a_call = _new_parent._mock_new_name == '()'
1122 mock_call_name = _new_parent._mock_new_name + dot + mock_call_name
1123
1124 # follow the parental chain:
1125 _new_parent = _new_parent._mock_new_parent
1126
1127 def _execute_mock_call(self, /, *args, **kwargs):
1128 # separate from _increment_mock_call so that awaited functions are
1129 # executed separately from their call, also AsyncMock overrides this method
1130
1131 effect = self.side_effect
1132 if effect is not None:
1133 if _is_exception(effect):
1134 raise effect
1135 elif not _callable(effect):
1136 result = next(effect)
1137 if _is_exception(result):
1138 raise result
1139 else:
1140 result = effect(*args, **kwargs)
1141
1142 if result is not DEFAULT:
1143 return result
1144
1145 if self._mock_return_value is not DEFAULT:
1146 return self.return_value
1147
1148 if self._mock_wraps is not None:
1149 return self._mock_wraps(*args, **kwargs)
1150
1151 return self.return_value
1152
1153
1154
1155 class Mock(CallableMixin, NonCallableMock):
1156 """
1157 Create a new `Mock` object. `Mock` takes several optional arguments
1158 that specify the behaviour of the Mock object:
1159
1160 * `spec`: This can be either a list of strings or an existing object (a
1161 class or instance) that acts as the specification for the mock object. If
1162 you pass in an object then a list of strings is formed by calling dir on
1163 the object (excluding unsupported magic attributes and methods). Accessing
1164 any attribute not in this list will raise an `AttributeError`.
1165
1166 If `spec` is an object (rather than a list of strings) then
1167 `mock.__class__` returns the class of the spec object. This allows mocks
1168 to pass `isinstance` tests.
1169
1170 * `spec_set`: A stricter variant of `spec`. If used, attempting to *set*
1171 or get an attribute on the mock that isn't on the object passed as
1172 `spec_set` will raise an `AttributeError`.
1173
1174 * `side_effect`: A function to be called whenever the Mock is called. See
1175 the `side_effect` attribute. Useful for raising exceptions or
1176 dynamically changing return values. The function is called with the same
1177 arguments as the mock, and unless it returns `DEFAULT`, the return
1178 value of this function is used as the return value.
1179
1180 If `side_effect` is an iterable then each call to the mock will return
1181 the next value from the iterable. If any of the members of the iterable
1182 are exceptions they will be raised instead of returned.
1183
1184 * `return_value`: The value returned when the mock is called. By default
1185 this is a new Mock (created on first access). See the
1186 `return_value` attribute.
1187
1188 * `wraps`: Item for the mock object to wrap. If `wraps` is not None then
1189 calling the Mock will pass the call through to the wrapped object
1190 (returning the real result). Attribute access on the mock will return a
1191 Mock object that wraps the corresponding attribute of the wrapped object
1192 (so attempting to access an attribute that doesn't exist will raise an
1193 `AttributeError`).
1194
1195 If the mock has an explicit `return_value` set then calls are not passed
1196 to the wrapped object and the `return_value` is returned instead.
1197
1198 * `name`: If the mock has a name then it will be used in the repr of the
1199 mock. This can be useful for debugging. The name is propagated to child
1200 mocks.
1201
1202 Mocks can also be called with arbitrary keyword arguments. These will be
1203 used to set attributes on the mock after it is created.
1204 """
1205
1206
1207 def _dot_lookup(thing, comp, import_path):
1208 try:
1209 return getattr(thing, comp)
1210 except AttributeError:
1211 __import__(import_path)
1212 return getattr(thing, comp)
1213
1214
1215 def _importer(target):
1216 components = target.split('.')
1217 import_path = components.pop(0)
1218 thing = __import__(import_path)
1219
1220 for comp in components:
1221 import_path += ".%s" % comp
1222 thing = _dot_lookup(thing, comp, import_path)
1223 return thing
1224
1225
1226 def _is_started(patcher):
1227 # XXXX horrible
1228 return hasattr(patcher, 'is_local')
1229
1230
1231 class _patch(object):
1232
1233 attribute_name = None
1234 _active_patches = []
1235
1236 def __init__(
1237 self, getter, attribute, new, spec, create,
1238 spec_set, autospec, new_callable, kwargs
1239 ):
1240 if new_callable is not None:
1241 if new is not DEFAULT:
1242 raise ValueError(
1243 "Cannot use 'new' and 'new_callable' together"
1244 )
1245 if autospec is not None:
1246 raise ValueError(
1247 "Cannot use 'autospec' and 'new_callable' together"
1248 )
1249
1250 self.getter = getter
1251 self.attribute = attribute
1252 self.new = new
1253 self.new_callable = new_callable
1254 self.spec = spec
1255 self.create = create
1256 self.has_local = False
1257 self.spec_set = spec_set
1258 self.autospec = autospec
1259 self.kwargs = kwargs
1260 self.additional_patchers = []
1261
1262
1263 def copy(self):
1264 patcher = _patch(
1265 self.getter, self.attribute, self.new, self.spec,
1266 self.create, self.spec_set,
1267 self.autospec, self.new_callable, self.kwargs
1268 )
1269 patcher.attribute_name = self.attribute_name
1270 patcher.additional_patchers = [
1271 p.copy() for p in self.additional_patchers
1272 ]
1273 return patcher
1274
1275
1276 def __call__(self, func):
1277 if isinstance(func, type):
1278 return self.decorate_class(func)
1279 if inspect.iscoroutinefunction(func):
1280 return self.decorate_async_callable(func)
1281 return self.decorate_callable(func)
1282
1283
1284 def decorate_class(self, klass):
1285 for attr in dir(klass):
1286 if not attr.startswith(patch.TEST_PREFIX):
1287 continue
1288
1289 attr_value = getattr(klass, attr)
1290 if not hasattr(attr_value, "__call__"):
1291 continue
1292
1293 patcher = self.copy()
1294 setattr(klass, attr, patcher(attr_value))
1295 return klass
1296
1297
1298 @contextlib.contextmanager
1299 def decoration_helper(self, patched, args, keywargs):
1300 extra_args = []
1301 entered_patchers = []
1302 patching = None
1303
1304 exc_info = tuple()
1305 try:
1306 for patching in patched.patchings:
1307 arg = patching.__enter__()
1308 entered_patchers.append(patching)
1309 if patching.attribute_name is not None:
1310 keywargs.update(arg)
1311 elif patching.new is DEFAULT:
1312 extra_args.append(arg)
1313
1314 args += tuple(extra_args)
1315 yield (args, keywargs)
1316 except:
1317 if (patching not in entered_patchers and
1318 _is_started(patching)):
1319 # the patcher may have been started, but an exception
1320 # raised whilst entering one of its additional_patchers
1321 entered_patchers.append(patching)
1322 # Pass the exception to __exit__
1323 exc_info = sys.exc_info()
1324 # re-raise the exception
1325 raise
1326 finally:
1327 for patching in reversed(entered_patchers):
1328 patching.__exit__(*exc_info)
1329
1330
1331 def decorate_callable(self, func):
1332 # NB. Keep the method in sync with decorate_async_callable()
1333 if hasattr(func, 'patchings'):
1334 func.patchings.append(self)
1335 return func
1336
1337 @wraps(func)
1338 def patched(*args, **keywargs):
1339 with self.decoration_helper(patched,
1340 args,
1341 keywargs) as (newargs, newkeywargs):
1342 return func(*newargs, **newkeywargs)
1343
1344 patched.patchings = [self]
1345 return patched
1346
1347
1348 def decorate_async_callable(self, func):
1349 # NB. Keep the method in sync with decorate_callable()
1350 if hasattr(func, 'patchings'):
1351 func.patchings.append(self)
1352 return func
1353
1354 @wraps(func)
1355 async def patched(*args, **keywargs):
1356 with self.decoration_helper(patched,
1357 args,
1358 keywargs) as (newargs, newkeywargs):
1359 return await func(*newargs, **newkeywargs)
1360
1361 patched.patchings = [self]
1362 return patched
1363
1364
1365 def get_original(self):
1366 target = self.getter()
1367 name = self.attribute
1368
1369 original = DEFAULT
1370 local = False
1371
1372 try:
1373 original = target.__dict__[name]
1374 except (AttributeError, KeyError):
1375 original = getattr(target, name, DEFAULT)
1376 else:
1377 local = True
1378
1379 if name in _builtins and isinstance(target, ModuleType):
1380 self.create = True
1381
1382 if not self.create and original is DEFAULT:
1383 raise AttributeError(
1384 "%s does not have the attribute %r" % (target, name)
1385 )
1386 return original, local
1387
1388
1389 def __enter__(self):
1390 """Perform the patch."""
1391 new, spec, spec_set = self.new, self.spec, self.spec_set
1392 autospec, kwargs = self.autospec, self.kwargs
1393 new_callable = self.new_callable
1394 self.target = self.getter()
1395
1396 # normalise False to None
1397 if spec is False:
1398 spec = None
1399 if spec_set is False:
1400 spec_set = None
1401 if autospec is False:
1402 autospec = None
1403
1404 if spec is not None and autospec is not None:
1405 raise TypeError("Can't specify spec and autospec")
1406 if ((spec is not None or autospec is not None) and
1407 spec_set not in (True, None)):
1408 raise TypeError("Can't provide explicit spec_set *and* spec or autospec")
1409
1410 original, local = self.get_original()
1411
1412 if new is DEFAULT and autospec is None:
1413 inherit = False
1414 if spec is True:
1415 # set spec to the object we are replacing
1416 spec = original
1417 if spec_set is True:
1418 spec_set = original
1419 spec = None
1420 elif spec is not None:
1421 if spec_set is True:
1422 spec_set = spec
1423 spec = None
1424 elif spec_set is True:
1425 spec_set = original
1426
1427 if spec is not None or spec_set is not None:
1428 if original is DEFAULT:
1429 raise TypeError("Can't use 'spec' with create=True")
1430 if isinstance(original, type):
1431 # If we're patching out a class and there is a spec
1432 inherit = True
1433 if spec is None and _is_async_obj(original):
1434 Klass = AsyncMock
1435 else:
1436 Klass = MagicMock
1437 _kwargs = {}
1438 if new_callable is not None:
1439 Klass = new_callable
1440 elif spec is not None or spec_set is not None:
1441 this_spec = spec
1442 if spec_set is not None:
1443 this_spec = spec_set
1444 if _is_list(this_spec):
1445 not_callable = '__call__' not in this_spec
1446 else:
1447 not_callable = not callable(this_spec)
1448 if _is_async_obj(this_spec):
1449 Klass = AsyncMock
1450 elif not_callable:
1451 Klass = NonCallableMagicMock
1452
1453 if spec is not None:
1454 _kwargs['spec'] = spec
1455 if spec_set is not None:
1456 _kwargs['spec_set'] = spec_set
1457
1458 # add a name to mocks
1459 if (isinstance(Klass, type) and
1460 issubclass(Klass, NonCallableMock) and self.attribute):
1461 _kwargs['name'] = self.attribute
1462
1463 _kwargs.update(kwargs)
1464 new = Klass(**_kwargs)
1465
1466 if inherit and _is_instance_mock(new):
1467 # we can only tell if the instance should be callable if the
1468 # spec is not a list
1469 this_spec = spec
1470 if spec_set is not None:
1471 this_spec = spec_set
1472 if (not _is_list(this_spec) and not
1473 _instance_callable(this_spec)):
1474 Klass = NonCallableMagicMock
1475
1476 _kwargs.pop('name')
1477 new.return_value = Klass(_new_parent=new, _new_name='()',
1478 **_kwargs)
1479 elif autospec is not None:
1480 # spec is ignored, new *must* be default, spec_set is treated
1481 # as a boolean. Should we check spec is not None and that spec_set
1482 # is a bool?
1483 if new is not DEFAULT:
1484 raise TypeError(
1485 "autospec creates the mock for you. Can't specify "
1486 "autospec and new."
1487 )
1488 if original is DEFAULT:
1489 raise TypeError("Can't use 'autospec' with create=True")
1490 spec_set = bool(spec_set)
1491 if autospec is True:
1492 autospec = original
1493
1494 new = create_autospec(autospec, spec_set=spec_set,
1495 _name=self.attribute, **kwargs)
1496 elif kwargs:
1497 # can't set keyword args when we aren't creating the mock
1498 # XXXX If new is a Mock we could call new.configure_mock(**kwargs)
1499 raise TypeError("Can't pass kwargs to a mock we aren't creating")
1500
1501 new_attr = new
1502
1503 self.temp_original = original
1504 self.is_local = local
1505 setattr(self.target, self.attribute, new_attr)
1506 if self.attribute_name is not None:
1507 extra_args = {}
1508 if self.new is DEFAULT:
1509 extra_args[self.attribute_name] = new
1510 for patching in self.additional_patchers:
1511 arg = patching.__enter__()
1512 if patching.new is DEFAULT:
1513 extra_args.update(arg)
1514 return extra_args
1515
1516 return new
1517
1518
1519 def __exit__(self, *exc_info):
1520 """Undo the patch."""
1521 if not _is_started(self):
1522 return
1523
1524 if self.is_local and self.temp_original is not DEFAULT:
1525 setattr(self.target, self.attribute, self.temp_original)
1526 else:
1527 delattr(self.target, self.attribute)
1528 if not self.create and (not hasattr(self.target, self.attribute) or
1529 self.attribute in ('__doc__', '__module__',
1530 '__defaults__', '__annotations__',
1531 '__kwdefaults__')):
1532 # needed for proxy objects like django settings
1533 setattr(self.target, self.attribute, self.temp_original)
1534
1535 del self.temp_original
1536 del self.is_local
1537 del self.target
1538 for patcher in reversed(self.additional_patchers):
1539 if _is_started(patcher):
1540 patcher.__exit__(*exc_info)
1541
1542
1543 def start(self):
1544 """Activate a patch, returning any created mock."""
1545 result = self.__enter__()
1546 self._active_patches.append(self)
1547 return result
1548
1549
1550 def stop(self):
1551 """Stop an active patch."""
1552 try:
1553 self._active_patches.remove(self)
1554 except ValueError:
1555 # If the patch hasn't been started this will fail
1556 pass
1557
1558 return self.__exit__()
1559
1560
1561
1562 def _get_target(target):
1563 try:
1564 target, attribute = target.rsplit('.', 1)
1565 except (TypeError, ValueError):
1566 raise TypeError("Need a valid target to patch. You supplied: %r" %
1567 (target,))
1568 getter = lambda: _importer(target)
1569 return getter, attribute
1570
1571
1572 def _patch_object(
1573 target, attribute, new=DEFAULT, spec=None,
1574 create=False, spec_set=None, autospec=None,
1575 new_callable=None, **kwargs
1576 ):
1577 """
1578 patch the named member (`attribute`) on an object (`target`) with a mock
1579 object.
1580
1581 `patch.object` can be used as a decorator, class decorator or a context
1582 manager. Arguments `new`, `spec`, `create`, `spec_set`,
1583 `autospec` and `new_callable` have the same meaning as for `patch`. Like
1584 `patch`, `patch.object` takes arbitrary keyword arguments for configuring
1585 the mock object it creates.
1586
1587 When used as a class decorator `patch.object` honours `patch.TEST_PREFIX`
1588 for choosing which methods to wrap.
1589 """
1590 if type(target) is str:
1591 raise TypeError(
1592 f"{target!r} must be the actual object to be patched, not a str"
1593 )
1594 getter = lambda: target
1595 return _patch(
1596 getter, attribute, new, spec, create,
1597 spec_set, autospec, new_callable, kwargs
1598 )
1599
1600
1601 def _patch_multiple(target, spec=None, create=False, spec_set=None,
1602 autospec=None, new_callable=None, **kwargs):
1603 """Perform multiple patches in a single call. It takes the object to be
1604 patched (either as an object or a string to fetch the object by importing)
1605 and keyword arguments for the patches::
1606
1607 with patch.multiple(settings, FIRST_PATCH='one', SECOND_PATCH='two'):
1608 ...
1609
1610 Use `DEFAULT` as the value if you want `patch.multiple` to create
1611 mocks for you. In this case the created mocks are passed into a decorated
1612 function by keyword, and a dictionary is returned when `patch.multiple` is
1613 used as a context manager.
1614
1615 `patch.multiple` can be used as a decorator, class decorator or a context
1616 manager. The arguments `spec`, `spec_set`, `create`,
1617 `autospec` and `new_callable` have the same meaning as for `patch`. These
1618 arguments will be applied to *all* patches done by `patch.multiple`.
1619
1620 When used as a class decorator `patch.multiple` honours `patch.TEST_PREFIX`
1621 for choosing which methods to wrap.
1622 """
1623 if type(target) is str:
1624 getter = lambda: _importer(target)
1625 else:
1626 getter = lambda: target
1627
1628 if not kwargs:
1629 raise ValueError(
1630 'Must supply at least one keyword argument with patch.multiple'
1631 )
1632 # need to wrap in a list for python 3, where items is a view
1633 items = list(kwargs.items())
1634 attribute, new = items[0]
1635 patcher = _patch(
1636 getter, attribute, new, spec, create, spec_set,
1637 autospec, new_callable, {}
1638 )
1639 patcher.attribute_name = attribute
1640 for attribute, new in items[1:]:
1641 this_patcher = _patch(
1642 getter, attribute, new, spec, create, spec_set,
1643 autospec, new_callable, {}
1644 )
1645 this_patcher.attribute_name = attribute
1646 patcher.additional_patchers.append(this_patcher)
1647 return patcher
1648
1649
1650 def patch(
1651 target, new=DEFAULT, spec=None, create=False,
1652 spec_set=None, autospec=None, new_callable=None, **kwargs
1653 ):
1654 """
1655 `patch` acts as a function decorator, class decorator or a context
1656 manager. Inside the body of the function or with statement, the `target`
1657 is patched with a `new` object. When the function/with statement exits
1658 the patch is undone.
1659
1660 If `new` is omitted, then the target is replaced with an
1661 `AsyncMock if the patched object is an async function or a
1662 `MagicMock` otherwise. If `patch` is used as a decorator and `new` is
1663 omitted, the created mock is passed in as an extra argument to the
1664 decorated function. If `patch` is used as a context manager the created
1665 mock is returned by the context manager.
1666
1667 `target` should be a string in the form `'package.module.ClassName'`. The
1668 `target` is imported and the specified object replaced with the `new`
1669 object, so the `target` must be importable from the environment you are
1670 calling `patch` from. The target is imported when the decorated function
1671 is executed, not at decoration time.
1672
1673 The `spec` and `spec_set` keyword arguments are passed to the `MagicMock`
1674 if patch is creating one for you.
1675
1676 In addition you can pass `spec=True` or `spec_set=True`, which causes
1677 patch to pass in the object being mocked as the spec/spec_set object.
1678
1679 `new_callable` allows you to specify a different class, or callable object,
1680 that will be called to create the `new` object. By default `AsyncMock` is
1681 used for async functions and `MagicMock` for the rest.
1682
1683 A more powerful form of `spec` is `autospec`. If you set `autospec=True`
1684 then the mock will be created with a spec from the object being replaced.
1685 All attributes of the mock will also have the spec of the corresponding
1686 attribute of the object being replaced. Methods and functions being
1687 mocked will have their arguments checked and will raise a `TypeError` if
1688 they are called with the wrong signature. For mocks replacing a class,
1689 their return value (the 'instance') will have the same spec as the class.
1690
1691 Instead of `autospec=True` you can pass `autospec=some_object` to use an
1692 arbitrary object as the spec instead of the one being replaced.
1693
1694 By default `patch` will fail to replace attributes that don't exist. If
1695 you pass in `create=True`, and the attribute doesn't exist, patch will
1696 create the attribute for you when the patched function is called, and
1697 delete it again afterwards. This is useful for writing tests against
1698 attributes that your production code creates at runtime. It is off by
1699 default because it can be dangerous. With it switched on you can write
1700 passing tests against APIs that don't actually exist!
1701
1702 Patch can be used as a `TestCase` class decorator. It works by
1703 decorating each test method in the class. This reduces the boilerplate
1704 code when your test methods share a common patchings set. `patch` finds
1705 tests by looking for method names that start with `patch.TEST_PREFIX`.
1706 By default this is `test`, which matches the way `unittest` finds tests.
1707 You can specify an alternative prefix by setting `patch.TEST_PREFIX`.
1708
1709 Patch can be used as a context manager, with the with statement. Here the
1710 patching applies to the indented block after the with statement. If you
1711 use "as" then the patched object will be bound to the name after the
1712 "as"; very useful if `patch` is creating a mock object for you.
1713
1714 `patch` takes arbitrary keyword arguments. These will be passed to
1715 the `Mock` (or `new_callable`) on construction.
1716
1717 `patch.dict(...)`, `patch.multiple(...)` and `patch.object(...)` are
1718 available for alternate use-cases.
1719 """
1720 getter, attribute = _get_target(target)
1721 return _patch(
1722 getter, attribute, new, spec, create,
1723 spec_set, autospec, new_callable, kwargs
1724 )
1725
1726
1727 class _patch_dict(object):
1728 """
1729 Patch a dictionary, or dictionary like object, and restore the dictionary
1730 to its original state after the test.
1731
1732 `in_dict` can be a dictionary or a mapping like container. If it is a
1733 mapping then it must at least support getting, setting and deleting items
1734 plus iterating over keys.
1735
1736 `in_dict` can also be a string specifying the name of the dictionary, which
1737 will then be fetched by importing it.
1738
1739 `values` can be a dictionary of values to set in the dictionary. `values`
1740 can also be an iterable of `(key, value)` pairs.
1741
1742 If `clear` is True then the dictionary will be cleared before the new
1743 values are set.
1744
1745 `patch.dict` can also be called with arbitrary keyword arguments to set
1746 values in the dictionary::
1747
1748 with patch.dict('sys.modules', mymodule=Mock(), other_module=Mock()):
1749 ...
1750
1751 `patch.dict` can be used as a context manager, decorator or class
1752 decorator. When used as a class decorator `patch.dict` honours
1753 `patch.TEST_PREFIX` for choosing which methods to wrap.
1754 """
1755
1756 def __init__(self, in_dict, values=(), clear=False, **kwargs):
1757 self.in_dict = in_dict
1758 # support any argument supported by dict(...) constructor
1759 self.values = dict(values)
1760 self.values.update(kwargs)
1761 self.clear = clear
1762 self._original = None
1763
1764
1765 def __call__(self, f):
1766 if isinstance(f, type):
1767 return self.decorate_class(f)
1768 @wraps(f)
1769 def _inner(*args, **kw):
1770 self._patch_dict()
1771 try:
1772 return f(*args, **kw)
1773 finally:
1774 self._unpatch_dict()
1775
1776 return _inner
1777
1778
1779 def decorate_class(self, klass):
1780 for attr in dir(klass):
1781 attr_value = getattr(klass, attr)
1782 if (attr.startswith(patch.TEST_PREFIX) and
1783 hasattr(attr_value, "__call__")):
1784 decorator = _patch_dict(self.in_dict, self.values, self.clear)
1785 decorated = decorator(attr_value)
1786 setattr(klass, attr, decorated)
1787 return klass
1788
1789
1790 def __enter__(self):
1791 """Patch the dict."""
1792 self._patch_dict()
1793 return self.in_dict
1794
1795
1796 def _patch_dict(self):
1797 values = self.values
1798 if isinstance(self.in_dict, str):
1799 self.in_dict = _importer(self.in_dict)
1800 in_dict = self.in_dict
1801 clear = self.clear
1802
1803 try:
1804 original = in_dict.copy()
1805 except AttributeError:
1806 # dict like object with no copy method
1807 # must support iteration over keys
1808 original = {}
1809 for key in in_dict:
1810 original[key] = in_dict[key]
1811 self._original = original
1812
1813 if clear:
1814 _clear_dict(in_dict)
1815
1816 try:
1817 in_dict.update(values)
1818 except AttributeError:
1819 # dict like object with no update method
1820 for key in values:
1821 in_dict[key] = values[key]
1822
1823
1824 def _unpatch_dict(self):
1825 in_dict = self.in_dict
1826 original = self._original
1827
1828 _clear_dict(in_dict)
1829
1830 try:
1831 in_dict.update(original)
1832 except AttributeError:
1833 for key in original:
1834 in_dict[key] = original[key]
1835
1836
1837 def __exit__(self, *args):
1838 """Unpatch the dict."""
1839 self._unpatch_dict()
1840 return False
1841
1842 start = __enter__
1843 stop = __exit__
1844
1845
1846 def _clear_dict(in_dict):
1847 try:
1848 in_dict.clear()
1849 except AttributeError:
1850 keys = list(in_dict)
1851 for key in keys:
1852 del in_dict[key]
1853
1854
1855 def _patch_stopall():
1856 """Stop all active patches. LIFO to unroll nested patches."""
1857 for patch in reversed(_patch._active_patches):
1858 patch.stop()
1859
1860
1861 patch.object = _patch_object
1862 patch.dict = _patch_dict
1863 patch.multiple = _patch_multiple
1864 patch.stopall = _patch_stopall
1865 patch.TEST_PREFIX = 'test'
1866
1867 magic_methods = (
1868 "lt le gt ge eq ne "
1869 "getitem setitem delitem "
1870 "len contains iter "
1871 "hash str sizeof "
1872 "enter exit "
1873 # we added divmod and rdivmod here instead of numerics
1874 # because there is no idivmod
1875 "divmod rdivmod neg pos abs invert "
1876 "complex int float index "
1877 "round trunc floor ceil "
1878 "bool next "
1879 "fspath "
1880 "aiter "
1881 )
1882
1883 numerics = (
1884 "add sub mul matmul div floordiv mod lshift rshift and xor or pow truediv"
1885 )
1886 inplace = ' '.join('i%s' % n for n in numerics.split())
1887 right = ' '.join('r%s' % n for n in numerics.split())
1888
1889 # not including __prepare__, __instancecheck__, __subclasscheck__
1890 # (as they are metaclass methods)
1891 # __del__ is not supported at all as it causes problems if it exists
1892
1893 _non_defaults = {
1894 '__get__', '__set__', '__delete__', '__reversed__', '__missing__',
1895 '__reduce__', '__reduce_ex__', '__getinitargs__', '__getnewargs__',
1896 '__getstate__', '__setstate__', '__getformat__', '__setformat__',
1897 '__repr__', '__dir__', '__subclasses__', '__format__',
1898 '__getnewargs_ex__',
1899 }
1900
1901
1902 def _get_method(name, func):
1903 "Turns a callable object (like a mock) into a real function"
1904 def method(self, /, *args, **kw):
1905 return func(self, *args, **kw)
1906 method.__name__ = name
1907 return method
1908
1909
1910 _magics = {
1911 '__%s__' % method for method in
1912 ' '.join([magic_methods, numerics, inplace, right]).split()
1913 }
1914
1915 # Magic methods used for async `with` statements
1916 _async_method_magics = {"__aenter__", "__aexit__", "__anext__"}
1917 # Magic methods that are only used with async calls but are synchronous functions themselves
1918 _sync_async_magics = {"__aiter__"}
1919 _async_magics = _async_method_magics | _sync_async_magics
1920
1921 _all_sync_magics = _magics | _non_defaults
1922 _all_magics = _all_sync_magics | _async_magics
1923
1924 _unsupported_magics = {
1925 '__getattr__', '__setattr__',
1926 '__init__', '__new__', '__prepare__',
1927 '__instancecheck__', '__subclasscheck__',
1928 '__del__'
1929 }
1930
1931 _calculate_return_value = {
1932 '__hash__': lambda self: object.__hash__(self),
1933 '__str__': lambda self: object.__str__(self),
1934 '__sizeof__': lambda self: object.__sizeof__(self),
1935 '__fspath__': lambda self: f"{type(self).__name__}/{self._extract_mock_name()}/{id(self)}",
1936 }
1937
1938 _return_values = {
1939 '__lt__': NotImplemented,
1940 '__gt__': NotImplemented,
1941 '__le__': NotImplemented,
1942 '__ge__': NotImplemented,
1943 '__int__': 1,
1944 '__contains__': False,
1945 '__len__': 0,
1946 '__exit__': False,
1947 '__complex__': 1j,
1948 '__float__': 1.0,
1949 '__bool__': True,
1950 '__index__': 1,
1951 '__aexit__': False,
1952 }
1953
1954
1955 def _get_eq(self):
1956 def __eq__(other):
1957 ret_val = self.__eq__._mock_return_value
1958 if ret_val is not DEFAULT:
1959 return ret_val
1960 if self is other:
1961 return True
1962 return NotImplemented
1963 return __eq__
1964
1965 def _get_ne(self):
1966 def __ne__(other):
1967 if self.__ne__._mock_return_value is not DEFAULT:
1968 return DEFAULT
1969 if self is other:
1970 return False
1971 return NotImplemented
1972 return __ne__
1973
1974 def _get_iter(self):
1975 def __iter__():
1976 ret_val = self.__iter__._mock_return_value
1977 if ret_val is DEFAULT:
1978 return iter([])
1979 # if ret_val was already an iterator, then calling iter on it should
1980 # return the iterator unchanged
1981 return iter(ret_val)
1982 return __iter__
1983
1984 def _get_async_iter(self):
1985 def __aiter__():
1986 ret_val = self.__aiter__._mock_return_value
1987 if ret_val is DEFAULT:
1988 return _AsyncIterator(iter([]))
1989 return _AsyncIterator(iter(ret_val))
1990 return __aiter__
1991
1992 _side_effect_methods = {
1993 '__eq__': _get_eq,
1994 '__ne__': _get_ne,
1995 '__iter__': _get_iter,
1996 '__aiter__': _get_async_iter
1997 }
1998
1999
2000
2001 def _set_return_value(mock, method, name):
2002 fixed = _return_values.get(name, DEFAULT)
2003 if fixed is not DEFAULT:
2004 method.return_value = fixed
2005 return
2006
2007 return_calculator = _calculate_return_value.get(name)
2008 if return_calculator is not None:
2009 return_value = return_calculator(mock)
2010 method.return_value = return_value
2011 return
2012
2013 side_effector = _side_effect_methods.get(name)
2014 if side_effector is not None:
2015 method.side_effect = side_effector(mock)
2016
2017
2018
2019 class MagicMixin(Base):
2020 def __init__(self, /, *args, **kw):
2021 self._mock_set_magics() # make magic work for kwargs in init
2022 _safe_super(MagicMixin, self).__init__(*args, **kw)
2023 self._mock_set_magics() # fix magic broken by upper level init
2024
2025
2026 def _mock_set_magics(self):
2027 orig_magics = _magics | _async_method_magics
2028 these_magics = orig_magics
2029
2030 if getattr(self, "_mock_methods", None) is not None:
2031 these_magics = orig_magics.intersection(self._mock_methods)
2032
2033 remove_magics = set()
2034 remove_magics = orig_magics - these_magics
2035
2036 for entry in remove_magics:
2037 if entry in type(self).__dict__:
2038 # remove unneeded magic methods
2039 delattr(self, entry)
2040
2041 # don't overwrite existing attributes if called a second time
2042 these_magics = these_magics - set(type(self).__dict__)
2043
2044 _type = type(self)
2045 for entry in these_magics:
2046 setattr(_type, entry, MagicProxy(entry, self))
2047
2048
2049
2050 class NonCallableMagicMock(MagicMixin, NonCallableMock):
2051 """A version of `MagicMock` that isn't callable."""
2052 def mock_add_spec(self, spec, spec_set=False):
2053 """Add a spec to a mock. `spec` can either be an object or a
2054 list of strings. Only attributes on the `spec` can be fetched as
2055 attributes from the mock.
2056
2057 If `spec_set` is True then only attributes on the spec can be set."""
2058 self._mock_add_spec(spec, spec_set)
2059 self._mock_set_magics()
2060
2061
2062 class AsyncMagicMixin(MagicMixin):
2063 def __init__(self, /, *args, **kw):
2064 self._mock_set_magics() # make magic work for kwargs in init
2065 _safe_super(AsyncMagicMixin, self).__init__(*args, **kw)
2066 self._mock_set_magics() # fix magic broken by upper level init
2067
2068 class MagicMock(MagicMixin, Mock):
2069 """
2070 MagicMock is a subclass of Mock with default implementations
2071 of most of the magic methods. You can use MagicMock without having to
2072 configure the magic methods yourself.
2073
2074 If you use the `spec` or `spec_set` arguments then *only* magic
2075 methods that exist in the spec will be created.
2076
2077 Attributes and the return value of a `MagicMock` will also be `MagicMocks`.
2078 """
2079 def mock_add_spec(self, spec, spec_set=False):
2080 """Add a spec to a mock. `spec` can either be an object or a
2081 list of strings. Only attributes on the `spec` can be fetched as
2082 attributes from the mock.
2083
2084 If `spec_set` is True then only attributes on the spec can be set."""
2085 self._mock_add_spec(spec, spec_set)
2086 self._mock_set_magics()
2087
2088
2089
2090 class MagicProxy(Base):
2091 def __init__(self, name, parent):
2092 self.name = name
2093 self.parent = parent
2094
2095 def create_mock(self):
2096 entry = self.name
2097 parent = self.parent
2098 m = parent._get_child_mock(name=entry, _new_name=entry,
2099 _new_parent=parent)
2100 setattr(parent, entry, m)
2101 _set_return_value(parent, m, entry)
2102 return m
2103
2104 def __get__(self, obj, _type=None):
2105 return self.create_mock()
2106
2107
2108 class AsyncMockMixin(Base):
2109 await_count = _delegating_property('await_count')
2110 await_args = _delegating_property('await_args')
2111 await_args_list = _delegating_property('await_args_list')
2112
2113 def __init__(self, /, *args, **kwargs):
2114 super().__init__(*args, **kwargs)
2115 # asyncio.iscoroutinefunction() checks _is_coroutine property to say if an
2116 # object is a coroutine. Without this check it looks to see if it is a
2117 # function/method, which in this case it is not (since it is an
2118 # AsyncMock).
2119 # It is set through __dict__ because when spec_set is True, this
2120 # attribute is likely undefined.
2121 self.__dict__['_is_coroutine'] = asyncio.coroutines._is_coroutine
2122 self.__dict__['_mock_await_count'] = 0
2123 self.__dict__['_mock_await_args'] = None
2124 self.__dict__['_mock_await_args_list'] = _CallList()
2125 code_mock = NonCallableMock(spec_set=CodeType)
2126 code_mock.co_flags = inspect.CO_COROUTINE
2127 self.__dict__['__code__'] = code_mock
2128
2129 async def _execute_mock_call(self, /, *args, **kwargs):
2130 # This is nearly just like super(), except for sepcial handling
2131 # of coroutines
2132
2133 _call = self.call_args
2134 self.await_count += 1
2135 self.await_args = _call
2136 self.await_args_list.append(_call)
2137
2138 effect = self.side_effect
2139 if effect is not None:
2140 if _is_exception(effect):
2141 raise effect
2142 elif not _callable(effect):
2143 try:
2144 result = next(effect)
2145 except StopIteration:
2146 # It is impossible to propogate a StopIteration
2147 # through coroutines because of PEP 479
2148 raise StopAsyncIteration
2149 if _is_exception(result):
2150 raise result
2151 elif asyncio.iscoroutinefunction(effect):
2152 result = await effect(*args, **kwargs)
2153 else:
2154 result = effect(*args, **kwargs)
2155
2156 if result is not DEFAULT:
2157 return result
2158
2159 if self._mock_return_value is not DEFAULT:
2160 return self.return_value
2161
2162 if self._mock_wraps is not None:
2163 if asyncio.iscoroutinefunction(self._mock_wraps):
2164 return await self._mock_wraps(*args, **kwargs)
2165 return self._mock_wraps(*args, **kwargs)
2166
2167 return self.return_value
2168
2169 def assert_awaited(self):
2170 """
2171 Assert that the mock was awaited at least once.
2172 """
2173 if self.await_count == 0:
2174 msg = f"Expected {self._mock_name or 'mock'} to have been awaited."
2175 raise AssertionError(msg)
2176
2177 def assert_awaited_once(self):
2178 """
2179 Assert that the mock was awaited exactly once.
2180 """
2181 if not self.await_count == 1:
2182 msg = (f"Expected {self._mock_name or 'mock'} to have been awaited once."
2183 f" Awaited {self.await_count} times.")
2184 raise AssertionError(msg)
2185
2186 def assert_awaited_with(self, /, *args, **kwargs):
2187 """
2188 Assert that the last await was with the specified arguments.
2189 """
2190 if self.await_args is None:
2191 expected = self._format_mock_call_signature(args, kwargs)
2192 raise AssertionError(f'Expected await: {expected}\nNot awaited')
2193
2194 def _error_message():
2195 msg = self._format_mock_failure_message(args, kwargs, action='await')
2196 return msg
2197
2198 expected = self._call_matcher((args, kwargs))
2199 actual = self._call_matcher(self.await_args)
2200 if expected != actual:
2201 cause = expected if isinstance(expected, Exception) else None
2202 raise AssertionError(_error_message()) from cause
2203
2204 def assert_awaited_once_with(self, /, *args, **kwargs):
2205 """
2206 Assert that the mock was awaited exactly once and with the specified
2207 arguments.
2208 """
2209 if not self.await_count == 1:
2210 msg = (f"Expected {self._mock_name or 'mock'} to have been awaited once."
2211 f" Awaited {self.await_count} times.")
2212 raise AssertionError(msg)
2213 return self.assert_awaited_with(*args, **kwargs)
2214
2215 def assert_any_await(self, /, *args, **kwargs):
2216 """
2217 Assert the mock has ever been awaited with the specified arguments.
2218 """
2219 expected = self._call_matcher((args, kwargs))
2220 actual = [self._call_matcher(c) for c in self.await_args_list]
2221 if expected not in actual:
2222 cause = expected if isinstance(expected, Exception) else None
2223 expected_string = self._format_mock_call_signature(args, kwargs)
2224 raise AssertionError(
2225 '%s await not found' % expected_string
2226 ) from cause
2227
2228 def assert_has_awaits(self, calls, any_order=False):
2229 """
2230 Assert the mock has been awaited with the specified calls.
2231 The :attr:`await_args_list` list is checked for the awaits.
2232
2233 If `any_order` is False (the default) then the awaits must be
2234 sequential. There can be extra calls before or after the
2235 specified awaits.
2236
2237 If `any_order` is True then the awaits can be in any order, but
2238 they must all appear in :attr:`await_args_list`.
2239 """
2240 expected = [self._call_matcher(c) for c in calls]
2241 cause = next((e for e in expected if isinstance(e, Exception)), None)
2242 all_awaits = _CallList(self._call_matcher(c) for c in self.await_args_list)
2243 if not any_order:
2244 if expected not in all_awaits:
2245 if cause is None:
2246 problem = 'Awaits not found.'
2247 else:
2248 problem = ('Error processing expected awaits.\n'
2249 'Errors: {}').format(
2250 [e if isinstance(e, Exception) else None
2251 for e in expected])
2252 raise AssertionError(
2253 f'{problem}\n'
2254 f'Expected: {_CallList(calls)}\n'
2255 f'Actual: {self.await_args_list}'
2256 ) from cause
2257 return
2258
2259 all_awaits = list(all_awaits)
2260
2261 not_found = []
2262 for kall in expected:
2263 try:
2264 all_awaits.remove(kall)
2265 except ValueError:
2266 not_found.append(kall)
2267 if not_found:
2268 raise AssertionError(
2269 '%r not all found in await list' % (tuple(not_found),)
2270 ) from cause
2271
2272 def assert_not_awaited(self):
2273 """
2274 Assert that the mock was never awaited.
2275 """
2276 if self.await_count != 0:
2277 msg = (f"Expected {self._mock_name or 'mock'} to not have been awaited."
2278 f" Awaited {self.await_count} times.")
2279 raise AssertionError(msg)
2280
2281 def reset_mock(self, /, *args, **kwargs):
2282 """
2283 See :func:`.Mock.reset_mock()`
2284 """
2285 super().reset_mock(*args, **kwargs)
2286 self.await_count = 0
2287 self.await_args = None
2288 self.await_args_list = _CallList()
2289
2290
2291 class AsyncMock(AsyncMockMixin, AsyncMagicMixin, Mock):
2292 """
2293 Enhance :class:`Mock` with features allowing to mock
2294 an async function.
2295
2296 The :class:`AsyncMock` object will behave so the object is
2297 recognized as an async function, and the result of a call is an awaitable:
2298
2299 >>> mock = AsyncMock()
2300 >>> asyncio.iscoroutinefunction(mock)
2301 True
2302 >>> inspect.isawaitable(mock())
2303 True
2304
2305
2306 The result of ``mock()`` is an async function which will have the outcome
2307 of ``side_effect`` or ``return_value``:
2308
2309 - if ``side_effect`` is a function, the async function will return the
2310 result of that function,
2311 - if ``side_effect`` is an exception, the async function will raise the
2312 exception,
2313 - if ``side_effect`` is an iterable, the async function will return the
2314 next value of the iterable, however, if the sequence of result is
2315 exhausted, ``StopIteration`` is raised immediately,
2316 - if ``side_effect`` is not defined, the async function will return the
2317 value defined by ``return_value``, hence, by default, the async function
2318 returns a new :class:`AsyncMock` object.
2319
2320 If the outcome of ``side_effect`` or ``return_value`` is an async function,
2321 the mock async function obtained when the mock object is called will be this
2322 async function itself (and not an async function returning an async
2323 function).
2324
2325 The test author can also specify a wrapped object with ``wraps``. In this
2326 case, the :class:`Mock` object behavior is the same as with an
2327 :class:`.Mock` object: the wrapped object may have methods
2328 defined as async function functions.
2329
2330 Based on Martin Richard's asynctest project.
2331 """
2332
2333
2334 class _ANY(object):
2335 "A helper object that compares equal to everything."
2336
2337 def __eq__(self, other):
2338 return True
2339
2340 def __ne__(self, other):
2341 return False
2342
2343 def __repr__(self):
2344 return '<ANY>'
2345
2346 ANY = _ANY()
2347
2348
2349
2350 def _format_call_signature(name, args, kwargs):
2351 message = '%s(%%s)' % name
2352 formatted_args = ''
2353 args_string = ', '.join([repr(arg) for arg in args])
2354 kwargs_string = ', '.join([
2355 '%s=%r' % (key, value) for key, value in kwargs.items()
2356 ])
2357 if args_string:
2358 formatted_args = args_string
2359 if kwargs_string:
2360 if formatted_args:
2361 formatted_args += ', '
2362 formatted_args += kwargs_string
2363
2364 return message % formatted_args
2365
2366
2367
2368 class _Call(tuple):
2369 """
2370 A tuple for holding the results of a call to a mock, either in the form
2371 `(args, kwargs)` or `(name, args, kwargs)`.
2372
2373 If args or kwargs are empty then a call tuple will compare equal to
2374 a tuple without those values. This makes comparisons less verbose::
2375
2376 _Call(('name', (), {})) == ('name',)
2377 _Call(('name', (1,), {})) == ('name', (1,))
2378 _Call(((), {'a': 'b'})) == ({'a': 'b'},)
2379
2380 The `_Call` object provides a useful shortcut for comparing with call::
2381
2382 _Call(((1, 2), {'a': 3})) == call(1, 2, a=3)
2383 _Call(('foo', (1, 2), {'a': 3})) == call.foo(1, 2, a=3)
2384
2385 If the _Call has no name then it will match any name.
2386 """
2387 def __new__(cls, value=(), name='', parent=None, two=False,
2388 from_kall=True):
2389 args = ()
2390 kwargs = {}
2391 _len = len(value)
2392 if _len == 3:
2393 name, args, kwargs = value
2394 elif _len == 2:
2395 first, second = value
2396 if isinstance(first, str):
2397 name = first
2398 if isinstance(second, tuple):
2399 args = second
2400 else:
2401 kwargs = second
2402 else:
2403 args, kwargs = first, second
2404 elif _len == 1:
2405 value, = value
2406 if isinstance(value, str):
2407 name = value
2408 elif isinstance(value, tuple):
2409 args = value
2410 else:
2411 kwargs = value
2412
2413 if two:
2414 return tuple.__new__(cls, (args, kwargs))
2415
2416 return tuple.__new__(cls, (name, args, kwargs))
2417
2418
2419 def __init__(self, value=(), name=None, parent=None, two=False,
2420 from_kall=True):
2421 self._mock_name = name
2422 self._mock_parent = parent
2423 self._mock_from_kall = from_kall
2424
2425
2426 def __eq__(self, other):
2427 if other is ANY:
2428 return True
2429 try:
2430 len_other = len(other)
2431 except TypeError:
2432 return False
2433
2434 self_name = ''
2435 if len(self) == 2:
2436 self_args, self_kwargs = self
2437 else:
2438 self_name, self_args, self_kwargs = self
2439
2440 if (getattr(self, '_mock_parent', None) and getattr(other, '_mock_parent', None)
2441 and self._mock_parent != other._mock_parent):
2442 return False
2443
2444 other_name = ''
2445 if len_other == 0:
2446 other_args, other_kwargs = (), {}
2447 elif len_other == 3:
2448 other_name, other_args, other_kwargs = other
2449 elif len_other == 1:
2450 value, = other
2451 if isinstance(value, tuple):
2452 other_args = value
2453 other_kwargs = {}
2454 elif isinstance(value, str):
2455 other_name = value
2456 other_args, other_kwargs = (), {}
2457 else:
2458 other_args = ()
2459 other_kwargs = value
2460 elif len_other == 2:
2461 # could be (name, args) or (name, kwargs) or (args, kwargs)
2462 first, second = other
2463 if isinstance(first, str):
2464 other_name = first
2465 if isinstance(second, tuple):
2466 other_args, other_kwargs = second, {}
2467 else:
2468 other_args, other_kwargs = (), second
2469 else:
2470 other_args, other_kwargs = first, second
2471 else:
2472 return False
2473
2474 if self_name and other_name != self_name:
2475 return False
2476
2477 # this order is important for ANY to work!
2478 return (other_args, other_kwargs) == (self_args, self_kwargs)
2479
2480
2481 __ne__ = object.__ne__
2482
2483
2484 def __call__(self, /, *args, **kwargs):
2485 if self._mock_name is None:
2486 return _Call(('', args, kwargs), name='()')
2487
2488 name = self._mock_name + '()'
2489 return _Call((self._mock_name, args, kwargs), name=name, parent=self)
2490
2491
2492 def __getattr__(self, attr):
2493 if self._mock_name is None:
2494 return _Call(name=attr, from_kall=False)
2495 name = '%s.%s' % (self._mock_name, attr)
2496 return _Call(name=name, parent=self, from_kall=False)
2497
2498
2499 def __getattribute__(self, attr):
2500 if attr in tuple.__dict__:
2501 raise AttributeError
2502 return tuple.__getattribute__(self, attr)
2503
2504
2505 def count(self, /, *args, **kwargs):
2506 return self.__getattr__('count')(*args, **kwargs)
2507
2508 def index(self, /, *args, **kwargs):
2509 return self.__getattr__('index')(*args, **kwargs)
2510
2511 def _get_call_arguments(self):
2512 if len(self) == 2:
2513 args, kwargs = self
2514 else:
2515 name, args, kwargs = self
2516
2517 return args, kwargs
2518
2519 @property
2520 def args(self):
2521 return self._get_call_arguments()[0]
2522
2523 @property
2524 def kwargs(self):
2525 return self._get_call_arguments()[1]
2526
2527 def __repr__(self):
2528 if not self._mock_from_kall:
2529 name = self._mock_name or 'call'
2530 if name.startswith('()'):
2531 name = 'call%s' % name
2532 return name
2533
2534 if len(self) == 2:
2535 name = 'call'
2536 args, kwargs = self
2537 else:
2538 name, args, kwargs = self
2539 if not name:
2540 name = 'call'
2541 elif not name.startswith('()'):
2542 name = 'call.%s' % name
2543 else:
2544 name = 'call%s' % name
2545 return _format_call_signature(name, args, kwargs)
2546
2547
2548 def call_list(self):
2549 """For a call object that represents multiple calls, `call_list`
2550 returns a list of all the intermediate calls as well as the
2551 final call."""
2552 vals = []
2553 thing = self
2554 while thing is not None:
2555 if thing._mock_from_kall:
2556 vals.append(thing)
2557 thing = thing._mock_parent
2558 return _CallList(reversed(vals))
2559
2560
2561 call = _Call(from_kall=False)
2562
2563
2564 def create_autospec(spec, spec_set=False, instance=False, _parent=None,
2565 _name=None, **kwargs):
2566 """Create a mock object using another object as a spec. Attributes on the
2567 mock will use the corresponding attribute on the `spec` object as their
2568 spec.
2569
2570 Functions or methods being mocked will have their arguments checked
2571 to check that they are called with the correct signature.
2572
2573 If `spec_set` is True then attempting to set attributes that don't exist
2574 on the spec object will raise an `AttributeError`.
2575
2576 If a class is used as a spec then the return value of the mock (the
2577 instance of the class) will have the same spec. You can use a class as the
2578 spec for an instance object by passing `instance=True`. The returned mock
2579 will only be callable if instances of the mock are callable.
2580
2581 `create_autospec` also takes arbitrary keyword arguments that are passed to
2582 the constructor of the created mock."""
2583 if _is_list(spec):
2584 # can't pass a list instance to the mock constructor as it will be
2585 # interpreted as a list of strings
2586 spec = type(spec)
2587
2588 is_type = isinstance(spec, type)
2589 is_async_func = _is_async_func(spec)
2590 _kwargs = {'spec': spec}
2591 if spec_set:
2592 _kwargs = {'spec_set': spec}
2593 elif spec is None:
2594 # None we mock with a normal mock without a spec
2595 _kwargs = {}
2596 if _kwargs and instance:
2597 _kwargs['_spec_as_instance'] = True
2598
2599 _kwargs.update(kwargs)
2600
2601 Klass = MagicMock
2602 if inspect.isdatadescriptor(spec):
2603 # descriptors don't have a spec
2604 # because we don't know what type they return
2605 _kwargs = {}
2606 elif is_async_func:
2607 if instance:
2608 raise RuntimeError("Instance can not be True when create_autospec "
2609 "is mocking an async function")
2610 Klass = AsyncMock
2611 elif not _callable(spec):
2612 Klass = NonCallableMagicMock
2613 elif is_type and instance and not _instance_callable(spec):
2614 Klass = NonCallableMagicMock
2615
2616 _name = _kwargs.pop('name', _name)
2617
2618 _new_name = _name
2619 if _parent is None:
2620 # for a top level object no _new_name should be set
2621 _new_name = ''
2622
2623 mock = Klass(parent=_parent, _new_parent=_parent, _new_name=_new_name,
2624 name=_name, **_kwargs)
2625
2626 if isinstance(spec, FunctionTypes):
2627 # should only happen at the top level because we don't
2628 # recurse for functions
2629 mock = _set_signature(mock, spec)
2630 if is_async_func:
2631 _setup_async_mock(mock)
2632 else:
2633 _check_signature(spec, mock, is_type, instance)
2634
2635 if _parent is not None and not instance:
2636 _parent._mock_children[_name] = mock
2637
2638 if is_type and not instance and 'return_value' not in kwargs:
2639 mock.return_value = create_autospec(spec, spec_set, instance=True,
2640 _name='()', _parent=mock)
2641
2642 for entry in dir(spec):
2643 if _is_magic(entry):
2644 # MagicMock already does the useful magic methods for us
2645 continue
2646
2647 # XXXX do we need a better way of getting attributes without
2648 # triggering code execution (?) Probably not - we need the actual
2649 # object to mock it so we would rather trigger a property than mock
2650 # the property descriptor. Likewise we want to mock out dynamically
2651 # provided attributes.
2652 # XXXX what about attributes that raise exceptions other than
2653 # AttributeError on being fetched?
2654 # we could be resilient against it, or catch and propagate the
2655 # exception when the attribute is fetched from the mock
2656 try:
2657 original = getattr(spec, entry)
2658 except AttributeError:
2659 continue
2660
2661 kwargs = {'spec': original}
2662 if spec_set:
2663 kwargs = {'spec_set': original}
2664
2665 if not isinstance(original, FunctionTypes):
2666 new = _SpecState(original, spec_set, mock, entry, instance)
2667 mock._mock_children[entry] = new
2668 else:
2669 parent = mock
2670 if isinstance(spec, FunctionTypes):
2671 parent = mock.mock
2672
2673 skipfirst = _must_skip(spec, entry, is_type)
2674 kwargs['_eat_self'] = skipfirst
2675 if asyncio.iscoroutinefunction(original):
2676 child_klass = AsyncMock
2677 else:
2678 child_klass = MagicMock
2679 new = child_klass(parent=parent, name=entry, _new_name=entry,
2680 _new_parent=parent,
2681 **kwargs)
2682 mock._mock_children[entry] = new
2683 _check_signature(original, new, skipfirst=skipfirst)
2684
2685 # so functions created with _set_signature become instance attributes,
2686 # *plus* their underlying mock exists in _mock_children of the parent
2687 # mock. Adding to _mock_children may be unnecessary where we are also
2688 # setting as an instance attribute?
2689 if isinstance(new, FunctionTypes):
2690 setattr(mock, entry, new)
2691
2692 return mock
2693
2694
2695 def _must_skip(spec, entry, is_type):
2696 """
2697 Return whether we should skip the first argument on spec's `entry`
2698 attribute.
2699 """
2700 if not isinstance(spec, type):
2701 if entry in getattr(spec, '__dict__', {}):
2702 # instance attribute - shouldn't skip
2703 return False
2704 spec = spec.__class__
2705
2706 for klass in spec.__mro__:
2707 result = klass.__dict__.get(entry, DEFAULT)
2708 if result is DEFAULT:
2709 continue
2710 if isinstance(result, (staticmethod, classmethod)):
2711 return False
2712 elif isinstance(getattr(result, '__get__', None), MethodWrapperTypes):
2713 # Normal method => skip if looked up on type
2714 # (if looked up on instance, self is already skipped)
2715 return is_type
2716 else:
2717 return False
2718
2719 # function is a dynamically provided attribute
2720 return is_type
2721
2722
2723 class _SpecState(object):
2724
2725 def __init__(self, spec, spec_set=False, parent=None,
2726 name=None, ids=None, instance=False):
2727 self.spec = spec
2728 self.ids = ids
2729 self.spec_set = spec_set
2730 self.parent = parent
2731 self.instance = instance
2732 self.name = name
2733
2734
2735 FunctionTypes = (
2736 # python function
2737 type(create_autospec),
2738 # instance method
2739 type(ANY.__eq__),
2740 )
2741
2742 MethodWrapperTypes = (
2743 type(ANY.__eq__.__get__),
2744 )
2745
2746
2747 file_spec = None
2748
2749
2750 def _to_stream(read_data):
2751 if isinstance(read_data, bytes):
2752 return io.BytesIO(read_data)
2753 else:
2754 return io.StringIO(read_data)
2755
2756
2757 def mock_open(mock=None, read_data=''):
2758 """
2759 A helper function to create a mock to replace the use of `open`. It works
2760 for `open` called directly or used as a context manager.
2761
2762 The `mock` argument is the mock object to configure. If `None` (the
2763 default) then a `MagicMock` will be created for you, with the API limited
2764 to methods or attributes available on standard file handles.
2765
2766 `read_data` is a string for the `read`, `readline` and `readlines` of the
2767 file handle to return. This is an empty string by default.
2768 """
2769 _read_data = _to_stream(read_data)
2770 _state = [_read_data, None]
2771
2772 def _readlines_side_effect(*args, **kwargs):
2773 if handle.readlines.return_value is not None:
2774 return handle.readlines.return_value
2775 return _state[0].readlines(*args, **kwargs)
2776
2777 def _read_side_effect(*args, **kwargs):
2778 if handle.read.return_value is not None:
2779 return handle.read.return_value
2780 return _state[0].read(*args, **kwargs)
2781
2782 def _readline_side_effect(*args, **kwargs):
2783 yield from _iter_side_effect()
2784 while True:
2785 yield _state[0].readline(*args, **kwargs)
2786
2787 def _iter_side_effect():
2788 if handle.readline.return_value is not None:
2789 while True:
2790 yield handle.readline.return_value
2791 for line in _state[0]:
2792 yield line
2793
2794 def _next_side_effect():
2795 if handle.readline.return_value is not None:
2796 return handle.readline.return_value
2797 return next(_state[0])
2798
2799 global file_spec
2800 if file_spec is None:
2801 import _io
2802 file_spec = list(set(dir(_io.TextIOWrapper)).union(set(dir(_io.BytesIO))))
2803
2804 if mock is None:
2805 mock = MagicMock(name='open', spec=open)
2806
2807 handle = MagicMock(spec=file_spec)
2808 handle.__enter__.return_value = handle
2809
2810 handle.write.return_value = None
2811 handle.read.return_value = None
2812 handle.readline.return_value = None
2813 handle.readlines.return_value = None
2814
2815 handle.read.side_effect = _read_side_effect
2816 _state[1] = _readline_side_effect()
2817 handle.readline.side_effect = _state[1]
2818 handle.readlines.side_effect = _readlines_side_effect
2819 handle.__iter__.side_effect = _iter_side_effect
2820 handle.__next__.side_effect = _next_side_effect
2821
2822 def reset_data(*args, **kwargs):
2823 _state[0] = _to_stream(read_data)
2824 if handle.readline.side_effect == _state[1]:
2825 # Only reset the side effect if the user hasn't overridden it.
2826 _state[1] = _readline_side_effect()
2827 handle.readline.side_effect = _state[1]
2828 return DEFAULT
2829
2830 mock.side_effect = reset_data
2831 mock.return_value = handle
2832 return mock
2833
2834
2835 class PropertyMock(Mock):
2836 """
2837 A mock intended to be used as a property, or other descriptor, on a class.
2838 `PropertyMock` provides `__get__` and `__set__` methods so you can specify
2839 a return value when it is fetched.
2840
2841 Fetching a `PropertyMock` instance from an object calls the mock, with
2842 no args. Setting it calls the mock with the value being set.
2843 """
2844 def _get_child_mock(self, /, **kwargs):
2845 return MagicMock(**kwargs)
2846
2847 def __get__(self, obj, obj_type=None):
2848 return self()
2849 def __set__(self, obj, val):
2850 self(val)
2851
2852
2853 def seal(mock):
2854 """Disable the automatic generation of child mocks.
2855
2856 Given an input Mock, seals it to ensure no further mocks will be generated
2857 when accessing an attribute that was not already defined.
2858
2859 The operation recursively seals the mock passed in, meaning that
2860 the mock itself, any mocks generated by accessing one of its attributes,
2861 and all assigned mocks without a name or spec will be sealed.
2862 """
2863 mock._mock_sealed = True
2864 for attr in dir(mock):
2865 try:
2866 m = getattr(mock, attr)
2867 except AttributeError:
2868 continue
2869 if not isinstance(m, NonCallableMock):
2870 continue
2871 if m._mock_new_parent is mock:
2872 seal(m)
2873
2874
2875 class _AsyncIterator:
2876 """
2877 Wraps an iterator in an asynchronous iterator.
2878 """
2879 def __init__(self, iterator):
2880 self.iterator = iterator
2881 code_mock = NonCallableMock(spec_set=CodeType)
2882 code_mock.co_flags = inspect.CO_ITERABLE_COROUTINE
2883 self.__dict__['__code__'] = code_mock
2884
2885 def __aiter__(self):
2886 return self
2887
2888 async def __anext__(self):
2889 try:
2890 return next(self.iterator)
2891 except StopIteration:
2892 pass
2893 raise StopAsyncIteration