Add rpc agent api and callbacks to resources_rpc

This patch also refactors existing test cases for server side rpc
classes in order to test code in generic manner. Finally, we remove
notify() and get_resource() from consumers or producers modules
respectively in order to remove circular dependencies. The notificitaion
driver will send events directly using RPC api class instead of going
through registry.

Co-Authored-By: Miguel Angel Ajo <mangelajo@redhat.com>
Partially-Implements: blueprint quantum-qos-api
Change-Id: I9120748505856acc7aa8d15d896697dd8487bb02
This commit is contained in:
Jakub Libosvar 2015-08-03 15:48:02 +00:00 committed by Ihar Hrachyshka
parent 11e22a435a
commit ac3e1e1256
5 changed files with 227 additions and 73 deletions

View File

@ -17,12 +17,14 @@ from oslo_log import helpers as log_helpers
from oslo_log import log as logging
import oslo_messaging
from neutron.api.rpc.callbacks.producer import registry
from neutron.api.rpc.callbacks.consumer import registry as cons_registry
from neutron.api.rpc.callbacks.producer import registry as prod_registry
from neutron.api.rpc.callbacks import resources
from neutron.common import constants
from neutron.common import exceptions
from neutron.common import rpc as n_rpc
from neutron.common import topics
from neutron.objects import base as obj_base
LOG = logging.getLogger(__name__)
@ -83,9 +85,7 @@ class ResourcesPullRpcApi(object):
raise ResourceNotFound(resource_type=resource_type,
resource_id=resource_id)
obj = resource_type_cls.obj_from_primitive(primitive)
obj.obj_reset_changes()
return obj
return resource_type_cls.clean_obj_from_primitive(primitive)
class ResourcesPullRpcCallback(object):
@ -103,11 +103,73 @@ class ResourcesPullRpcCallback(object):
version='1.0', namespace=constants.RPC_NAMESPACE_RESOURCES)
def pull(self, context, resource_type, version, resource_id):
_validate_resource_type(resource_type)
obj = registry.pull(resource_type, resource_id, context=context)
obj = prod_registry.pull(resource_type, resource_id, context=context)
if obj:
# don't request a backport for the latest known version
#TODO(QoS): Remove in the future with new version of
# versionedobjects containing
# https://review.openstack.org/#/c/207998/
if version == obj.VERSION:
version = None
return obj.obj_to_primitive(target_version=version)
def _object_topic(obj):
resource_type = resources.get_resource_type(obj)
return topics.RESOURCE_TOPIC_PATTERN % {
'resource_type': resource_type, 'version': obj.VERSION}
class ResourcesPushRpcApi(object):
"""Plugin-side RPC for plugin-to-agents interaction.
This interface is designed to push versioned object updates to interested
agents using fanout topics.
This class implements the caller side of an rpc interface. The receiver
side can be found below: ResourcesPushRpcCallback.
"""
def __init__(self):
target = oslo_messaging.Target(
version='1.0',
namespace=constants.RPC_NAMESPACE_RESOURCES)
self.client = n_rpc.get_client(target)
def _prepare_object_fanout_context(self, obj):
"""Prepare fanout context, one topic per object type."""
obj_topic = _object_topic(obj)
return self.client.prepare(fanout=True, topic=obj_topic)
@log_helpers.log_method_call
def push(self, context, resource, event_type):
resource_type = resources.get_resource_type(resource)
_validate_resource_type(resource_type)
cctxt = self._prepare_object_fanout_context(resource)
#TODO(QoS): Push notifications for every known version once we have
# multiple of those
dehydrated_resource = resource.obj_to_primitive()
cctxt.cast(context, 'push',
resource=dehydrated_resource,
event_type=event_type)
class ResourcesPushRpcCallback(object):
"""Agent-side RPC for plugin-to-agents interaction.
This class implements the receiver for notification about versioned objects
resource updates used by neutron.api.rpc.callbacks. You can find the
caller side in ResourcesPushRpcApi.
"""
# History
# 1.0 Initial version
target = oslo_messaging.Target(version='1.0',
namespace=constants.RPC_NAMESPACE_RESOURCES)
def push(self, context, resource, event_type):
resource_obj = obj_base.NeutronObject.clean_obj_from_primitive(
resource)
LOG.debug("Resources notification (%(event_type)s): %(resource)s",
{'event_type': event_type, 'resource': repr(resource_obj)})
resource_type = resources.get_resource_type(resource_obj)
cons_registry.push(resource_type, resource_obj, event_type)

