diff --git a/taskflow/engines/worker_based/proxy.py b/taskflow/engines/worker_based/proxy.py index 1265f9e4..4d5282ee 100644 --- a/taskflow/engines/worker_based/proxy.py +++ b/taskflow/engines/worker_based/proxy.py @@ -44,6 +44,12 @@ class Proxy(object): self._transport = kwargs.get('transport') self._transport_opts = kwargs.get('transport_options') + self._drain_events_timeout = DRAIN_EVENTS_PERIOD + if self._transport == 'memory' and self._transport_opts: + polling_interval = self._transport_opts.get('polling_interval') + if polling_interval: + self._drain_events_timeout = polling_interval + # create connection self._conn = kombu.Connection(self._url, transport=self._transport, transport_options=self._transport_opts) @@ -95,7 +101,7 @@ class Proxy(object): self._running.set() while self.is_running: try: - conn.drain_events(timeout=DRAIN_EVENTS_PERIOD) + conn.drain_events(timeout=self._drain_events_timeout) except socket.timeout: pass if self._on_wait is not None: diff --git a/taskflow/tests/unit/test_action_engine.py b/taskflow/tests/unit/test_action_engine.py index 65882944..4218571c 100644 --- a/taskflow/tests/unit/test_action_engine.py +++ b/taskflow/tests/unit/test_action_engine.py @@ -587,6 +587,9 @@ class WorkerBasedEngineTest(EngineTaskTest, 'taskflow.tests.utils', ], 'transport': self.transport, + 'transport_options': { + 'polling_interval': 0.01 + } } self.worker = wkr.Worker(**worker_conf) self.worker_thread = threading.Thread(target=self.worker.run)