Add keepalive for redis-based taskflow boards

When using amphorav2+persistence, taskflow maintains the ownership of a
task using a TTL in redis for each job.

When a task is longer than expected, it is released in redis and another
controller can take it over while it is still running on the first
controller.

Add a keepalive mechanism in the worker to periodically reset the TTL
while the worker is still processing a job

Story 2009761
Task 44242

Change-Id: I6619ba117f7051fe44086389789bc6e2810fd23d
(cherry picked from commit f13c3ca6b8480792e2ab86a5413843b56eb453c6)
This commit is contained in:
Gregory Thiemonge 2022-01-04 09:31:36 +01:00
parent 4039d35ce2
commit 987d6a346f
5 changed files with 52 additions and 42 deletions

View File

@ -126,7 +126,7 @@ class TaskFlowServiceController(object):
def __init__(self, driver): def __init__(self, driver):
self.driver = driver self.driver = driver
def run_poster(self, flow_factory, *args, wait=False, **kwargs): def run_poster(self, flow_factory, *args, **kwargs):
with self.driver.persistence_driver.get_persistence() as persistence: with self.driver.persistence_driver.get_persistence() as persistence:
with self.driver.job_board(persistence) as job_board: with self.driver.job_board(persistence) as job_board:
job_id = uuidutils.generate_uuid() job_id = uuidutils.generate_uuid()
@ -145,16 +145,23 @@ class TaskFlowServiceController(object):
job_board.post(job_name, book=job_logbook, job_board.post(job_name, book=job_logbook,
details=job_details) details=job_details)
if wait: self._wait_for_job(job_board)
self._wait_for_job(job_board)
return job_id return job_id
def _wait_for_job(self, job_board): def _wait_for_job(self, job_board):
# Wait for job to its complete state # Wait for job to its complete state
for job in job_board.iterjobs(): expiration_time = CONF.task_flow.jobboard_expiration_time
LOG.debug("Waiting for job %s to finish", job.name)
job.wait() need_wait = True
while need_wait:
need_wait = False
for job in job_board.iterjobs():
# If job hasn't finished in expiration_time/2 seconds,
# extend its TTL
if not job.wait(timeout=expiration_time / 2):
job.extend_expiry(expiration_time)
need_wait = True
def run_conductor(self, name): def run_conductor(self, name):
with self.driver.persistence_driver.get_persistence() as persistence: with self.driver.persistence_driver.get_persistence() as persistence:

View File

