Refactoring worker and tests
This commit is contained in:
@@ -21,9 +21,6 @@ class Deployment(models.Model):
|
|||||||
name = models.CharField(max_length=50)
|
name = models.CharField(max_length=50)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
class RawData(models.Model):
|
class RawData(models.Model):
|
||||||
deployment = models.ForeignKey(Deployment)
|
deployment = models.ForeignKey(Deployment)
|
||||||
tenant = models.CharField(max_length=50, null=True, blank=True,
|
tenant = models.CharField(max_length=50, null=True, blank=True,
|
||||||
|
@@ -26,19 +26,17 @@ class NovaConsumerTestCase(unittest.TestCase):
|
|||||||
consumer = self.mox.CreateMockAnything()
|
consumer = self.mox.CreateMockAnything()
|
||||||
created_consumers.append(consumer)
|
created_consumers.append(consumer)
|
||||||
return consumer
|
return consumer
|
||||||
self.mox.StubOutClassWithMocks(kombu.entity, 'Exchange')
|
self.mox.StubOutWithMock(worker.NovaConsumer, '_create_exchange')
|
||||||
self.mox.StubOutClassWithMocks(kombu, 'Queue')
|
self.mox.StubOutWithMock(worker.NovaConsumer, '_create_queue')
|
||||||
kombu.entity.Exchange('nova', type='topic', exclusive=False,
|
|
||||||
durable=True, auto_delete=False)
|
|
||||||
info_queue = kombu.Queue('monitor.info', kombu.entity.Exchange,
|
|
||||||
auto_delete=False, durable=True,
|
|
||||||
exclusive=False, routing_key='monitor.info',
|
|
||||||
queue_arguments={})
|
|
||||||
error_queue = kombu.Queue('monitor.error', kombu.entity.Exchange,
|
|
||||||
auto_delete=False, durable=True,
|
|
||||||
exclusive=False, routing_key='monitor.error',
|
|
||||||
queue_arguments={})
|
|
||||||
consumer = worker.NovaConsumer('test', None, None, True, {})
|
consumer = worker.NovaConsumer('test', None, None, True, {})
|
||||||
|
exchange = self.mox.CreateMockAnything()
|
||||||
|
consumer._create_exchange('nova', 'topic').AndReturn(exchange)
|
||||||
|
info_queue = self.mox.CreateMockAnything()
|
||||||
|
error_queue = self.mox.CreateMockAnything()
|
||||||
|
consumer._create_queue('monitor.info', exchange, 'monitor.info')\
|
||||||
|
.AndReturn(info_queue)
|
||||||
|
consumer._create_queue('monitor.error', exchange, 'monitor.error')\
|
||||||
|
.AndReturn(error_queue)
|
||||||
self.mox.ReplayAll()
|
self.mox.ReplayAll()
|
||||||
consumers = consumer.get_consumers(Consumer, None)
|
consumers = consumer.get_consumers(Consumer, None)
|
||||||
self.assertEqual(len(consumers), 1)
|
self.assertEqual(len(consumers), 1)
|
||||||
@@ -50,39 +48,47 @@ class NovaConsumerTestCase(unittest.TestCase):
|
|||||||
self.assertTrue(consumer.on_nova in created_callbacks)
|
self.assertTrue(consumer.on_nova in created_callbacks)
|
||||||
self.mox.VerifyAll()
|
self.mox.VerifyAll()
|
||||||
|
|
||||||
def test_get_consumers_queue_args(self):
|
def test_create_exchange(self):
|
||||||
created_queues = []
|
args = {'key': 'value'}
|
||||||
created_callbacks = []
|
consumer = worker.NovaConsumer('test', None, None, True, args)
|
||||||
created_consumers = []
|
|
||||||
def Consumer(queues=None, callbacks=None):
|
|
||||||
created_queues.extend(queues)
|
|
||||||
created_callbacks.extend(callbacks)
|
|
||||||
consumer = self.mox.CreateMockAnything()
|
|
||||||
created_consumers.append(consumer)
|
|
||||||
return consumer
|
|
||||||
self.mox.StubOutClassWithMocks(kombu.entity, 'Exchange')
|
self.mox.StubOutClassWithMocks(kombu.entity, 'Exchange')
|
||||||
self.mox.StubOutClassWithMocks(kombu, 'Queue')
|
exchange = kombu.entity.Exchange('nova', type='topic', exclusive=False,
|
||||||
kombu.entity.Exchange('nova', type='topic', exclusive=False,
|
|
||||||
durable=True, auto_delete=False)
|
durable=True, auto_delete=False)
|
||||||
queue_args = {'arg': 'val'}
|
self.mox.ReplayAll()
|
||||||
info_queue = kombu.Queue('monitor.info', kombu.entity.Exchange,
|
actual_exchange = consumer._create_exchange('nova', 'topic')
|
||||||
auto_delete=False, durable=True,
|
self.assertEqual(actual_exchange, exchange)
|
||||||
exclusive=False, routing_key='monitor.info',
|
self.mox.VerifyAll()
|
||||||
queue_arguments=queue_args)
|
|
||||||
error_queue = kombu.Queue('monitor.error', kombu.entity.Exchange,
|
def test_create_queue(self):
|
||||||
auto_delete=False, durable=True,
|
self.mox.StubOutClassWithMocks(kombu, 'Queue')
|
||||||
exclusive=False, routing_key='monitor.error',
|
exchange = self.mox.CreateMockAnything()
|
||||||
|
queue = kombu.Queue('name', exchange, auto_delete=False, durable=True,
|
||||||
|
exclusive=False, routing_key='routing.key',
|
||||||
|
queue_arguments={})
|
||||||
|
consumer = worker.NovaConsumer('test', None, None, True, {})
|
||||||
|
self.mox.ReplayAll()
|
||||||
|
actual_queue = consumer._create_queue('name', exchange, 'routing.key',
|
||||||
|
exclusive=False,
|
||||||
|
auto_delete=False)
|
||||||
|
self.assertEqual(actual_queue, queue)
|
||||||
|
self.mox.VerifyAll()
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
def test_create_queue_with_queue_args(self):
|
||||||
|
self.mox.StubOutClassWithMocks(kombu, 'Queue')
|
||||||
|
exchange = self.mox.CreateMockAnything()
|
||||||
|
queue_args = {'key': 'value'}
|
||||||
|
queue = kombu.Queue('name', exchange, auto_delete=False, durable=True,
|
||||||
|
exclusive=False, routing_key='routing.key',
|
||||||
queue_arguments=queue_args)
|
queue_arguments=queue_args)
|
||||||
consumer = worker.NovaConsumer('test', None, None, True, queue_args)
|
consumer = worker.NovaConsumer('test', None, None, True, queue_args)
|
||||||
self.mox.ReplayAll()
|
self.mox.ReplayAll()
|
||||||
consumers = consumer.get_consumers(Consumer, None)
|
actual_queue = consumer._create_queue('name', exchange, 'routing.key',
|
||||||
self.assertEqual(len(consumers), 1)
|
exclusive=False,
|
||||||
self.assertEqual(consumers[0], created_consumers[0])
|
auto_delete=False)
|
||||||
self.assertEqual(len(created_queues), 2)
|
self.assertEqual(actual_queue, queue)
|
||||||
self.assertTrue(info_queue in created_queues)
|
|
||||||
self.assertTrue(error_queue in created_queues)
|
|
||||||
self.assertEqual(len(created_callbacks), 1)
|
|
||||||
self.assertTrue(consumer.on_nova in created_callbacks)
|
|
||||||
self.mox.VerifyAll()
|
self.mox.VerifyAll()
|
||||||
|
|
||||||
def test_process(self):
|
def test_process(self):
|
||||||
|
@@ -48,20 +48,23 @@ class NovaConsumer(kombu.mixins.ConsumerMixin):
|
|||||||
self.processed = 0
|
self.processed = 0
|
||||||
self.total_processed = 0
|
self.total_processed = 0
|
||||||
|
|
||||||
|
def _create_exchange(self, name, type, exclusive=False, auto_delete=False):
|
||||||
|
return kombu.entity.Exchange(name, type=type, exclusive=exclusive,
|
||||||
|
durable=self.durable, auto_delete=auto_delete)
|
||||||
|
|
||||||
|
def _create_queue(self, name, nova_exchange, routing_key, exclusive=False,
|
||||||
|
auto_delete=False):
|
||||||
|
return kombu.Queue(name, nova_exchange, durable=self.durable,
|
||||||
|
auto_delete=exclusive, exclusive=auto_delete,
|
||||||
|
queue_arguments=self.queue_arguments,
|
||||||
|
routing_key=routing_key)
|
||||||
|
|
||||||
def get_consumers(self, Consumer, channel):
|
def get_consumers(self, Consumer, channel):
|
||||||
nova_exchange = kombu.entity.Exchange("nova", type="topic",
|
nova_exchange = self._create_exchange("nova", "topic")
|
||||||
exclusive=False, durable=self.durable,
|
|
||||||
auto_delete=False)
|
|
||||||
|
|
||||||
nova_queues = [
|
nova_queues = [
|
||||||
kombu.Queue("monitor.info", nova_exchange, durable=self.durable,
|
self._create_queue('monitor.info', nova_exchange, 'monitor.info'),
|
||||||
auto_delete=False, exclusive=False,
|
self._create_queue('monitor.error', nova_exchange, 'monitor.error')
|
||||||
queue_arguments=self.queue_arguments,
|
|
||||||
routing_key='monitor.info'),
|
|
||||||
kombu.Queue("monitor.error", nova_exchange, durable=self.durable,
|
|
||||||
auto_delete=False,
|
|
||||||
queue_arguments=self.queue_arguments,
|
|
||||||
exclusive=False, routing_key='monitor.error'),
|
|
||||||
]
|
]
|
||||||
|
|
||||||
return [Consumer(queues=nova_queues, callbacks=[self.on_nova])]
|
return [Consumer(queues=nova_queues, callbacks=[self.on_nova])]
|
||||||
|
Reference in New Issue
Block a user