Add bulk pull OVO interface

Add an RPC interface to retrieve many OVO resources
at once based on filters.

Partially-Implements: blueprint push-notifications
Change-Id: I5e765712ab8b8065d71653c563f004c7a9ce9021
This commit is contained in:
Kevin Benton 2017-01-22 16:45:41 -08:00
parent 8a7909080c
commit 9645877da7
3 changed files with 72 additions and 11 deletions

View File

@ -49,6 +49,14 @@ def _validate_resource_type(resource_type):
raise InvalidResourceTypeClass(resource_type=resource_type)
def _resource_to_class(resource_type):
_validate_resource_type(resource_type)
# we've already validated the resource type, so we are pretty sure the
# class is there => no need to validate it specifically
return resources.get_resource_cls(resource_type)
def resource_type_versioned_topic(resource_type, version=None):
"""Return the topic for a resource type.
@ -74,19 +82,14 @@ class ResourcesPullRpcApi(object):
if not hasattr(cls, '_instance'):
cls._instance = super(ResourcesPullRpcApi, cls).__new__(cls)
target = oslo_messaging.Target(
topic=topics.PLUGIN, version='1.0',
topic=topics.PLUGIN, version='1.1',
namespace=constants.RPC_NAMESPACE_RESOURCES)
cls._instance.client = n_rpc.get_client(target)
return cls._instance
@log_helpers.log_method_call
def pull(self, context, resource_type, resource_id):
_validate_resource_type(resource_type)
# we've already validated the resource type, so we are pretty sure the
# class is there => no need to validate it specifically
resource_type_cls = resources.get_resource_cls(resource_type)
resource_type_cls = _resource_to_class(resource_type)
cctxt = self.client.prepare()
primitive = cctxt.call(context, 'pull',
resource_type=resource_type,
@ -95,9 +98,18 @@ class ResourcesPullRpcApi(object):
if primitive is None:
raise ResourceNotFound(resource_type=resource_type,
resource_id=resource_id)
return resource_type_cls.clean_obj_from_primitive(primitive)
@log_helpers.log_method_call
def bulk_pull(self, context, resource_type, filter_kwargs=None):
resource_type_cls = _resource_to_class(resource_type)
cctxt = self.client.prepare()
primitives = cctxt.call(context, 'bulk_pull',
resource_type=resource_type,
version=resource_type_cls.VERSION, filter_kwargs=filter_kwargs)
return [resource_type_cls.clean_obj_from_primitive(primitive)
for primitive in primitives]
class ResourcesPullRpcCallback(object):
"""Plugin-side RPC (implementation) for agent-to-plugin interaction.
@ -109,9 +121,10 @@ class ResourcesPullRpcCallback(object):
# History
# 1.0 Initial version
# 1.1 Added bulk_pull
target = oslo_messaging.Target(
version='1.0', namespace=constants.RPC_NAMESPACE_RESOURCES)
version='1.1', namespace=constants.RPC_NAMESPACE_RESOURCES)
@oslo_messaging.expected_exceptions(rpc_exc.CallbackNotFound)
def pull(self, context, resource_type, version, resource_id):
@ -119,6 +132,16 @@ class ResourcesPullRpcCallback(object):
if obj:
return obj.obj_to_primitive(target_version=version)
@oslo_messaging.expected_exceptions(rpc_exc.CallbackNotFound)
def bulk_pull(self, context, resource_type, version, filter_kwargs=None):
filter_kwargs = filter_kwargs or {}
resource_type_cls = _resource_to_class(resource_type)
# TODO(kevinbenton): add in producer registry so producers can add
# hooks to mangle these things like they can with 'pull'.
return [obj.obj_to_primitive(target_version=version)
for obj in resource_type_cls.get_objects(context, _pager=None,
**filter_kwargs)]
class ResourcesPushToServersRpcApi(object):
"""Publisher-side RPC (stub) for plugin-to-plugin fanout interaction.

View File

@ -98,8 +98,6 @@ class OVOServerRpcInterface(object):
"""ML2 server-side RPC interface.
Generates RPC callback notifications on ML2 object changes.
TODO(kevinbenton): interface to query server for these objects
"""
def __init__(self):

View File

@ -151,6 +151,23 @@ class ResourcesPullRpcApiTestCase(ResourcesRpcBaseTestCase):
version=TEST_VERSION, resource_id=resource_id)
self.assertEqual(expected_obj, result)
def test_bulk_pull(self):
self.obj_registry.register(FakeResource)
expected_objs = [_create_test_resource(self.context),
_create_test_resource(self.context)]
self.cctxt_mock.call.return_value = [
e.obj_to_primitive() for e in expected_objs]
filter_kwargs = {'a': 'b', 'c': 'd'}
result = self.rpc.bulk_pull(
self.context, FakeResource.obj_name(),
filter_kwargs=filter_kwargs)
self.cctxt_mock.call.assert_called_once_with(
self.context, 'bulk_pull', resource_type='FakeResource',
version=TEST_VERSION, filter_kwargs=filter_kwargs)
self.assertEqual(expected_objs, result)
def test_pull_resource_not_found(self):
resource_dict = _create_test_dict()
resource_id = resource_dict['id']
@ -198,6 +215,29 @@ class ResourcesPullRpcCallbackTestCase(ResourcesRpcBaseTestCase):
primitive['versioned_object.data'])
self.assertEqual(self.resource_obj.obj_to_primitive(), primitive)
def test_bulk_pull(self):
r1 = self.resource_obj
r2 = _create_test_resource(self.context)
@classmethod
def get_objs(*args, **kwargs):
if 'id' not in kwargs:
return [r1, r2]
return [r for r in [r1, r2] if r.id == kwargs['id']]
# the bulk interface currently retrieves directly from the registry
with mock.patch.object(FakeResource, 'get_objects', new=get_objs):
objs = self.callbacks.bulk_pull(
self.context, resource_type=FakeResource.obj_name(),
version=TEST_VERSION)
self.assertItemsEqual([r1.obj_to_primitive(),
r2.obj_to_primitive()],
objs)
objs = self.callbacks.bulk_pull(
self.context, resource_type=FakeResource.obj_name(),
version=TEST_VERSION, filter_kwargs={'id': r1.id})
self.assertEqual([r1.obj_to_primitive()], objs)
@mock.patch.object(FakeResource, 'obj_to_primitive')
def test_pull_backports_to_older_version(self, to_prim_mock):
with mock.patch.object(resources_rpc.prod_registry, 'pull',