Browse Source

Merge "Use retry for AmphoraComputeConnectivityWait" into stable/train

changes/74/741974/1
Zuul 2 weeks ago
committed by Gerrit Code Review
parent
commit
d69a6f776a
10 changed files with 86 additions and 37 deletions
  1. +6
    -0
      octavia/amphorae/driver_exceptions/exceptions.py
  2. +3
    -1
      octavia/amphorae/drivers/driver_base.py
  3. +24
    -13
      octavia/amphorae/drivers/haproxy/rest_api_driver.py
  4. +4
    -3
      octavia/amphorae/drivers/noop_driver/driver.py
  5. +1
    -0
      octavia/common/constants.py
  6. +2
    -6
      octavia/controller/queue/v2/endpoints.py
  7. +15
    -5
      octavia/controller/worker/v2/flows/amphora_flows.py
  8. +24
    -2
      octavia/controller/worker/v2/tasks/amphora_driver_tasks.py
  9. +3
    -5
      octavia/tests/unit/controller/queue/v2/test_endpoints.py
  10. +4
    -2
      octavia/tests/unit/controller/worker/v2/tasks/test_amphora_driver_tasks.py

+ 6
- 0
octavia/amphorae/driver_exceptions/exceptions.py View File

@@ -121,3 +121,9 @@ class NodeProvisioningError(ProvisioningErrors):
class AmpDriverNotImplementedError(AmphoraDriverError):

message = _('Amphora does not implement this feature.')


class AmpConnectionRetry(AmphoraDriverError):

message = _('Could not connect to amphora, exception caught: '
'%(exception)s')

+ 3
- 1
octavia/amphorae/drivers/driver_base.py View File

@@ -82,11 +82,13 @@ class AmphoraLoadBalancerDriver(object):
"""

@abc.abstractmethod
def get_info(self, amphora):
def get_info(self, amphora, raise_retry_exception=False):
"""Returns information about the amphora.

:param amphora: amphora object, need to use its id property
:type amphora: octavia.db.models.Amphora
:param raise_retry_exception: Flag if outside task should be retried
:type boolean: False by default
:returns: return a value list (amphora.id, status flag--'info')

At this moment, we just build the basic structure for testing, will


+ 24
- 13
octavia/amphorae/drivers/haproxy/rest_api_driver.py View File

