limit recursion when failing to execute in conncurrent
PYTHON-585
This commit is contained in:
@@ -94,6 +94,8 @@ def execute_concurrent(session, statements_and_parameters, concurrency=100, rais
|
|||||||
|
|
||||||
class _ConcurrentExecutor(object):
|
class _ConcurrentExecutor(object):
|
||||||
|
|
||||||
|
max_error_recursion = 100
|
||||||
|
|
||||||
def __init__(self, session, statements_and_params):
|
def __init__(self, session, statements_and_params):
|
||||||
self.session = session
|
self.session = session
|
||||||
self._enum_statements = enumerate(iter(statements_and_params))
|
self._enum_statements = enumerate(iter(statements_and_params))
|
||||||
@@ -102,6 +104,7 @@ class _ConcurrentExecutor(object):
|
|||||||
self._results_queue = []
|
self._results_queue = []
|
||||||
self._current = 0
|
self._current = 0
|
||||||
self._exec_count = 0
|
self._exec_count = 0
|
||||||
|
self._exec_depth = 0
|
||||||
|
|
||||||
def execute(self, concurrency, fail_fast):
|
def execute(self, concurrency, fail_fast):
|
||||||
self._fail_fast = fail_fast
|
self._fail_fast = fail_fast
|
||||||
@@ -125,6 +128,7 @@ class _ConcurrentExecutor(object):
|
|||||||
pass
|
pass
|
||||||
|
|
||||||
def _execute(self, idx, statement, params):
|
def _execute(self, idx, statement, params):
|
||||||
|
self._exec_depth += 1
|
||||||
try:
|
try:
|
||||||
future = self.session.execute_async(statement, params, timeout=None)
|
future = self.session.execute_async(statement, params, timeout=None)
|
||||||
args = (future, idx)
|
args = (future, idx)
|
||||||
@@ -135,7 +139,15 @@ class _ConcurrentExecutor(object):
|
|||||||
# exc_info with fail_fast to preserve stack trace info when raising on the client thread
|
# exc_info with fail_fast to preserve stack trace info when raising on the client thread
|
||||||
# (matches previous behavior -- not sure why we wouldn't want stack trace in the other case)
|
# (matches previous behavior -- not sure why we wouldn't want stack trace in the other case)
|
||||||
e = sys.exc_info() if self._fail_fast and six.PY2 else exc
|
e = sys.exc_info() if self._fail_fast and six.PY2 else exc
|
||||||
self._put_result(e, idx, False)
|
|
||||||
|
# If we're not failing fast and all executions are raising, there is a chance of recursing
|
||||||
|
# here as subsequent requests are attempted. If we hit this threshold, schedule this result/retry
|
||||||
|
# and let the event loop thread return.
|
||||||
|
if self._exec_depth < self.max_error_recursion:
|
||||||
|
self._put_result(e, idx, False)
|
||||||
|
else:
|
||||||
|
self.session.submit(self._put_result, e, idx, False)
|
||||||
|
self._exec_depth -= 1
|
||||||
|
|
||||||
def _on_success(self, result, future, idx):
|
def _on_success(self, result, future, idx):
|
||||||
future.clear_callbacks()
|
future.clear_callbacks()
|
||||||
|
Reference in New Issue
Block a user