diff --git a/requirements.txt b/requirements.txt index 8fd178ce..0b9750d3 100644 --- a/requirements.txt +++ b/requirements.txt @@ -43,6 +43,7 @@ automaton>=0.5.0 # Apache-2.0 # For common utilities oslo.utils>=3.2.0 # Apache-2.0 oslo.serialization>=1.10.0 # Apache-2.0 +retrying>=1.2.3,!=1.3.0 # Apache-2.0 # For lru caches and such cachetools>=1.0.0 # MIT License diff --git a/taskflow/persistence/backends/impl_sqlalchemy.py b/taskflow/persistence/backends/impl_sqlalchemy.py index e223e02e..8016b38a 100644 --- a/taskflow/persistence/backends/impl_sqlalchemy.py +++ b/taskflow/persistence/backends/impl_sqlalchemy.py @@ -23,6 +23,7 @@ import functools import time from oslo_utils import strutils +import retrying import six import sqlalchemy as sa from sqlalchemy import exc as sa_exc @@ -35,7 +36,6 @@ from taskflow.persistence.backends.sqlalchemy import migration from taskflow.persistence.backends.sqlalchemy import tables from taskflow.persistence import base from taskflow.persistence import models -from taskflow.types import failure from taskflow.utils import eventlet_utils from taskflow.utils import misc @@ -246,6 +246,10 @@ class SQLAlchemyBackend(base.Backend): self._engine = self._create_engine(self._conf) self._owns_engine = True self._validated = False + try: + self._max_retries = misc.as_int(self._conf.get('max_retries')) + except TypeError: + self._max_retries = 0 @staticmethod def _create_engine(conf): @@ -326,11 +330,7 @@ class SQLAlchemyBackend(base.Backend): def get_connection(self): conn = Connection(self) if not self._validated: - try: - max_retries = misc.as_int(self._conf.get('max_retries', None)) - except TypeError: - max_retries = 0 - conn.validate(max_retries=max_retries) + conn.validate(max_retries=self._max_retries) self._validated = True return conn @@ -356,47 +356,41 @@ class Connection(base.Connection): return self._backend def validate(self, max_retries=0): + """Performs basic **connection** validation of a sqlalchemy engine.""" - def verify_connect(failures): - try: - # See if we can make a connection happen. - # - # NOTE(harlowja): note that even though we are connecting - # once it does not mean that we will be able to connect in - # the future, so this is more of a sanity test and is not - # complete connection insurance. - with contextlib.closing(self._engine.connect()): - pass - except sa_exc.OperationalError as ex: - if _is_db_connection_error(six.text_type(ex.args[0])): - failures.append(failure.Failure()) - return False - return True + def _retry_on_exception(exc): + LOG.warn("Engine connection (validate) failed due to '%s'", exc) + if isinstance(exc, sa_exc.OperationalError) and \ + _is_db_connection_error(six.text_type(exc.args[0])): + # We may be able to fix this by retrying... + return True + if isinstance(exc, (sa_exc.TimeoutError, + sa_exc.ResourceClosedError, + sa_exc.DisconnectionError)): + # We may be able to fix this by retrying... + return True + # Other failures we likely can't fix by retrying... + return False - failures = [] - if verify_connect(failures): - return + @retrying.retry(stop_max_attempt_number=max(0, int(max_retries)), + # Ensure that the 2 ** retry number + # is converted into milliseconds (thus why this + # multiplies by 1000.0) because thats what retrying + # lib. uses internally for whatever reason. + wait_exponential_multiplier=1000.0, + wrap_exception=False, + retry_on_exception=_retry_on_exception) + def _try_connect(engine): + # See if we can make a connection happen. + # + # NOTE(harlowja): note that even though we are connecting + # once it does not mean that we will be able to connect in + # the future, so this is more of a sanity test and is not + # complete connection insurance. + with contextlib.closing(engine.connect()): + pass - # Sorry it didn't work out... - if max_retries <= 0: - failures[-1].reraise() - - # Go through the exponential backoff loop and see if we can connect - # after a given number of backoffs (with a backoff sleeping period - # between each attempt)... - attempts_left = max_retries - for sleepy_secs in misc.ExponentialBackoff(max_retries): - LOG.warn("SQL connection failed due to '%s', %s attempts left.", - failures[-1].exc, attempts_left) - LOG.info("Attempting to test the connection again in %s seconds.", - sleepy_secs) - time.sleep(sleepy_secs) - if verify_connect(failures): - return - attempts_left -= 1 - - # Sorry it didn't work out... - failures[-1].reraise() + _try_connect(self._engine) def upgrade(self): try: diff --git a/taskflow/utils/misc.py b/taskflow/utils/misc.py index eb32abd2..1a516d83 100644 --- a/taskflow/utils/misc.py +++ b/taskflow/utils/misc.py @@ -35,7 +35,6 @@ from oslo_utils import importutils from oslo_utils import netutils from oslo_utils import reflection import six -from six.moves import range as compat_range from taskflow.types import failure from taskflow.types import notifier @@ -465,29 +464,6 @@ def sequence_minus(seq1, seq2): return result -class ExponentialBackoff(object): - """An iterable object that will yield back an exponential delay sequence. - - This objects provides for a configurable exponent, count of numbers - to generate, and a maximum number that will be returned. This object may - also be iterated over multiple times (yielding the same sequence each - time). - """ - def __init__(self, count, exponent=2, max_backoff=3600): - self.count = max(0, int(count)) - self.exponent = exponent - self.max_backoff = max(0, int(max_backoff)) - - def __iter__(self): - if self.count <= 0: - raise StopIteration() - for i in compat_range(0, self.count): - yield min(self.exponent ** i, self.max_backoff) - - def __str__(self): - return "ExponentialBackoff: %s" % ([str(v) for v in self]) - - def as_int(obj, quiet=False): """Converts an arbitrary value into a integer.""" # Try "2" -> 2