diff --git a/octavia/common/base_taskflow.py b/octavia/common/base_taskflow.py index 29a5062215..4419b15d1f 100644 --- a/octavia/common/base_taskflow.py +++ b/octavia/common/base_taskflow.py @@ -126,7 +126,7 @@ class TaskFlowServiceController(object): def __init__(self, driver): self.driver = driver - def run_poster(self, flow_factory, *args, wait=False, **kwargs): + def run_poster(self, flow_factory, *args, **kwargs): with self.driver.persistence_driver.get_persistence() as persistence: with self.driver.job_board(persistence) as job_board: job_id = uuidutils.generate_uuid() @@ -145,16 +145,23 @@ class TaskFlowServiceController(object): job_board.post(job_name, book=job_logbook, details=job_details) - if wait: - self._wait_for_job(job_board) + self._wait_for_job(job_board) return job_id def _wait_for_job(self, job_board): # Wait for job to its complete state - for job in job_board.iterjobs(): - LOG.debug("Waiting for job %s to finish", job.name) - job.wait() + expiration_time = CONF.task_flow.jobboard_expiration_time + + need_wait = True + while need_wait: + need_wait = False + for job in job_board.iterjobs(): + # If job hasn't finished in expiration_time/2 seconds, + # extend its TTL + if not job.wait(timeout=expiration_time / 2): + job.extend_expiry(expiration_time) + need_wait = True def run_conductor(self, name): with self.driver.persistence_driver.get_persistence() as persistence: diff --git a/octavia/controller/worker/v2/controller_worker.py b/octavia/controller/worker/v2/controller_worker.py index 746ea1b271..72809322ac 100644 --- a/octavia/controller/worker/v2/controller_worker.py +++ b/octavia/controller/worker/v2/controller_worker.py @@ -948,7 +948,7 @@ class ControllerWorker(object): self.run_flow( flow_utils.get_failover_amphora_flow, amphora.to_dict(), lb_amp_count, - store=stored_params, wait=True) + store=stored_params) LOG.info("Successfully completed the failover for an amphora: %s", {"id": amphora_id, @@ -1099,7 +1099,7 @@ class ControllerWorker(object): self.run_flow( flow_utils.get_failover_LB_flow, amps, provider_lb_dict, - store=stored_params, wait=True) + store=stored_params) LOG.info('Failover of load balancer %s completed successfully.', lb.id) diff --git a/octavia/tests/unit/common/test_base_taskflow.py b/octavia/tests/unit/common/test_base_taskflow.py index 794128769b..0af3f24977 100644 --- a/octavia/tests/unit/common/test_base_taskflow.py +++ b/octavia/tests/unit/common/test_base_taskflow.py @@ -114,25 +114,26 @@ class TestTaskFlowServiceController(base.TestCase): post_args = self.jobboard_mock.__enter__().post.call_args self.assertEqual(job_name, post_args[0][0]) self.assertEqual(job_details, post_args[1]['details']) - wait.assert_not_called() + wait.assert_called() self.assertEqual(self._mock_uuid, uuid) - @mock.patch('oslo_utils.uuidutils.generate_uuid', return_value=_mock_uuid) - @mock.patch('taskflow.engines.save_factory_details') - def test_run_poster_wait(self, mock_engines, mockuuid): - flow_factory = mock.MagicMock() - flow_factory.__name__ = 'testname' - job_details = {'store': 'test'} - with mock.patch.object(self.service_controller, '_wait_for_job' - ) as wait: - uuid = self.service_controller.run_poster(flow_factory, wait=True, - **job_details) - self.persistence_mock.__enter__().get_connection( - ).save_logbook.assert_called() - mock_engines.assert_called() - self.jobboard_mock.__enter__().post.assert_called() - wait.assert_called_once_with(self.jobboard_mock.__enter__()) - self.assertEqual(self._mock_uuid, uuid) + def test__wait_for_job(self): + job1 = mock.MagicMock() + job1.wait.side_effect = [False, True] + job2 = mock.MagicMock() + job2.wait.side_effect = [False, True] + job3 = mock.MagicMock() + job3.wait.return_value = True + job_board = mock.MagicMock() + job_board.iterjobs.side_effect = [ + [job1, job2, job3], + [job1, job2] + ] + self.service_controller._wait_for_job(job_board) + + job1.extend_expiry.assert_called_once() + job2.extend_expiry.assert_called_once() + job3.extend_expiry.assert_not_called() @mock.patch('octavia.common.base_taskflow.RedisDynamicLoggingConductor') @mock.patch('octavia.common.base_taskflow.DynamicLoggingConductor') diff --git a/octavia/tests/unit/controller/worker/v2/test_controller_worker.py b/octavia/tests/unit/controller/worker/v2/test_controller_worker.py index 70343fa7b5..be5b51b175 100644 --- a/octavia/tests/unit/controller/worker/v2/test_controller_worker.py +++ b/octavia/tests/unit/controller/worker/v2/test_controller_worker.py @@ -1210,8 +1210,7 @@ class TestControllerWorker(base.TestCase): cw.services_controller.run_poster.assert_called_once_with( flow_utils.get_failover_amphora_flow, - mock_amphora.to_dict(), 1, store=expected_stored_params, - wait=True) + mock_amphora.to_dict(), 1, store=expected_stored_params) @mock.patch('octavia.db.repositories.AvailabilityZoneRepository.' 'get_availability_zone_metadata_dict', return_value={}) @@ -1265,8 +1264,7 @@ class TestControllerWorker(base.TestCase): cw.services_controller.run_poster.assert_called_once_with( flow_utils.get_failover_amphora_flow, - mock_amphora.to_dict(), 2, store=expected_stored_params, - wait=True) + mock_amphora.to_dict(), 2, store=expected_stored_params) @mock.patch('octavia.db.repositories.AvailabilityZoneRepository.' 'get_availability_zone_metadata_dict', return_value={}) @@ -1320,8 +1318,7 @@ class TestControllerWorker(base.TestCase): cw.services_controller.run_poster.assert_called_once_with( flow_utils.get_failover_amphora_flow, - mock_amphora.to_dict(), 2, store=expected_stored_params, - wait=True) + mock_amphora.to_dict(), 2, store=expected_stored_params) @mock.patch('octavia.api.drivers.utils.' 'db_loadbalancer_to_provider_loadbalancer') @@ -1372,8 +1369,7 @@ class TestControllerWorker(base.TestCase): cw.services_controller.run_poster.assert_called_once_with( flow_utils.get_failover_amphora_flow, - mock_amphora.to_dict(), None, store=expected_stored_params, - wait=True) + mock_amphora.to_dict(), None, store=expected_stored_params) @mock.patch('octavia.db.repositories.FlavorRepository.' 'get_flavor_metadata_dict', return_value={}) @@ -1429,8 +1425,7 @@ class TestControllerWorker(base.TestCase): cw.services_controller.run_poster.assert_called_once_with( flow_utils.get_failover_amphora_flow, - mock_amphora.to_dict(), 1, store=expected_stored_params, - wait=True) + mock_amphora.to_dict(), 1, store=expected_stored_params) @mock.patch('octavia.db.repositories.AvailabilityZoneRepository.' 'get_availability_zone_metadata_dict', return_value={}) @@ -1488,8 +1483,7 @@ class TestControllerWorker(base.TestCase): print(cw.services_controller.run_poster, flush=True) cw.services_controller.run_poster.assert_called_once_with( flow_utils.get_failover_amphora_flow, - mock_amphora.to_dict(), 1, store=expected_stored_params, - wait=True) + mock_amphora.to_dict(), 1, store=expected_stored_params) @mock.patch('octavia.controller.worker.v1.flows.amphora_flows.' 'AmphoraFlows.get_failover_amphora_flow') @@ -1601,7 +1595,7 @@ class TestControllerWorker(base.TestCase): cw.services_controller.run_poster.assert_called_once_with( flow_utils.get_failover_amphora_flow, mock_amphora.to_dict(), - None, store=expected_stored_params, wait=True) + None, store=expected_stored_params) @mock.patch('octavia.db.repositories.AmphoraHealthRepository.delete') def test_failover_deleted_amphora(self, @@ -1800,7 +1794,7 @@ class TestControllerWorker(base.TestCase): cw.services_controller.run_poster.assert_called_once_with( flow_utils.get_failover_LB_flow, [_amphora_mock], provider_lb, - wait=True, store=expected_flow_store) + store=expected_flow_store) @mock.patch('octavia.controller.worker.v2.controller_worker.' 'ControllerWorker._get_amphorae_for_failover') @@ -1851,7 +1845,7 @@ class TestControllerWorker(base.TestCase): cw.services_controller.run_poster.assert_called_once_with( flow_utils.get_failover_LB_flow, [_amphora_mock, _amphora_mock], - provider_lb, wait=True, store=expected_flow_store) + provider_lb, store=expected_flow_store) @mock.patch('octavia.db.repositories.LoadBalancerRepository.update') def test_failover_loadbalancer_no_lb(self, @@ -1952,7 +1946,7 @@ class TestControllerWorker(base.TestCase): cw.services_controller.run_poster.assert_called_once_with( flow_utils.get_failover_LB_flow, [_amphora_mock], provider_lb, - wait=True, store=expected_flow_store) + store=expected_flow_store) @mock.patch('octavia.db.repositories.FlavorRepository.' 'get_flavor_metadata_dict', return_value={'taste': 'spicy'}) @@ -2006,7 +2000,7 @@ class TestControllerWorker(base.TestCase): cw.services_controller.run_poster.assert_called_once_with( flow_utils.get_failover_LB_flow, [_amphora_mock, _amphora_mock], - provider_lb, wait=True, store=expected_flow_store) + provider_lb, store=expected_flow_store) def test_amphora_cert_rotation(self, mock_api_get_session, diff --git a/releasenotes/notes/fix-long-tasks-with-redis-keepalive-af18211334c14f54.yaml b/releasenotes/notes/fix-long-tasks-with-redis-keepalive-af18211334c14f54.yaml new file mode 100644 index 0000000000..5ca61f3c40 --- /dev/null +++ b/releasenotes/notes/fix-long-tasks-with-redis-keepalive-af18211334c14f54.yaml @@ -0,0 +1,8 @@ +--- +fixes: + - | + Fix an issue with amphorav2 and persistence, some long tasks executed by a + controller might have been released in taskflow and rescheduled on another + controller. Octavia now ensures that a task is never released early by + using a keepalive mechanism to notify taskflow (and its redis backend) that + a job is still running.