From 0bf196505733e4379ad6007e05b004531b8487e8 Mon Sep 17 00:00:00 2001 From: "Carlos D. Garza" Date: Tue, 3 Nov 2015 23:07:16 -0600 Subject: [PATCH] Implementing EventStreamer reciever for octavia Listens to oslo_messaging for database updates from octavia. When neutron-lbaas is being used with octavia this queing system will be used to so that health check and status updates from octavia are sent to neutron-lbaas so that neutron updates its database in sync with octavia. See CR https://review.openstack.org/218735 for details on the octavia side. This event handler can actually be used for other drivers as long, but for now its only used for octavia. Co-Authored-By: Brandon Logan Change-Id: I4694d9830d37c6432ff64497396be120dda31501 --- neutron_lbaas/common/exceptions.py | 4 + neutron_lbaas/drivers/driver_base.py | 29 +++ neutron_lbaas/drivers/octavia/driver.py | 9 +- .../octavia/octavia_messaging_consumer.py | 112 +++++++++++ .../services/loadbalancer/constants.py | 8 + .../test_octavia_messaging_consumer.py | 177 ++++++++++++++++++ 6 files changed, 337 insertions(+), 2 deletions(-) create mode 100644 neutron_lbaas/drivers/octavia/octavia_messaging_consumer.py create mode 100644 neutron_lbaas/tests/unit/drivers/octavia/test_octavia_messaging_consumer.py diff --git a/neutron_lbaas/common/exceptions.py b/neutron_lbaas/common/exceptions.py index 94a6ad4da..1ea9b1a2c 100644 --- a/neutron_lbaas/common/exceptions.py +++ b/neutron_lbaas/common/exceptions.py @@ -22,6 +22,10 @@ from neutron.common import exceptions from neutron_lbaas._i18n import _LE +class ModelMapException(exceptions.NeutronException): + message = _LE("Unable to map model class %(target_name)s") + + class LbaasException(exceptions.NeutronException): pass diff --git a/neutron_lbaas/drivers/driver_base.py b/neutron_lbaas/drivers/driver_base.py index b733ed374..4afaf81c3 100644 --- a/neutron_lbaas/drivers/driver_base.py +++ b/neutron_lbaas/drivers/driver_base.py @@ -14,10 +14,17 @@ from functools import wraps +from neutron import context as ncontext +from oslo_log import log as logging from oslo_utils import excutils +from neutron_lbaas.common import exceptions from neutron_lbaas.db.loadbalancer import models from neutron_lbaas.drivers import driver_mixins +from neutron_lbaas.services.loadbalancer import constants + + +LOG = logging.getLogger(__name__) class NotImplementedManager(object): @@ -41,6 +48,11 @@ class LoadBalancerBaseDriver(object): the various load balancer objects. """ + model_map = {constants.LOADBALANCER_EVENT: models.LoadBalancer, + constants.LISTENER_EVENT: models.Listener, + constants.POOL_EVENT: models.PoolV2, + constants.MEMBER_EVENT: models.MemberV2} + load_balancer = NotImplementedManager() listener = NotImplementedManager() pool = NotImplementedManager() @@ -50,6 +62,23 @@ class LoadBalancerBaseDriver(object): def __init__(self, plugin): self.plugin = plugin + def handle_streamed_event(self, container): + # TODO(crc32): update_stats will be implemented here in the future + if container.info_type not in LoadBalancerBaseDriver.model_map: + if container.info_type == constants.LISTENER_STATS_EVENT: + return + else: + exc = exceptions.ModelMapException( + target_name=container.info_type) + raise exc + else: + model_class = LoadBalancerBaseDriver.model_map[ + container.info_type] + context = ncontext.get_admin_context() + self.plugin.db.update_status(context, model_class, + container.info_id, + **container.info_payload) + class BaseLoadBalancerManager(driver_mixins.BaseRefreshMixin, driver_mixins.BaseStatsMixin, diff --git a/neutron_lbaas/drivers/octavia/driver.py b/neutron_lbaas/drivers/octavia/driver.py index 03435691d..5435fb1f9 100644 --- a/neutron_lbaas/drivers/octavia/driver.py +++ b/neutron_lbaas/drivers/octavia/driver.py @@ -20,12 +20,14 @@ from neutron import context as ncontext from oslo_config import cfg from oslo_log import log as logging from oslo_serialization import jsonutils +from oslo_service import service from oslo_utils import excutils import requests from neutron_lbaas._i18n import _ from neutron_lbaas.common import keystone from neutron_lbaas.drivers import driver_base +from neutron_lbaas.drivers.octavia import octavia_messaging_consumer LOG = logging.getLogger(__name__) VERSION = "1.0.1" @@ -53,8 +55,9 @@ OPTS = [ default=False, help=_('True if Octavia will be responsible for allocating the VIP.' ' False if neutron-lbaas will allocate it and pass to Octavia.') - ), + ) ] + cfg.CONF.register_opts(OPTS, 'octavia') @@ -167,7 +170,9 @@ class OctaviaDriver(driver_base.LoadBalancerBaseDriver): self.pool = PoolManager(self) self.member = MemberManager(self) self.health_monitor = HealthMonitorManager(self) - + self.octavia_consumer = octavia_messaging_consumer.OctaviaConsumer( + self) + service.launch(cfg.CONF, self.octavia_consumer) LOG.debug("OctaviaDriver: initialized, version=%s", VERSION) @property diff --git a/neutron_lbaas/drivers/octavia/octavia_messaging_consumer.py b/neutron_lbaas/drivers/octavia/octavia_messaging_consumer.py new file mode 100644 index 000000000..216fd0155 --- /dev/null +++ b/neutron_lbaas/drivers/octavia/octavia_messaging_consumer.py @@ -0,0 +1,112 @@ +# Copyright 2016 Rackspace +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from neutron_lbaas._i18n import _LI +from oslo_config import cfg +from oslo_log import log as logging +import oslo_messaging as messaging +from oslo_service import service + + +oslo_messaging_opts = [ + cfg.StrOpt('event_stream_topic', + default='neutron_lbaas_event', + help=_('topic name for receiving events from a queue')) +] + +cfg.CONF.register_opts(oslo_messaging_opts, group='oslo_messaging') + + +LOG = logging.getLogger(__name__) + + +class InfoContainer(object): + @staticmethod + def from_dict(dict_obj): + return InfoContainer(dict_obj['info_type'], + dict_obj['info_id'], + dict_obj['info_payload']) + + def __init__(self, info_type, info_id, info_payload): + self.info_type = info_type + self.info_id = info_id + self.info_payload = info_payload + + def to_dict(self): + return {'info_type': self.info_type, + 'info_id': self.info_id, + 'info_payload': self.info_payload} + + def __eq__(self, other): + if not isinstance(other, InfoContainer): + return False + if self.info_type != other.info_type: + return False + if self.info_id != other.info_id: + return False + if self.info_payload != other.info_payload: + return False + return True + + def __ne__(self, other): + return not self == other + + +class ConsumerEndPoint(object): + target = messaging.Target(namespace="control", version='1.0') + + def __init__(self, driver): + self.driver = driver + + def update_info(self, ctx, container): + LOG.debug("Received event from stream %s", container) + container_inst = InfoContainer.from_dict(container) + self.driver.handle_streamed_event(container_inst) + + +class OctaviaConsumer(service.Service): + def __init__(self, driver, **kwargs): + super(OctaviaConsumer, self).__init__(**kwargs) + topic = cfg.CONF.oslo_messaging.event_stream_topic + server = cfg.CONF.host + self.driver = driver + self.transport = messaging.get_transport(cfg.CONF) + self.target = messaging.Target(topic=topic, server=server, + exchange="common", fanout=False) + self.endpoints = [ConsumerEndPoint(self.driver)] + self.server = None + + def start(self): + super(OctaviaConsumer, self).start() + LOG.info(_LI("Starting octavia consumer...")) + self.server = messaging.get_rpc_server(self.transport, self.target, + self.endpoints, + executor='eventlet') + self.server.start() + + def stop(self, graceful=False): + if self.server: + LOG.info(_LI('Stopping consumer...')) + self.server.stop() + if graceful: + LOG.info( + _LI('Consumer successfully stopped. Waiting for final ' + 'messages to be processed...')) + self.server.wait() + super(OctaviaConsumer, self).stop(graceful=graceful) + + def reset(self): + if self.server: + self.server.reset() + super(OctaviaConsumer, self).reset() diff --git a/neutron_lbaas/services/loadbalancer/constants.py b/neutron_lbaas/services/loadbalancer/constants.py index 0a726ef90..98a814963 100644 --- a/neutron_lbaas/services/loadbalancer/constants.py +++ b/neutron_lbaas/services/loadbalancer/constants.py @@ -118,3 +118,11 @@ LOADBALANCERV2 = "LOADBALANCERV2" # for the LBaaS V1 vip and LBaaS V2 listeners. -1 indicates # no limit, the value cannot be less than -1. MIN_CONNECT_VALUE = -1 + +# LBaas V2 Table entities +LISTENER_EVENT = 'listener' +LISTENER_STATS_EVENT = 'listener_stats' +LOADBALANCER_EVENT = 'loadbalancer' +MEMBER_EVENT = 'member' +OPERATING_STATUS = 'operating_status' +POOL_EVENT = 'pool' diff --git a/neutron_lbaas/tests/unit/drivers/octavia/test_octavia_messaging_consumer.py b/neutron_lbaas/tests/unit/drivers/octavia/test_octavia_messaging_consumer.py new file mode 100644 index 000000000..e693f167e --- /dev/null +++ b/neutron_lbaas/tests/unit/drivers/octavia/test_octavia_messaging_consumer.py @@ -0,0 +1,177 @@ +# Copyright 2016 Rackspace +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import mock +from oslo_config import cfg + +from neutron_lbaas.common import exceptions +from neutron_lbaas.db.loadbalancer import models +import neutron_lbaas.drivers.octavia.driver as odriver +from neutron_lbaas.drivers.octavia.driver import octavia_messaging_consumer +from neutron_lbaas.services.loadbalancer import constants +from neutron_lbaas.tests.unit.drivers.octavia import test_octavia_driver + +InfoContainer = octavia_messaging_consumer.InfoContainer + + +class TestOctaviaMessagingConsumer(test_octavia_driver.BaseOctaviaDriverTest): + def setUp(self): + super(test_octavia_driver.BaseOctaviaDriverTest, self).setUp() + self.plugin = mock.Mock() + self.driver = odriver.OctaviaDriver(self.plugin) + + def assert_handle_streamed_event_called(self, model_class, id_param, + payload): + call_args_list = self.driver.plugin.db.update_status.call_args_list[0] + self.assertEqual(len(call_args_list), 2) + self.assertEqual(len(call_args_list[0]), 3) + self.assertEqual(model_class, call_args_list[0][1]) + self.assertEqual(call_args_list[0][2], id_param) + self.assertEqual(call_args_list[1], payload) + + def test_info_container_constructor(self): + ID = 'test_id' + PAYLOAD = 'test_payload' + TYPE = 'test_type' + cnt = InfoContainer(TYPE, ID, PAYLOAD) + self.assertEqual(cnt.info_type, TYPE) + self.assertEqual(cnt.info_id, ID) + self.assertEqual(cnt.info_payload, PAYLOAD) + self.assertEqual(cnt.to_dict(), {'info_type': TYPE, 'info_id': ID, + 'info_payload': PAYLOAD}) + + def test_info_container_from_dict(self): + ID = 'test_id' + PAYLOAD = 'test_payload' + TYPE = 'test_type' + cnt = InfoContainer.from_dict({'info_type': TYPE, 'info_id': ID, + 'info_payload': PAYLOAD}) + self.assertEqual(cnt.info_type, TYPE) + self.assertEqual(cnt.info_id, ID) + self.assertEqual(cnt.info_payload, PAYLOAD) + + def test_set_consumer_topic(self): + TOPIC = 'neutron_lbaas_event' + self.addCleanup(cfg.CONF.clear_override, 'event_stream_topic', + group='oslo_messaging') + cfg.CONF.set_override('event_stream_topic', TOPIC, + group='oslo_messaging') + consumer = octavia_messaging_consumer.OctaviaConsumer(self.driver) + self.assertIsNotNone(consumer.transport) + self.assertEqual(TOPIC, consumer.target.topic) + self.assertEqual(cfg.CONF.host, consumer.target.server) + + @mock.patch.object(octavia_messaging_consumer.messaging, 'get_rpc_server') + def test_consumer_start(self, mock_get_rpc_server): + mock_server = mock.Mock() + mock_get_rpc_server.return_value = mock_server + TOPIC = 'neutron_lbaas_event' + self.addCleanup(cfg.CONF.clear_override, 'event_stream_topic', + group='oslo_messaging') + cfg.CONF.set_override('event_stream_topic', TOPIC, + group='oslo_messaging') + consumer = octavia_messaging_consumer.OctaviaConsumer(self.driver) + consumer.start() + mock_get_rpc_server.assert_called_once_with( + consumer.transport, consumer.target, consumer.endpoints, + executor='eventlet' + ) + mock_server.start.assert_called_once_with() + + @mock.patch.object(octavia_messaging_consumer.messaging, 'get_rpc_server') + def test_consumer_stop(self, mock_get_rpc_server): + mock_server = mock.Mock() + mock_get_rpc_server.return_value = mock_server + consumer = octavia_messaging_consumer.OctaviaConsumer(self.driver) + consumer.start() + consumer.stop() + mock_server.stop.assert_called_once_with() + mock_server.wait.assert_not_called() + + @mock.patch.object(octavia_messaging_consumer.messaging, 'get_rpc_server') + def test_consumer_graceful_stop(self, mock_get_rpc_server): + mock_server = mock.Mock() + mock_get_rpc_server.return_value = mock_server + consumer = octavia_messaging_consumer.OctaviaConsumer(self.driver) + consumer.start() + consumer.stop(graceful=True) + mock_server.stop.assert_called_once_with() + mock_server.wait.assert_called_once_with() + + @mock.patch.object(octavia_messaging_consumer.messaging, 'get_rpc_server') + def test_consumer_reset(self, mock_get_rpc_server): + mock_server = mock.Mock() + mock_get_rpc_server.return_value = mock_server + consumer = octavia_messaging_consumer.OctaviaConsumer(self.driver) + consumer.start() + consumer.reset() + mock_server.reset.assert_called_once_with() + + def set_db_mocks(self): + TOPIC = 'neutron_lbaas_event' + self.addCleanup(cfg.CONF.clear_override, 'event_stream_topic', + group='oslo_messaging') + cfg.CONF.set_override('event_stream_topic', TOPIC, + group='oslo_messaging') + self.payload = {'operating_status': 'ONLINE'} + self.consumer = octavia_messaging_consumer.OctaviaConsumer( + self.driver) + + def test_updatedb_with_raises_exception_with_bad_model_name(self): + self.set_db_mocks() + + cnt = InfoContainer('listener_statsX', 'id', + self.payload).to_dict() + self.assertRaises(exceptions.ModelMapException, + self.consumer.endpoints[0].update_info, {}, cnt) + + def test_updatedb_ignores_listener_stats(self): + self.set_db_mocks() + cnt = InfoContainer('listener_stats', 'id', self.payload).to_dict() + self.consumer.endpoints[0].update_info({}, cnt) + call_len = len(self.driver.plugin.db.update_status.call_args_list) + self.assertEqual(call_len, 0) # See didn't do anything + + def test_updatedb_loadbalancer(self): + self.set_db_mocks() + cnt = InfoContainer(constants.LOADBALANCER_EVENT, 'lb_id', + self.payload).to_dict() + self.consumer.endpoints[0].update_info({}, cnt) + self.assert_handle_streamed_event_called(models.LoadBalancer, 'lb_id', + self.payload) + + def test_updatedb_listener(self): + self.set_db_mocks() + cnt = InfoContainer(constants.LISTENER_EVENT, 'listener_id', + self.payload).to_dict() + self.consumer.endpoints[0].update_info({}, cnt) + self.assert_handle_streamed_event_called(models.Listener, + 'listener_id', + self.payload) + + def test_updatedb_pool(self): + self.set_db_mocks() + cnt = InfoContainer(constants.POOL_EVENT, 'pool_id', + self.payload).to_dict() + self.consumer.endpoints[0].update_info({}, cnt) + self.assert_handle_streamed_event_called(models.PoolV2, 'pool_id', + self.payload) + + def test_updatedb_member(self): + self.set_db_mocks() + cnt = InfoContainer(constants.MEMBER_EVENT, 'pool_id', + self.payload).to_dict() + self.consumer.endpoints[0].update_info({}, cnt) + self.assert_handle_streamed_event_called(models.MemberV2, 'pool_id', + self.payload)