@@ -91,7 +91,8 @@ class HaproxyAmphoraLoadBalancerDriver(

return haproxy_version_string.split('.')[:2]

def _populate_amphora_api_version(self, amphora):
def _populate_amphora_api_version(self, amphora,
raise_retry_exception=False):
"""Populate the amphora object with the api_version

This will query the amphora for version discovery and populate
@@ -102,7 +103,8 @@ class HaproxyAmphoraLoadBalancerDriver(
if not getattr(amphora, 'api_version', None):
try:
amphora.api_version = self.clients['base'].get_api_version(
amphora)['api_version']
amphora,
raise_retry_exception=raise_retry_exception)['api_version']
except exc.NotFound:
# Amphora is too old for version discovery, default to 0.5
amphora.api_version = '0.5'
@@ -364,9 +366,11 @@ class HaproxyAmphoraLoadBalancerDriver(
self.clients[amphora.api_version].delete_listener(
amphora, listener.load_balancer.id)

def get_info(self, amphora):
self._populate_amphora_api_version(amphora)
return self.clients[amphora.api_version].get_info(amphora)
def get_info(self, amphora, raise_retry_exception=False):
self._populate_amphora_api_version(
amphora, raise_retry_exception=raise_retry_exception)
return self.clients[amphora.api_version].get_info(
amphora, raise_retry_exception=raise_retry_exception)

def get_diagnostics(self, amphora):
pass
@@ -624,7 +628,7 @@ class AmphoraAPIClientBase(object):
port=CONF.haproxy_amphora.bind_port)

def request(self, method, amp, path='/', timeout_dict=None,
retry_404=True, **kwargs):
retry_404=True, raise_retry_exception=False, **kwargs):
cfg_ha_amp = CONF.haproxy_amphora
if timeout_dict is None:
timeout_dict = {}
@@ -689,7 +693,13 @@ class AmphoraAPIClientBase(object):
exception = e
LOG.warning("Could not connect to instance. Retrying.")
time.sleep(conn_retry_interval)

if raise_retry_exception:
# For taskflow persistence cause attribute should
# be serializable to JSON. Pass None, as cause exception
# is described in the expection message.
six.raise_from(
driver_except.AmpConnectionRetry(exception=str(e)),
None)
LOG.error("Connection retries (currently set to %(max_retries)s) "
"exhausted. The amphora is unavailable. Reason: "
"%(exception)s",
@@ -697,9 +707,10 @@ class AmphoraAPIClientBase(object):
'exception': exception})
raise driver_except.TimeOutException()

def get_api_version(self, amp):
def get_api_version(self, amp, raise_retry_exception=False):
amp.api_version = None
r = self.get(amp, retry_404=False)
r = self.get(amp, retry_404=False,
raise_retry_exception=raise_retry_exception)
# Handle 404 special as we don't want to log an ERROR on 404
exc.check_exception(r, (404,))
if r.status_code == 404:
@@ -766,8 +777,8 @@ class AmphoraAPIClient0_5(AmphoraAPIClientBase):
amp, 'listeners/{listener_id}'.format(listener_id=listener_id))
return exc.check_exception(r, (404,))

def get_info(self, amp):
r = self.get(amp, "info")
def get_info(self, amp, raise_retry_exception=False):
r = self.get(amp, "info", raise_retry_exception=raise_retry_exception)
if exc.check_exception(r):
return r.json()
return None
@@ -896,8 +907,8 @@ class AmphoraAPIClient1_0(AmphoraAPIClientBase):
amp, 'listeners/{object_id}'.format(object_id=object_id))
return exc.check_exception(r, (404,))

def get_info(self, amp):
r = self.get(amp, "info")
def get_info(self, amp, raise_retry_exception=False):
r = self.get(amp, "info", raise_retry_exception=raise_retry_exception)
if exc.check_exception(r):
return r.json()
return None


+ 4
- 3
octavia/amphorae/drivers/noop_driver/driver.py View File

@@ -73,7 +73,7 @@ class NoopManager(object):
listener.load_balancer.vip.ip_address)] = (
listener, listener.load_balancer.vip, 'delete')

def get_info(self, amphora):
def get_info(self, amphora, raise_retry_exception=False):
LOG.debug("Amphora %s no-op, info amphora %s",
self.__class__.__name__, amphora.id)
self.amphoraconfig[amphora.id] = (amphora.id, 'get_info')
@@ -138,9 +138,10 @@ class NoopAmphoraLoadBalancerDriver(

self.driver.delete(listener)

def get_info(self, amphora):
def get_info(self, amphora, raise_retry_exception=False):

self.driver.get_info(amphora)
self.driver.get_info(amphora,
raise_retry_exception=raise_retry_exception)

def get_diagnostics(self, amphora):



+ 1
- 0
octavia/common/constants.py View File

@@ -383,6 +383,7 @@ VRRP_GROUP = 'vrrp_group'
# Taskflow flow and task names
CERT_ROTATE_AMPHORA_FLOW = 'octavia-cert-rotate-amphora-flow'
CREATE_AMPHORA_FLOW = 'octavia-create-amphora-flow'
CREATE_AMPHORA_RETRY_SUBFLOW = 'octavia-create-amphora-retry-subflow'
CREATE_AMPHORA_FOR_LB_FLOW = 'octavia-create-amp-for-lb-flow'
CREATE_HEALTH_MONITOR_FLOW = 'octavia-create-health-monitor-flow'
CREATE_LISTENER_FLOW = 'octavia-create-listener_flow'


+ 2
- 6
octavia/controller/queue/v2/endpoints.py View File

@@ -15,9 +15,9 @@
from oslo_config import cfg
from oslo_log import log as logging
import oslo_messaging as messaging
from stevedore import driver as stevedore_driver

from octavia.common import constants
from octavia.controller.worker.v2 import controller_worker

CONF = cfg.CONF

@@ -34,11 +34,7 @@ class Endpoints(object):
version='2.0')

def __init__(self):
self.worker = stevedore_driver.DriverManager(
namespace='octavia.plugins',
name=CONF.octavia_plugins,
invoke_on_load=True
).driver
self.worker = controller_worker.ControllerWorker()

def create_load_balancer(self, context, load_balancer_id,
flavor=None):


+ 15
- 5
octavia/controller/worker/v2/flows/amphora_flows.py View File

@@ -70,9 +70,14 @@ class AmphoraFlows(object):
create_amphora_flow.add(database_tasks.UpdateAmphoraInfo(
requires=(constants.AMPHORA_ID, constants.COMPUTE_OBJ),
provides=constants.AMPHORA))
create_amphora_flow.add(
retry_subflow = linear_flow.Flow(
constants.CREATE_AMPHORA_RETRY_SUBFLOW,
retry=amphora_driver_tasks.AmpRetry())
retry_subflow.add(
amphora_driver_tasks.AmphoraComputeConnectivityWait(
requires=constants.AMPHORA))
requires=constants.AMPHORA,
inject={'raise_retry_exception': True}))
create_amphora_flow.add(retry_subflow)
create_amphora_flow.add(database_tasks.ReloadAmphora(
requires=constants.AMPHORA_ID,
provides=constants.AMPHORA))
@@ -194,10 +199,15 @@ class AmphoraFlows(object):
name=sf_name + '-' + constants.UPDATE_AMPHORA_INFO,
requires=(constants.AMPHORA_ID, constants.COMPUTE_OBJ),
provides=constants.AMPHORA))
create_amp_for_lb_subflow.add(
retry_task = sf_name + '-' + constants.AMP_COMPUTE_CONNECTIVITY_WAIT
retry_subflow = linear_flow.Flow(
constants.CREATE_AMPHORA_RETRY_SUBFLOW,
retry=amphora_driver_tasks.AmpRetry())
retry_subflow.add(
amphora_driver_tasks.AmphoraComputeConnectivityWait(
name=sf_name + '-' + constants.AMP_COMPUTE_CONNECTIVITY_WAIT,
requires=constants.AMPHORA))
name=retry_task, requires=constants.AMPHORA,
inject={'raise_retry_exception': True}))
create_amp_for_lb_subflow.add(retry_subflow)
create_amp_for_lb_subflow.add(amphora_driver_tasks.AmphoraFinalize(
name=sf_name + '-' + constants.AMPHORA_FINALIZE,
requires=constants.AMPHORA))


+ 24
- 2
octavia/controller/worker/v2/tasks/amphora_driver_tasks.py View File

@@ -18,6 +18,7 @@ from oslo_config import cfg
from oslo_log import log as logging
import six
from stevedore import driver as stevedore_driver
from taskflow import retry
from taskflow import task
from taskflow.types import failure

@@ -49,6 +50,26 @@ class BaseAmphoraTask(task.Task):
self.task_utils = task_utilities.TaskUtils()


class AmpRetry(retry.Times):

def on_failure(self, history, *args, **kwargs):
last_errors = history[-1][1]
max_retry_attempt = CONF.haproxy_amphora.connection_max_retries
for task_name, ex_info in last_errors.items():
if len(history) <= max_retry_attempt:
# When taskflow persistance is enabled and flow/task state is
# saved in the backend. If flow(task) is restored(restart of
# worker,etc) we are getting ex_info as None - we need to RETRY
# task to check its real state.
if ex_info is None or ex_info._exc_info is None:
return retry.RETRY
excp = ex_info._exc_info[1]
if isinstance(excp, driver_except.AmpConnectionRetry):
return retry.RETRY

return retry.REVERT_ALL


class AmpListenersUpdate(BaseAmphoraTask):
"""Task to update the listeners on one amphora."""

@@ -323,10 +344,11 @@ class AmphoraVRRPStart(BaseAmphoraTask):
class AmphoraComputeConnectivityWait(BaseAmphoraTask):
"""Task to wait for the compute instance to be up."""

def execute(self, amphora):
def execute(self, amphora, raise_retry_exception=False):
"""Execute get_info routine for an amphora until it responds."""
try:
amp_info = self.amphora_driver.get_info(amphora)
amp_info = self.amphora_driver.get_info(
amphora, raise_retry_exception=raise_retry_exception)
LOG.debug('Successfuly connected to amphora %s: %s',
amphora.id, amp_info)
except driver_except.TimeOutException:


+ 3
- 5
octavia/tests/unit/controller/queue/v2/test_endpoints.py View File

@@ -18,7 +18,6 @@ from oslo_config import fixture as oslo_fixture
from oslo_utils import uuidutils

from octavia.controller.queue.v2 import endpoints
from octavia.controller.worker.v2 import controller_worker
from octavia.tests.unit import base


@@ -30,10 +29,9 @@ class TestEndpoints(base.TestCase):
conf = self.useFixture(oslo_fixture.Config(cfg.CONF))
conf.config(octavia_plugins='hot_plug_plugin')

mock_class = mock.create_autospec(controller_worker.ControllerWorker)
self.worker_patcher = mock.patch('octavia.controller.queue.v2.'
'endpoints.stevedore_driver')
self.worker_patcher.start().ControllerWorker = mock_class
self.worker_patcher = mock.patch('octavia.controller.worker.v2.'
'controller_worker.ControllerWorker')
self.worker_patcher.start()

self.ep = endpoints.Endpoints()
self.context = {}


+ 4
- 2
octavia/tests/unit/controller/worker/v2/tasks/test_amphora_driver_tasks.py View File

@@ -580,8 +580,10 @@ class TestAmphoraDriverTasks(base.TestCase):
mock_amphora_repo_update):
amp_compute_conn_wait_obj = (
amphora_driver_tasks.AmphoraComputeConnectivityWait())
amp_compute_conn_wait_obj.execute(_amphora_mock)
mock_driver.get_info.assert_called_once_with(_amphora_mock)
amp_compute_conn_wait_obj.execute(_amphora_mock,
raise_retry_exception=True)
mock_driver.get_info.assert_called_once_with(
_amphora_mock, raise_retry_exception=True)

mock_driver.get_info.side_effect = driver_except.TimeOutException()
self.assertRaises(driver_except.TimeOutException,


Loading…
Cancel
Save