Merge "Add simple multi thread broker"

This commit is contained in:
Jenkins 2014-10-24 13:56:52 +00:00 committed by Gerrit Code Review
commit 2adf9a9c05
2 changed files with 186 additions and 0 deletions

101
rally/broker.py Normal file
View File

@ -0,0 +1,101 @@
# Copyright 2014: Mirantis Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import collections
import threading
import time
from oslo.config import cfg
from rally.i18n import _
from rally.openstack.common import log as logging
CONF = cfg.CONF
LOG = logging.getLogger(__name__)
def _consumer(consume, queue, is_published):
"""Infinity worker that consumes tasks from queue.
This finishes it's work only in case if is_published.isSet().
:param consume: method that consumes an object removed from the queue
:param queue: deque object to popleft() objects from
:param is_published: threading.Event that is used to stop the consumer
when the queue is empty
"""
cache = {}
while True:
if queue:
try:
consume(cache, queue.popleft())
except Exception as e:
LOG.warning(_("Failed to consume a task from the queue: "
"%s") % e)
if CONF.debug:
LOG.exception(e)
elif is_published.isSet():
break
else:
time.sleep(0.1)
def _publisher(publish, queue, is_published):
"""Calls a publish method that fills queue with jobs.
After running publish method it sets is_published variable, that is used to
stop workers (consumers).
:param publish: method that fills the queue
:param queue: deque object to be filled by the publish() method
:param is_published: threading.Event that is used to stop consumers and
finish task
"""
try:
publish(queue)
except Exception as e:
LOG.warning(_("Failed to publish a task to the queue: %s") % e)
if CONF.debug:
LOG.exception(e)
finally:
is_published.set()
def run(publish, consume, consumers_count=1):
"""Run broker.
publish() put to queue, consume() process one element from queue.
When publish() is finished and elements from queue are processed process
is finished all consumers threads are cleaned.
:param publish: Function that puts values to the queue
:param consume: Function that processes a single value from the queue
:param consumers_count: Number of consumers
"""
queue = collections.deque()
is_published = threading.Event()
consumers = []
for i in range(consumers_count):
consumer = threading.Thread(target=_consumer,
args=(consume, queue, is_published))
consumer.start()
consumers.append(consumer)
_publisher(publish, queue, is_published)
for consumer in consumers:
consumer.join()

85
tests/unit/test_broker.py Normal file
View File

@ -0,0 +1,85 @@
# Copyright 2014: Mirantis Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import collections
import mock
from rally import broker
from tests.unit import test
class BrokerTestCase(test.TestCase):
def test__publisher(self):
mock_publish = mock.MagicMock()
mock_is_published = mock.MagicMock()
queue = collections.deque()
broker._publisher(mock_publish, queue, mock_is_published)
mock_publish.assert_called_once_with(queue)
mock_is_published.set.assert_called_once_with()
def test__publisher_fails(self):
mock_publish = mock.MagicMock(side_effect=Exception())
mock_is_published = mock.MagicMock()
queue = collections.deque()
broker._publisher(mock_publish, queue, mock_is_published)
mock_is_published.set.assert_called_once_with()
def test__consumer(self):
queue = collections.deque([1, 2, 3])
mock_consume = mock.MagicMock()
mock_is_published = mock.MagicMock()
mock_is_published.isSet = mock.MagicMock(return_value=True)
broker._consumer(mock_consume, queue, mock_is_published)
self.assertEqual(3, mock_consume.call_count)
self.assertEqual(0, len(queue))
def test__consumer_cache(self):
cache_keys_history = []
def consume(cache, item):
cache[item] = True
cache_keys_history.append(cache.keys())
queue = collections.deque([1, 2, 3])
mock_is_published = mock.MagicMock()
mock_is_published.isSet = mock.MagicMock(return_value=True)
broker._consumer(consume, queue, mock_is_published)
self.assertEqual([[1], [1, 2], [1, 2, 3]], cache_keys_history)
def test__consumer_fails(self):
queue = collections.deque([1, 2, 3])
mock_consume = mock.MagicMock(side_effect=Exception())
mock_is_published = mock.MagicMock()
mock_is_published.isSet = mock.MagicMock(return_value=True)
broker._consumer(mock_consume, queue, mock_is_published)
self.assertEqual(0, len(queue))
def test_run(self):
def publish(queue):
queue.append(1)
queue.append(2)
queue.append(3)
consumed = set()
def consume(cache, item):
consumed.add(item)
consumer_count = 2
broker.run(publish, consume, consumer_count)
self.assertEqual(set([1, 2, 3]), consumed)