feat: add max_dispatches arg to conductor's run

- This will cause the conductor to only do 'n' number of dispatches,
  after which it stops dispatching jobs.

- This will allow the code that call the conductor, to monitor
  conductor.dispatching, and make a decision on what is to be done with
  it. Eg: Decomission a conductor, restart the conductor etc

- Backward Compatible.

Change-Id: I3386c7050806806b5ee44a74ba93e50515a5ab7b
This commit is contained in:
Sriram Madapusi Vasudevan
2015-10-14 11:59:08 -04:00
parent 2d8b9dad24
commit 5f5fdd1811
3 changed files with 70 additions and 7 deletions

View File

@@ -11,7 +11,6 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import threading
try:
@@ -22,13 +21,13 @@ except ImportError:
from debtcollector import removals
from oslo_utils import excutils
import six
from taskflow.conductors import base
from taskflow import exceptions as excp
from taskflow.listeners import logging as logging_listener
from taskflow import logging
from taskflow.types import timing as tt
from taskflow.utils import async_utils
from taskflow.utils import iter_utils
LOG = logging.getLogger(__name__)
WAIT_TIMEOUT = 0.5
@@ -44,7 +43,7 @@ class BlockingConductor(base.Conductor):
This conductor iterates over jobs in the provided jobboard (waiting for
the given timeout if no jobs exist) and attempts to claim them, work on
those jobs in its local thread (blocking further work from being claimed
and consumed) and then consume those work units after completetion. This
and consumed) and then consume those work units after completion. This
process will repeat until the conductor has been stopped or other critical
error occurs.
@@ -160,13 +159,24 @@ class BlockingConductor(base.Conductor):
LOG.info("Job completed successfully: %s", job)
return async_utils.make_completed_future(consume)
def run(self):
def run(self, max_dispatches=None):
self._dead.clear()
total_dispatched = 0
try:
if max_dispatches is None:
# NOTE(TheSriram): if max_dispatches is not set,
# then the conductor will run indefinitely, and not
# stop after 'n' number of dispatches
max_dispatches = -1
dispatch_gen = iter_utils.iter_forever(max_dispatches)
while True:
if self._wait_timeout.is_stopped():
break
dispatched = 0
local_dispatched = 0
for job in self._jobboard.iterjobs():
if self._wait_timeout.is_stopped():
break
@@ -186,7 +196,8 @@ class BlockingConductor(base.Conductor):
LOG.warn("Job dispatching failed: %s", job,
exc_info=True)
else:
dispatched += 1
local_dispatched += 1
consume = f.result()
try:
if consume:
@@ -200,8 +211,17 @@ class BlockingConductor(base.Conductor):
else:
LOG.warn("Failed job abandonment: %s", job,
exc_info=True)
if dispatched == 0 and not self._wait_timeout.is_stopped():
total_dispatched = next(dispatch_gen)
if local_dispatched == 0 and \
not self._wait_timeout.is_stopped():
self._wait_timeout.wait()
except StopIteration:
if max_dispatches >= 0 and total_dispatched >= max_dispatches:
LOG.info("Maximum dispatch limit of %s reached",
max_dispatches)
finally:
self._dead.set()

View File

@@ -121,6 +121,34 @@ class BlockingConductorTest(test_utils.EngineTestBase, test.TestCase):
self.assertIsNotNone(fd)
self.assertEqual(st.SUCCESS, fd.state)
def test_run_max_dispatches(self):
components = self.make_components()
components.conductor.connect()
consumed_event = threading.Event()
def on_consume(state, details):
consumed_event.set()
components.board.notifier.register(base.REMOVAL, on_consume)
with close_many(components.client, components.conductor):
t = threading_utils.daemon_thread(
lambda: components.conductor.run(max_dispatches=5))
t.start()
lb, fd = pu.temporary_flow_detail(components.persistence)
engines.save_factory_details(fd, test_factory,
[False], {},
backend=components.persistence)
for _ in range(5):
components.board.post('poke', lb,
details={'flow_uuid': fd.uuid})
self.assertTrue(consumed_event.wait(
test_utils.WAIT_TIMEOUT))
components.board.post('poke', lb,
details={'flow_uuid': fd.uuid})
components.conductor.stop()
self.assertTrue(components.conductor.wait(test_utils.WAIT_TIMEOUT))
self.assertFalse(components.conductor.dispatching)
def test_fail_run(self):
components = self.make_components()
components.conductor.connect()

View File

@@ -17,6 +17,7 @@
# under the License.
import itertools
from six.moves import range as compat_range
def count(it):
@@ -53,3 +54,17 @@ def while_is_not(it, stop_value):
yield value
if value is stop_value:
break
def iter_forever(limit):
"""Yields values from iterator until a limit is reached.
if limit is negative, we iterate forever.
"""
if limit < 0:
i = itertools.count()
while True:
yield next(i)
else:
for i in compat_range(0, limit):
yield i