Merge "Add keepalive for redis-based taskflow boards" into stable/ussuri

This commit is contained in:
Zuul 2022-04-27 16:56:05 +00:00 committed by Gerrit Code Review
commit be6062bfa4
5 changed files with 53 additions and 45 deletions

View File

@ -126,7 +126,7 @@ class TaskFlowServiceController(object):
def __init__(self, driver):
self.driver = driver
def run_poster(self, flow_factory, *args, wait=False, **kwargs):
def run_poster(self, flow_factory, *args, **kwargs):
with self.driver.persistence_driver.get_persistence() as persistence:
with self.driver.job_board(persistence) as job_board:
job_id = uuidutils.generate_uuid()
@ -145,16 +145,23 @@ class TaskFlowServiceController(object):
job_board.post(job_name, book=job_logbook,
details=job_details)
if wait:
self._wait_for_job(job_board)
self._wait_for_job(job_board)
return job_id
def _wait_for_job(self, job_board):
# Wait for job to its complete state
for job in job_board.iterjobs():
LOG.debug("Waiting for job %s to finish", job.name)
job.wait()
expiration_time = CONF.task_flow.jobboard_expiration_time
need_wait = True
while need_wait:
need_wait = False
for job in job_board.iterjobs():
# If job hasn't finished in expiration_time/2 seconds,
# extend its TTL
if not job.wait(timeout=expiration_time / 2):
job.extend_expiry(expiration_time)
need_wait = True
def run_conductor(self, name):
with self.driver.persistence_driver.get_persistence() as persistence:

View File

