Implementing EventStreamer reciever for octavia

Listens to oslo_messaging for database updates from octavia.

When neutron-lbaas is being used with octavia this queing system will be
used to so that health check and status updates from octavia are sent to
neutron-lbaas so that neutron updates its database in sync with octavia.
See CR https://review.openstack.org/218735 for details on the octavia
side.

This event handler can actually be used for other drivers as long,
but for now its only used for octavia.

Co-Authored-By: Brandon Logan <brandon.logan@rackspace.com>
Change-Id: I4694d9830d37c6432ff64497396be120dda31501
This commit is contained in:
Carlos D. Garza 2015-11-03 23:07:16 -06:00 committed by Brandon Logan
parent dbf61f0217
commit 0bf1965057
6 changed files with 337 additions and 2 deletions

View File

@ -22,6 +22,10 @@ from neutron.common import exceptions
from neutron_lbaas._i18n import _LE
class ModelMapException(exceptions.NeutronException):
message = _LE("Unable to map model class %(target_name)s")
class LbaasException(exceptions.NeutronException):
pass

View File

@ -14,10 +14,17 @@
from functools import wraps
from neutron import context as ncontext
from oslo_log import log as logging
from oslo_utils import excutils
from neutron_lbaas.common import exceptions
from neutron_lbaas.db.loadbalancer import models
from neutron_lbaas.drivers import driver_mixins
from neutron_lbaas.services.loadbalancer import constants
LOG = logging.getLogger(__name__)
class NotImplementedManager(object):
@ -41,6 +48,11 @@ class LoadBalancerBaseDriver(object):
the various load balancer objects.
"""
model_map = {constants.LOADBALANCER_EVENT: models.LoadBalancer,
constants.LISTENER_EVENT: models.Listener,
constants.POOL_EVENT: models.PoolV2,
constants.MEMBER_EVENT: models.MemberV2}
load_balancer = NotImplementedManager()
listener = NotImplementedManager()
pool = NotImplementedManager()
@ -50,6 +62,23 @@ class LoadBalancerBaseDriver(object):
def __init__(self, plugin):
self.plugin = plugin
def handle_streamed_event(self, container):
# TODO(crc32): update_stats will be implemented here in the future
if container.info_type not in LoadBalancerBaseDriver.model_map:
if container.info_type == constants.LISTENER_STATS_EVENT:
return
else:
exc = exceptions.ModelMapException(
target_name=container.info_type)
raise exc
else:
model_class = LoadBalancerBaseDriver.model_map[
container.info_type]
context = ncontext.get_admin_context()
self.plugin.db.update_status(context, model_class,
container.info_id,
**container.info_payload)
class BaseLoadBalancerManager(driver_mixins.BaseRefreshMixin,
driver_mixins.BaseStatsMixin,

View File

@ -20,12 +20,14 @@ from neutron import context as ncontext
from oslo_config import cfg
from oslo_log import log as logging
from oslo_serialization import jsonutils
from oslo_service import service
from oslo_utils import excutils
import requests
from neutron_lbaas._i18n import _
from neutron_lbaas.common import keystone
from neutron_lbaas.drivers import driver_base
from neutron_lbaas.drivers.octavia import octavia_messaging_consumer
LOG = logging.getLogger(__name__)
VERSION = "1.0.1"
@ -53,8 +55,9 @@ OPTS = [
default=False,
help=_('True if Octavia will be responsible for allocating the VIP.'
' False if neutron-lbaas will allocate it and pass to Octavia.')
),
)
]
cfg.CONF.register_opts(OPTS, 'octavia')
@ -167,7 +170,9 @@ class OctaviaDriver(driver_base.LoadBalancerBaseDriver):
self.pool = PoolManager(self)
self.member = MemberManager(self)
self.health_monitor = HealthMonitorManager(self)
self.octavia_consumer = octavia_messaging_consumer.OctaviaConsumer(
self)
service.launch(cfg.CONF, self.octavia_consumer)
LOG.debug("OctaviaDriver: initialized, version=%s", VERSION)
@property

View File

@ -0,0 +1,112 @@
# Copyright 2016 Rackspace
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from neutron_lbaas._i18n import _LI
from oslo_config import cfg
from oslo_log import log as logging
import oslo_messaging as messaging
from oslo_service import service
oslo_messaging_opts = [
cfg.StrOpt('event_stream_topic',
default='neutron_lbaas_event',
help=_('topic name for receiving events from a queue'))
]
cfg.CONF.register_opts(oslo_messaging_opts, group='oslo_messaging')
LOG = logging.getLogger(__name__)
class InfoContainer(object):
@staticmethod
def from_dict(dict_obj):
return InfoContainer(dict_obj['info_type'],
dict_obj['info_id'],
dict_obj['info_payload'])
def __init__(self, info_type, info_id, info_payload):
self.info_type = info_type
self.info_id = info_id
self.info_payload = info_payload
def to_dict(self):
return {'info_type': self.info_type,
'info_id': self.info_id,
'info_payload': self.info_payload}
def __eq__(self, other):
if not isinstance(other, InfoContainer):
return False
if self.info_type != other.info_type:
return False
if self.info_id != other.info_id:
return False
if self.info_payload != other.info_payload:
return False
return True
def __ne__(self, other):
return not self == other
class ConsumerEndPoint(object):
target = messaging.Target(namespace="control", version='1.0')
def __init__(self, driver):
self.driver = driver
def update_info(self, ctx, container):
LOG.debug("Received event from stream %s", container)
container_inst = InfoContainer.from_dict(container)
self.driver.handle_streamed_event(container_inst)
class OctaviaConsumer(service.Service):
def __init__(self, driver, **kwargs):
super(OctaviaConsumer, self).__init__(**kwargs)
topic = cfg.CONF.oslo_messaging.event_stream_topic
server = cfg.CONF.host
self.driver = driver
self.transport = messaging.get_transport(cfg.CONF)
self.target = messaging.Target(topic=topic, server=server,
exchange="common", fanout=False)
self.endpoints = [ConsumerEndPoint(self.driver)]
self.server = None
def start(self):
super(OctaviaConsumer, self).start()
LOG.info(_LI("Starting octavia consumer..."))
self.server = messaging.get_rpc_server(self.transport, self.target,
self.endpoints,
executor='eventlet')
self.server.start()
def stop(self, graceful=False):
if self.server:
LOG.info(_LI('Stopping consumer...'))
self.server.stop()
if graceful:
LOG.info(
_LI('Consumer successfully stopped. Waiting for final '
'messages to be processed...'))
self.server.wait()
super(OctaviaConsumer, self).stop(graceful=graceful)
def reset(self):
if self.server:
self.server.reset()
super(OctaviaConsumer, self).reset()

View File

@ -118,3 +118,11 @@ LOADBALANCERV2 = "LOADBALANCERV2"
# for the LBaaS V1 vip and LBaaS V2 listeners. -1 indicates
# no limit, the value cannot be less than -1.
MIN_CONNECT_VALUE = -1
# LBaas V2 Table entities
LISTENER_EVENT = 'listener'
LISTENER_STATS_EVENT = 'listener_stats'
LOADBALANCER_EVENT = 'loadbalancer'
MEMBER_EVENT = 'member'
OPERATING_STATUS = 'operating_status'
POOL_EVENT = 'pool'

View File

@ -0,0 +1,177 @@
# Copyright 2016 Rackspace
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from oslo_config import cfg
from neutron_lbaas.common import exceptions
from neutron_lbaas.db.loadbalancer import models
import neutron_lbaas.drivers.octavia.driver as odriver
from neutron_lbaas.drivers.octavia.driver import octavia_messaging_consumer
from neutron_lbaas.services.loadbalancer import constants
from neutron_lbaas.tests.unit.drivers.octavia import test_octavia_driver
InfoContainer = octavia_messaging_consumer.InfoContainer
class TestOctaviaMessagingConsumer(test_octavia_driver.BaseOctaviaDriverTest):
def setUp(self):
super(test_octavia_driver.BaseOctaviaDriverTest, self).setUp()
self.plugin = mock.Mock()
self.driver = odriver.OctaviaDriver(self.plugin)
def assert_handle_streamed_event_called(self, model_class, id_param,
payload):
call_args_list = self.driver.plugin.db.update_status.call_args_list[0]
self.assertEqual(len(call_args_list), 2)
self.assertEqual(len(call_args_list[0]), 3)
self.assertEqual(model_class, call_args_list[0][1])
self.assertEqual(call_args_list[0][2], id_param)
self.assertEqual(call_args_list[1], payload)
def test_info_container_constructor(self):
ID = 'test_id'
PAYLOAD = 'test_payload'
TYPE = 'test_type'
cnt = InfoContainer(TYPE, ID, PAYLOAD)
self.assertEqual(cnt.info_type, TYPE)
self.assertEqual(cnt.info_id, ID)
self.assertEqual(cnt.info_payload, PAYLOAD)
self.assertEqual(cnt.to_dict(), {'info_type': TYPE, 'info_id': ID,
'info_payload': PAYLOAD})
def test_info_container_from_dict(self):
ID = 'test_id'
PAYLOAD = 'test_payload'
TYPE = 'test_type'
cnt = InfoContainer.from_dict({'info_type': TYPE, 'info_id': ID,
'info_payload': PAYLOAD})
self.assertEqual(cnt.info_type, TYPE)
self.assertEqual(cnt.info_id, ID)
self.assertEqual(cnt.info_payload, PAYLOAD)
def test_set_consumer_topic(self):
TOPIC = 'neutron_lbaas_event'
self.addCleanup(cfg.CONF.clear_override, 'event_stream_topic',
group='oslo_messaging')
cfg.CONF.set_override('event_stream_topic', TOPIC,
group='oslo_messaging')
consumer = octavia_messaging_consumer.OctaviaConsumer(self.driver)
self.assertIsNotNone(consumer.transport)
self.assertEqual(TOPIC, consumer.target.topic)
self.assertEqual(cfg.CONF.host, consumer.target.server)
@mock.patch.object(octavia_messaging_consumer.messaging, 'get_rpc_server')
def test_consumer_start(self, mock_get_rpc_server):
mock_server = mock.Mock()
mock_get_rpc_server.return_value = mock_server
TOPIC = 'neutron_lbaas_event'
self.addCleanup(cfg.CONF.clear_override, 'event_stream_topic',
group='oslo_messaging')
cfg.CONF.set_override('event_stream_topic', TOPIC,
group='oslo_messaging')
consumer = octavia_messaging_consumer.OctaviaConsumer(self.driver)
consumer.start()
mock_get_rpc_server.assert_called_once_with(
consumer.transport, consumer.target, consumer.endpoints,
executor='eventlet'
)
mock_server.start.assert_called_once_with()
@mock.patch.object(octavia_messaging_consumer.messaging, 'get_rpc_server')
def test_consumer_stop(self, mock_get_rpc_server):
mock_server = mock.Mock()
mock_get_rpc_server.return_value = mock_server
consumer = octavia_messaging_consumer.OctaviaConsumer(self.driver)
consumer.start()
consumer.stop()
mock_server.stop.assert_called_once_with()
mock_server.wait.assert_not_called()
@mock.patch.object(octavia_messaging_consumer.messaging, 'get_rpc_server')
def test_consumer_graceful_stop(self, mock_get_rpc_server):
mock_server = mock.Mock()
mock_get_rpc_server.return_value = mock_server
consumer = octavia_messaging_consumer.OctaviaConsumer(self.driver)
consumer.start()
consumer.stop(graceful=True)
mock_server.stop.assert_called_once_with()
mock_server.wait.assert_called_once_with()
@mock.patch.object(octavia_messaging_consumer.messaging, 'get_rpc_server')
def test_consumer_reset(self, mock_get_rpc_server):
mock_server = mock.Mock()
mock_get_rpc_server.return_value = mock_server
consumer = octavia_messaging_consumer.OctaviaConsumer(self.driver)
consumer.start()
consumer.reset()
mock_server.reset.assert_called_once_with()
def set_db_mocks(self):
TOPIC = 'neutron_lbaas_event'
self.addCleanup(cfg.CONF.clear_override, 'event_stream_topic',
group='oslo_messaging')
cfg.CONF.set_override('event_stream_topic', TOPIC,
group='oslo_messaging')
self.payload = {'operating_status': 'ONLINE'}
self.consumer = octavia_messaging_consumer.OctaviaConsumer(
self.driver)
def test_updatedb_with_raises_exception_with_bad_model_name(self):
self.set_db_mocks()
cnt = InfoContainer('listener_statsX', 'id',
self.payload).to_dict()
self.assertRaises(exceptions.ModelMapException,
self.consumer.endpoints[0].update_info, {}, cnt)
def test_updatedb_ignores_listener_stats(self):
self.set_db_mocks()
cnt = InfoContainer('listener_stats', 'id', self.payload).to_dict()
self.consumer.endpoints[0].update_info({}, cnt)
call_len = len(self.driver.plugin.db.update_status.call_args_list)
self.assertEqual(call_len, 0) # See didn't do anything
def test_updatedb_loadbalancer(self):
self.set_db_mocks()
cnt = InfoContainer(constants.LOADBALANCER_EVENT, 'lb_id',
self.payload).to_dict()
self.consumer.endpoints[0].update_info({}, cnt)
self.assert_handle_streamed_event_called(models.LoadBalancer, 'lb_id',
self.payload)
def test_updatedb_listener(self):
self.set_db_mocks()
cnt = InfoContainer(constants.LISTENER_EVENT, 'listener_id',
self.payload).to_dict()
self.consumer.endpoints[0].update_info({}, cnt)
self.assert_handle_streamed_event_called(models.Listener,
'listener_id',
self.payload)
def test_updatedb_pool(self):
self.set_db_mocks()
cnt = InfoContainer(constants.POOL_EVENT, 'pool_id',
self.payload).to_dict()
self.consumer.endpoints[0].update_info({}, cnt)
self.assert_handle_streamed_event_called(models.PoolV2, 'pool_id',
self.payload)
def test_updatedb_member(self):
self.set_db_mocks()
cnt = InfoContainer(constants.MEMBER_EVENT, 'pool_id',
self.payload).to_dict()
self.consumer.endpoints[0].update_info({}, cnt)
self.assert_handle_streamed_event_called(models.MemberV2, 'pool_id',
self.payload)