diff --git a/octavia/common/base_taskflow.py b/octavia/common/base_taskflow.py index 4b82c31cc5..d824639a58 100644 --- a/octavia/common/base_taskflow.py +++ b/octavia/common/base_taskflow.py @@ -29,7 +29,8 @@ from taskflow.listeners import logging from taskflow.persistence import models from taskflow import states -from octavia.amphorae.driver_exceptions import exceptions +from octavia.amphorae.driver_exceptions import exceptions as drv_exceptions +from octavia.common import exceptions LOG = log.getLogger(__name__) @@ -40,7 +41,9 @@ CONF = cfg.CONF # to instance" will be logged as usual. def retryMaskFilter(record): if record.exc_info is not None and isinstance( - record.exc_info[1], exceptions.AmpConnectionRetry): + record.exc_info[1], ( + drv_exceptions.AmpConnectionRetry, + exceptions.ComputeWaitTimeoutException)): return False return True diff --git a/octavia/common/constants.py b/octavia/common/constants.py index a44fecaf80..f3a17d3d0b 100644 --- a/octavia/common/constants.py +++ b/octavia/common/constants.py @@ -507,6 +507,7 @@ GENERATE_SERVER_PEM = 'octavia-generate-serverpem' UPDATE_CERT_EXPIRATION = 'octavia-update-cert-expiration' CERT_COMPUTE_CREATE = 'octavia-cert-compute-create' COMPUTE_CREATE = 'octavia-compute-create' +COMPUTE_CREATE_RETRY_SUBFLOW = 'octavia-compute-create-retry-subflow' UPDATE_AMPHORA_COMPUTEID = 'octavia-update-amphora-computeid' MARK_AMPHORA_BOOTING_INDB = 'octavia-mark-amphora-booting-indb' WAIT_FOR_AMPHORA = 'octavia-wait_for_amphora' diff --git a/octavia/controller/worker/v2/flows/amphora_flows.py b/octavia/controller/worker/v2/flows/amphora_flows.py index 0466b80e22..2883c13866 100644 --- a/octavia/controller/worker/v2/flows/amphora_flows.py +++ b/octavia/controller/worker/v2/flows/amphora_flows.py @@ -57,9 +57,14 @@ class AmphoraFlows(object): provides=constants.COMPUTE_ID)) create_amphora_flow.add(database_tasks.MarkAmphoraBootingInDB( requires=(constants.AMPHORA_ID, constants.COMPUTE_ID))) - create_amphora_flow.add(compute_tasks.ComputeActiveWait( - requires=(constants.COMPUTE_ID, constants.AMPHORA_ID), - provides=constants.COMPUTE_OBJ)) + retry_subflow = linear_flow.Flow( + constants.COMPUTE_CREATE_RETRY_SUBFLOW, + retry=compute_tasks.ComputeRetry()) + retry_subflow.add( + compute_tasks.ComputeWait( + requires=(constants.COMPUTE_ID, constants.AMPHORA_ID), + provides=constants.COMPUTE_OBJ)) + create_amphora_flow.add(retry_subflow) create_amphora_flow.add(database_tasks.UpdateAmphoraInfo( requires=(constants.AMPHORA_ID, constants.COMPUTE_OBJ), provides=constants.AMPHORA)) @@ -113,11 +118,7 @@ class AmphoraFlows(object): create_amp_for_lb_subflow.add(database_tasks.MarkAmphoraBootingInDB( name=sf_name + '-' + constants.MARK_AMPHORA_BOOTING_INDB, requires=(constants.AMPHORA_ID, constants.COMPUTE_ID))) - create_amp_for_lb_subflow.add(compute_tasks.ComputeActiveWait( - name=sf_name + '-' + constants.COMPUTE_WAIT, - requires=(constants.COMPUTE_ID, constants.AMPHORA_ID, - constants.AVAILABILITY_ZONE), - provides=constants.COMPUTE_OBJ)) + create_amp_for_lb_subflow.add(self._retry_compute_wait_flow(sf_name)) create_amp_for_lb_subflow.add(database_tasks.UpdateAmphoraInfo( name=sf_name + '-' + constants.UPDATE_AMPHORA_INFO, requires=(constants.AMPHORA_ID, constants.COMPUTE_OBJ), @@ -146,6 +147,18 @@ class AmphoraFlows(object): return create_amp_for_lb_subflow + def _retry_compute_wait_flow(self, sf_name): + retry_task = sf_name + '-' + constants.COMPUTE_WAIT + retry_subflow = linear_flow.Flow( + sf_name + '-' + constants.COMPUTE_CREATE_RETRY_SUBFLOW, + retry=compute_tasks.ComputeRetry()) + retry_subflow.add( + compute_tasks.ComputeWait( + name=retry_task, + requires=(constants.COMPUTE_ID, constants.AMPHORA_ID), + provides=constants.COMPUTE_OBJ)) + return retry_subflow + def _retry_flow(self, sf_name): retry_task = sf_name + '-' + constants.AMP_COMPUTE_CONNECTIVITY_WAIT retry_subflow = linear_flow.Flow( diff --git a/octavia/controller/worker/v2/tasks/compute_tasks.py b/octavia/controller/worker/v2/tasks/compute_tasks.py index 7b675ee268..8ffe9bf1ae 100644 --- a/octavia/controller/worker/v2/tasks/compute_tasks.py +++ b/octavia/controller/worker/v2/tasks/compute_tasks.py @@ -19,6 +19,7 @@ from cryptography import fernet from oslo_config import cfg from oslo_log import log as logging from stevedore import driver as stevedore_driver +from taskflow import retry from taskflow import task from taskflow.types import failure import tenacity @@ -51,6 +52,26 @@ class BaseComputeTask(task.Task): self.rate_limit = amphora_rate_limit.AmphoraBuildRateLimit() +class ComputeRetry(retry.Times): + + def on_failure(self, history, *args, **kwargs): + last_errors = history[-1][1] + max_retry_attempt = CONF.controller_worker.amp_active_retries + for task_name, ex_info in last_errors.items(): + if len(history) <= max_retry_attempt: + # When taskflow persistance is enabled and flow/task state is + # saved in the backend. If flow(task) is restored(restart of + # worker,etc) we are getting ex_info as None - we need to RETRY + # task to check its real state. + if ex_info is None or ex_info._exc_info is None: + return retry.RETRY + excp = ex_info._exc_info[1] + if isinstance(excp, exceptions.ComputeWaitTimeoutException): + return retry.RETRY + + return retry.REVERT_ALL + + class ComputeCreate(BaseComputeTask): """Create the compute instance for a new amphora.""" @@ -251,7 +272,7 @@ class ComputeDelete(BaseComputeTask): raise -class ComputeActiveWait(BaseComputeTask): +class ComputeWait(BaseComputeTask): """Wait for the compute driver to mark the amphora active.""" def execute(self, compute_id, amphora_id, availability_zone): @@ -268,16 +289,16 @@ class ComputeActiveWait(BaseComputeTask): amp_network = availability_zone.get(constants.MANAGEMENT_NETWORK) else: amp_network = None - for i in range(CONF.controller_worker.amp_active_retries): - amp, fault = self.compute.get_amphora(compute_id, amp_network) - if amp.status == constants.ACTIVE: - if CONF.haproxy_amphora.build_rate_limit != -1: - self.rate_limit.remove_from_build_req_queue(amphora_id) - return amp.to_dict() - if amp.status == constants.ERROR: - raise exceptions.ComputeBuildException(fault=fault) - time.sleep(CONF.controller_worker.amp_active_wait_sec) + amp, fault = self.compute.get_amphora(compute_id, amp_network) + if amp.status == constants.ACTIVE: + if CONF.haproxy_amphora.build_rate_limit != -1: + self.rate_limit.remove_from_build_req_queue(amphora_id) + return amp.to_dict() + if amp.status == constants.ERROR: + raise exceptions.ComputeBuildException(fault=fault) + + time.sleep(CONF.controller_worker.amp_active_wait_sec) raise exceptions.ComputeWaitTimeoutException(id=compute_id) diff --git a/octavia/tests/unit/controller/worker/v2/tasks/test_compute_tasks.py b/octavia/tests/unit/controller/worker/v2/tasks/test_compute_tasks.py index f937c14bff..28ace32755 100644 --- a/octavia/tests/unit/controller/worker/v2/tasks/test_compute_tasks.py +++ b/octavia/tests/unit/controller/worker/v2/tasks/test_compute_tasks.py @@ -451,7 +451,7 @@ class TestComputeTasks(base.TestCase): mock_driver.get_amphora.return_value = _db_amphora_mock, None - computewait = compute_tasks.ComputeActiveWait() + computewait = compute_tasks.ComputeWait() # Test with no AZ computewait.execute(COMPUTE_ID, AMPHORA_ID, None) @@ -486,7 +486,7 @@ class TestComputeTasks(base.TestCase): mock_driver.get_amphora.return_value = _db_amphora_mock, None - computewait = compute_tasks.ComputeActiveWait() + computewait = compute_tasks.ComputeWait() computewait.execute(COMPUTE_ID, AMPHORA_ID, None) mock_driver.get_amphora.assert_called_once_with(COMPUTE_ID, None) @@ -511,7 +511,7 @@ class TestComputeTasks(base.TestCase): mock_driver.get_amphora.return_value = _db_amphora_mock, None - computewait = compute_tasks.ComputeActiveWait() + computewait = compute_tasks.ComputeWait() computewait.execute(COMPUTE_ID, AMPHORA_ID, None) mock_driver.get_amphora.assert_called_once_with(COMPUTE_ID, None) diff --git a/releasenotes/notes/fix-computewait-with-persistence-d10223bfb48a0ded.yaml b/releasenotes/notes/fix-computewait-with-persistence-d10223bfb48a0ded.yaml new file mode 100644 index 0000000000..6275a02811 --- /dev/null +++ b/releasenotes/notes/fix-computewait-with-persistence-d10223bfb48a0ded.yaml @@ -0,0 +1,6 @@ +--- +fixes: + - | + Fix an issue that may have occurred when running the amphorav2 with + persistence, the ComputeActiveWait was incorrectly executed twice on + different controllers.