Implementation for bp/api-to-oslo-messing-handler

Implements: blueprint api-to-oslo-messaging-handler

Change-Id: Ia23418943c6c21032bf204132a8118ace237ef10
This commit is contained in:
Carlos D. Garza 2015-01-21 16:58:59 -06:00 committed by Carlos Garza
parent 39d20059ca
commit 0969dcb7a8
9 changed files with 322 additions and 6 deletions

1
.gitignore vendored
View File

@ -7,6 +7,7 @@ cover/
covhtml/
dist/
doc/build
.idea/*
*.DS_Store
*.pyc
*.egg-info/

View File

@ -21,17 +21,17 @@ import six
class BaseObjectHandler(object):
"""Base class for any object handler."""
@abc.abstractmethod
def create(self, data_model):
def create(self, model_id):
"""Begins process of actually creating data_model."""
pass
@abc.abstractmethod
def update(self, data_model):
def update(self, model_id, updated_dict):
"""Begins process of actually updating data_model."""
pass
@abc.abstractmethod
def delete(self, data_model):
def delete(self, model_id):
"""Begins process of actually deleting data_model."""
pass
@ -42,13 +42,13 @@ class NotImplementedObjectHandler(BaseObjectHandler):
Helper class to make any subclass of AbstractHandler explode if it
is missing any of the required object managers.
"""
def update(self, data_model):
def update(self, model_id, updated_dict):
raise NotImplementedError()
def delete(self, data_model):
def delete(self, model_id):
raise NotImplementedError()
def create(self, data_model):
def create(self, model_id):
raise NotImplementedError()

View File

@ -0,0 +1,143 @@
# Copyright 2014 Rackspace
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.# Copyright 2014 Rackspace
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
from oslo.config import cfg
from oslo import messaging
import six
from octavia.api.v1.handlers import abstract_handler
from octavia.common import constants
cfg.CONF.import_group('oslo_messaging', 'octavia.common.config')
@six.add_metaclass(abc.ABCMeta)
class BaseProducer(abstract_handler.BaseObjectHandler):
"""Base queue producer class."""
@abc.abstractproperty
def payload_class(self):
"""returns a string representing the container class."""
pass
def __init__(self):
topic = cfg.CONF.oslo_messaging.topic
self.transport = messaging.get_transport(cfg.CONF)
self.target = messaging.Target(
namespace=constants.RPC_NAMESPACE_CONTROLLER_AGENT,
topic=topic, version="1.0", fanout=False)
self.client = messaging.RPCClient(self.transport, target=self.target)
def create(self, model_id):
"""Sends a create message to the controller via oslo.messaging
:param data_model:
"""
kw = {"{0}_id".format(self.payload_class): model_id}
method_name = "create_{0}".format(self.payload_class)
self.client.cast({}, method_name, **kw)
def update(self, model_id, updated_dict):
"""sends an update message to the controller via oslo.messaging
:param updated_model:
:param data_model:
"""
kw = {"{0}_updates".format(self.payload_class): updated_dict,
"{0}_id".format(self.payload_class): model_id}
method_name = "update_{0}".format(self.payload_class)
self.client.cast({}, method_name, **kw)
def delete(self, model_id):
"""sends a delete message to the controller via oslo.messaging
:param updated_model:
:param data_model:
"""
kw = {"{0}_id".format(self.payload_class): model_id}
method_name = "delete_{0}".format(self.payload_class)
self.client.cast({}, method_name, **kw)
class LoadBalancerProducer(BaseProducer):
"""Sends updates,deletes and creates to the RPC end of the queue consumer
"""
@property
def payload_class(self):
return "load_balancer"
class ListenerProducer(BaseProducer):
"""Sends updates,deletes and creates to the RPC end of the queue consumer
"""
@property
def payload_class(self):
return "listener"
class PoolProducer(BaseProducer):
"""Sends updates,deletes and creates to the RPC end of the queue consumer
"""
@property
def payload_class(self):
return "pool"
class HealthMonitorProducer(BaseProducer):
"""Sends updates,deletes and creates to the RPC end of the queue consumer
"""
@property
def payload_class(self):
return "health_monitor"
class MemberProducer(BaseProducer):
"""Sends updates,deletes and creates to the RPC end of the queue consumer
"""
@property
def payload_class(self):
return "member"
class ProducerHandler(abstract_handler.BaseHandler):
"""Base class for all QueueProducers.
used to send messages via the Class variables load_balancer, listener,
health_monitor, and member.
"""
load_balancer = LoadBalancerProducer()
listener = ListenerProducer()
pool = PoolProducer()
health_monitor = HealthMonitorProducer()
member = MemberProducer()

View File

@ -79,11 +79,16 @@ networking_opts = [
cfg.StrOpt('lb_network_name', help=_('Name of amphora internal network')),
]
oslo_messaging_opts = [
cfg.StrOpt('topic'),
]
core_cli_opts = []
# Register the configuration options
cfg.CONF.register_opts(core_opts)
cfg.CONF.register_opts(networking_opts, group='networking')
cfg.CONF.register_opts(oslo_messaging_opts, group='oslo_messaging')
cfg.CONF.register_cli_opts(core_cli_opts)
cfg.CONF.import_group('keystone_authtoken', 'keystonemiddleware.auth_token')

View File

@ -64,3 +64,5 @@ NOVA_1 = '1.1'
NOVA_2 = '2'
NOVA_3 = '3'
NOVA_VERSIONS = (NOVA_1, NOVA_2, NOVA_3)
RPC_NAMESPACE_CONTROLLER_AGENT = 'controller'

View File

@ -0,0 +1,165 @@
# Copyright 2014 Rackspace
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.# Copyright 2014 Rackspace
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from oslo_config import fixture
import oslo_messaging as messaging
from octavia.api.v1.handlers.queue import producer
from octavia.common import config
from octavia.tests.unit import base
class TestProducer(base.TestCase):
def setUp(self):
super(TestProducer, self).setUp()
self.config = fixture.Config()
config.cfg.CONF.set_override('topic', 'OCTAVIA_PROV',
group='oslo_messaging')
mck_target = mock.patch(
'octavia.api.v1.handlers.queue.producer.messaging.Target')
mck_transport = mock.patch(
'octavia.api.v1.handlers.queue.producer.messaging.get_transport')
self.mck_client = mock.create_autospec(messaging.RPCClient)
mck_client = mock.patch(
'octavia.api.v1.handlers.queue.producer.messaging.RPCClient',
return_value=self.mck_client)
mck_target.start()
mck_transport.start()
mck_client.start()
self.addCleanup(mck_target.stop)
self.addCleanup(mck_transport.stop)
self.addCleanup(mck_client.stop)
def test_create_loadbalancer(self):
p = producer.LoadBalancerProducer()
p.create(10)
kw = {'load_balancer_id': 10}
self.mck_client.cast.assert_called_once_with(
{}, 'create_load_balancer', **kw)
def test_delete_loadbalancer(self):
p = producer.LoadBalancerProducer()
p.delete(10)
kw = {'load_balancer_id': 10}
self.mck_client.cast.assert_called_once_with(
{}, 'delete_load_balancer', **kw)
def test_update_loadbalancer(self):
p = producer.LoadBalancerProducer()
p.update(10, {'admin_state_up': False})
kw = {'load_balancer_updates': {'admin_state_up': False},
'load_balancer_id': 10}
self.mck_client.cast.assert_called_once_with(
{}, 'update_load_balancer', **kw)
def test_create_listener(self):
p = producer.ListenerProducer()
p.create(10)
kw = {'listener_id': 10}
self.mck_client.cast.assert_called_once_with(
{}, 'create_listener', **kw)
def test_delete_listener(self):
p = producer.ListenerProducer()
p.delete(10)
kw = {'listener_id': 10}
self.mck_client.cast.assert_called_once_with(
{}, 'delete_listener', **kw)
def test_update_listener(self):
p = producer.ListenerProducer()
p.update(10, {'admin_state_up': False})
kw = {'listener_updates': {'admin_state_up': False},
'listener_id': 10}
self.mck_client.cast.assert_called_once_with(
{}, 'update_listener', **kw)
def test_create_pool(self):
p = producer.PoolProducer()
p.create(10)
kw = {'pool_id': 10}
self.mck_client.cast.assert_called_once_with(
{}, 'create_pool', **kw)
def test_delete_pool(self):
p = producer.PoolProducer()
p.delete(10)
kw = {'pool_id': 10}
self.mck_client.cast.assert_called_once_with(
{}, 'delete_pool', **kw)
def test_update_pool(self):
p = producer.PoolProducer()
p.update(10, {'admin_state_up': False})
kw = {'pool_updates': {'admin_state_up': False},
'pool_id': 10}
self.mck_client.cast.assert_called_once_with(
{}, 'update_pool', **kw)
def test_create_healthmonitor(self):
p = producer.HealthMonitorProducer()
p.create(10)
kw = {'health_monitor_id': 10}
self.mck_client.cast.assert_called_once_with(
{}, 'create_health_monitor', **kw)
def test_delete_healthmonitor(self):
p = producer.HealthMonitorProducer()
p.delete(10)
kw = {'health_monitor_id': 10}
self.mck_client.cast.assert_called_once_with(
{}, 'delete_health_monitor', **kw)
def test_update_healthmonitor(self):
p = producer.HealthMonitorProducer()
p.update(10, {'admin_state_up': False})
kw = {'health_monitor_updates': {'admin_state_up': False},
'health_monitor_id': 10}
self.mck_client.cast.assert_called_once_with(
{}, 'update_health_monitor', **kw)
def test_create_member(self):
p = producer.MemberProducer()
p.create(10)
kw = {'member_id': 10}
self.mck_client.cast.assert_called_once_with(
{}, 'create_member', **kw)
def test_delete_member(self):
p = producer.MemberProducer()
p.delete(10)
kw = {'member_id': 10}
self.mck_client.cast.assert_called_once_with(
{}, 'delete_member', **kw)
def test_update_member(self):
p = producer.MemberProducer()
p.update(10, {'admin_state_up': False})
kw = {'member_updates': {'admin_state_up': False},
'member_id': 10}
self.mck_client.cast.assert_called_once_with(
{}, 'update_member', **kw)