Enable taskflow retry feature when waiting for compute

The ComputeActiveWait task may be blocked for 300 seconds (with the
default settings for amp_active_retries and amp_active_wait_sec) but
that triggers trigger issues when persistence is enabled if this delay
exceeds the jobboard_expiration_time value.

ComputeActiveWait is renamed to ComputeWait and now uses the retry
feature of taskflow so it would not block taskflow for a long period.

Story 2008956
Task 42584

Change-Id: I174e63cfe176e1792d0bbcd348dbc9115544e297
This commit is contained in:
Gregory Thiemonge 2021-11-03 09:16:01 +01:00
parent 81134bdfec
commit a9ee09a676
6 changed files with 67 additions and 23 deletions

View File

@ -28,7 +28,8 @@ from taskflow.listeners import logging
from taskflow.persistence import models from taskflow.persistence import models
from taskflow import states from taskflow import states
from octavia.amphorae.driver_exceptions import exceptions from octavia.amphorae.driver_exceptions import exceptions as drv_exceptions
from octavia.common import exceptions
LOG = log.getLogger(__name__) LOG = log.getLogger(__name__)
@ -39,7 +40,9 @@ CONF = cfg.CONF
# to instance" will be logged as usual. # to instance" will be logged as usual.
def retryMaskFilter(record): def retryMaskFilter(record):
if record.exc_info is not None and isinstance( if record.exc_info is not None and isinstance(
record.exc_info[1], exceptions.AmpConnectionRetry): record.exc_info[1], (
drv_exceptions.AmpConnectionRetry,
exceptions.ComputeWaitTimeoutException)):
return False return False
return True return True

View File

@ -496,6 +496,7 @@ GENERATE_SERVER_PEM = 'octavia-generate-serverpem'
UPDATE_CERT_EXPIRATION = 'octavia-update-cert-expiration' UPDATE_CERT_EXPIRATION = 'octavia-update-cert-expiration'
CERT_COMPUTE_CREATE = 'octavia-cert-compute-create' CERT_COMPUTE_CREATE = 'octavia-cert-compute-create'
COMPUTE_CREATE = 'octavia-compute-create' COMPUTE_CREATE = 'octavia-compute-create'
COMPUTE_CREATE_RETRY_SUBFLOW = 'octavia-compute-create-retry-subflow'
UPDATE_AMPHORA_COMPUTEID = 'octavia-update-amphora-computeid' UPDATE_AMPHORA_COMPUTEID = 'octavia-update-amphora-computeid'
MARK_AMPHORA_BOOTING_INDB = 'octavia-mark-amphora-booting-indb' MARK_AMPHORA_BOOTING_INDB = 'octavia-mark-amphora-booting-indb'
WAIT_FOR_AMPHORA = 'octavia-wait_for_amphora' WAIT_FOR_AMPHORA = 'octavia-wait_for_amphora'

View File

