Use retry for AmphoraComputeConnectivityWait

Use taskflow retry for connectivity wait. [1]

This reqired for redis jobboard implementation as each retry expand
claim for job on worker. This means that worker is proccesing job and
it should not be released for other workers to work on it.

Adopted for v2 flows.

[1] - https://docs.openstack.org/taskflow/latest/user/atoms.html#retry

Story: 2005072
Task: 33477

Change-Id: I2cf241ea965ad56ed70ebde83632ab855f5d859e
This commit is contained in:
Ann Taraday 2019-06-03 19:41:47 +04:00
parent 728550103f
commit 314b43af9a
10 changed files with 86 additions and 37 deletions

View File

@ -121,3 +121,9 @@ class NodeProvisioningError(ProvisioningErrors):
class AmpDriverNotImplementedError(AmphoraDriverError):
message = _('Amphora does not implement this feature.')
class AmpConnectionRetry(AmphoraDriverError):
message = _('Could not connect to amphora, exception caught: '
'%(exception)s')

View File

@ -82,11 +82,13 @@ class AmphoraLoadBalancerDriver(object):
"""
@abc.abstractmethod
def get_info(self, amphora):
def get_info(self, amphora, raise_retry_exception=False):
"""Returns information about the amphora.
:param amphora: amphora object, need to use its id property
:type amphora: octavia.db.models.Amphora
:param raise_retry_exception: Flag if outside task should be retried
:type boolean: False by default
:returns: return a value list (amphora.id, status flag--'info')
At this moment, we just build the basic structure for testing, will

View File

