diff --git a/octavia/amphorae/driver_exceptions/exceptions.py b/octavia/amphorae/driver_exceptions/exceptions.py index b93b434017..3380e5d2cd 100644 --- a/octavia/amphorae/driver_exceptions/exceptions.py +++ b/octavia/amphorae/driver_exceptions/exceptions.py @@ -121,3 +121,9 @@ class NodeProvisioningError(ProvisioningErrors): class AmpDriverNotImplementedError(AmphoraDriverError): message = _('Amphora does not implement this feature.') + + +class AmpConnectionRetry(AmphoraDriverError): + + message = _('Could not connect to amphora, exception caught: ' + '%(exception)s') diff --git a/octavia/amphorae/drivers/driver_base.py b/octavia/amphorae/drivers/driver_base.py index f2c72ed807..215b95ef2a 100644 --- a/octavia/amphorae/drivers/driver_base.py +++ b/octavia/amphorae/drivers/driver_base.py @@ -82,11 +82,13 @@ class AmphoraLoadBalancerDriver(object): """ @abc.abstractmethod - def get_info(self, amphora): + def get_info(self, amphora, raise_retry_exception=False): """Returns information about the amphora. :param amphora: amphora object, need to use its id property :type amphora: octavia.db.models.Amphora + :param raise_retry_exception: Flag if outside task should be retried + :type boolean: False by default :returns: return a value list (amphora.id, status flag--'info') At this moment, we just build the basic structure for testing, will diff --git a/octavia/amphorae/drivers/haproxy/rest_api_driver.py b/octavia/amphorae/drivers/haproxy/rest_api_driver.py index 47c720a331..aee2fb3ef0 100644 --- a/octavia/amphorae/drivers/haproxy/rest_api_driver.py +++ b/octavia/amphorae/drivers/haproxy/rest_api_driver.py @@ -91,7 +91,8 @@ class HaproxyAmphoraLoadBalancerDriver( return haproxy_version_string.split('.')[:2] - def _populate_amphora_api_version(self, amphora): + def _populate_amphora_api_version(self, amphora, + raise_retry_exception=False): """Populate the amphora object with the api_version This will query the amphora for version discovery and populate @@ -102,7 +103,8 @@ class HaproxyAmphoraLoadBalancerDriver( if not getattr(amphora, 'api_version', None): try: amphora.api_version = self.clients['base'].get_api_version( - amphora)['api_version'] + amphora, + raise_retry_exception=raise_retry_exception)['api_version'] except exc.NotFound: # Amphora is too old for version discovery, default to 0.5 amphora.api_version = '0.5' @@ -364,9 +366,11 @@ class HaproxyAmphoraLoadBalancerDriver( self.clients[amphora.api_version].delete_listener( amphora, listener.load_balancer.id) - def get_info(self, amphora): - self._populate_amphora_api_version(amphora) - return self.clients[amphora.api_version].get_info(amphora) + def get_info(self, amphora, raise_retry_exception=False): + self._populate_amphora_api_version( + amphora, raise_retry_exception=raise_retry_exception) + return self.clients[amphora.api_version].get_info( + amphora, raise_retry_exception=raise_retry_exception) def get_diagnostics(self, amphora): pass @@ -624,7 +628,7 @@ class AmphoraAPIClientBase(object): port=CONF.haproxy_amphora.bind_port) def request(self, method, amp, path='/', timeout_dict=None, - retry_404=True, **kwargs): + retry_404=True, raise_retry_exception=False, **kwargs): cfg_ha_amp = CONF.haproxy_amphora if timeout_dict is None: timeout_dict = {} @@ -689,7 +693,13 @@ class AmphoraAPIClientBase(object): exception = e LOG.warning("Could not connect to instance. Retrying.") time.sleep(conn_retry_interval) - + if raise_retry_exception: + # For taskflow persistence cause attribute should + # be serializable to JSON. Pass None, as cause exception + # is described in the expection message. + six.raise_from( + driver_except.AmpConnectionRetry(exception=str(e)), + None) LOG.error("Connection retries (currently set to %(max_retries)s) " "exhausted. The amphora is unavailable. Reason: " "%(exception)s", @@ -697,9 +707,10 @@ class AmphoraAPIClientBase(object): 'exception': exception}) raise driver_except.TimeOutException() - def get_api_version(self, amp): + def get_api_version(self, amp, raise_retry_exception=False): amp.api_version = None - r = self.get(amp, retry_404=False) + r = self.get(amp, retry_404=False, + raise_retry_exception=raise_retry_exception) # Handle 404 special as we don't want to log an ERROR on 404 exc.check_exception(r, (404,)) if r.status_code == 404: @@ -766,8 +777,8 @@ class AmphoraAPIClient0_5(AmphoraAPIClientBase): amp, 'listeners/{listener_id}'.format(listener_id=listener_id)) return exc.check_exception(r, (404,)) - def get_info(self, amp): - r = self.get(amp, "info") + def get_info(self, amp, raise_retry_exception=False): + r = self.get(amp, "info", raise_retry_exception=raise_retry_exception) if exc.check_exception(r): return r.json() return None @@ -896,8 +907,8 @@ class AmphoraAPIClient1_0(AmphoraAPIClientBase): amp, 'listeners/{object_id}'.format(object_id=object_id)) return exc.check_exception(r, (404,)) - def get_info(self, amp): - r = self.get(amp, "info") + def get_info(self, amp, raise_retry_exception=False): + r = self.get(amp, "info", raise_retry_exception=raise_retry_exception) if exc.check_exception(r): return r.json() return None diff --git a/octavia/amphorae/drivers/noop_driver/driver.py b/octavia/amphorae/drivers/noop_driver/driver.py index bd53ee5c5c..6ce3de70bf 100644 --- a/octavia/amphorae/drivers/noop_driver/driver.py +++ b/octavia/amphorae/drivers/noop_driver/driver.py @@ -73,7 +73,7 @@ class NoopManager(object): listener.load_balancer.vip.ip_address)] = ( listener, listener.load_balancer.vip, 'delete') - def get_info(self, amphora): + def get_info(self, amphora, raise_retry_exception=False): LOG.debug("Amphora %s no-op, info amphora %s", self.__class__.__name__, amphora.id) self.amphoraconfig[amphora.id] = (amphora.id, 'get_info') @@ -138,9 +138,10 @@ class NoopAmphoraLoadBalancerDriver( self.driver.delete(listener) - def get_info(self, amphora): + def get_info(self, amphora, raise_retry_exception=False): - self.driver.get_info(amphora) + self.driver.get_info(amphora, + raise_retry_exception=raise_retry_exception) def get_diagnostics(self, amphora): diff --git a/octavia/common/constants.py b/octavia/common/constants.py index 1c98116fd1..2c505c1132 100644 --- a/octavia/common/constants.py +++ b/octavia/common/constants.py @@ -383,6 +383,7 @@ VRRP_GROUP = 'vrrp_group' # Taskflow flow and task names CERT_ROTATE_AMPHORA_FLOW = 'octavia-cert-rotate-amphora-flow' CREATE_AMPHORA_FLOW = 'octavia-create-amphora-flow' +CREATE_AMPHORA_RETRY_SUBFLOW = 'octavia-create-amphora-retry-subflow' CREATE_AMPHORA_FOR_LB_FLOW = 'octavia-create-amp-for-lb-flow' CREATE_HEALTH_MONITOR_FLOW = 'octavia-create-health-monitor-flow' CREATE_LISTENER_FLOW = 'octavia-create-listener_flow' diff --git a/octavia/controller/queue/v2/endpoints.py b/octavia/controller/queue/v2/endpoints.py index 00eaef7487..fa4583f185 100644 --- a/octavia/controller/queue/v2/endpoints.py +++ b/octavia/controller/queue/v2/endpoints.py @@ -15,9 +15,9 @@ from oslo_config import cfg from oslo_log import log as logging import oslo_messaging as messaging -from stevedore import driver as stevedore_driver from octavia.common import constants +from octavia.controller.worker.v2 import controller_worker CONF = cfg.CONF @@ -34,11 +34,7 @@ class Endpoints(object): version='2.0') def __init__(self): - self.worker = stevedore_driver.DriverManager( - namespace='octavia.plugins', - name=CONF.octavia_plugins, - invoke_on_load=True - ).driver + self.worker = controller_worker.ControllerWorker() def create_load_balancer(self, context, load_balancer_id, flavor=None): diff --git a/octavia/controller/worker/v2/flows/amphora_flows.py b/octavia/controller/worker/v2/flows/amphora_flows.py index a983b30213..608ddc4398 100644 --- a/octavia/controller/worker/v2/flows/amphora_flows.py +++ b/octavia/controller/worker/v2/flows/amphora_flows.py @@ -70,9 +70,14 @@ class AmphoraFlows(object): create_amphora_flow.add(database_tasks.UpdateAmphoraInfo( requires=(constants.AMPHORA_ID, constants.COMPUTE_OBJ), provides=constants.AMPHORA)) - create_amphora_flow.add( + retry_subflow = linear_flow.Flow( + constants.CREATE_AMPHORA_RETRY_SUBFLOW, + retry=amphora_driver_tasks.AmpRetry()) + retry_subflow.add( amphora_driver_tasks.AmphoraComputeConnectivityWait( - requires=constants.AMPHORA)) + requires=constants.AMPHORA, + inject={'raise_retry_exception': True})) + create_amphora_flow.add(retry_subflow) create_amphora_flow.add(database_tasks.ReloadAmphora( requires=constants.AMPHORA_ID, provides=constants.AMPHORA)) @@ -194,10 +199,15 @@ class AmphoraFlows(object): name=sf_name + '-' + constants.UPDATE_AMPHORA_INFO, requires=(constants.AMPHORA_ID, constants.COMPUTE_OBJ), provides=constants.AMPHORA)) - create_amp_for_lb_subflow.add( + retry_task = sf_name + '-' + constants.AMP_COMPUTE_CONNECTIVITY_WAIT + retry_subflow = linear_flow.Flow( + constants.CREATE_AMPHORA_RETRY_SUBFLOW, + retry=amphora_driver_tasks.AmpRetry()) + retry_subflow.add( amphora_driver_tasks.AmphoraComputeConnectivityWait( - name=sf_name + '-' + constants.AMP_COMPUTE_CONNECTIVITY_WAIT, - requires=constants.AMPHORA)) + name=retry_task, requires=constants.AMPHORA, + inject={'raise_retry_exception': True})) + create_amp_for_lb_subflow.add(retry_subflow) create_amp_for_lb_subflow.add(amphora_driver_tasks.AmphoraFinalize( name=sf_name + '-' + constants.AMPHORA_FINALIZE, requires=constants.AMPHORA)) diff --git a/octavia/controller/worker/v2/tasks/amphora_driver_tasks.py b/octavia/controller/worker/v2/tasks/amphora_driver_tasks.py index 5a2f07683d..8f9ed12379 100644 --- a/octavia/controller/worker/v2/tasks/amphora_driver_tasks.py +++ b/octavia/controller/worker/v2/tasks/amphora_driver_tasks.py @@ -18,6 +18,7 @@ from oslo_config import cfg from oslo_log import log as logging import six from stevedore import driver as stevedore_driver +from taskflow import retry from taskflow import task from taskflow.types import failure @@ -49,6 +50,26 @@ class BaseAmphoraTask(task.Task): self.task_utils = task_utilities.TaskUtils() +class AmpRetry(retry.Times): + + def on_failure(self, history, *args, **kwargs): + last_errors = history[-1][1] + max_retry_attempt = CONF.haproxy_amphora.connection_max_retries + for task_name, ex_info in last_errors.items(): + if len(history) <= max_retry_attempt: + # When taskflow persistance is enabled and flow/task state is + # saved in the backend. If flow(task) is restored(restart of + # worker,etc) we are getting ex_info as None - we need to RETRY + # task to check its real state. + if ex_info is None or ex_info._exc_info is None: + return retry.RETRY + excp = ex_info._exc_info[1] + if isinstance(excp, driver_except.AmpConnectionRetry): + return retry.RETRY + + return retry.REVERT_ALL + + class AmpListenersUpdate(BaseAmphoraTask): """Task to update the listeners on one amphora.""" @@ -323,10 +344,11 @@ class AmphoraVRRPStart(BaseAmphoraTask): class AmphoraComputeConnectivityWait(BaseAmphoraTask): """Task to wait for the compute instance to be up.""" - def execute(self, amphora): + def execute(self, amphora, raise_retry_exception=False): """Execute get_info routine for an amphora until it responds.""" try: - amp_info = self.amphora_driver.get_info(amphora) + amp_info = self.amphora_driver.get_info( + amphora, raise_retry_exception=raise_retry_exception) LOG.debug('Successfuly connected to amphora %s: %s', amphora.id, amp_info) except driver_except.TimeOutException: diff --git a/octavia/tests/unit/controller/queue/v2/test_endpoints.py b/octavia/tests/unit/controller/queue/v2/test_endpoints.py index 226f164eff..a5207bc98c 100644 --- a/octavia/tests/unit/controller/queue/v2/test_endpoints.py +++ b/octavia/tests/unit/controller/queue/v2/test_endpoints.py @@ -18,7 +18,6 @@ from oslo_config import fixture as oslo_fixture from oslo_utils import uuidutils from octavia.controller.queue.v2 import endpoints -from octavia.controller.worker.v2 import controller_worker from octavia.tests.unit import base @@ -30,10 +29,9 @@ class TestEndpoints(base.TestCase): conf = self.useFixture(oslo_fixture.Config(cfg.CONF)) conf.config(octavia_plugins='hot_plug_plugin') - mock_class = mock.create_autospec(controller_worker.ControllerWorker) - self.worker_patcher = mock.patch('octavia.controller.queue.v2.' - 'endpoints.stevedore_driver') - self.worker_patcher.start().ControllerWorker = mock_class + self.worker_patcher = mock.patch('octavia.controller.worker.v2.' + 'controller_worker.ControllerWorker') + self.worker_patcher.start() self.ep = endpoints.Endpoints() self.context = {} diff --git a/octavia/tests/unit/controller/worker/v2/tasks/test_amphora_driver_tasks.py b/octavia/tests/unit/controller/worker/v2/tasks/test_amphora_driver_tasks.py index 960b8e454c..170b60e6f1 100644 --- a/octavia/tests/unit/controller/worker/v2/tasks/test_amphora_driver_tasks.py +++ b/octavia/tests/unit/controller/worker/v2/tasks/test_amphora_driver_tasks.py @@ -580,8 +580,10 @@ class TestAmphoraDriverTasks(base.TestCase): mock_amphora_repo_update): amp_compute_conn_wait_obj = ( amphora_driver_tasks.AmphoraComputeConnectivityWait()) - amp_compute_conn_wait_obj.execute(_amphora_mock) - mock_driver.get_info.assert_called_once_with(_amphora_mock) + amp_compute_conn_wait_obj.execute(_amphora_mock, + raise_retry_exception=True) + mock_driver.get_info.assert_called_once_with( + _amphora_mock, raise_retry_exception=True) mock_driver.get_info.side_effect = driver_except.TimeOutException() self.assertRaises(driver_except.TimeOutException,