@ -57,9 +57,14 @@ class AmphoraFlows(object):
provides=constants.COMPUTE_ID)) provides=constants.COMPUTE_ID))
create_amphora_flow.add(database_tasks.MarkAmphoraBootingInDB( create_amphora_flow.add(database_tasks.MarkAmphoraBootingInDB(
requires=(constants.AMPHORA_ID, constants.COMPUTE_ID))) requires=(constants.AMPHORA_ID, constants.COMPUTE_ID)))
create_amphora_flow.add(compute_tasks.ComputeActiveWait( retry_subflow = linear_flow.Flow(
requires=(constants.COMPUTE_ID, constants.AMPHORA_ID), constants.COMPUTE_CREATE_RETRY_SUBFLOW,
provides=constants.COMPUTE_OBJ)) retry=compute_tasks.ComputeRetry())
retry_subflow.add(
compute_tasks.ComputeWait(
requires=(constants.COMPUTE_ID, constants.AMPHORA_ID),
provides=constants.COMPUTE_OBJ))
create_amphora_flow.add(retry_subflow)
create_amphora_flow.add(database_tasks.UpdateAmphoraInfo( create_amphora_flow.add(database_tasks.UpdateAmphoraInfo(
requires=(constants.AMPHORA_ID, constants.COMPUTE_OBJ), requires=(constants.AMPHORA_ID, constants.COMPUTE_OBJ),
provides=constants.AMPHORA)) provides=constants.AMPHORA))
@ -113,11 +118,7 @@ class AmphoraFlows(object):
create_amp_for_lb_subflow.add(database_tasks.MarkAmphoraBootingInDB( create_amp_for_lb_subflow.add(database_tasks.MarkAmphoraBootingInDB(
name=sf_name + '-' + constants.MARK_AMPHORA_BOOTING_INDB, name=sf_name + '-' + constants.MARK_AMPHORA_BOOTING_INDB,
requires=(constants.AMPHORA_ID, constants.COMPUTE_ID))) requires=(constants.AMPHORA_ID, constants.COMPUTE_ID)))
create_amp_for_lb_subflow.add(compute_tasks.ComputeActiveWait( create_amp_for_lb_subflow.add(self._retry_compute_wait_flow(sf_name))
name=sf_name + '-' + constants.COMPUTE_WAIT,
requires=(constants.COMPUTE_ID, constants.AMPHORA_ID,
constants.AVAILABILITY_ZONE),
provides=constants.COMPUTE_OBJ))
create_amp_for_lb_subflow.add(database_tasks.UpdateAmphoraInfo( create_amp_for_lb_subflow.add(database_tasks.UpdateAmphoraInfo(
name=sf_name + '-' + constants.UPDATE_AMPHORA_INFO, name=sf_name + '-' + constants.UPDATE_AMPHORA_INFO,
requires=(constants.AMPHORA_ID, constants.COMPUTE_OBJ), requires=(constants.AMPHORA_ID, constants.COMPUTE_OBJ),
@ -146,6 +147,18 @@ class AmphoraFlows(object):
return create_amp_for_lb_subflow return create_amp_for_lb_subflow
def _retry_compute_wait_flow(self, sf_name):
retry_task = sf_name + '-' + constants.COMPUTE_WAIT
retry_subflow = linear_flow.Flow(
sf_name + '-' + constants.COMPUTE_CREATE_RETRY_SUBFLOW,
retry=compute_tasks.ComputeRetry())
retry_subflow.add(
compute_tasks.ComputeWait(
name=retry_task,
requires=(constants.COMPUTE_ID, constants.AMPHORA_ID),
provides=constants.COMPUTE_OBJ))
return retry_subflow
def _retry_flow(self, sf_name): def _retry_flow(self, sf_name):
retry_task = sf_name + '-' + constants.AMP_COMPUTE_CONNECTIVITY_WAIT retry_task = sf_name + '-' + constants.AMP_COMPUTE_CONNECTIVITY_WAIT
retry_subflow = linear_flow.Flow( retry_subflow = linear_flow.Flow(

View File

@ -19,6 +19,7 @@ from cryptography import fernet
from oslo_config import cfg from oslo_config import cfg
from oslo_log import log as logging from oslo_log import log as logging
from stevedore import driver as stevedore_driver from stevedore import driver as stevedore_driver
from taskflow import retry
from taskflow import task from taskflow import task
from taskflow.types import failure from taskflow.types import failure
import tenacity import tenacity
@ -51,6 +52,26 @@ class BaseComputeTask(task.Task):
self.rate_limit = amphora_rate_limit.AmphoraBuildRateLimit() self.rate_limit = amphora_rate_limit.AmphoraBuildRateLimit()
class ComputeRetry(retry.Times):
def on_failure(self, history, *args, **kwargs):
last_errors = history[-1][1]
max_retry_attempt = CONF.controller_worker.amp_active_retries
for task_name, ex_info in last_errors.items():
if len(history) <= max_retry_attempt:
# When taskflow persistance is enabled and flow/task state is
# saved in the backend. If flow(task) is restored(restart of
# worker,etc) we are getting ex_info as None - we need to RETRY
# task to check its real state.
if ex_info is None or ex_info._exc_info is None:
return retry.RETRY
excp = ex_info._exc_info[1]
if isinstance(excp, exceptions.ComputeWaitTimeoutException):
return retry.RETRY
return retry.REVERT_ALL
class ComputeCreate(BaseComputeTask): class ComputeCreate(BaseComputeTask):
"""Create the compute instance for a new amphora.""" """Create the compute instance for a new amphora."""
@ -251,7 +272,7 @@ class ComputeDelete(BaseComputeTask):
raise raise
class ComputeActiveWait(BaseComputeTask): class ComputeWait(BaseComputeTask):
"""Wait for the compute driver to mark the amphora active.""" """Wait for the compute driver to mark the amphora active."""
def execute(self, compute_id, amphora_id, availability_zone): def execute(self, compute_id, amphora_id, availability_zone):
@ -268,16 +289,16 @@ class ComputeActiveWait(BaseComputeTask):
amp_network = availability_zone.get(constants.MANAGEMENT_NETWORK) amp_network = availability_zone.get(constants.MANAGEMENT_NETWORK)
else: else:
amp_network = None amp_network = None
for i in range(CONF.controller_worker.amp_active_retries):
amp, fault = self.compute.get_amphora(compute_id, amp_network)
if amp.status == constants.ACTIVE:
if CONF.haproxy_amphora.build_rate_limit != -1:
self.rate_limit.remove_from_build_req_queue(amphora_id)
return amp.to_dict()
if amp.status == constants.ERROR:
raise exceptions.ComputeBuildException(fault=fault)
time.sleep(CONF.controller_worker.amp_active_wait_sec)
amp, fault = self.compute.get_amphora(compute_id, amp_network)
if amp.status == constants.ACTIVE:
if CONF.haproxy_amphora.build_rate_limit != -1:
self.rate_limit.remove_from_build_req_queue(amphora_id)
return amp.to_dict()
if amp.status == constants.ERROR:
raise exceptions.ComputeBuildException(fault=fault)
time.sleep(CONF.controller_worker.amp_active_wait_sec)
raise exceptions.ComputeWaitTimeoutException(id=compute_id) raise exceptions.ComputeWaitTimeoutException(id=compute_id)

View File

@ -438,7 +438,7 @@ class TestComputeTasks(base.TestCase):
mock_driver.get_amphora.return_value = _db_amphora_mock, None mock_driver.get_amphora.return_value = _db_amphora_mock, None
computewait = compute_tasks.ComputeActiveWait() computewait = compute_tasks.ComputeWait()
# Test with no AZ # Test with no AZ
computewait.execute(COMPUTE_ID, AMPHORA_ID, None) computewait.execute(COMPUTE_ID, AMPHORA_ID, None)
@ -473,7 +473,7 @@ class TestComputeTasks(base.TestCase):
mock_driver.get_amphora.return_value = _db_amphora_mock, None mock_driver.get_amphora.return_value = _db_amphora_mock, None
computewait = compute_tasks.ComputeActiveWait() computewait = compute_tasks.ComputeWait()
computewait.execute(COMPUTE_ID, AMPHORA_ID, None) computewait.execute(COMPUTE_ID, AMPHORA_ID, None)
mock_driver.get_amphora.assert_called_once_with(COMPUTE_ID, None) mock_driver.get_amphora.assert_called_once_with(COMPUTE_ID, None)
@ -498,7 +498,7 @@ class TestComputeTasks(base.TestCase):
mock_driver.get_amphora.return_value = _db_amphora_mock, None mock_driver.get_amphora.return_value = _db_amphora_mock, None
computewait = compute_tasks.ComputeActiveWait() computewait = compute_tasks.ComputeWait()
computewait.execute(COMPUTE_ID, AMPHORA_ID, None) computewait.execute(COMPUTE_ID, AMPHORA_ID, None)
mock_driver.get_amphora.assert_called_once_with(COMPUTE_ID, None) mock_driver.get_amphora.assert_called_once_with(COMPUTE_ID, None)

View File

@ -0,0 +1,6 @@
---
fixes:
- |
Fix an issue that may have occurred when running the amphorav2 with
persistence, the ComputeActiveWait was incorrectly executed twice on
different controllers.