@ -88,7 +88,8 @@ class HaproxyAmphoraLoadBalancerDriver(
return haproxy_version_string.split('.')[:2]
def _populate_amphora_api_version(self, amphora):
def _populate_amphora_api_version(self, amphora,
raise_retry_exception=False):
"""Populate the amphora object with the api_version
This will query the amphora for version discovery and populate
@ -99,7 +100,8 @@ class HaproxyAmphoraLoadBalancerDriver(
if not getattr(amphora, 'api_version', None):
try:
amphora.api_version = self.clients['base'].get_api_version(
amphora)['api_version']
amphora,
raise_retry_exception=raise_retry_exception)['api_version']
except exc.NotFound:
# Amphora is too old for version discovery, default to 0.5
amphora.api_version = '0.5'
@ -334,9 +336,11 @@ class HaproxyAmphoraLoadBalancerDriver(
self.clients[amphora.api_version].delete_listener(
amphora, listener.load_balancer.id)
def get_info(self, amphora):
self._populate_amphora_api_version(amphora)
return self.clients[amphora.api_version].get_info(amphora)
def get_info(self, amphora, raise_retry_exception=False):
self._populate_amphora_api_version(
amphora, raise_retry_exception=raise_retry_exception)
return self.clients[amphora.api_version].get_info(
amphora, raise_retry_exception=raise_retry_exception)
def get_diagnostics(self, amphora):
pass
@ -594,7 +598,7 @@ class AmphoraAPIClientBase(object):
port=CONF.haproxy_amphora.bind_port)
def request(self, method, amp, path='/', timeout_dict=None,
retry_404=True, **kwargs):
retry_404=True, raise_retry_exception=False, **kwargs):
cfg_ha_amp = CONF.haproxy_amphora
if timeout_dict is None:
timeout_dict = {}
@ -659,7 +663,13 @@ class AmphoraAPIClientBase(object):
exception = e
LOG.warning("Could not connect to instance. Retrying.")
time.sleep(conn_retry_interval)
if raise_retry_exception:
# For taskflow persistence cause attribute should
# be serializable to JSON. Pass None, as cause exception
# is described in the expection message.
six.raise_from(
driver_except.AmpConnectionRetry(exception=str(e)),
None)
LOG.error("Connection retries (currently set to %(max_retries)s) "
"exhausted. The amphora is unavailable. Reason: "
"%(exception)s",
@ -667,9 +677,10 @@ class AmphoraAPIClientBase(object):
'exception': exception})
raise driver_except.TimeOutException()
def get_api_version(self, amp):
def get_api_version(self, amp, raise_retry_exception=False):
amp.api_version = None
r = self.get(amp, retry_404=False)
r = self.get(amp, retry_404=False,
raise_retry_exception=raise_retry_exception)
# Handle 404 special as we don't want to log an ERROR on 404
exc.check_exception(r, (404,))
if r.status_code == 404:
@ -736,8 +747,8 @@ class AmphoraAPIClient0_5(AmphoraAPIClientBase):
amp, 'listeners/{listener_id}'.format(listener_id=listener_id))
return exc.check_exception(r, (404,))
def get_info(self, amp):
r = self.get(amp, "info")
def get_info(self, amp, raise_retry_exception=False):
r = self.get(amp, "info", raise_retry_exception=raise_retry_exception)
if exc.check_exception(r):
return r.json()
return None
@ -866,8 +877,8 @@ class AmphoraAPIClient1_0(AmphoraAPIClientBase):
amp, 'listeners/{object_id}'.format(object_id=object_id))
return exc.check_exception(r, (404,))
def get_info(self, amp):
r = self.get(amp, "info")
def get_info(self, amp, raise_retry_exception=False):
r = self.get(amp, "info", raise_retry_exception=raise_retry_exception)
if exc.check_exception(r):
return r.json()
return None

View File

@ -73,7 +73,7 @@ class NoopManager(object):
listener.load_balancer.vip.ip_address)] = (
listener, listener.load_balancer.vip, 'delete')
def get_info(self, amphora):
def get_info(self, amphora, raise_retry_exception=False):
LOG.debug("Amphora %s no-op, info amphora %s",
self.__class__.__name__, amphora.id)
self.amphoraconfig[amphora.id] = (amphora.id, 'get_info')
@ -138,9 +138,10 @@ class NoopAmphoraLoadBalancerDriver(
self.driver.delete(listener)
def get_info(self, amphora):
def get_info(self, amphora, raise_retry_exception=False):
self.driver.get_info(amphora)
self.driver.get_info(amphora,
raise_retry_exception=raise_retry_exception)
def get_diagnostics(self, amphora):

View File

@ -375,6 +375,7 @@ VRRP_GROUP = 'vrrp_group'
# Taskflow flow and task names
CERT_ROTATE_AMPHORA_FLOW = 'octavia-cert-rotate-amphora-flow'
CREATE_AMPHORA_FLOW = 'octavia-create-amphora-flow'
CREATE_AMPHORA_RETRY_SUBFLOW = 'octavia-create-amphora-retry-subflow'
CREATE_AMPHORA_FOR_LB_FLOW = 'octavia-create-amp-for-lb-flow'
CREATE_HEALTH_MONITOR_FLOW = 'octavia-create-health-monitor-flow'
CREATE_LISTENER_FLOW = 'octavia-create-listener_flow'

View File

@ -15,9 +15,9 @@
from oslo_config import cfg
from oslo_log import log as logging
import oslo_messaging as messaging
from stevedore import driver as stevedore_driver
from octavia.common import constants
from octavia.controller.worker.v2 import controller_worker
CONF = cfg.CONF
@ -34,11 +34,7 @@ class Endpoints(object):
version='2.0')
def __init__(self):
self.worker = stevedore_driver.DriverManager(
namespace='octavia.plugins',
name=CONF.octavia_plugins,
invoke_on_load=True
).driver
self.worker = controller_worker.ControllerWorker()
def create_load_balancer(self, context, load_balancer_id,
flavor=None):

View File

@ -70,9 +70,14 @@ class AmphoraFlows(object):
create_amphora_flow.add(database_tasks.UpdateAmphoraInfo(
requires=(constants.AMPHORA_ID, constants.COMPUTE_OBJ),
provides=constants.AMPHORA))
create_amphora_flow.add(
retry_subflow = linear_flow.Flow(
constants.CREATE_AMPHORA_RETRY_SUBFLOW,
retry=amphora_driver_tasks.AmpRetry())
retry_subflow.add(
amphora_driver_tasks.AmphoraComputeConnectivityWait(
requires=constants.AMPHORA))
requires=constants.AMPHORA,
inject={'raise_retry_exception': True}))
create_amphora_flow.add(retry_subflow)
create_amphora_flow.add(database_tasks.ReloadAmphora(
requires=constants.AMPHORA_ID,
provides=constants.AMPHORA))
@ -193,10 +198,15 @@ class AmphoraFlows(object):
name=sf_name + '-' + constants.UPDATE_AMPHORA_INFO,
requires=(constants.AMPHORA_ID, constants.COMPUTE_OBJ),
provides=constants.AMPHORA))
create_amp_for_lb_subflow.add(
retry_task = sf_name + '-' + constants.AMP_COMPUTE_CONNECTIVITY_WAIT
retry_subflow = linear_flow.Flow(
constants.CREATE_AMPHORA_RETRY_SUBFLOW,
retry=amphora_driver_tasks.AmpRetry())
retry_subflow.add(
amphora_driver_tasks.AmphoraComputeConnectivityWait(
name=sf_name + '-' + constants.AMP_COMPUTE_CONNECTIVITY_WAIT,
requires=constants.AMPHORA))
name=retry_task, requires=constants.AMPHORA,
inject={'raise_retry_exception': True}))
create_amp_for_lb_subflow.add(retry_subflow)
create_amp_for_lb_subflow.add(amphora_driver_tasks.AmphoraFinalize(
name=sf_name + '-' + constants.AMPHORA_FINALIZE,
requires=constants.AMPHORA))

View File

@ -18,6 +18,7 @@ from oslo_config import cfg
from oslo_log import log as logging
import six
from stevedore import driver as stevedore_driver
from taskflow import retry
from taskflow import task
from taskflow.types import failure
@ -49,6 +50,26 @@ class BaseAmphoraTask(task.Task):
self.task_utils = task_utilities.TaskUtils()
class AmpRetry(retry.Times):
def on_failure(self, history, *args, **kwargs):
last_errors = history[-1][1]
max_retry_attempt = CONF.haproxy_amphora.connection_max_retries
for task_name, ex_info in last_errors.items():
if len(history) <= max_retry_attempt:
# When taskflow persistance is enabled and flow/task state is
# saved in the backend. If flow(task) is restored(restart of
# worker,etc) we are getting ex_info as None - we need to RETRY
# task to check its real state.
if ex_info is None or ex_info._exc_info is None:
return retry.RETRY
excp = ex_info._exc_info[1]
if isinstance(excp, driver_except.AmpConnectionRetry):
return retry.RETRY
return retry.REVERT_ALL
class AmpListenersUpdate(BaseAmphoraTask):
"""Task to update the listeners on one amphora."""
@ -323,10 +344,11 @@ class AmphoraVRRPStart(BaseAmphoraTask):
class AmphoraComputeConnectivityWait(BaseAmphoraTask):
"""Task to wait for the compute instance to be up."""
def execute(self, amphora):
def execute(self, amphora, raise_retry_exception=False):
"""Execute get_info routine for an amphora until it responds."""
try:
amp_info = self.amphora_driver.get_info(amphora)
amp_info = self.amphora_driver.get_info(
amphora, raise_retry_exception=raise_retry_exception)
LOG.debug('Successfuly connected to amphora %s: %s',
amphora.id, amp_info)
except driver_except.TimeOutException:

View File

@ -18,7 +18,6 @@ from oslo_config import fixture as oslo_fixture
from oslo_utils import uuidutils
from octavia.controller.queue.v2 import endpoints
from octavia.controller.worker.v2 import controller_worker
from octavia.tests.unit import base
@ -30,10 +29,9 @@ class TestEndpoints(base.TestCase):
conf = self.useFixture(oslo_fixture.Config(cfg.CONF))
conf.config(octavia_plugins='hot_plug_plugin')
mock_class = mock.create_autospec(controller_worker.ControllerWorker)
self.worker_patcher = mock.patch('octavia.controller.queue.v2.'
'endpoints.stevedore_driver')
self.worker_patcher.start().ControllerWorker = mock_class
self.worker_patcher = mock.patch('octavia.controller.worker.v2.'
'controller_worker.ControllerWorker')
self.worker_patcher.start()
self.ep = endpoints.Endpoints()
self.context = {}

View File

@ -580,8 +580,10 @@ class TestAmphoraDriverTasks(base.TestCase):
mock_amphora_repo_update):
amp_compute_conn_wait_obj = (
amphora_driver_tasks.AmphoraComputeConnectivityWait())
amp_compute_conn_wait_obj.execute(_amphora_mock)
mock_driver.get_info.assert_called_once_with(_amphora_mock)
amp_compute_conn_wait_obj.execute(_amphora_mock,
raise_retry_exception=True)
mock_driver.get_info.assert_called_once_with(
_amphora_mock, raise_retry_exception=True)
mock_driver.get_info.side_effect = driver_except.TimeOutException()
self.assertRaises(driver_except.TimeOutException,