Fix consuming in broker

IndexError may be caused not only deque.popleft() but also inside
calling function. So IndexError should be handed separately.

Change-Id: I442d652854431c33499260362f0d4621ba0fe7c9
This commit is contained in:
Sergey Skripnick 2015-04-10 14:01:42 +03:00
parent 0c413f553c
commit 388905969f
2 changed files with 29 additions and 16 deletions

View File

@ -36,23 +36,23 @@ def _consumer(consume, queue, is_published):
"""
cache = {}
while True:
if queue:
try:
consume(cache, queue.popleft())
except IndexError:
# NOTE(boris-42): queue is accessed from multiple threads so
# it's quite possible to have 2 queue accessing
# at the same point queue with only 1 element
pass
except Exception as e:
LOG.warning(_("Failed to consume a task from the queue: "
"%s") % e)
if logging.is_debug():
LOG.exception(e)
elif is_published.isSet():
break
else:
if not queue:
if is_published.isSet():
break
time.sleep(0.1)
continue
else:
try:
args = queue.popleft()
except IndexError:
# consumed by other thread
continue
try:
consume(cache, args)
except Exception as e:
LOG.warning(_("Failed to consume a task from the queue: %s") % e)
if logging.is_debug():
LOG.exception(e)
def _publisher(publish, queue, is_published):

View File

@ -68,6 +68,19 @@ class BrokerTestCase(test.TestCase):
broker._consumer(mock_consume, queue, mock_is_published)
self.assertEqual(0, len(queue))
@mock.patch("rally.common.broker.LOG")
def test__consumer_indexerror(self, m_log):
consume = mock.Mock()
consume.side_effect = IndexError()
queue = collections.deque([1, 2, 3])
is_published = mock.Mock()
is_published.isSet.side_effect = [False, False, True]
broker._consumer(consume, queue, is_published)
self.assertTrue(m_log.warning.called)
self.assertFalse(queue)
expected = [mock.call({}, 1), mock.call({}, 2), mock.call({}, 3)]
self.assertEqual(expected, consume.mock_calls)
def test_run(self):
def publish(queue):