Merge pull request #670 from zackdever/predictable-future
Consistent error handling in future call/errbacks + better test failures
This commit is contained in:
@@ -5,6 +5,8 @@ log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class Future(object):
|
||||
error_on_callbacks = False # and errbacks
|
||||
|
||||
def __init__(self):
|
||||
self.is_done = False
|
||||
self.value = None
|
||||
@@ -28,11 +30,7 @@ class Future(object):
|
||||
assert not self.is_done, 'Future is already complete'
|
||||
self.value = value
|
||||
self.is_done = True
|
||||
for f in self._callbacks:
|
||||
try:
|
||||
f(value)
|
||||
except Exception:
|
||||
log.exception('Error processing callback')
|
||||
self._call_backs('callback', self._callbacks, self.value)
|
||||
return self
|
||||
|
||||
def failure(self, e):
|
||||
@@ -41,18 +39,14 @@ class Future(object):
|
||||
assert isinstance(self.exception, BaseException), (
|
||||
'future failed without an exception')
|
||||
self.is_done = True
|
||||
for f in self._errbacks:
|
||||
try:
|
||||
f(self.exception)
|
||||
except Exception:
|
||||
log.exception('Error processing errback')
|
||||
self._call_backs('errback', self._errbacks, self.exception)
|
||||
return self
|
||||
|
||||
def add_callback(self, f, *args, **kwargs):
|
||||
if args or kwargs:
|
||||
f = functools.partial(f, *args, **kwargs)
|
||||
if self.is_done and not self.exception:
|
||||
f(self.value)
|
||||
self._call_backs('callback', [f], self.value)
|
||||
else:
|
||||
self._callbacks.append(f)
|
||||
return self
|
||||
@@ -61,7 +55,7 @@ class Future(object):
|
||||
if args or kwargs:
|
||||
f = functools.partial(f, *args, **kwargs)
|
||||
if self.is_done and self.exception:
|
||||
f(self.exception)
|
||||
self._call_backs('callback', [f], self.exception)
|
||||
else:
|
||||
self._errbacks.append(f)
|
||||
return self
|
||||
@@ -75,3 +69,12 @@ class Future(object):
|
||||
self.add_callback(future.success)
|
||||
self.add_errback(future.failure)
|
||||
return self
|
||||
|
||||
def _call_backs(self, back_type, backs, value):
|
||||
for f in backs:
|
||||
try:
|
||||
f(value)
|
||||
except Exception as e:
|
||||
log.exception('Error processing %s', back_type)
|
||||
if self.error_on_callbacks:
|
||||
raise e
|
||||
|
||||
@@ -15,3 +15,6 @@ except ImportError:
|
||||
pass
|
||||
|
||||
logging.getLogger(__name__).addHandler(NullHandler())
|
||||
|
||||
from kafka.future import Future
|
||||
Future.error_on_callbacks = True # always fail during testing
|
||||
|
||||
@@ -548,7 +548,7 @@ def test_send_offset_fetch_request_success(patched_coord, partitions):
|
||||
patched_coord._client.send.return_value = _f
|
||||
future = patched_coord._send_offset_fetch_request(partitions)
|
||||
(node, request), _ = patched_coord._client.send.call_args
|
||||
response = OffsetFetchResponse[0]([('foobar', [(0, 0), (1, 0)])])
|
||||
response = OffsetFetchResponse[0]([('foobar', [(0, 123, b'', 0), (1, 234, b'', 0)])])
|
||||
_f.success(response)
|
||||
patched_coord._handle_offset_fetch_response.assert_called_with(
|
||||
future, response)
|
||||
|
||||
Reference in New Issue
Block a user