Merge "Use the retrying lib. to do basic sqlalchemy engine validation"

This commit is contained in:
Jenkins 2016-01-09 01:27:54 +00:00 committed by Gerrit Code Review
commit 8bd70574db
3 changed files with 39 additions and 68 deletions

View File

@ -43,6 +43,7 @@ automaton>=0.5.0 # Apache-2.0
# For common utilities
oslo.utils>=3.2.0 # Apache-2.0
oslo.serialization>=1.10.0 # Apache-2.0
retrying>=1.2.3,!=1.3.0 # Apache-2.0
# For lru caches and such
cachetools>=1.0.0 # MIT License

View File

@ -23,6 +23,7 @@ import functools
import time
from oslo_utils import strutils
import retrying
import six
import sqlalchemy as sa
from sqlalchemy import exc as sa_exc
@ -35,7 +36,6 @@ from taskflow.persistence.backends.sqlalchemy import migration
from taskflow.persistence.backends.sqlalchemy import tables
from taskflow.persistence import base
from taskflow.persistence import models
from taskflow.types import failure
from taskflow.utils import eventlet_utils
from taskflow.utils import misc
@ -246,6 +246,10 @@ class SQLAlchemyBackend(base.Backend):
self._engine = self._create_engine(self._conf)
self._owns_engine = True
self._validated = False
try:
self._max_retries = misc.as_int(self._conf.get('max_retries'))
except TypeError:
self._max_retries = 0
@staticmethod
def _create_engine(conf):
@ -326,11 +330,7 @@ class SQLAlchemyBackend(base.Backend):
def get_connection(self):
conn = Connection(self)
if not self._validated:
try:
max_retries = misc.as_int(self._conf.get('max_retries', None))
except TypeError:
max_retries = 0
conn.validate(max_retries=max_retries)
conn.validate(max_retries=self._max_retries)
self._validated = True
return conn
@ -356,47 +356,41 @@ class Connection(base.Connection):
return self._backend
def validate(self, max_retries=0):
"""Performs basic **connection** validation of a sqlalchemy engine."""
def verify_connect(failures):
try:
# See if we can make a connection happen.
#
# NOTE(harlowja): note that even though we are connecting
# once it does not mean that we will be able to connect in
# the future, so this is more of a sanity test and is not
# complete connection insurance.
with contextlib.closing(self._engine.connect()):
pass
except sa_exc.OperationalError as ex:
if _is_db_connection_error(six.text_type(ex.args[0])):
failures.append(failure.Failure())
return False
return True
def _retry_on_exception(exc):
LOG.warn("Engine connection (validate) failed due to '%s'", exc)
if isinstance(exc, sa_exc.OperationalError) and \
_is_db_connection_error(six.text_type(exc.args[0])):
# We may be able to fix this by retrying...
return True
if isinstance(exc, (sa_exc.TimeoutError,
sa_exc.ResourceClosedError,
sa_exc.DisconnectionError)):
# We may be able to fix this by retrying...
return True
# Other failures we likely can't fix by retrying...
return False
failures = []
if verify_connect(failures):
return
@retrying.retry(stop_max_attempt_number=max(0, int(max_retries)),
# Ensure that the 2 ** retry number
# is converted into milliseconds (thus why this
# multiplies by 1000.0) because thats what retrying
# lib. uses internally for whatever reason.
wait_exponential_multiplier=1000.0,
wrap_exception=False,
retry_on_exception=_retry_on_exception)
def _try_connect(engine):
# See if we can make a connection happen.
#
# NOTE(harlowja): note that even though we are connecting
# once it does not mean that we will be able to connect in
# the future, so this is more of a sanity test and is not
# complete connection insurance.
with contextlib.closing(engine.connect()):
pass
# Sorry it didn't work out...
if max_retries <= 0:
failures[-1].reraise()
# Go through the exponential backoff loop and see if we can connect
# after a given number of backoffs (with a backoff sleeping period
# between each attempt)...
attempts_left = max_retries
for sleepy_secs in misc.ExponentialBackoff(max_retries):
LOG.warn("SQL connection failed due to '%s', %s attempts left.",
failures[-1].exc, attempts_left)
LOG.info("Attempting to test the connection again in %s seconds.",
sleepy_secs)
time.sleep(sleepy_secs)
if verify_connect(failures):
return
attempts_left -= 1
# Sorry it didn't work out...
failures[-1].reraise()
_try_connect(self._engine)
def upgrade(self):
try:

View File

@ -35,7 +35,6 @@ from oslo_utils import importutils
from oslo_utils import netutils
from oslo_utils import reflection
import six
from six.moves import range as compat_range
from taskflow.types import failure
from taskflow.types import notifier
@ -465,29 +464,6 @@ def sequence_minus(seq1, seq2):
return result
class ExponentialBackoff(object):
"""An iterable object that will yield back an exponential delay sequence.
This objects provides for a configurable exponent, count of numbers
to generate, and a maximum number that will be returned. This object may
also be iterated over multiple times (yielding the same sequence each
time).
"""
def __init__(self, count, exponent=2, max_backoff=3600):
self.count = max(0, int(count))
self.exponent = exponent
self.max_backoff = max(0, int(max_backoff))
def __iter__(self):
if self.count <= 0:
raise StopIteration()
for i in compat_range(0, self.count):
yield min(self.exponent ** i, self.max_backoff)
def __str__(self):
return "ExponentialBackoff: %s" % ([str(v) for v in self])
def as_int(obj, quiet=False):
"""Converts an arbitrary value into a integer."""
# Try "2" -> 2