From 9e7ec87113c686c88438f8e77b69b6e4bddf7c71 Mon Sep 17 00:00:00 2001 From: "Ivan A. Melnikov" Date: Wed, 26 Mar 2014 17:36:01 +0400 Subject: [PATCH] Run worker-based engine tests faster This change reduces polling intervals for transport and proxy from default 1 second to 0.01 second. On my machine this reduces time needed to run WorkerBasedEngineTest almost 5 times, which, in turn, makes testr almost twice faster. Change-Id: I8dfe08c06234f33e838059b2f8d9d6a7c7819e06 --- taskflow/engines/worker_based/proxy.py | 8 +++++++- taskflow/tests/unit/test_action_engine.py | 3 +++ 2 files changed, 10 insertions(+), 1 deletion(-) diff --git a/taskflow/engines/worker_based/proxy.py b/taskflow/engines/worker_based/proxy.py index 1265f9e4..4d5282ee 100644 --- a/taskflow/engines/worker_based/proxy.py +++ b/taskflow/engines/worker_based/proxy.py @@ -44,6 +44,12 @@ class Proxy(object): self._transport = kwargs.get('transport') self._transport_opts = kwargs.get('transport_options') + self._drain_events_timeout = DRAIN_EVENTS_PERIOD + if self._transport == 'memory' and self._transport_opts: + polling_interval = self._transport_opts.get('polling_interval') + if polling_interval: + self._drain_events_timeout = polling_interval + # create connection self._conn = kombu.Connection(self._url, transport=self._transport, transport_options=self._transport_opts) @@ -95,7 +101,7 @@ class Proxy(object): self._running.set() while self.is_running: try: - conn.drain_events(timeout=DRAIN_EVENTS_PERIOD) + conn.drain_events(timeout=self._drain_events_timeout) except socket.timeout: pass if self._on_wait is not None: diff --git a/taskflow/tests/unit/test_action_engine.py b/taskflow/tests/unit/test_action_engine.py index c095f086..3691ed05 100644 --- a/taskflow/tests/unit/test_action_engine.py +++ b/taskflow/tests/unit/test_action_engine.py @@ -587,6 +587,9 @@ class WorkerBasedEngineTest(EngineTaskTest, 'taskflow.tests.utils', ], 'transport': self.transport, + 'transport_options': { + 'polling_interval': 0.01 + } } self.worker = wkr.Worker(**worker_conf) self.worker_thread = threading.Thread(target=self.worker.run)