diff --git a/octavia/common/base_taskflow.py b/octavia/common/base_taskflow.py index 9b0eb45315..524ac96ed5 100644 --- a/octavia/common/base_taskflow.py +++ b/octavia/common/base_taskflow.py @@ -126,7 +126,7 @@ class TaskFlowServiceController(object): def __init__(self, driver): self.driver = driver - def run_poster(self, flow_factory, *args, wait=False, **kwargs): + def run_poster(self, flow_factory, *args, **kwargs): with self.driver.persistence_driver.get_persistence() as persistence: with self.driver.job_board(persistence) as job_board: job_id = uuidutils.generate_uuid() @@ -145,16 +145,23 @@ class TaskFlowServiceController(object): job_board.post(job_name, book=job_logbook, details=job_details) - if wait: - self._wait_for_job(job_board) + self._wait_for_job(job_board) return job_id def _wait_for_job(self, job_board): # Wait for job to its complete state - for job in job_board.iterjobs(): - LOG.debug("Waiting for job %s to finish", job.name) - job.wait() + expiration_time = CONF.task_flow.jobboard_expiration_time + + need_wait = True + while need_wait: + need_wait = False + for job in job_board.iterjobs(): + # If job hasn't finished in expiration_time/2 seconds, + # extend its TTL + if not job.wait(timeout=expiration_time / 2): + job.extend_expiry(expiration_time) + need_wait = True def run_conductor(self, name): with self.driver.persistence_driver.get_persistence() as persistence: diff --git a/octavia/controller/worker/v2/controller_worker.py b/octavia/controller/worker/v2/controller_worker.py index 8ec5843628..a6186385d4 100644 --- a/octavia/controller/worker/v2/controller_worker.py +++ b/octavia/controller/worker/v2/controller_worker.py @@ -100,7 +100,7 @@ class ControllerWorker(object): db_apis.get_session(), availability_zone)) job_id = self.services_controller.run_poster( flow_utils.get_create_amphora_flow, - store=store, wait=True) + store=store) return job_id except Exception as e: @@ -906,7 +906,7 @@ class ControllerWorker(object): self.services_controller.run_poster( flow_utils.get_failover_amphora_flow, amphora.to_dict(), lb_amp_count, - store=stored_params, wait=True) + store=stored_params) LOG.info("Successfully completed the failover for an amphora: %s", {"id": amphora_id, @@ -1057,7 +1057,7 @@ class ControllerWorker(object): self.services_controller.run_poster( flow_utils.get_failover_LB_flow, amps, provider_lb_dict, - store=stored_params, wait=True) + store=stored_params) LOG.info('Failover of load balancer %s completed successfully.', lb.id) diff --git a/octavia/tests/unit/common/test_base_taskflow.py b/octavia/tests/unit/common/test_base_taskflow.py index b319dfa8e3..a113a35b30 100644 --- a/octavia/tests/unit/common/test_base_taskflow.py +++ b/octavia/tests/unit/common/test_base_taskflow.py @@ -114,25 +114,26 @@ class TestTaskFlowServiceController(base.TestCase): post_args = self.jobboard_mock.__enter__().post.call_args self.assertEqual(job_name, post_args[0][0]) self.assertEqual(job_details, post_args[1]['details']) - wait.assert_not_called() + wait.assert_called() self.assertEqual(self._mock_uuid, uuid) - @mock.patch('oslo_utils.uuidutils.generate_uuid', return_value=_mock_uuid) - @mock.patch('taskflow.engines.save_factory_details') - def test_run_poster_wait(self, mock_engines, mockuuid): - flow_factory = mock.MagicMock() - flow_factory.__name__ = 'testname' - job_details = {'store': 'test'} - with mock.patch.object(self.service_controller, '_wait_for_job' - ) as wait: - uuid = self.service_controller.run_poster(flow_factory, wait=True, - **job_details) - self.persistence_mock.__enter__().get_connection( - ).save_logbook.assert_called() - mock_engines.assert_called() - self.jobboard_mock.__enter__().post.assert_called() - wait.assert_called_once_with(self.jobboard_mock.__enter__()) - self.assertEqual(self._mock_uuid, uuid) + def test__wait_for_job(self): + job1 = mock.MagicMock() + job1.wait.side_effect = [False, True] + job2 = mock.MagicMock() + job2.wait.side_effect = [False, True] + job3 = mock.MagicMock() + job3.wait.return_value = True + job_board = mock.MagicMock() + job_board.iterjobs.side_effect = [ + [job1, job2, job3], + [job1, job2] + ] + self.service_controller._wait_for_job(job_board) + + job1.extend_expiry.assert_called_once() + job2.extend_expiry.assert_called_once() + job3.extend_expiry.assert_not_called() @mock.patch('octavia.common.base_taskflow.RedisDynamicLoggingConductor') @mock.patch('octavia.common.base_taskflow.DynamicLoggingConductor') diff --git a/octavia/tests/unit/controller/worker/v2/test_controller_worker.py b/octavia/tests/unit/controller/worker/v2/test_controller_worker.py index e03d483362..c5bb7af5fd 100644 --- a/octavia/tests/unit/controller/worker/v2/test_controller_worker.py +++ b/octavia/tests/unit/controller/worker/v2/test_controller_worker.py @@ -189,7 +189,6 @@ class TestControllerWorker(base.TestCase): (cw.services_controller.run_poster. assert_called_once_with( flow_utils.get_create_amphora_flow, - wait=True, store={constants.BUILD_TYPE_PRIORITY: constants.LB_CREATE_SPARES_POOL_PRIORITY, constants.FLAVOR: None, @@ -226,7 +225,6 @@ class TestControllerWorker(base.TestCase): (cw.services_controller.run_poster. assert_called_once_with( flow_utils.get_create_amphora_flow, - wait=True, store={constants.BUILD_TYPE_PRIORITY: constants.LB_CREATE_SPARES_POOL_PRIORITY, constants.FLAVOR: None, @@ -1252,8 +1250,7 @@ class TestControllerWorker(base.TestCase): cw.services_controller.run_poster.assert_called_once_with( flow_utils.get_failover_amphora_flow, - mock_amphora.to_dict(), 1, store=expected_stored_params, - wait=True) + mock_amphora.to_dict(), 1, store=expected_stored_params) @mock.patch('octavia.db.repositories.AvailabilityZoneRepository.' 'get_availability_zone_metadata_dict', return_value={}) @@ -1308,8 +1305,7 @@ class TestControllerWorker(base.TestCase): cw.services_controller.run_poster.assert_called_once_with( flow_utils.get_failover_amphora_flow, - mock_amphora.to_dict(), 2, store=expected_stored_params, - wait=True) + mock_amphora.to_dict(), 2, store=expected_stored_params) @mock.patch('octavia.db.repositories.AvailabilityZoneRepository.' 'get_availability_zone_metadata_dict', return_value={}) @@ -1364,8 +1360,7 @@ class TestControllerWorker(base.TestCase): cw.services_controller.run_poster.assert_called_once_with( flow_utils.get_failover_amphora_flow, - mock_amphora.to_dict(), 2, store=expected_stored_params, - wait=True) + mock_amphora.to_dict(), 2, store=expected_stored_params) @mock.patch('octavia.api.drivers.utils.' 'db_loadbalancer_to_provider_loadbalancer') @@ -1417,8 +1412,7 @@ class TestControllerWorker(base.TestCase): cw.services_controller.run_poster.assert_called_once_with( flow_utils.get_failover_amphora_flow, - mock_amphora.to_dict(), None, store=expected_stored_params, - wait=True) + mock_amphora.to_dict(), None, store=expected_stored_params) @mock.patch('octavia.db.repositories.FlavorRepository.' 'get_flavor_metadata_dict', return_value={}) @@ -1475,8 +1469,7 @@ class TestControllerWorker(base.TestCase): cw.services_controller.run_poster.assert_called_once_with( flow_utils.get_failover_amphora_flow, - mock_amphora.to_dict(), 1, store=expected_stored_params, - wait=True) + mock_amphora.to_dict(), 1, store=expected_stored_params) @mock.patch('octavia.db.repositories.AvailabilityZoneRepository.' 'get_availability_zone_metadata_dict', return_value={}) @@ -1535,8 +1528,7 @@ class TestControllerWorker(base.TestCase): print(cw.services_controller.run_poster, flush=True) cw.services_controller.run_poster.assert_called_once_with( flow_utils.get_failover_amphora_flow, - mock_amphora.to_dict(), 1, store=expected_stored_params, - wait=True) + mock_amphora.to_dict(), 1, store=expected_stored_params) @mock.patch('octavia.controller.worker.v1.flows.amphora_flows.' 'AmphoraFlows.get_failover_amphora_flow') @@ -1624,7 +1616,7 @@ class TestControllerWorker(base.TestCase): cw.services_controller.run_poster.assert_called_once_with( flow_utils.get_failover_amphora_flow, mock_amphora.to_dict(), - None, store=expected_stored_params, wait=True) + None, store=expected_stored_params) @mock.patch('octavia.db.repositories.AmphoraHealthRepository.delete') def test_failover_deleted_amphora(self, @@ -1823,7 +1815,7 @@ class TestControllerWorker(base.TestCase): cw.services_controller.run_poster.assert_called_once_with( flow_utils.get_failover_LB_flow, [_amphora_mock], provider_lb, - wait=True, store=expected_flow_store) + store=expected_flow_store) @mock.patch('octavia.controller.worker.v2.controller_worker.' 'ControllerWorker._get_amphorae_for_failover') @@ -1874,7 +1866,7 @@ class TestControllerWorker(base.TestCase): cw.services_controller.run_poster.assert_called_once_with( flow_utils.get_failover_LB_flow, [_amphora_mock, _amphora_mock], - provider_lb, wait=True, store=expected_flow_store) + provider_lb, store=expected_flow_store) @mock.patch('octavia.db.repositories.LoadBalancerRepository.update') def test_failover_loadbalancer_no_lb(self, @@ -1975,7 +1967,7 @@ class TestControllerWorker(base.TestCase): cw.services_controller.run_poster.assert_called_once_with( flow_utils.get_failover_LB_flow, [_amphora_mock], provider_lb, - wait=True, store=expected_flow_store) + store=expected_flow_store) @mock.patch('octavia.db.repositories.FlavorRepository.' 'get_flavor_metadata_dict', return_value={'taste': 'spicy'}) @@ -2029,7 +2021,7 @@ class TestControllerWorker(base.TestCase): cw.services_controller.run_poster.assert_called_once_with( flow_utils.get_failover_LB_flow, [_amphora_mock, _amphora_mock], - provider_lb, wait=True, store=expected_flow_store) + provider_lb, store=expected_flow_store) def test_amphora_cert_rotation(self, mock_api_get_session, diff --git a/releasenotes/notes/fix-long-tasks-with-redis-keepalive-af18211334c14f54.yaml b/releasenotes/notes/fix-long-tasks-with-redis-keepalive-af18211334c14f54.yaml new file mode 100644 index 0000000000..5ca61f3c40 --- /dev/null +++ b/releasenotes/notes/fix-long-tasks-with-redis-keepalive-af18211334c14f54.yaml @@ -0,0 +1,8 @@ +--- +fixes: + - | + Fix an issue with amphorav2 and persistence, some long tasks executed by a + controller might have been released in taskflow and rescheduled on another + controller. Octavia now ensures that a task is never released early by + using a keepalive mechanism to notify taskflow (and its redis backend) that + a job is still running.