diff --git a/octavia/common/base_taskflow.py b/octavia/common/base_taskflow.py index 82dfb4b834..07e05099ee 100644 --- a/octavia/common/base_taskflow.py +++ b/octavia/common/base_taskflow.py @@ -16,6 +16,7 @@ import concurrent.futures import datetime import functools +import time from oslo_config import cfg from oslo_log import log @@ -195,17 +196,9 @@ class TaskFlowServiceController(object): def _wait_for_job(self, job_board): # Wait for job to its complete state - expiration_time = CONF.task_flow.jobboard_expiration_time - - need_wait = True - while need_wait: - need_wait = False - for job in job_board.iterjobs(): - # If job hasn't finished in expiration_time/2 seconds, - # extend its TTL - if not job.wait(timeout=expiration_time / 2): - job.extend_expiry(expiration_time) - need_wait = True + for job in job_board.iterjobs(): + LOG.debug("Waiting for job %s to finish", job.name) + job.wait() def run_conductor(self, name): with self.driver.persistence_driver.get_persistence() as persistence: @@ -229,4 +222,35 @@ class TaskFlowServiceController(object): name, board, persistence=persistence, engine=CONF.task_flow.engine) + waiter_th = concurrent.futures.ThreadPoolExecutor( + max_workers=1) + waiter_th.submit(self._waiter, conductor) + conductor.run() + + def _extend_jobs(self, conductor, expiration_time): + conductor_name = conductor._name + + with self.driver.persistence_driver.get_persistence() as persistence: + with self.driver.job_board(persistence) as board: + for job in board.iterjobs(): + try: + owner = board.find_owner(job) + except TypeError: + # taskflow throws an exception if a job is not owned + # (probably a bug in taskflow) + continue + # Only extend expiry for jobs that are owner by our + # conductor (from the same process) + if owner == conductor_name: + if job.expires_in() < expiration_time / 2: + LOG.debug("Extend expiry for job %s", job.name) + job.extend_expiry(expiration_time) + + def _waiter(self, conductor): + expiration_time = CONF.task_flow.jobboard_expiration_time + + while True: + self._extend_jobs(conductor, expiration_time) + + time.sleep(expiration_time / 4) diff --git a/octavia/tests/unit/common/test_base_taskflow.py b/octavia/tests/unit/common/test_base_taskflow.py index 70273da413..779e9d0035 100644 --- a/octavia/tests/unit/common/test_base_taskflow.py +++ b/octavia/tests/unit/common/test_base_taskflow.py @@ -120,25 +120,20 @@ class TestTaskFlowServiceController(base.TestCase): def test__wait_for_job(self): job1 = mock.MagicMock() - job1.wait.side_effect = [False, True] job2 = mock.MagicMock() - job2.wait.side_effect = [False, True] - job3 = mock.MagicMock() - job3.wait.return_value = True job_board = mock.MagicMock() job_board.iterjobs.side_effect = [ - [job1, job2, job3], [job1, job2] ] self.service_controller._wait_for_job(job_board) - job1.extend_expiry.assert_called_once() - job2.extend_expiry.assert_called_once() - job3.extend_expiry.assert_not_called() + job1.wait.assert_called_once() + job2.wait.assert_called_once() @mock.patch('octavia.common.base_taskflow.RedisDynamicLoggingConductor') @mock.patch('octavia.common.base_taskflow.DynamicLoggingConductor') - def test_run_conductor(self, dynamiccond, rediscond): + @mock.patch('concurrent.futures.ThreadPoolExecutor') + def test_run_conductor(self, mock_threadpoolexec, dynamiccond, rediscond): self.service_controller.run_conductor("test") rediscond.assert_called_once_with( "test", self.jobboard_mock.__enter__(), @@ -156,6 +151,30 @@ class TestTaskFlowServiceController(base.TestCase): persistence=self.persistence_mock.__enter__(), engine='parallel') + def test__extend_jobs(self): + conductor = mock.MagicMock() + conductor._name = 'mycontroller' + + job1 = mock.MagicMock() + job1.expires_in.return_value = 10 + job2 = mock.MagicMock() + job2.expires_in.return_value = 10 + job3 = mock.MagicMock() + job3.expires_in.return_value = 30 + self.jobboard_mock.__enter__().iterjobs.return_value = [ + job1, job2, job3] + + self.jobboard_mock.__enter__().find_owner.side_effect = [ + 'mycontroller', + TypeError('no owner'), + 'mycontroller'] + + self.service_controller._extend_jobs(conductor, 30) + + job1.extend_expiry.assert_called_once_with(30) + job2.extend_expiry.assert_not_called() + job3.extend_expiry.assert_not_called() + class TestJobDetailsFilter(base.TestCase): def test_filter(self): diff --git a/releasenotes/notes/fix-reschedule-of-jobboard-tasks-929c066dea9267fd.yaml b/releasenotes/notes/fix-reschedule-of-jobboard-tasks-929c066dea9267fd.yaml new file mode 100644 index 0000000000..67f3c395a7 --- /dev/null +++ b/releasenotes/notes/fix-reschedule-of-jobboard-tasks-929c066dea9267fd.yaml @@ -0,0 +1,5 @@ +--- +fixes: + - | + Fix the rescheduling of taskflow tasks that have been resumed after being + interrupted.