diff --git a/rally/common/broker.py b/rally/common/broker.py index 6e55a06972..fe8452cc98 100644 --- a/rally/common/broker.py +++ b/rally/common/broker.py @@ -36,23 +36,23 @@ def _consumer(consume, queue, is_published): """ cache = {} while True: - if queue: - try: - consume(cache, queue.popleft()) - except IndexError: - # NOTE(boris-42): queue is accessed from multiple threads so - # it's quite possible to have 2 queue accessing - # at the same point queue with only 1 element - pass - except Exception as e: - LOG.warning(_("Failed to consume a task from the queue: " - "%s") % e) - if logging.is_debug(): - LOG.exception(e) - elif is_published.isSet(): - break - else: + if not queue: + if is_published.isSet(): + break time.sleep(0.1) + continue + else: + try: + args = queue.popleft() + except IndexError: + # consumed by other thread + continue + try: + consume(cache, args) + except Exception as e: + LOG.warning(_("Failed to consume a task from the queue: %s") % e) + if logging.is_debug(): + LOG.exception(e) def _publisher(publish, queue, is_published): diff --git a/tests/unit/common/test_broker.py b/tests/unit/common/test_broker.py index c5fa1057ce..730f170f57 100644 --- a/tests/unit/common/test_broker.py +++ b/tests/unit/common/test_broker.py @@ -68,6 +68,19 @@ class BrokerTestCase(test.TestCase): broker._consumer(mock_consume, queue, mock_is_published) self.assertEqual(0, len(queue)) + @mock.patch("rally.common.broker.LOG") + def test__consumer_indexerror(self, m_log): + consume = mock.Mock() + consume.side_effect = IndexError() + queue = collections.deque([1, 2, 3]) + is_published = mock.Mock() + is_published.isSet.side_effect = [False, False, True] + broker._consumer(consume, queue, is_published) + self.assertTrue(m_log.warning.called) + self.assertFalse(queue) + expected = [mock.call({}, 1), mock.call({}, 2), mock.call({}, 3)] + self.assertEqual(expected, consume.mock_calls) + def test_run(self): def publish(queue):