Prevent incorrect reschedule of resumed tasks with jobboard

A previous patch [0] introduced a keepalive mechanism that allows a
controller to notify taskflow that a task is being executed.
But it doesn't handle "resumed" tasks (tasks that have been rescheduled
from one controller to another controller).

This patch moves this keepalive task in a thread that is attached to a
taskflow conductor, the thread will extend the expiration of the tasks
that are executed by this conductor.

[0] I6619ba117f7051fe44086389789bc6e2810fd23d

Story 2009998
Task 45089

Change-Id: I29fcad9e121a30d6e8f8178f2f078cf10771a32a
This commit is contained in:
Gregory Thiemonge 2022-04-15 14:53:05 +02:00
parent 8caca9219f
commit d367b47639
3 changed files with 68 additions and 20 deletions

View File

@ -16,6 +16,7 @@
import concurrent.futures import concurrent.futures
import datetime import datetime
import functools import functools
import time
from oslo_config import cfg from oslo_config import cfg
from oslo_log import log from oslo_log import log
@ -153,17 +154,9 @@ class TaskFlowServiceController(object):
def _wait_for_job(self, job_board): def _wait_for_job(self, job_board):
# Wait for job to its complete state # Wait for job to its complete state
expiration_time = CONF.task_flow.jobboard_expiration_time
need_wait = True
while need_wait:
need_wait = False
for job in job_board.iterjobs(): for job in job_board.iterjobs():
# If job hasn't finished in expiration_time/2 seconds, LOG.debug("Waiting for job %s to finish", job.name)
# extend its TTL job.wait()
if not job.wait(timeout=expiration_time / 2):
job.extend_expiry(expiration_time)
need_wait = True
def run_conductor(self, name): def run_conductor(self, name):
with self.driver.persistence_driver.get_persistence() as persistence: with self.driver.persistence_driver.get_persistence() as persistence:
@ -187,4 +180,35 @@ class TaskFlowServiceController(object):
name, board, persistence=persistence, name, board, persistence=persistence,
engine=CONF.task_flow.engine) engine=CONF.task_flow.engine)
waiter_th = concurrent.futures.ThreadPoolExecutor(
max_workers=1)
waiter_th.submit(self._waiter, conductor)
conductor.run() conductor.run()
def _extend_jobs(self, conductor, expiration_time):
conductor_name = conductor._name
with self.driver.persistence_driver.get_persistence() as persistence:
with self.driver.job_board(persistence) as board:
for job in board.iterjobs():
try:
owner = board.find_owner(job)
except TypeError:
# taskflow throws an exception if a job is not owned
# (probably a bug in taskflow)
continue
# Only extend expiry for jobs that are owner by our
# conductor (from the same process)
if owner == conductor_name:
if job.expires_in() < expiration_time / 2:
LOG.debug("Extend expiry for job %s", job.name)
job.extend_expiry(expiration_time)
def _waiter(self, conductor):
expiration_time = CONF.task_flow.jobboard_expiration_time
while True:
self._extend_jobs(conductor, expiration_time)
time.sleep(expiration_time / 4)

View File

@ -119,25 +119,20 @@ class TestTaskFlowServiceController(base.TestCase):
def test__wait_for_job(self): def test__wait_for_job(self):
job1 = mock.MagicMock() job1 = mock.MagicMock()
job1.wait.side_effect = [False, True]
job2 = mock.MagicMock() job2 = mock.MagicMock()
job2.wait.side_effect = [False, True]
job3 = mock.MagicMock()
job3.wait.return_value = True
job_board = mock.MagicMock() job_board = mock.MagicMock()
job_board.iterjobs.side_effect = [ job_board.iterjobs.side_effect = [
[job1, job2, job3],
[job1, job2] [job1, job2]
] ]
self.service_controller._wait_for_job(job_board) self.service_controller._wait_for_job(job_board)
job1.extend_expiry.assert_called_once() job1.wait.assert_called_once()
job2.extend_expiry.assert_called_once() job2.wait.assert_called_once()
job3.extend_expiry.assert_not_called()
@mock.patch('octavia.common.base_taskflow.RedisDynamicLoggingConductor') @mock.patch('octavia.common.base_taskflow.RedisDynamicLoggingConductor')
@mock.patch('octavia.common.base_taskflow.DynamicLoggingConductor') @mock.patch('octavia.common.base_taskflow.DynamicLoggingConductor')
def test_run_conductor(self, dynamiccond, rediscond): @mock.patch('concurrent.futures.ThreadPoolExecutor')
def test_run_conductor(self, mock_threadpoolexec, dynamiccond, rediscond):
self.service_controller.run_conductor("test") self.service_controller.run_conductor("test")
rediscond.assert_called_once_with( rediscond.assert_called_once_with(
"test", self.jobboard_mock.__enter__(), "test", self.jobboard_mock.__enter__(),
@ -154,3 +149,27 @@ class TestTaskFlowServiceController(base.TestCase):
"test2", self.jobboard_mock.__enter__(), "test2", self.jobboard_mock.__enter__(),
persistence=self.persistence_mock.__enter__(), persistence=self.persistence_mock.__enter__(),
engine='parallel') engine='parallel')
def test__extend_jobs(self):
conductor = mock.MagicMock()
conductor._name = 'mycontroller'
job1 = mock.MagicMock()
job1.expires_in.return_value = 10
job2 = mock.MagicMock()
job2.expires_in.return_value = 10
job3 = mock.MagicMock()
job3.expires_in.return_value = 30
self.jobboard_mock.__enter__().iterjobs.return_value = [
job1, job2, job3]
self.jobboard_mock.__enter__().find_owner.side_effect = [
'mycontroller',
TypeError('no owner'),
'mycontroller']
self.service_controller._extend_jobs(conductor, 30)
job1.extend_expiry.assert_called_once_with(30)
job2.extend_expiry.assert_not_called()
job3.extend_expiry.assert_not_called()

View File

@ -0,0 +1,5 @@
---
fixes:
- |
Fix the rescheduling of taskflow tasks that have been resumed after being
interrupted.