View File

@ -38,6 +38,8 @@ DHCP_AGENT = 'dhcp_agent'
METERING_AGENT = 'metering_agent'
LOADBALANCER_AGENT = 'n-lbaas_agent'
RESOURCE_TOPIC_PATTERN = "neutron-vo-%(resource_type)s-%(version)s"
def get_topic_name(prefix, table, operation, host=None):
"""Create a topic name.

View File

@ -48,6 +48,12 @@ class NeutronObject(obj_base.VersionedObject,
def to_dict(self):
return dict(self.items())
@classmethod
def clean_obj_from_primitive(cls, primitive, context=None):
obj = cls.obj_from_primitive(primitive, context)
obj.obj_reset_changes()
return obj
@classmethod
def get_by_id(cls, context, id):
raise NotImplementedError()

View File

@ -15,71 +15,100 @@
import mock
from oslo_utils import uuidutils
from oslo_versionedobjects import base as obj_base
from oslo_versionedobjects import fields as obj_fields
import testtools
from neutron.api.rpc.callbacks import resources
from neutron.api.rpc.handlers import resources_rpc
from neutron.common import topics
from neutron import context
from neutron.objects.qos import policy
from neutron.objects import base as objects_base
from neutron.tests import base
@obj_base.VersionedObjectRegistry.register
class FakeResource(objects_base.NeutronObject):
fields = {
'id': obj_fields.UUIDField(),
'field': obj_fields.StringField()
}
@classmethod
def get_objects(cls, context, **kwargs):
return list()
class ResourcesRpcBaseTestCase(base.BaseTestCase):
def setUp(self):
super(ResourcesRpcBaseTestCase, self).setUp()
self.context = context.get_admin_context()
def _create_test_policy_dict(self):
def _create_test_dict(self):
return {'id': uuidutils.generate_uuid(),
'tenant_id': uuidutils.generate_uuid(),
'name': 'test',
'description': 'test',
'shared': False}
'field': 'foo'}
def _create_test_policy(self, policy_dict):
policy_obj = policy.QosPolicy(self.context, **policy_dict)
policy_obj.obj_reset_changes()
return policy_obj
def _create_test_resource(self, **kwargs):
resource = FakeResource(self.context, **kwargs)
resource.obj_reset_changes()
return resource
class _ValidateResourceTypeTestCase(base.BaseTestCase):
def setUp(self):
super(_ValidateResourceTypeTestCase, self).setUp()
self.is_valid_mock = mock.patch.object(
resources_rpc.resources, 'is_valid_resource_type').start()
def test_valid_type(self):
self.is_valid_mock.return_value = True
resources_rpc._validate_resource_type('foo')
def test_invalid_type(self):
self.is_valid_mock.return_value = False
with testtools.ExpectedException(
resources_rpc.InvalidResourceTypeClass):
resources_rpc._validate_resource_type('foo')
class ResourcesPullRpcApiTestCase(ResourcesRpcBaseTestCase):
def setUp(self):
super(ResourcesPullRpcApiTestCase, self).setUp()
self.client_p = mock.patch.object(resources_rpc.n_rpc, 'get_client')
self.client = self.client_p.start()
mock.patch.object(resources_rpc.n_rpc, 'get_client').start()
mock.patch.object(resources_rpc, '_validate_resource_type').start()
mock.patch('neutron.api.rpc.callbacks.resources.get_resource_cls',
return_value=FakeResource).start()
self.rpc = resources_rpc.ResourcesPullRpcApi()
self.mock_cctxt = self.rpc.client.prepare.return_value
self.cctxt_mock = self.rpc.client.prepare.return_value
def test_is_singleton(self):
self.assertEqual(id(self.rpc),
id(resources_rpc.ResourcesPullRpcApi()))
def test_pull(self):
policy_dict = self._create_test_policy_dict()
expected_policy_obj = self._create_test_policy(policy_dict)
qos_policy_id = policy_dict['id']
self.mock_cctxt.call.return_value = (
expected_policy_obj.obj_to_primitive())
pull_result = self.rpc.pull(
self.context, resources.QOS_POLICY, qos_policy_id)
self.mock_cctxt.call.assert_called_once_with(
self.context, 'pull', resource_type=resources.QOS_POLICY,
version=policy.QosPolicy.VERSION, resource_id=qos_policy_id)
self.assertEqual(expected_policy_obj, pull_result)
resource_dict = self._create_test_dict()
expected_obj = self._create_test_resource(**resource_dict)
resource_id = resource_dict['id']
self.cctxt_mock.call.return_value = expected_obj.obj_to_primitive()
def test_pull_invalid_resource_type_cls(self):
self.assertRaises(
resources_rpc.InvalidResourceTypeClass, self.rpc.pull,
self.context, 'foo_type', 'foo_id')
result = self.rpc.pull(
self.context, FakeResource.obj_name(), resource_id)
self.cctxt_mock.call.assert_called_once_with(
self.context, 'pull', resource_type='FakeResource',
version=FakeResource.VERSION, resource_id=resource_id)
self.assertEqual(expected_obj, result)
def test_pull_resource_not_found(self):
policy_dict = self._create_test_policy_dict()
qos_policy_id = policy_dict['id']
self.mock_cctxt.call.return_value = None
self.assertRaises(
resources_rpc.ResourceNotFound, self.rpc.pull,
self.context, resources.QOS_POLICY, qos_policy_id)
resource_dict = self._create_test_dict()
resource_id = resource_dict['id']
self.cctxt_mock.call.return_value = None
with testtools.ExpectedException(resources_rpc.ResourceNotFound):
self.rpc.pull(self.context, FakeResource.obj_name(),
resource_id)
class ResourcesPullRpcCallbackTestCase(ResourcesRpcBaseTestCase):
@ -87,45 +116,91 @@ class ResourcesPullRpcCallbackTestCase(ResourcesRpcBaseTestCase):
def setUp(self):
super(ResourcesPullRpcCallbackTestCase, self).setUp()
self.callbacks = resources_rpc.ResourcesPullRpcCallback()
self.resource_dict = self._create_test_dict()
self.resource_obj = self._create_test_resource(**self.resource_dict)
def test_pull(self):
policy_dict = self._create_test_policy_dict()
policy_obj = self._create_test_policy(policy_dict)
qos_policy_id = policy_dict['id']
with mock.patch.object(resources_rpc.registry, 'pull',
return_value=policy_obj) as registry_mock:
with mock.patch.object(
resources_rpc.prod_registry, 'pull',
return_value=self.resource_obj) as registry_mock:
primitive = self.callbacks.pull(
self.context, resource_type=resources.QOS_POLICY,
version=policy.QosPolicy.VERSION,
resource_id=qos_policy_id)
registry_mock.assert_called_once_with(
resources.QOS_POLICY,
qos_policy_id, context=self.context)
self.assertEqual(policy_dict, primitive['versioned_object.data'])
self.assertEqual(policy_obj.obj_to_primitive(), primitive)
self.context, resource_type=FakeResource.obj_name(),
version=FakeResource.VERSION,
resource_id=self.resource_dict['id'])
registry_mock.assert_called_once_with(
'FakeResource', self.resource_dict['id'], context=self.context)
self.assertEqual(self.resource_dict,
primitive['versioned_object.data'])
self.assertEqual(self.resource_obj.obj_to_primitive(), primitive)
@mock.patch.object(policy.QosPolicy, 'obj_to_primitive')
@mock.patch.object(FakeResource, 'obj_to_primitive')
def test_pull_no_backport_for_latest_version(self, to_prim_mock):
policy_dict = self._create_test_policy_dict()
policy_obj = self._create_test_policy(policy_dict)
qos_policy_id = policy_dict['id']
with mock.patch.object(resources_rpc.registry, 'pull',
return_value=policy_obj):
with mock.patch.object(resources_rpc.prod_registry, 'pull',
return_value=self.resource_obj):
self.callbacks.pull(
self.context, resource_type=resources.QOS_POLICY,
version=policy.QosPolicy.VERSION,
resource_id=qos_policy_id)
to_prim_mock.assert_called_with(target_version=None)
self.context, resource_type=FakeResource.obj_name(),
version=FakeResource.VERSION,
resource_id=self.resource_obj.id)
to_prim_mock.assert_called_with(target_version=None)
@mock.patch.object(policy.QosPolicy, 'obj_to_primitive')
@mock.patch.object(FakeResource, 'obj_to_primitive')
def test_pull_backports_to_older_version(self, to_prim_mock):
policy_dict = self._create_test_policy_dict()
policy_obj = self._create_test_policy(policy_dict)
qos_policy_id = policy_dict['id']
with mock.patch.object(resources_rpc.registry, 'pull',
return_value=policy_obj):
with mock.patch.object(resources_rpc.prod_registry, 'pull',
return_value=self.resource_obj):
self.callbacks.pull(
self.context, resource_type=resources.QOS_POLICY,
self.context, resource_type=FakeResource.obj_name(),
version='0.9', # less than initial version 1.0
resource_id=qos_policy_id)
resource_id=self.resource_dict['id'])
to_prim_mock.assert_called_with(target_version='0.9')
class ResourcesPushRpcApiTestCase(ResourcesRpcBaseTestCase):
def setUp(self):
super(ResourcesPushRpcApiTestCase, self).setUp()
mock.patch.object(resources_rpc.n_rpc, 'get_client').start()
mock.patch.object(resources_rpc, '_validate_resource_type').start()
self.rpc = resources_rpc.ResourcesPushRpcApi()
self.cctxt_mock = self.rpc.client.prepare.return_value
resource_dict = self._create_test_dict()
self.resource_obj = self._create_test_resource(**resource_dict)
def test__prepare_object_fanout_context(self):
expected_topic = topics.RESOURCE_TOPIC_PATTERN % {
'resource_type': resources.get_resource_type(self.resource_obj),
'version': self.resource_obj.VERSION}
observed = self.rpc._prepare_object_fanout_context(self.resource_obj)
self.rpc.client.prepare.assert_called_once_with(
fanout=True, topic=expected_topic)
self.assertEqual(self.cctxt_mock, observed)
def test_push(self):
self.rpc.push(
self.context, self.resource_obj, 'TYPE')
self.cctxt_mock.cast.assert_called_once_with(
self.context, 'push',
resource=self.resource_obj.obj_to_primitive(),
event_type='TYPE')
class ResourcesPushRpcCallbackTestCase(ResourcesRpcBaseTestCase):
def setUp(self):
super(ResourcesPushRpcCallbackTestCase, self).setUp()
mock.patch.object(resources_rpc, '_validate_resource_type').start()
mock.patch.object(
resources_rpc.resources,
'get_resource_cls', return_value=FakeResource).start()
resource_dict = self._create_test_dict()
self.resource_obj = self._create_test_resource(**resource_dict)
self.resource_prim = self.resource_obj.obj_to_primitive()
self.callbacks = resources_rpc.ResourcesPushRpcCallback()
@mock.patch.object(resources_rpc.cons_registry, 'push')
def test_push(self, reg_push_mock):
self.callbacks.push(self.context, self.resource_prim, 'TYPE')
reg_push_mock.assert_called_once_with(self.resource_obj.obj_name(),
self.resource_obj, 'TYPE')

View File

@ -26,6 +26,8 @@ from neutron.tests import base as test_base
SQLALCHEMY_COMMIT = 'sqlalchemy.engine.Connection._commit_impl'
OBJECTS_BASE_OBJ_FROM_PRIMITIVE = ('oslo_versionedobjects.base.'
'VersionedObject.obj_from_primitive')
class FakeModel(object):
@ -214,6 +216,13 @@ class BaseObjectIfaceTestCase(_BaseObjectTestCase, test_base.BaseTestCase):
delete_mock.assert_called_once_with(
self.context, self._test_class.db_model, self.db_obj['id'])
@mock.patch(OBJECTS_BASE_OBJ_FROM_PRIMITIVE)
def test_clean_obj_from_primitive(self, get_prim_m):
expected_obj = get_prim_m.return_value
observed_obj = self._test_class.clean_obj_from_primitive('foo', 'bar')
self.assertIs(expected_obj, observed_obj)
self.assertTrue(observed_obj.obj_reset_changes.called)
class BaseDbObjectTestCase(_BaseObjectTestCase):