Make Consumer an oslo_service

This makes more sense and also suppresses the error messages when
launching the service returned from oslo_messaing.get_rpc_server
service.  Instead of that service wait() being called, the Consumer's
wait will be called.

Change-Id: I63816e92fbe26a4213946e6ab584531bdc3b7dd2
Closes-Bug: #1527418
This commit is contained in:
Brandon Logan 2015-12-17 17:42:25 -06:00
parent ea4018b51f
commit 4a6e5a3f21
3 changed files with 57 additions and 24 deletions

View File

@ -16,20 +16,21 @@ import sys
import eventlet
eventlet.monkey_patch()
from oslo_log import log as logging
from oslo_config import cfg
from oslo_reports import guru_meditation_report as gmr
from oslo_service import service
from octavia.common import service
from octavia.common import service as octavia_service
from octavia.controller.queue import consumer
from octavia import version
LOG = logging.getLogger(__name__)
CONF = cfg.CONF
def main():
service.prepare_service(sys.argv)
octavia_service.prepare_service(sys.argv)
gmr.TextGuruMeditation.setup_autorun(version)
c = consumer.Consumer()
c.listen()
launcher = service.launch(CONF, consumer.Consumer())
launcher.wait()

View File

@ -23,26 +23,36 @@ from octavia.i18n import _LI
LOG = logging.getLogger(__name__)
class Consumer(object):
class Consumer(service.Service):
def __init__(self):
super(Consumer, self).__init__()
self.server = None
def start(self):
topic = cfg.CONF.oslo_messaging.topic
server = cfg.CONF.host
transport = messaging.get_transport(cfg.CONF)
target = messaging.Target(topic=topic, server=server, fanout=False)
endpoints = [endpoint.Endpoint()]
self.server = messaging.get_rpc_server(transport, target, endpoints,
executor='eventlet')
LOG.info(_LI('Starting consumer...'))
self.server.start()
super(Consumer, self).start()
def listen(self):
try:
LOG.info(_LI('Starting consumer...'))
service.launch(cfg.CONF, self.server).wait()
finally:
def stop(self, graceful=False):
if self.server:
LOG.info(_LI('Stopping consumer...'))
self.server.stop()
LOG.info(_LI('Consumer successfully stopped. Waiting for final '
'messages to be processed...'))
self.server.wait()
LOG.info(_LI('Finished waiting.'))
if graceful:
LOG.info(
_LI('Consumer successfully stopped. Waiting for final '
'messages to be processed...'))
self.server.wait()
super(Consumer, self).stop(graceful=graceful)
def reset(self):
if self.server:
self.server.reset()
super(Consumer, self).reset()

View File

@ -21,6 +21,10 @@ from octavia.controller.queue import endpoint
from octavia.tests.unit import base
@mock.patch.object(messaging, 'get_transport')
@mock.patch.object(messaging, 'Target')
@mock.patch.object(endpoint, 'Endpoint')
@mock.patch.object(messaging, 'get_rpc_server')
class TestConsumer(base.TestCase):
def setUp(self):
@ -29,12 +33,8 @@ class TestConsumer(base.TestCase):
cfg.CONF.set_override('topic', 'foo_topic', group='oslo_messaging')
cfg.CONF.set_override('host', 'foo_host')
@mock.patch.object(messaging, 'get_transport')
@mock.patch.object(messaging, 'Target')
@mock.patch.object(endpoint, 'Endpoint')
@mock.patch.object(messaging, 'get_rpc_server')
def test_config_setup(self, mock_rpc_server, mock_endpoint, mock_target,
mock_get_transport):
def test_consumer_start(self, mock_rpc_server, mock_endpoint, mock_target,
mock_get_transport):
mock_get_transport_rv = mock.Mock()
mock_get_transport.return_value = mock_get_transport_rv
mock_rpc_server_rv = mock.Mock()
@ -44,7 +44,7 @@ class TestConsumer(base.TestCase):
mock_target_rv = mock.Mock()
mock_target.return_value = mock_target_rv
consumer.Consumer()
consumer.Consumer().start()
mock_get_transport.assert_called_once_with(cfg.CONF)
mock_target.assert_called_once_with(topic='foo_topic',
@ -54,3 +54,25 @@ class TestConsumer(base.TestCase):
mock_target_rv,
[mock_endpoint_rv],
executor='eventlet')
def test_consumer_stop(self, mock_rpc_server, mock_endpoint, mock_target,
mock_get_transport):
mock_rpc_server_rv = mock.Mock()
mock_rpc_server.return_value = mock_rpc_server_rv
cons = consumer.Consumer()
cons.start()
cons.stop()
mock_rpc_server_rv.stop.assert_called_once_with()
self.assertFalse(mock_rpc_server_rv.wait.called)
def test_consumer_graceful_stop(self, mock_rpc_server, mock_endpoint,
mock_target, mock_get_transport):
mock_rpc_server_rv = mock.Mock()
mock_rpc_server.return_value = mock_rpc_server_rv
cons = consumer.Consumer()
cons.start()
cons.stop(graceful=True)
mock_rpc_server_rv.stop.assert_called_once_with()
mock_rpc_server_rv.wait.assert_called_once_with()