@ -100,7 +100,7 @@ class ControllerWorker(object):
db_apis.get_session(), availability_zone))
job_id = self.services_controller.run_poster(
flow_utils.get_create_amphora_flow,
store=store, wait=True)
store=store)
return job_id
except Exception as e:
@ -906,7 +906,7 @@ class ControllerWorker(object):
self.services_controller.run_poster(
flow_utils.get_failover_amphora_flow,
amphora.to_dict(), lb_amp_count,
store=stored_params, wait=True)
store=stored_params)
LOG.info("Successfully completed the failover for an amphora: %s",
{"id": amphora_id,
@ -1057,7 +1057,7 @@ class ControllerWorker(object):
self.services_controller.run_poster(
flow_utils.get_failover_LB_flow, amps, provider_lb_dict,
store=stored_params, wait=True)
store=stored_params)
LOG.info('Failover of load balancer %s completed successfully.',
lb.id)

View File

@ -114,25 +114,26 @@ class TestTaskFlowServiceController(base.TestCase):
post_args = self.jobboard_mock.__enter__().post.call_args
self.assertEqual(job_name, post_args[0][0])
self.assertEqual(job_details, post_args[1]['details'])
wait.assert_not_called()
wait.assert_called()
self.assertEqual(self._mock_uuid, uuid)
@mock.patch('oslo_utils.uuidutils.generate_uuid', return_value=_mock_uuid)
@mock.patch('taskflow.engines.save_factory_details')
def test_run_poster_wait(self, mock_engines, mockuuid):
flow_factory = mock.MagicMock()
flow_factory.__name__ = 'testname'
job_details = {'store': 'test'}
with mock.patch.object(self.service_controller, '_wait_for_job'
) as wait:
uuid = self.service_controller.run_poster(flow_factory, wait=True,
**job_details)
self.persistence_mock.__enter__().get_connection(
).save_logbook.assert_called()
mock_engines.assert_called()
self.jobboard_mock.__enter__().post.assert_called()
wait.assert_called_once_with(self.jobboard_mock.__enter__())
self.assertEqual(self._mock_uuid, uuid)
def test__wait_for_job(self):
job1 = mock.MagicMock()
job1.wait.side_effect = [False, True]
job2 = mock.MagicMock()
job2.wait.side_effect = [False, True]
job3 = mock.MagicMock()
job3.wait.return_value = True
job_board = mock.MagicMock()
job_board.iterjobs.side_effect = [
[job1, job2, job3],
[job1, job2]
]
self.service_controller._wait_for_job(job_board)
job1.extend_expiry.assert_called_once()
job2.extend_expiry.assert_called_once()
job3.extend_expiry.assert_not_called()
@mock.patch('octavia.common.base_taskflow.RedisDynamicLoggingConductor')
@mock.patch('octavia.common.base_taskflow.DynamicLoggingConductor')

View File

@ -189,7 +189,6 @@ class TestControllerWorker(base.TestCase):
(cw.services_controller.run_poster.
assert_called_once_with(
flow_utils.get_create_amphora_flow,
wait=True,
store={constants.BUILD_TYPE_PRIORITY:
constants.LB_CREATE_SPARES_POOL_PRIORITY,
constants.FLAVOR: None,
@ -226,7 +225,6 @@ class TestControllerWorker(base.TestCase):
(cw.services_controller.run_poster.
assert_called_once_with(
flow_utils.get_create_amphora_flow,
wait=True,
store={constants.BUILD_TYPE_PRIORITY:
constants.LB_CREATE_SPARES_POOL_PRIORITY,
constants.FLAVOR: None,
@ -1252,8 +1250,7 @@ class TestControllerWorker(base.TestCase):
cw.services_controller.run_poster.assert_called_once_with(
flow_utils.get_failover_amphora_flow,
mock_amphora.to_dict(), 1, store=expected_stored_params,
wait=True)
mock_amphora.to_dict(), 1, store=expected_stored_params)
@mock.patch('octavia.db.repositories.AvailabilityZoneRepository.'
'get_availability_zone_metadata_dict', return_value={})
@ -1308,8 +1305,7 @@ class TestControllerWorker(base.TestCase):
cw.services_controller.run_poster.assert_called_once_with(
flow_utils.get_failover_amphora_flow,
mock_amphora.to_dict(), 2, store=expected_stored_params,
wait=True)
mock_amphora.to_dict(), 2, store=expected_stored_params)
@mock.patch('octavia.db.repositories.AvailabilityZoneRepository.'
'get_availability_zone_metadata_dict', return_value={})
@ -1364,8 +1360,7 @@ class TestControllerWorker(base.TestCase):
cw.services_controller.run_poster.assert_called_once_with(
flow_utils.get_failover_amphora_flow,
mock_amphora.to_dict(), 2, store=expected_stored_params,
wait=True)
mock_amphora.to_dict(), 2, store=expected_stored_params)
@mock.patch('octavia.api.drivers.utils.'
'db_loadbalancer_to_provider_loadbalancer')
@ -1417,8 +1412,7 @@ class TestControllerWorker(base.TestCase):
cw.services_controller.run_poster.assert_called_once_with(
flow_utils.get_failover_amphora_flow,
mock_amphora.to_dict(), None, store=expected_stored_params,
wait=True)
mock_amphora.to_dict(), None, store=expected_stored_params)
@mock.patch('octavia.db.repositories.FlavorRepository.'
'get_flavor_metadata_dict', return_value={})
@ -1475,8 +1469,7 @@ class TestControllerWorker(base.TestCase):
cw.services_controller.run_poster.assert_called_once_with(
flow_utils.get_failover_amphora_flow,
mock_amphora.to_dict(), 1, store=expected_stored_params,
wait=True)
mock_amphora.to_dict(), 1, store=expected_stored_params)
@mock.patch('octavia.db.repositories.AvailabilityZoneRepository.'
'get_availability_zone_metadata_dict', return_value={})
@ -1535,8 +1528,7 @@ class TestControllerWorker(base.TestCase):
print(cw.services_controller.run_poster, flush=True)
cw.services_controller.run_poster.assert_called_once_with(
flow_utils.get_failover_amphora_flow,
mock_amphora.to_dict(), 1, store=expected_stored_params,
wait=True)
mock_amphora.to_dict(), 1, store=expected_stored_params)
@mock.patch('octavia.controller.worker.v1.flows.amphora_flows.'
'AmphoraFlows.get_failover_amphora_flow')
@ -1624,7 +1616,7 @@ class TestControllerWorker(base.TestCase):
cw.services_controller.run_poster.assert_called_once_with(
flow_utils.get_failover_amphora_flow,
mock_amphora.to_dict(),
None, store=expected_stored_params, wait=True)
None, store=expected_stored_params)
@mock.patch('octavia.db.repositories.AmphoraHealthRepository.delete')
def test_failover_deleted_amphora(self,
@ -1823,7 +1815,7 @@ class TestControllerWorker(base.TestCase):
cw.services_controller.run_poster.assert_called_once_with(
flow_utils.get_failover_LB_flow, [_amphora_mock], provider_lb,
wait=True, store=expected_flow_store)
store=expected_flow_store)
@mock.patch('octavia.controller.worker.v2.controller_worker.'
'ControllerWorker._get_amphorae_for_failover')
@ -1874,7 +1866,7 @@ class TestControllerWorker(base.TestCase):
cw.services_controller.run_poster.assert_called_once_with(
flow_utils.get_failover_LB_flow, [_amphora_mock, _amphora_mock],
provider_lb, wait=True, store=expected_flow_store)
provider_lb, store=expected_flow_store)
@mock.patch('octavia.db.repositories.LoadBalancerRepository.update')
def test_failover_loadbalancer_no_lb(self,
@ -1975,7 +1967,7 @@ class TestControllerWorker(base.TestCase):
cw.services_controller.run_poster.assert_called_once_with(
flow_utils.get_failover_LB_flow, [_amphora_mock], provider_lb,
wait=True, store=expected_flow_store)
store=expected_flow_store)
@mock.patch('octavia.db.repositories.FlavorRepository.'
'get_flavor_metadata_dict', return_value={'taste': 'spicy'})
@ -2029,7 +2021,7 @@ class TestControllerWorker(base.TestCase):
cw.services_controller.run_poster.assert_called_once_with(
flow_utils.get_failover_LB_flow, [_amphora_mock, _amphora_mock],
provider_lb, wait=True, store=expected_flow_store)
provider_lb, store=expected_flow_store)
def test_amphora_cert_rotation(self,
mock_api_get_session,

View File

@ -0,0 +1,8 @@
---
fixes:
- |
Fix an issue with amphorav2 and persistence, some long tasks executed by a
controller might have been released in taskflow and rescheduled on another
controller. Octavia now ensures that a task is never released early by
using a keepalive mechanism to notify taskflow (and its redis backend) that
a job is still running.