From dad1357ebfa5600844b15a22a3d10f82a2d1ddbf Mon Sep 17 00:00:00 2001 From: Ann Taraday Date: Mon, 3 Jun 2019 19:41:47 +0400 Subject: [PATCH] Use retry for AmphoraComputeConnectivityWait Use taskflow retry for connectivity wait. [1] This reqired for redis jobboard implementation as each retry expand claim for job on worker. This means that worker is proccesing job and it should not be released for other workers to work on it. Adopted for v2 flows. [1] - https://docs.openstack.org/taskflow/latest/user/atoms.html#retry Story: 2005072 Task: 33477 Change-Id: I2cf241ea965ad56ed70ebde83632ab855f5d859e (cherry picked from commit 314b43af9a57a698a66d46c11892380bff315166) --- .../amphorae/driver_exceptions/exceptions.py | 6 +++ octavia/amphorae/drivers/driver_base.py | 4 +- .../drivers/haproxy/rest_api_driver.py | 37 ++++++++++++------- .../amphorae/drivers/noop_driver/driver.py | 7 ++-- octavia/common/constants.py | 1 + octavia/controller/queue/v2/endpoints.py | 8 +--- .../worker/v2/flows/amphora_flows.py | 20 +++++++--- .../worker/v2/tasks/amphora_driver_tasks.py | 26 ++++++++++++- .../controller/queue/v2/test_endpoints.py | 8 ++-- .../v2/tasks/test_amphora_driver_tasks.py | 6 ++- 10 files changed, 86 insertions(+), 37 deletions(-) diff --git a/octavia/amphorae/driver_exceptions/exceptions.py b/octavia/amphorae/driver_exceptions/exceptions.py index b93b434017..3380e5d2cd 100644 --- a/octavia/amphorae/driver_exceptions/exceptions.py +++ b/octavia/amphorae/driver_exceptions/exceptions.py @@ -121,3 +121,9 @@ class NodeProvisioningError(ProvisioningErrors): class AmpDriverNotImplementedError(AmphoraDriverError): message = _('Amphora does not implement this feature.') + + +class AmpConnectionRetry(AmphoraDriverError): + + message = _('Could not connect to amphora, exception caught: ' + '%(exception)s') diff --git a/octavia/amphorae/drivers/driver_base.py b/octavia/amphorae/drivers/driver_base.py index f2c72ed807..215b95ef2a 100644 --- a/octavia/amphorae/drivers/driver_base.py +++ b/octavia/amphorae/drivers/driver_base.py @@ -82,11 +82,13 @@ class AmphoraLoadBalancerDriver(object): """ @abc.abstractmethod - def get_info(self, amphora): + def get_info(self, amphora, raise_retry_exception=False): """Returns information about the amphora. :param amphora: amphora object, need to use its id property :type amphora: octavia.db.models.Amphora + :param raise_retry_exception: Flag if outside task should be retried + :type boolean: False by default :returns: return a value list (amphora.id, status flag--'info') At this moment, we just build the basic structure for testing, will diff --git a/octavia/amphorae/drivers/haproxy/rest_api_driver.py b/octavia/amphorae/drivers/haproxy/rest_api_driver.py index 47c720a331..aee2fb3ef0 100644 --- a/octavia/amphorae/drivers/haproxy/rest_api_driver.py +++ b/octavia/amphorae/drivers/haproxy/rest_api_driver.py @@ -91,7 +91,8 @@ class HaproxyAmphoraLoadBalancerDriver( return haproxy_version_string.split('.')[:2] - def _populate_amphora_api_version(self, amphora): + def _populate_amphora_api_version(self, amphora, + raise_retry_exception=False): """Populate the amphora object with the api_version This will query the amphora for version discovery and populate @@ -102,7 +103,8 @@ class HaproxyAmphoraLoadBalancerDriver( if not getattr(amphora, 'api_version', None): try: amphora.api_version = self.clients['base'].get_api_version( - amphora)['api_version'] + amphora, + raise_retry_exception=raise_retry_exception)['api_version'] except exc.NotFound: # Amphora is too old for version discovery, default to 0.5 amphora.api_version = '0.5' @@ -364,9 +366,11 @@ class HaproxyAmphoraLoadBalancerDriver( self.clients[amphora.api_version].delete_listener( amphora, listener.load_balancer.id) - def get_info(self, amphora): - self._populate_amphora_api_version(amphora) - return self.clients[amphora.api_version].get_info(amphora) + def get_info(self, amphora, raise_retry_exception=False): + self._populate_amphora_api_version( + amphora, raise_retry_exception=raise_retry_exception) + return self.clients[amphora.api_version].get_info( + amphora, raise_retry_exception=raise_retry_exception) def get_diagnostics(self, amphora): pass @@ -624,7 +628,7 @@ class AmphoraAPIClientBase(object): port=CONF.haproxy_amphora.bind_port) def request(self, method, amp, path='/', timeout_dict=None, - retry_404=True, **kwargs): + retry_404=True, raise_retry_exception=False, **kwargs): cfg_ha_amp = CONF.haproxy_amphora if timeout_dict is None: timeout_dict = {} @@ -689,7 +693,13 @@ class AmphoraAPIClientBase(object): exception = e LOG.warning("Could not connect to instance. Retrying.") time.sleep(conn_retry_interval) - + if raise_retry_exception: + # For taskflow persistence cause attribute should + # be serializable to JSON. Pass None, as cause exception + # is described in the expection message. + six.raise_from( + driver_except.AmpConnectionRetry(exception=str(e)), + None) LOG.error("Connection retries (currently set to %(max_retries)s) " "exhausted. The amphora is unavailable. Reason: " "%(exception)s", @@ -697,9 +707,10 @@ class AmphoraAPIClientBase(object): 'exception': exception}) raise driver_except.TimeOutException() - def get_api_version(self, amp): + def get_api_version(self, amp, raise_retry_exception=False): amp.api_version = None - r = self.get(amp, retry_404=False) + r = self.get(amp, retry_404=False, + raise_retry_exception=raise_retry_exception) # Handle 404 special as we don't want to log an ERROR on 404 exc.check_exception(r, (404,)) if r.status_code == 404: @@ -766,8 +777,8 @@ class AmphoraAPIClient0_5(AmphoraAPIClientBase): amp, 'listeners/{listener_id}'.format(listener_id=listener_id)) return exc.check_exception(r, (404,)) - def get_info(self, amp): - r = self.get(amp, "info") + def get_info(self, amp, raise_retry_exception=False): + r = self.get(amp, "info", raise_retry_exception=raise_retry_exception) if exc.check_exception(r): return r.json() return None @@ -896,8 +907,8 @@ class AmphoraAPIClient1_0(AmphoraAPIClientBase): amp, 'listeners/{object_id}'.format(object_id=object_id)) return exc.check_exception(r, (404,)) - def get_info(self, amp): - r = self.get(amp, "info") + def get_info(self, amp, raise_retry_exception=False): + r = self.get(amp, "info", raise_retry_exception=raise_retry_exception) if exc.check_exception(r): return r.json() return None diff --git a/octavia/amphorae/drivers/noop_driver/driver.py b/octavia/amphorae/drivers/noop_driver/driver.py index bd53ee5c5c..6ce3de70bf 100644 --- a/octavia/amphorae/drivers/noop_driver/driver.py +++ b/octavia/amphorae/drivers/noop_driver/driver.py @@ -73,7 +73,7 @@ class NoopManager(object): listener.load_balancer.vip.ip_address)] = ( listener, listener.load_balancer.vip, 'delete') - def get_info(self, amphora): + def get_info(self, amphora, raise_retry_exception=False): LOG.debug("Amphora %s no-op, info amphora %s", self.__class__.__name__, amphora.id) self.amphoraconfig[amphora.id] = (amphora.id, 'get_info') @@ -138,9 +138,10 @@ class NoopAmphoraLoadBalancerDriver( self.driver.delete(listener) - def get_info(self, amphora): + def get_info(self, amphora, raise_retry_exception=False): - self.driver.get_info(amphora) + self.driver.get_info(amphora, + raise_retry_exception=raise_retry_exception) def get_diagnostics(self, amphora): diff --git a/octavia/common/constants.py b/octavia/common/constants.py index 1c98116fd1..2c505c1132 100644 --- a/octavia/common/constants.py +++ b/octavia/common/constants.py @@ -383,6 +383,7 @@ VRRP_GROUP = 'vrrp_group' # Taskflow flow and task names CERT_ROTATE_AMPHORA_FLOW = 'octavia-cert-rotate-amphora-flow' CREATE_AMPHORA_FLOW = 'octavia-create-amphora-flow' +CREATE_AMPHORA_RETRY_SUBFLOW = 'octavia-create-amphora-retry-subflow' CREATE_AMPHORA_FOR_LB_FLOW = 'octavia-create-amp-for-lb-flow' CREATE_HEALTH_MONITOR_FLOW = 'octavia-create-health-monitor-flow' CREATE_LISTENER_FLOW = 'octavia-create-listener_flow' diff --git a/octavia/controller/queue/v2/endpoints.py b/octavia/controller/queue/v2/endpoints.py index 00eaef7487..fa4583f185 100644 --- a/octavia/controller/queue/v2/endpoints.py +++ b/octavia/controller/queue/v2/endpoints.py @@ -15,9 +15,9 @@ from oslo_config import cfg from oslo_log import log as logging import oslo_messaging as messaging -from stevedore import driver as stevedore_driver from octavia.common import constants +from octavia.controller.worker.v2 import controller_worker CONF = cfg.CONF @@ -34,11 +34,7 @@ class Endpoints(object): version='2.0') def __init__(self): - self.worker = stevedore_driver.DriverManager( - namespace='octavia.plugins', - name=CONF.octavia_plugins, - invoke_on_load=True - ).driver + self.worker = controller_worker.ControllerWorker() def create_load_balancer(self, context, load_balancer_id, flavor=None): diff --git a/octavia/controller/worker/v2/flows/amphora_flows.py b/octavia/controller/worker/v2/flows/amphora_flows.py index a983b30213..608ddc4398 100644 --- a/octavia/controller/worker/v2/flows/amphora_flows.py +++ b/octavia/controller/worker/v2/flows/amphora_flows.py @@ -70,9 +70,14 @@ class AmphoraFlows(object): create_amphora_flow.add(database_tasks.UpdateAmphoraInfo( requires=(constants.AMPHORA_ID, constants.COMPUTE_OBJ), provides=constants.AMPHORA)) - create_amphora_flow.add( + retry_subflow = linear_flow.Flow( + constants.CREATE_AMPHORA_RETRY_SUBFLOW, + retry=amphora_driver_tasks.AmpRetry()) + retry_subflow.add( amphora_driver_tasks.AmphoraComputeConnectivityWait( - requires=constants.AMPHORA)) + requires=constants.AMPHORA, + inject={'raise_retry_exception': True})) + create_amphora_flow.add(retry_subflow) create_amphora_flow.add(database_tasks.ReloadAmphora( requires=constants.AMPHORA_ID, provides=constants.AMPHORA)) @@ -194,10 +199,15 @@ class AmphoraFlows(object): name=sf_name + '-' + constants.UPDATE_AMPHORA_INFO, requires=(constants.AMPHORA_ID, constants.COMPUTE_OBJ), provides=constants.AMPHORA)) - create_amp_for_lb_subflow.add( + retry_task = sf_name + '-' + constants.AMP_COMPUTE_CONNECTIVITY_WAIT + retry_subflow = linear_flow.Flow( + constants.CREATE_AMPHORA_RETRY_SUBFLOW, + retry=amphora_driver_tasks.AmpRetry()) + retry_subflow.add( amphora_driver_tasks.AmphoraComputeConnectivityWait( - name=sf_name + '-' + constants.AMP_COMPUTE_CONNECTIVITY_WAIT, - requires=constants.AMPHORA)) + name=retry_task, requires=constants.AMPHORA, + inject={'raise_retry_exception': True})) + create_amp_for_lb_subflow.add(retry_subflow) create_amp_for_lb_subflow.add(amphora_driver_tasks.AmphoraFinalize( name=sf_name + '-' + constants.AMPHORA_FINALIZE, requires=constants.AMPHORA)) diff --git a/octavia/controller/worker/v2/tasks/amphora_driver_tasks.py b/octavia/controller/worker/v2/tasks/amphora_driver_tasks.py index 5a2f07683d..8f9ed12379 100644 --- a/octavia/controller/worker/v2/tasks/amphora_driver_tasks.py +++ b/octavia/controller/worker/v2/tasks/amphora_driver_tasks.py @@ -18,6 +18,7 @@ from oslo_config import cfg from oslo_log import log as logging import six from stevedore import driver as stevedore_driver +from taskflow import retry from taskflow import task from taskflow.types import failure @@ -49,6 +50,26 @@ class BaseAmphoraTask(task.Task): self.task_utils = task_utilities.TaskUtils() +class AmpRetry(retry.Times): + + def on_failure(self, history, *args, **kwargs): + last_errors = history[-1][1] + max_retry_attempt = CONF.haproxy_amphora.connection_max_retries + for task_name, ex_info in last_errors.items(): + if len(history) <= max_retry_attempt: + # When taskflow persistance is enabled and flow/task state is + # saved in the backend. If flow(task) is restored(restart of + # worker,etc) we are getting ex_info as None - we need to RETRY + # task to check its real state. + if ex_info is None or ex_info._exc_info is None: + return retry.RETRY + excp = ex_info._exc_info[1] + if isinstance(excp, driver_except.AmpConnectionRetry): + return retry.RETRY + + return retry.REVERT_ALL + + class AmpListenersUpdate(BaseAmphoraTask): """Task to update the listeners on one amphora.""" @@ -323,10 +344,11 @@ class AmphoraVRRPStart(BaseAmphoraTask): class AmphoraComputeConnectivityWait(BaseAmphoraTask): """Task to wait for the compute instance to be up.""" - def execute(self, amphora): + def execute(self, amphora, raise_retry_exception=False): """Execute get_info routine for an amphora until it responds.""" try: - amp_info = self.amphora_driver.get_info(amphora) + amp_info = self.amphora_driver.get_info( + amphora, raise_retry_exception=raise_retry_exception) LOG.debug('Successfuly connected to amphora %s: %s', amphora.id, amp_info) except driver_except.TimeOutException: diff --git a/octavia/tests/unit/controller/queue/v2/test_endpoints.py b/octavia/tests/unit/controller/queue/v2/test_endpoints.py index 226f164eff..a5207bc98c 100644 --- a/octavia/tests/unit/controller/queue/v2/test_endpoints.py +++ b/octavia/tests/unit/controller/queue/v2/test_endpoints.py @@ -18,7 +18,6 @@ from oslo_config import fixture as oslo_fixture from oslo_utils import uuidutils from octavia.controller.queue.v2 import endpoints -from octavia.controller.worker.v2 import controller_worker from octavia.tests.unit import base @@ -30,10 +29,9 @@ class TestEndpoints(base.TestCase): conf = self.useFixture(oslo_fixture.Config(cfg.CONF)) conf.config(octavia_plugins='hot_plug_plugin') - mock_class = mock.create_autospec(controller_worker.ControllerWorker) - self.worker_patcher = mock.patch('octavia.controller.queue.v2.' - 'endpoints.stevedore_driver') - self.worker_patcher.start().ControllerWorker = mock_class + self.worker_patcher = mock.patch('octavia.controller.worker.v2.' + 'controller_worker.ControllerWorker') + self.worker_patcher.start() self.ep = endpoints.Endpoints() self.context = {} diff --git a/octavia/tests/unit/controller/worker/v2/tasks/test_amphora_driver_tasks.py b/octavia/tests/unit/controller/worker/v2/tasks/test_amphora_driver_tasks.py index 960b8e454c..170b60e6f1 100644 --- a/octavia/tests/unit/controller/worker/v2/tasks/test_amphora_driver_tasks.py +++ b/octavia/tests/unit/controller/worker/v2/tasks/test_amphora_driver_tasks.py @@ -580,8 +580,10 @@ class TestAmphoraDriverTasks(base.TestCase): mock_amphora_repo_update): amp_compute_conn_wait_obj = ( amphora_driver_tasks.AmphoraComputeConnectivityWait()) - amp_compute_conn_wait_obj.execute(_amphora_mock) - mock_driver.get_info.assert_called_once_with(_amphora_mock) + amp_compute_conn_wait_obj.execute(_amphora_mock, + raise_retry_exception=True) + mock_driver.get_info.assert_called_once_with( + _amphora_mock, raise_retry_exception=True) mock_driver.get_info.side_effect = driver_except.TimeOutException() self.assertRaises(driver_except.TimeOutException,