diff --git a/tests/unit/test_worker.py b/tests/unit/test_worker.py index 9b32ea5..a03a080 100644 --- a/tests/unit/test_worker.py +++ b/tests/unit/test_worker.py @@ -36,6 +36,12 @@ class ConsumerTestCase(StacktachBaseTestCase): def tearDown(self): self.mox.UnsetStubs() + def _test_topics(self): + return [ + dict(queue="queue1", routing_key="monitor.info"), + dict(queue="queue2", routing_key="monitor.error") + ] + def test_get_consumers(self): created_queues = [] created_callbacks = [] @@ -49,15 +55,14 @@ class ConsumerTestCase(StacktachBaseTestCase): self.mox.StubOutWithMock(worker.Consumer, '_create_exchange') self.mox.StubOutWithMock(worker.Consumer, '_create_queue') consumer = worker.Consumer('test', None, None, True, {}, "nova", - ["monitor.info", "monitor.error"], - "stacktach_") + self._test_topics()) exchange = self.mox.CreateMockAnything() consumer._create_exchange('nova', 'topic').AndReturn(exchange) info_queue = self.mox.CreateMockAnything() error_queue = self.mox.CreateMockAnything() - consumer._create_queue('stacktach_nova', exchange, 'monitor.info')\ + consumer._create_queue('queue1', exchange, 'monitor.info')\ .AndReturn(info_queue) - consumer._create_queue('stacktach_nova', exchange, 'monitor.error')\ + consumer._create_queue('queue2', exchange, 'monitor.error')\ .AndReturn(error_queue) self.mox.ReplayAll() consumers = consumer.get_consumers(Consumer, None) @@ -73,8 +78,7 @@ class ConsumerTestCase(StacktachBaseTestCase): def test_create_exchange(self): args = {'key': 'value'} consumer = worker.Consumer('test', None, None, True, args, 'nova', - ["monitor.info", "monitor.error"], - "stacktach_") + self._test_topics()) self.mox.StubOutClassWithMocks(kombu.entity, 'Exchange') exchange = kombu.entity.Exchange('nova', type='topic', exclusive=False, @@ -91,8 +95,7 @@ class ConsumerTestCase(StacktachBaseTestCase): exclusive=False, routing_key='routing.key', queue_arguments={}) consumer = worker.Consumer('test', None, None, True, {}, 'nova', - ["monitor.info", "monitor.error"], - "stacktach_") + self._test_topics()) self.mox.ReplayAll() actual_queue = consumer._create_queue('name', exchange, 'routing.key', exclusive=False, @@ -109,8 +112,7 @@ class ConsumerTestCase(StacktachBaseTestCase): exclusive=False, routing_key='routing.key', queue_arguments=queue_args) consumer = worker.Consumer('test', None, None, True, queue_args, - 'nova', ["monitor.info", "monitor.error"], - "stacktach_") + 'nova', self._test_topics()) self.mox.ReplayAll() actual_queue = consumer._create_queue('name', exchange, 'routing.key', exclusive=False, @@ -126,8 +128,7 @@ class ConsumerTestCase(StacktachBaseTestCase): exchange = 'nova' consumer = worker.Consumer('test', None, deployment, True, {}, - exchange, ["monitor.info", "monitor.error"], - "stacktach_") + exchange, self._test_topics()) routing_key = 'monitor.info' message.delivery_info = {'routing_key': routing_key} body_dict = {u'key': u'value'} @@ -165,7 +166,7 @@ class ConsumerTestCase(StacktachBaseTestCase): 'rabbit_password': 'rabbit', 'rabbit_virtual_host': '/', "services": ["nova"], - "topics": {"nova": ["monitor.info", "monitor.error"]} + "topics": {"nova": self._test_topics()} } self.mox.StubOutWithMock(db, 'get_or_create_deployment') deployment = self.mox.CreateMockAnything() @@ -188,8 +189,7 @@ class ConsumerTestCase(StacktachBaseTestCase): exchange = 'nova' consumer = worker.Consumer(config['name'], conn, deployment, config['durable_queue'], {}, exchange, - ["monitor.info", "monitor.error"], - "stacktach_") + self._test_topics()) consumer.run() worker.continue_running().AndReturn(False) self.mox.ReplayAll() @@ -208,7 +208,7 @@ class ConsumerTestCase(StacktachBaseTestCase): 'queue_arguments': {'x-ha-policy': 'all'}, 'queue_name_prefix': "test_name_", "services": ["nova"], - "topics": {"nova": ["monitor.info", "monitor.error"]} + "topics": {"nova": self._test_topics()} } self.mox.StubOutWithMock(db, 'get_or_create_deployment') deployment = self.mox.CreateMockAnything() @@ -232,8 +232,7 @@ class ConsumerTestCase(StacktachBaseTestCase): consumer = worker.Consumer(config['name'], conn, deployment, config['durable_queue'], config['queue_arguments'], exchange, - ["monitor.info", "monitor.error"], - "test_name_") + self._test_topics()) consumer.run() worker.continue_running().AndReturn(False) self.mox.ReplayAll() diff --git a/worker/worker.py b/worker/worker.py index fe29407..1d20da8 100644 --- a/worker/worker.py +++ b/worker/worker.py @@ -44,7 +44,7 @@ LOG = stacklog.get_logger() class Consumer(kombu.mixins.ConsumerMixin): def __init__(self, name, connection, deployment, durable, queue_arguments, - exchange, topics, queue_name_prefix): + exchange, topics): self.connection = connection self.deployment = deployment self.durable = durable @@ -56,7 +56,6 @@ class Consumer(kombu.mixins.ConsumerMixin): self.total_processed = 0 self.topics = topics self.exchange = exchange - self.queue_name_prefix = queue_name_prefix def _create_exchange(self, name, type, exclusive=False, auto_delete=False): return message_service.create_exchange(name, exchange_type=type, exclusive=exclusive, @@ -73,8 +72,8 @@ class Consumer(kombu.mixins.ConsumerMixin): def get_consumers(self, Consumer, channel): exchange = self._create_exchange(self.exchange, "topic") - queue_name = "%s%s" % (self.queue_name_prefix, self.exchange) - queues = [self._create_queue(queue_name, exchange, topic) + queues = [self._create_queue(topic['queue'], exchange, + topic['routing_key']) for topic in self.topics] return [Consumer(queues=queues, callbacks=[self.on_nova])] @@ -154,7 +153,6 @@ def run(deployment_config, exchange): queue_arguments = deployment_config.get('queue_arguments', {}) exit_on_exception = deployment_config.get('exit_on_exception', False) topics = deployment_config.get('topics', {}) - queue_name_prefix = deployment_config.get('queue_name_prefix', 'stacktach_') deployment, new = db.get_or_create_deployment(name) @@ -177,8 +175,7 @@ def run(deployment_config, exchange): try: consumer = Consumer(name, conn, deployment, durable, queue_arguments, exchange, - topics[exchange], - queue_name_prefix) + topics[exchange]) consumer.run() except Exception as e: LOG.error("!!!!Exception!!!!")