diff --git a/octavia/cmd/octavia_worker.py b/octavia/cmd/octavia_worker.py index c2337dc28e..7aeee213e0 100755 --- a/octavia/cmd/octavia_worker.py +++ b/octavia/cmd/octavia_worker.py @@ -16,20 +16,21 @@ import sys import eventlet eventlet.monkey_patch() -from oslo_log import log as logging +from oslo_config import cfg from oslo_reports import guru_meditation_report as gmr +from oslo_service import service -from octavia.common import service +from octavia.common import service as octavia_service from octavia.controller.queue import consumer from octavia import version -LOG = logging.getLogger(__name__) +CONF = cfg.CONF def main(): - service.prepare_service(sys.argv) + octavia_service.prepare_service(sys.argv) gmr.TextGuruMeditation.setup_autorun(version) - c = consumer.Consumer() - c.listen() + launcher = service.launch(CONF, consumer.Consumer()) + launcher.wait() diff --git a/octavia/controller/queue/consumer.py b/octavia/controller/queue/consumer.py index ee629c18b1..e02fee2769 100644 --- a/octavia/controller/queue/consumer.py +++ b/octavia/controller/queue/consumer.py @@ -23,26 +23,36 @@ from octavia.i18n import _LI LOG = logging.getLogger(__name__) -class Consumer(object): +class Consumer(service.Service): def __init__(self): + super(Consumer, self).__init__() + self.server = None + + def start(self): topic = cfg.CONF.oslo_messaging.topic server = cfg.CONF.host transport = messaging.get_transport(cfg.CONF) target = messaging.Target(topic=topic, server=server, fanout=False) endpoints = [endpoint.Endpoint()] - self.server = messaging.get_rpc_server(transport, target, endpoints, executor='eventlet') + LOG.info(_LI('Starting consumer...')) + self.server.start() + super(Consumer, self).start() - def listen(self): - try: - LOG.info(_LI('Starting consumer...')) - service.launch(cfg.CONF, self.server).wait() - finally: + def stop(self, graceful=False): + if self.server: LOG.info(_LI('Stopping consumer...')) self.server.stop() - LOG.info(_LI('Consumer successfully stopped. Waiting for final ' - 'messages to be processed...')) - self.server.wait() - LOG.info(_LI('Finished waiting.')) + if graceful: + LOG.info( + _LI('Consumer successfully stopped. Waiting for final ' + 'messages to be processed...')) + self.server.wait() + super(Consumer, self).stop(graceful=graceful) + + def reset(self): + if self.server: + self.server.reset() + super(Consumer, self).reset() diff --git a/octavia/tests/unit/controller/queue/test_consumer.py b/octavia/tests/unit/controller/queue/test_consumer.py index 5692a96b3e..44bb294356 100644 --- a/octavia/tests/unit/controller/queue/test_consumer.py +++ b/octavia/tests/unit/controller/queue/test_consumer.py @@ -21,6 +21,10 @@ from octavia.controller.queue import endpoint from octavia.tests.unit import base +@mock.patch.object(messaging, 'get_transport') +@mock.patch.object(messaging, 'Target') +@mock.patch.object(endpoint, 'Endpoint') +@mock.patch.object(messaging, 'get_rpc_server') class TestConsumer(base.TestCase): def setUp(self): @@ -29,12 +33,8 @@ class TestConsumer(base.TestCase): cfg.CONF.set_override('topic', 'foo_topic', group='oslo_messaging') cfg.CONF.set_override('host', 'foo_host') - @mock.patch.object(messaging, 'get_transport') - @mock.patch.object(messaging, 'Target') - @mock.patch.object(endpoint, 'Endpoint') - @mock.patch.object(messaging, 'get_rpc_server') - def test_config_setup(self, mock_rpc_server, mock_endpoint, mock_target, - mock_get_transport): + def test_consumer_start(self, mock_rpc_server, mock_endpoint, mock_target, + mock_get_transport): mock_get_transport_rv = mock.Mock() mock_get_transport.return_value = mock_get_transport_rv mock_rpc_server_rv = mock.Mock() @@ -44,7 +44,7 @@ class TestConsumer(base.TestCase): mock_target_rv = mock.Mock() mock_target.return_value = mock_target_rv - consumer.Consumer() + consumer.Consumer().start() mock_get_transport.assert_called_once_with(cfg.CONF) mock_target.assert_called_once_with(topic='foo_topic', @@ -54,3 +54,25 @@ class TestConsumer(base.TestCase): mock_target_rv, [mock_endpoint_rv], executor='eventlet') + + def test_consumer_stop(self, mock_rpc_server, mock_endpoint, mock_target, + mock_get_transport): + mock_rpc_server_rv = mock.Mock() + mock_rpc_server.return_value = mock_rpc_server_rv + + cons = consumer.Consumer() + cons.start() + cons.stop() + mock_rpc_server_rv.stop.assert_called_once_with() + self.assertFalse(mock_rpc_server_rv.wait.called) + + def test_consumer_graceful_stop(self, mock_rpc_server, mock_endpoint, + mock_target, mock_get_transport): + mock_rpc_server_rv = mock.Mock() + mock_rpc_server.return_value = mock_rpc_server_rv + + cons = consumer.Consumer() + cons.start() + cons.stop(graceful=True) + mock_rpc_server_rv.stop.assert_called_once_with() + mock_rpc_server_rv.wait.assert_called_once_with()