diff --git a/taskflow/conductors/single_threaded.py b/taskflow/conductors/single_threaded.py index cd8022df..5deeba30 100644 --- a/taskflow/conductors/single_threaded.py +++ b/taskflow/conductors/single_threaded.py @@ -67,20 +67,22 @@ class SingleThreadedConductor(base.Conductor): @lock_utils.locked def stop(self, timeout=None): - """Stops dispatching and returns whether the dispatcher loop is active - or whether it has ceased. If a timeout is provided the dispatcher - loop may not have ceased by the timeout reached (the request to cease - will be honored in the future). + """Requests the conductor to stop dispatching and returns whether the + stop request was successfully completed. If the dispatching is still + occurring then False is returned otherwise True will be returned to + signal that the conductor is no longer dispatching job requests. + + NOTE(harlowja): If a timeout is provided the dispatcher loop may + not have ceased by the timeout reached (the request to cease will + be honored in the future) and False will be returned indicating this. """ self._wait_timeout.interrupt() self._dead.wait(timeout) - return self.dispatching + return self._dead.is_set() @property def dispatching(self): - if self._dead.is_set(): - return False - return True + return not self._dead.is_set() def _dispatch_job(self, job): engine = self._engine_from_job(job) diff --git a/taskflow/tests/unit/conductor/test_conductor.py b/taskflow/tests/unit/conductor/test_conductor.py index 7ac75d91..b43ba035 100644 --- a/taskflow/tests/unit/conductor/test_conductor.py +++ b/taskflow/tests/unit/conductor/test_conductor.py @@ -88,7 +88,8 @@ class SingleThreadedConductorTest(test_utils.EngineTestBase, test.TestCase): with close_many(components.conductor, components.client): t = make_thread(components.conductor) t.start() - self.assertFalse(components.conductor.stop(0.5)) + self.assertTrue(components.conductor.stop(0.5)) + self.assertFalse(components.conductor.dispatching) t.join() def test_run(self): @@ -111,7 +112,7 @@ class SingleThreadedConductorTest(test_utils.EngineTestBase, test.TestCase): details={'flow_uuid': fd.uuid}) consumed_event.wait(1.0) self.assertTrue(consumed_event.is_set()) - components.conductor.stop(1.0) + self.assertTrue(components.conductor.stop(1.0)) self.assertFalse(components.conductor.dispatching) persistence = components.persistence @@ -142,7 +143,7 @@ class SingleThreadedConductorTest(test_utils.EngineTestBase, test.TestCase): details={'flow_uuid': fd.uuid}) consumed_event.wait(1.0) self.assertTrue(consumed_event.is_set()) - components.conductor.stop(1.0) + self.assertTrue(components.conductor.stop(1.0)) self.assertFalse(components.conductor.dispatching) persistence = components.persistence