@ -948,7 +948,7 @@ class ControllerWorker(object):
self.run_flow( self.run_flow(
flow_utils.get_failover_amphora_flow, flow_utils.get_failover_amphora_flow,
amphora.to_dict(), lb_amp_count, amphora.to_dict(), lb_amp_count,
store=stored_params, wait=True) store=stored_params)
LOG.info("Successfully completed the failover for an amphora: %s", LOG.info("Successfully completed the failover for an amphora: %s",
{"id": amphora_id, {"id": amphora_id,
@ -1099,7 +1099,7 @@ class ControllerWorker(object):
self.run_flow( self.run_flow(
flow_utils.get_failover_LB_flow, amps, provider_lb_dict, flow_utils.get_failover_LB_flow, amps, provider_lb_dict,
store=stored_params, wait=True) store=stored_params)
LOG.info('Failover of load balancer %s completed successfully.', LOG.info('Failover of load balancer %s completed successfully.',
lb.id) lb.id)

View File

@ -114,25 +114,26 @@ class TestTaskFlowServiceController(base.TestCase):
post_args = self.jobboard_mock.__enter__().post.call_args post_args = self.jobboard_mock.__enter__().post.call_args
self.assertEqual(job_name, post_args[0][0]) self.assertEqual(job_name, post_args[0][0])
self.assertEqual(job_details, post_args[1]['details']) self.assertEqual(job_details, post_args[1]['details'])
wait.assert_not_called() wait.assert_called()
self.assertEqual(self._mock_uuid, uuid) self.assertEqual(self._mock_uuid, uuid)
@mock.patch('oslo_utils.uuidutils.generate_uuid', return_value=_mock_uuid) def test__wait_for_job(self):
@mock.patch('taskflow.engines.save_factory_details') job1 = mock.MagicMock()
def test_run_poster_wait(self, mock_engines, mockuuid): job1.wait.side_effect = [False, True]
flow_factory = mock.MagicMock() job2 = mock.MagicMock()
flow_factory.__name__ = 'testname' job2.wait.side_effect = [False, True]
job_details = {'store': 'test'} job3 = mock.MagicMock()
with mock.patch.object(self.service_controller, '_wait_for_job' job3.wait.return_value = True
) as wait: job_board = mock.MagicMock()
uuid = self.service_controller.run_poster(flow_factory, wait=True, job_board.iterjobs.side_effect = [
**job_details) [job1, job2, job3],
self.persistence_mock.__enter__().get_connection( [job1, job2]
).save_logbook.assert_called() ]
mock_engines.assert_called() self.service_controller._wait_for_job(job_board)
self.jobboard_mock.__enter__().post.assert_called()
wait.assert_called_once_with(self.jobboard_mock.__enter__()) job1.extend_expiry.assert_called_once()
self.assertEqual(self._mock_uuid, uuid) job2.extend_expiry.assert_called_once()
job3.extend_expiry.assert_not_called()
@mock.patch('octavia.common.base_taskflow.RedisDynamicLoggingConductor') @mock.patch('octavia.common.base_taskflow.RedisDynamicLoggingConductor')
@mock.patch('octavia.common.base_taskflow.DynamicLoggingConductor') @mock.patch('octavia.common.base_taskflow.DynamicLoggingConductor')

View File

@ -1210,8 +1210,7 @@ class TestControllerWorker(base.TestCase):
cw.services_controller.run_poster.assert_called_once_with( cw.services_controller.run_poster.assert_called_once_with(
flow_utils.get_failover_amphora_flow, flow_utils.get_failover_amphora_flow,
mock_amphora.to_dict(), 1, store=expected_stored_params, mock_amphora.to_dict(), 1, store=expected_stored_params)
wait=True)
@mock.patch('octavia.db.repositories.AvailabilityZoneRepository.' @mock.patch('octavia.db.repositories.AvailabilityZoneRepository.'
'get_availability_zone_metadata_dict', return_value={}) 'get_availability_zone_metadata_dict', return_value={})
@ -1265,8 +1264,7 @@ class TestControllerWorker(base.TestCase):
cw.services_controller.run_poster.assert_called_once_with( cw.services_controller.run_poster.assert_called_once_with(
flow_utils.get_failover_amphora_flow, flow_utils.get_failover_amphora_flow,
mock_amphora.to_dict(), 2, store=expected_stored_params, mock_amphora.to_dict(), 2, store=expected_stored_params)
wait=True)
@mock.patch('octavia.db.repositories.AvailabilityZoneRepository.' @mock.patch('octavia.db.repositories.AvailabilityZoneRepository.'
'get_availability_zone_metadata_dict', return_value={}) 'get_availability_zone_metadata_dict', return_value={})
@ -1320,8 +1318,7 @@ class TestControllerWorker(base.TestCase):
cw.services_controller.run_poster.assert_called_once_with( cw.services_controller.run_poster.assert_called_once_with(
flow_utils.get_failover_amphora_flow, flow_utils.get_failover_amphora_flow,
mock_amphora.to_dict(), 2, store=expected_stored_params, mock_amphora.to_dict(), 2, store=expected_stored_params)
wait=True)
@mock.patch('octavia.api.drivers.utils.' @mock.patch('octavia.api.drivers.utils.'
'db_loadbalancer_to_provider_loadbalancer') 'db_loadbalancer_to_provider_loadbalancer')
@ -1372,8 +1369,7 @@ class TestControllerWorker(base.TestCase):
cw.services_controller.run_poster.assert_called_once_with( cw.services_controller.run_poster.assert_called_once_with(
flow_utils.get_failover_amphora_flow, flow_utils.get_failover_amphora_flow,
mock_amphora.to_dict(), None, store=expected_stored_params, mock_amphora.to_dict(), None, store=expected_stored_params)
wait=True)
@mock.patch('octavia.db.repositories.FlavorRepository.' @mock.patch('octavia.db.repositories.FlavorRepository.'
'get_flavor_metadata_dict', return_value={}) 'get_flavor_metadata_dict', return_value={})
@ -1429,8 +1425,7 @@ class TestControllerWorker(base.TestCase):
cw.services_controller.run_poster.assert_called_once_with( cw.services_controller.run_poster.assert_called_once_with(
flow_utils.get_failover_amphora_flow, flow_utils.get_failover_amphora_flow,
mock_amphora.to_dict(), 1, store=expected_stored_params, mock_amphora.to_dict(), 1, store=expected_stored_params)
wait=True)
@mock.patch('octavia.db.repositories.AvailabilityZoneRepository.' @mock.patch('octavia.db.repositories.AvailabilityZoneRepository.'
'get_availability_zone_metadata_dict', return_value={}) 'get_availability_zone_metadata_dict', return_value={})
@ -1488,8 +1483,7 @@ class TestControllerWorker(base.TestCase):
print(cw.services_controller.run_poster, flush=True) print(cw.services_controller.run_poster, flush=True)
cw.services_controller.run_poster.assert_called_once_with( cw.services_controller.run_poster.assert_called_once_with(
flow_utils.get_failover_amphora_flow, flow_utils.get_failover_amphora_flow,
mock_amphora.to_dict(), 1, store=expected_stored_params, mock_amphora.to_dict(), 1, store=expected_stored_params)
wait=True)
@mock.patch('octavia.controller.worker.v1.flows.amphora_flows.' @mock.patch('octavia.controller.worker.v1.flows.amphora_flows.'
'AmphoraFlows.get_failover_amphora_flow') 'AmphoraFlows.get_failover_amphora_flow')
@ -1601,7 +1595,7 @@ class TestControllerWorker(base.TestCase):
cw.services_controller.run_poster.assert_called_once_with( cw.services_controller.run_poster.assert_called_once_with(
flow_utils.get_failover_amphora_flow, flow_utils.get_failover_amphora_flow,
mock_amphora.to_dict(), mock_amphora.to_dict(),
None, store=expected_stored_params, wait=True) None, store=expected_stored_params)
@mock.patch('octavia.db.repositories.AmphoraHealthRepository.delete') @mock.patch('octavia.db.repositories.AmphoraHealthRepository.delete')
def test_failover_deleted_amphora(self, def test_failover_deleted_amphora(self,
@ -1800,7 +1794,7 @@ class TestControllerWorker(base.TestCase):
cw.services_controller.run_poster.assert_called_once_with( cw.services_controller.run_poster.assert_called_once_with(
flow_utils.get_failover_LB_flow, [_amphora_mock], provider_lb, flow_utils.get_failover_LB_flow, [_amphora_mock], provider_lb,
wait=True, store=expected_flow_store) store=expected_flow_store)
@mock.patch('octavia.controller.worker.v2.controller_worker.' @mock.patch('octavia.controller.worker.v2.controller_worker.'
'ControllerWorker._get_amphorae_for_failover') 'ControllerWorker._get_amphorae_for_failover')
@ -1851,7 +1845,7 @@ class TestControllerWorker(base.TestCase):
cw.services_controller.run_poster.assert_called_once_with( cw.services_controller.run_poster.assert_called_once_with(
flow_utils.get_failover_LB_flow, [_amphora_mock, _amphora_mock], flow_utils.get_failover_LB_flow, [_amphora_mock, _amphora_mock],
provider_lb, wait=True, store=expected_flow_store) provider_lb, store=expected_flow_store)
@mock.patch('octavia.db.repositories.LoadBalancerRepository.update') @mock.patch('octavia.db.repositories.LoadBalancerRepository.update')
def test_failover_loadbalancer_no_lb(self, def test_failover_loadbalancer_no_lb(self,
@ -1952,7 +1946,7 @@ class TestControllerWorker(base.TestCase):
cw.services_controller.run_poster.assert_called_once_with( cw.services_controller.run_poster.assert_called_once_with(
flow_utils.get_failover_LB_flow, [_amphora_mock], provider_lb, flow_utils.get_failover_LB_flow, [_amphora_mock], provider_lb,
wait=True, store=expected_flow_store) store=expected_flow_store)
@mock.patch('octavia.db.repositories.FlavorRepository.' @mock.patch('octavia.db.repositories.FlavorRepository.'
'get_flavor_metadata_dict', return_value={'taste': 'spicy'}) 'get_flavor_metadata_dict', return_value={'taste': 'spicy'})
@ -2006,7 +2000,7 @@ class TestControllerWorker(base.TestCase):
cw.services_controller.run_poster.assert_called_once_with( cw.services_controller.run_poster.assert_called_once_with(
flow_utils.get_failover_LB_flow, [_amphora_mock, _amphora_mock], flow_utils.get_failover_LB_flow, [_amphora_mock, _amphora_mock],
provider_lb, wait=True, store=expected_flow_store) provider_lb, store=expected_flow_store)
def test_amphora_cert_rotation(self, def test_amphora_cert_rotation(self,
mock_api_get_session, mock_api_get_session,

View File

@ -0,0 +1,8 @@
---
fixes:
- |
Fix an issue with amphorav2 and persistence, some long tasks executed by a
controller might have been released in taskflow and rescheduled on another
controller. Octavia now ensures that a task is never released early by
using a keepalive mechanism to notify taskflow (and its redis backend) that
a job is still running.