Introduce bulk push to rpc callback mechanism
There are usage patterns which would benefit from having the capability to send a list of resources in bulk instead of using individual fanout messages. From now on, the rpc callback subscriber receives a list of resources (single or multiple), and the pushers must always push a list. Backwards compatibility for QoSPolicy consumers in mitaka is provided by calling push with "resource" parameter for single item lists during one release cycle. That will be dropped when Ocata opens. Partially-implements: blueprint vlan-aware-vms Change-Id: I1117925360a29ecbd1902fa527b2f24f94ce81ec
This commit is contained in:
parent
2a64fabd52
commit
7f617e6a21
@ -211,10 +211,10 @@ The agent code processing port updates may look like::
|
|||||||
from neutron.api.rpc.callbacks import resources
|
from neutron.api.rpc.callbacks import resources
|
||||||
|
|
||||||
|
|
||||||
def process_resource_updates(resource_type, resource, event_type):
|
def process_resource_updates(resource_type, resource_list, event_type):
|
||||||
|
|
||||||
# send to the right handler which will update any control plane
|
# send to the right handler which will update any control plane
|
||||||
# details related to the updated resource...
|
# details related to the updated resources...
|
||||||
|
|
||||||
|
|
||||||
def subscribe_resources():
|
def subscribe_resources():
|
||||||
@ -238,7 +238,7 @@ The relevant function is:
|
|||||||
The callback function will receive the following arguments:
|
The callback function will receive the following arguments:
|
||||||
|
|
||||||
* resource_type: the type of resource which is receiving the update.
|
* resource_type: the type of resource which is receiving the update.
|
||||||
* resource: resource of supported object
|
* resource_list: list of resources which have been pushed by server.
|
||||||
* event_type: will be one of CREATED, UPDATED, or DELETED, see
|
* event_type: will be one of CREATED, UPDATED, or DELETED, see
|
||||||
neutron.api.rpc.callbacks.events for details.
|
neutron.api.rpc.callbacks.events for details.
|
||||||
|
|
||||||
@ -263,9 +263,22 @@ Sending resource events
|
|||||||
-----------------------
|
-----------------------
|
||||||
|
|
||||||
On the server side, resource updates could come from anywhere, a service plugin,
|
On the server side, resource updates could come from anywhere, a service plugin,
|
||||||
an extension, anything that updates, creates, or destroys the resource and that
|
an extension, anything that updates, creates, or destroys the resources and that
|
||||||
is of any interest to subscribed agents.
|
is of any interest to subscribed agents.
|
||||||
|
|
||||||
|
A callback is expected to receive a list of resources. When resources in the list
|
||||||
|
belong to the same resource type, a single push RPC message is sent; if the list
|
||||||
|
contains objects of different resource types, resources of each type are grouped
|
||||||
|
and sent separately, one push RPC message per type. On the receiver side,
|
||||||
|
resources in a list always belong to the same type. In other words, a server-side
|
||||||
|
push of a list of heterogenous objects will result into N messages on bus and
|
||||||
|
N client-side callback invocations, where N is the number of unique resource
|
||||||
|
types in the given list, e.g. L(A, A, B, C, C, C) would be fragmented into
|
||||||
|
L1(A, A), L2(B), L3(C, C, C), and each list pushed separately.
|
||||||
|
|
||||||
|
Note: there is no guarantee in terms of order in which separate resource lists
|
||||||
|
will be delivered to consumers.
|
||||||
|
|
||||||
The server/publisher side may look like::
|
The server/publisher side may look like::
|
||||||
|
|
||||||
from neutron.api.rpc.callbacks.producer import registry
|
from neutron.api.rpc.callbacks.producer import registry
|
||||||
@ -274,17 +287,17 @@ The server/publisher side may look like::
|
|||||||
def create_qos_policy(...):
|
def create_qos_policy(...):
|
||||||
policy = fetch_policy(...)
|
policy = fetch_policy(...)
|
||||||
update_the_db(...)
|
update_the_db(...)
|
||||||
registry.push(policy, events.CREATED)
|
registry.push([policy], events.CREATED)
|
||||||
|
|
||||||
def update_qos_policy(...):
|
def update_qos_policy(...):
|
||||||
policy = fetch_policy(...)
|
policy = fetch_policy(...)
|
||||||
update_the_db(...)
|
update_the_db(...)
|
||||||
registry.push(policy, events.UPDATED)
|
registry.push([policy], events.UPDATED)
|
||||||
|
|
||||||
def delete_qos_policy(...):
|
def delete_qos_policy(...):
|
||||||
policy = fetch_policy(...)
|
policy = fetch_policy(...)
|
||||||
update_the_db(...)
|
update_the_db(...)
|
||||||
registry.push(policy, events.DELETED)
|
registry.push([policy], events.DELETED)
|
||||||
|
|
||||||
|
|
||||||
References
|
References
|
||||||
|
@ -220,13 +220,14 @@ class QosAgentExtension(l2_agent_extension.L2AgentExtension):
|
|||||||
connection.create_consumer(topic, endpoints, fanout=True)
|
connection.create_consumer(topic, endpoints, fanout=True)
|
||||||
|
|
||||||
@lockutils.synchronized('qos-port')
|
@lockutils.synchronized('qos-port')
|
||||||
def _handle_notification(self, qos_policy, event_type):
|
def _handle_notification(self, qos_policies, event_type):
|
||||||
# server does not allow to remove a policy that is attached to any
|
# server does not allow to remove a policy that is attached to any
|
||||||
# port, so we ignore DELETED events. Also, if we receive a CREATED
|
# port, so we ignore DELETED events. Also, if we receive a CREATED
|
||||||
# event for a policy, it means that there are no ports so far that are
|
# event for a policy, it means that there are no ports so far that are
|
||||||
# attached to it. That's why we are interested in UPDATED events only
|
# attached to it. That's why we are interested in UPDATED events only
|
||||||
if event_type == events.UPDATED:
|
if event_type == events.UPDATED:
|
||||||
self._process_update_policy(qos_policy)
|
for qos_policy in qos_policies:
|
||||||
|
self._process_update_policy(qos_policy)
|
||||||
|
|
||||||
@lockutils.synchronized('qos-port')
|
@lockutils.synchronized('qos-port')
|
||||||
def handle_port(self, context, port):
|
def handle_port(self, context, port):
|
||||||
|
@ -27,12 +27,12 @@ def unsubscribe(callback, resource_type):
|
|||||||
_get_manager().unregister(callback, resource_type)
|
_get_manager().unregister(callback, resource_type)
|
||||||
|
|
||||||
|
|
||||||
def push(resource_type, resource, event_type):
|
def push(resource_type, resource_list, event_type):
|
||||||
"""Push resource events into all registered callbacks for the type."""
|
"""Push resource list into all registered callbacks for the event type."""
|
||||||
|
|
||||||
callbacks = _get_manager().get_callbacks(resource_type)
|
callbacks = _get_manager().get_callbacks(resource_type)
|
||||||
for callback in callbacks:
|
for callback in callbacks:
|
||||||
callback(resource, event_type)
|
callback(resource_list, event_type)
|
||||||
|
|
||||||
|
|
||||||
def clear():
|
def clear():
|
||||||
|
@ -13,6 +13,8 @@
|
|||||||
# License for the specific language governing permissions and limitations
|
# License for the specific language governing permissions and limitations
|
||||||
# under the License.
|
# under the License.
|
||||||
|
|
||||||
|
import collections
|
||||||
|
|
||||||
from neutron_lib import exceptions
|
from neutron_lib import exceptions
|
||||||
from oslo_log import helpers as log_helpers
|
from oslo_log import helpers as log_helpers
|
||||||
from oslo_log import log as logging
|
from oslo_log import log as logging
|
||||||
@ -179,27 +181,63 @@ class ResourcesPushRpcApi(object):
|
|||||||
|
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
target = oslo_messaging.Target(
|
target = oslo_messaging.Target(
|
||||||
version='1.0',
|
|
||||||
namespace=constants.RPC_NAMESPACE_RESOURCES)
|
namespace=constants.RPC_NAMESPACE_RESOURCES)
|
||||||
self.client = n_rpc.get_client(target)
|
self.client = n_rpc.get_client(target)
|
||||||
|
|
||||||
def _prepare_object_fanout_context(self, obj, version):
|
def _prepare_object_fanout_context(self, obj, resource_version,
|
||||||
|
rpc_version):
|
||||||
"""Prepare fanout context, one topic per object type."""
|
"""Prepare fanout context, one topic per object type."""
|
||||||
obj_topic = resource_type_versioned_topic(obj.obj_name(), version)
|
obj_topic = resource_type_versioned_topic(obj.obj_name(),
|
||||||
return self.client.prepare(fanout=True, topic=obj_topic)
|
resource_version)
|
||||||
|
return self.client.prepare(fanout=True, topic=obj_topic,
|
||||||
|
version=rpc_version)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _classify_resources_by_type(resource_list):
|
||||||
|
resources_by_type = collections.defaultdict(list)
|
||||||
|
for resource in resource_list:
|
||||||
|
resource_type = resources.get_resource_type(resource)
|
||||||
|
resources_by_type[resource_type].append(resource)
|
||||||
|
return resources_by_type
|
||||||
|
|
||||||
@log_helpers.log_method_call
|
@log_helpers.log_method_call
|
||||||
def push(self, context, resource, event_type):
|
def push(self, context, resource_list, event_type):
|
||||||
resource_type = resources.get_resource_type(resource)
|
"""Push an event and list of resources to agents, batched per type.
|
||||||
|
When a list of different resource types is passed to this method,
|
||||||
|
the push will be sent as separate individual list pushes, one per
|
||||||
|
resource type.
|
||||||
|
"""
|
||||||
|
|
||||||
|
resources_by_type = self._classify_resources_by_type(resource_list)
|
||||||
|
for resource_type, type_resources in resources_by_type.items():
|
||||||
|
self._push(context, resource_type, type_resources, event_type)
|
||||||
|
|
||||||
|
def _push(self, context, resource_type, resource_list, event_type):
|
||||||
|
"""Push an event and list of resources of the same type to agents."""
|
||||||
_validate_resource_type(resource_type)
|
_validate_resource_type(resource_type)
|
||||||
versions = version_manager.get_resource_versions(resource_type)
|
compat_call = len(resource_list) == 1
|
||||||
for version in versions:
|
|
||||||
cctxt = self._prepare_object_fanout_context(resource, version)
|
for version in version_manager.get_resource_versions(resource_type):
|
||||||
dehydrated_resource = resource.obj_to_primitive(
|
cctxt = self._prepare_object_fanout_context(
|
||||||
target_version=version)
|
resource_list[0], version,
|
||||||
cctxt.cast(context, 'push',
|
rpc_version='1.0' if compat_call else '1.1')
|
||||||
resource=dehydrated_resource,
|
|
||||||
event_type=event_type)
|
dehydrated_resources = [
|
||||||
|
resource.obj_to_primitive(target_version=version)
|
||||||
|
for resource in resource_list]
|
||||||
|
|
||||||
|
if compat_call:
|
||||||
|
#TODO(mangelajo): remove in Ocata, backwards compatibility
|
||||||
|
# for agents expecting a single element as
|
||||||
|
# a single element instead of a list, this
|
||||||
|
# is only relevant to the QoSPolicy topic queue
|
||||||
|
cctxt.cast(context, 'push',
|
||||||
|
resource=dehydrated_resources[0],
|
||||||
|
event_type=event_type)
|
||||||
|
else:
|
||||||
|
cctxt.cast(context, 'push',
|
||||||
|
resource_list=dehydrated_resources,
|
||||||
|
event_type=event_type)
|
||||||
|
|
||||||
|
|
||||||
class ResourcesPushRpcCallback(object):
|
class ResourcesPushRpcCallback(object):
|
||||||
@ -211,14 +249,22 @@ class ResourcesPushRpcCallback(object):
|
|||||||
"""
|
"""
|
||||||
# History
|
# History
|
||||||
# 1.0 Initial version
|
# 1.0 Initial version
|
||||||
|
# 1.1 push method introduces resource_list support
|
||||||
|
|
||||||
target = oslo_messaging.Target(version='1.0',
|
target = oslo_messaging.Target(version='1.1',
|
||||||
namespace=constants.RPC_NAMESPACE_RESOURCES)
|
namespace=constants.RPC_NAMESPACE_RESOURCES)
|
||||||
|
|
||||||
def push(self, context, resource, event_type):
|
def push(self, context, **kwargs):
|
||||||
resource_obj = obj_base.NeutronObject.clean_obj_from_primitive(
|
"""Push receiver, will always receive resources of the same type."""
|
||||||
resource)
|
# TODO(mangelajo): accept single 'resource' parameter for backwards
|
||||||
LOG.debug("Resources notification (%(event_type)s): %(resource)s",
|
# compatibility during Newton, remove in Ocata
|
||||||
{'event_type': event_type, 'resource': repr(resource_obj)})
|
resource_list = ([kwargs['resource']] if 'resource' in kwargs else
|
||||||
resource_type = resources.get_resource_type(resource_obj)
|
kwargs['resource_list'])
|
||||||
cons_registry.push(resource_type, resource_obj, event_type)
|
event_type = kwargs['event_type']
|
||||||
|
|
||||||
|
resource_objs = [
|
||||||
|
obj_base.NeutronObject.clean_obj_from_primitive(resource)
|
||||||
|
for resource in resource_list]
|
||||||
|
|
||||||
|
resource_type = resources.get_resource_type(resource_objs[0])
|
||||||
|
cons_registry.push(resource_type, resource_objs, event_type)
|
||||||
|
@ -53,7 +53,7 @@ class RpcQosServiceNotificationDriver(
|
|||||||
pass
|
pass
|
||||||
|
|
||||||
def update_policy(self, context, policy):
|
def update_policy(self, context, policy):
|
||||||
self.notification_api.push(context, policy, events.UPDATED)
|
self.notification_api.push(context, [policy], events.UPDATED)
|
||||||
|
|
||||||
def delete_policy(self, context, policy):
|
def delete_policy(self, context, policy):
|
||||||
self.notification_api.push(context, policy, events.DELETED)
|
self.notification_api.push(context, [policy], events.DELETED)
|
||||||
|
@ -229,7 +229,7 @@ class TestOVSAgentQosExtension(OVSAgentQoSExtensionTestFramework):
|
|||||||
policy_copy.rules[0].max_kbps = 500
|
policy_copy.rules[0].max_kbps = 500
|
||||||
policy_copy.rules[0].max_burst_kbps = 5
|
policy_copy.rules[0].max_burst_kbps = 5
|
||||||
policy_copy.rules[1].dscp_mark = TEST_DSCP_MARK_2
|
policy_copy.rules[1].dscp_mark = TEST_DSCP_MARK_2
|
||||||
consumer_reg.push(resources.QOS_POLICY, policy_copy, events.UPDATED)
|
consumer_reg.push(resources.QOS_POLICY, [policy_copy], events.UPDATED)
|
||||||
self.wait_until_bandwidth_limit_rule_applied(self.ports[0],
|
self.wait_until_bandwidth_limit_rule_applied(self.ports[0],
|
||||||
policy_copy.rules[0])
|
policy_copy.rules[0])
|
||||||
self._assert_bandwidth_limit_rule_is_set(self.ports[0],
|
self._assert_bandwidth_limit_rule_is_set(self.ports[0],
|
||||||
@ -265,6 +265,6 @@ class TestOVSAgentQosExtension(OVSAgentQoSExtensionTestFramework):
|
|||||||
|
|
||||||
policy_copy = copy.deepcopy(self.qos_policies[TEST_POLICY_ID1])
|
policy_copy = copy.deepcopy(self.qos_policies[TEST_POLICY_ID1])
|
||||||
policy_copy.rules = list()
|
policy_copy.rules = list()
|
||||||
consumer_reg.push(resources.QOS_POLICY, policy_copy, events.UPDATED)
|
consumer_reg.push(resources.QOS_POLICY, [policy_copy], events.UPDATED)
|
||||||
|
|
||||||
self.wait_until_bandwidth_limit_rule_applied(port_dict, None)
|
self.wait_until_bandwidth_limit_rule_applied(port_dict, None)
|
||||||
|
@ -238,7 +238,7 @@ class QosExtensionRpcTestCase(QosExtensionBaseTestCase):
|
|||||||
self.qos_ext, '_process_update_policy') as update_mock:
|
self.qos_ext, '_process_update_policy') as update_mock:
|
||||||
|
|
||||||
policy_obj = mock.Mock()
|
policy_obj = mock.Mock()
|
||||||
self.qos_ext._handle_notification(policy_obj, events.UPDATED)
|
self.qos_ext._handle_notification([policy_obj], events.UPDATED)
|
||||||
update_mock.assert_called_with(policy_obj)
|
update_mock.assert_called_with(policy_obj)
|
||||||
|
|
||||||
def test__process_update_policy(self):
|
def test__process_update_policy(self):
|
||||||
|
@ -51,6 +51,6 @@ class ConsumerRegistryTestCase(base.BaseTestCase):
|
|||||||
callback2 = mock.Mock()
|
callback2 = mock.Mock()
|
||||||
callbacks = {callback1, callback2}
|
callbacks = {callback1, callback2}
|
||||||
manager_mock().get_callbacks.return_value = callbacks
|
manager_mock().get_callbacks.return_value = callbacks
|
||||||
registry.push(resource_type_, resource_, event_type_)
|
registry.push(resource_type_, [resource_], event_type_)
|
||||||
for callback in callbacks:
|
for callback in callbacks:
|
||||||
callback.assert_called_with(resource_, event_type_)
|
callback.assert_called_with([resource_], event_type_)
|
||||||
|
@ -28,30 +28,45 @@ from neutron.objects import base as objects_base
|
|||||||
from neutron.tests import base
|
from neutron.tests import base
|
||||||
|
|
||||||
|
|
||||||
|
TEST_EVENT = 'test_event'
|
||||||
|
TEST_VERSION = '1.0'
|
||||||
|
|
||||||
|
|
||||||
def _create_test_dict(uuid=None):
|
def _create_test_dict(uuid=None):
|
||||||
return {'id': uuid or uuidutils.generate_uuid(),
|
return {'id': uuid or uuidutils.generate_uuid(),
|
||||||
'field': 'foo'}
|
'field': 'foo'}
|
||||||
|
|
||||||
|
|
||||||
def _create_test_resource(context=None):
|
def _create_test_resource(context=None, resource_cls=None):
|
||||||
|
resource_cls = resource_cls or FakeResource
|
||||||
resource_dict = _create_test_dict()
|
resource_dict = _create_test_dict()
|
||||||
resource = FakeResource(context, **resource_dict)
|
resource = resource_cls(context, **resource_dict)
|
||||||
resource.obj_reset_changes()
|
resource.obj_reset_changes()
|
||||||
return resource
|
return resource
|
||||||
|
|
||||||
|
|
||||||
class FakeResource(objects_base.NeutronObject):
|
class BaseFakeResource(objects_base.NeutronObject):
|
||||||
# Version 1.0: Initial version
|
@classmethod
|
||||||
VERSION = '1.0'
|
def get_objects(cls, context, **kwargs):
|
||||||
|
return list()
|
||||||
|
|
||||||
|
|
||||||
|
class FakeResource(BaseFakeResource):
|
||||||
|
VERSION = TEST_VERSION
|
||||||
|
|
||||||
fields = {
|
fields = {
|
||||||
'id': obj_fields.UUIDField(),
|
'id': obj_fields.UUIDField(),
|
||||||
'field': obj_fields.StringField()
|
'field': obj_fields.StringField()
|
||||||
}
|
}
|
||||||
|
|
||||||
@classmethod
|
|
||||||
def get_objects(cls, context, **kwargs):
|
class FakeResource2(BaseFakeResource):
|
||||||
return list()
|
VERSION = TEST_VERSION
|
||||||
|
|
||||||
|
fields = {
|
||||||
|
'id': obj_fields.UUIDField(),
|
||||||
|
'field': obj_fields.StringField()
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
class ResourcesRpcBaseTestCase(base.BaseTestCase):
|
class ResourcesRpcBaseTestCase(base.BaseTestCase):
|
||||||
@ -63,6 +78,21 @@ class ResourcesRpcBaseTestCase(base.BaseTestCase):
|
|||||||
fixture.VersionedObjectRegistryFixture())
|
fixture.VersionedObjectRegistryFixture())
|
||||||
|
|
||||||
self.context = context.get_admin_context()
|
self.context = context.get_admin_context()
|
||||||
|
mock.patch.object(resources_rpc.resources,
|
||||||
|
'is_valid_resource_type').start()
|
||||||
|
mock.patch.object(resources_rpc.resources, 'get_resource_cls',
|
||||||
|
side_effect=self._get_resource_cls).start()
|
||||||
|
|
||||||
|
self.resource_objs = [_create_test_resource(self.context)
|
||||||
|
for _ in range(2)]
|
||||||
|
self.resource_objs2 = [_create_test_resource(self.context,
|
||||||
|
FakeResource2)
|
||||||
|
for _ in range(2)]
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _get_resource_cls(resource_type):
|
||||||
|
return {FakeResource.obj_name(): FakeResource,
|
||||||
|
FakeResource2.obj_name(): FakeResource2}.get(resource_type)
|
||||||
|
|
||||||
|
|
||||||
class _ValidateResourceTypeTestCase(base.BaseTestCase):
|
class _ValidateResourceTypeTestCase(base.BaseTestCase):
|
||||||
@ -99,9 +129,6 @@ class ResourcesPullRpcApiTestCase(ResourcesRpcBaseTestCase):
|
|||||||
|
|
||||||
def setUp(self):
|
def setUp(self):
|
||||||
super(ResourcesPullRpcApiTestCase, self).setUp()
|
super(ResourcesPullRpcApiTestCase, self).setUp()
|
||||||
mock.patch.object(resources_rpc, '_validate_resource_type').start()
|
|
||||||
mock.patch('neutron.api.rpc.callbacks.resources.get_resource_cls',
|
|
||||||
return_value=FakeResource).start()
|
|
||||||
self.rpc = resources_rpc.ResourcesPullRpcApi()
|
self.rpc = resources_rpc.ResourcesPullRpcApi()
|
||||||
mock.patch.object(self.rpc, 'client').start()
|
mock.patch.object(self.rpc, 'client').start()
|
||||||
self.cctxt_mock = self.rpc.client.prepare.return_value
|
self.cctxt_mock = self.rpc.client.prepare.return_value
|
||||||
@ -120,7 +147,7 @@ class ResourcesPullRpcApiTestCase(ResourcesRpcBaseTestCase):
|
|||||||
|
|
||||||
self.cctxt_mock.call.assert_called_once_with(
|
self.cctxt_mock.call.assert_called_once_with(
|
||||||
self.context, 'pull', resource_type='FakeResource',
|
self.context, 'pull', resource_type='FakeResource',
|
||||||
version=FakeResource.VERSION, resource_id=resource_id)
|
version=TEST_VERSION, resource_id=resource_id)
|
||||||
self.assertEqual(expected_obj, result)
|
self.assertEqual(expected_obj, result)
|
||||||
|
|
||||||
def test_pull_resource_not_found(self):
|
def test_pull_resource_not_found(self):
|
||||||
@ -162,7 +189,7 @@ class ResourcesPullRpcCallbackTestCase(ResourcesRpcBaseTestCase):
|
|||||||
return_value=self.resource_obj) as registry_mock:
|
return_value=self.resource_obj) as registry_mock:
|
||||||
primitive = self.callbacks.pull(
|
primitive = self.callbacks.pull(
|
||||||
self.context, resource_type=FakeResource.obj_name(),
|
self.context, resource_type=FakeResource.obj_name(),
|
||||||
version=FakeResource.VERSION,
|
version=TEST_VERSION,
|
||||||
resource_id=self.resource_obj.id)
|
resource_id=self.resource_obj.id)
|
||||||
registry_mock.assert_called_once_with(
|
registry_mock.assert_called_once_with(
|
||||||
'FakeResource', self.resource_obj.id, context=self.context)
|
'FakeResource', self.resource_obj.id, context=self.context)
|
||||||
@ -182,58 +209,96 @@ class ResourcesPullRpcCallbackTestCase(ResourcesRpcBaseTestCase):
|
|||||||
|
|
||||||
|
|
||||||
class ResourcesPushRpcApiTestCase(ResourcesRpcBaseTestCase):
|
class ResourcesPushRpcApiTestCase(ResourcesRpcBaseTestCase):
|
||||||
|
"""Tests the neutron server side of the RPC interface."""
|
||||||
|
|
||||||
def setUp(self):
|
def setUp(self):
|
||||||
super(ResourcesPushRpcApiTestCase, self).setUp()
|
super(ResourcesPushRpcApiTestCase, self).setUp()
|
||||||
mock.patch.object(resources_rpc.n_rpc, 'get_client').start()
|
mock.patch.object(resources_rpc.n_rpc, 'get_client').start()
|
||||||
mock.patch.object(resources_rpc, '_validate_resource_type').start()
|
|
||||||
self.rpc = resources_rpc.ResourcesPushRpcApi()
|
self.rpc = resources_rpc.ResourcesPushRpcApi()
|
||||||
self.cctxt_mock = self.rpc.client.prepare.return_value
|
self.cctxt_mock = self.rpc.client.prepare.return_value
|
||||||
self.resource_obj = _create_test_resource(self.context)
|
mock.patch.object(version_manager, 'get_resource_versions',
|
||||||
|
return_value=set([TEST_VERSION])).start()
|
||||||
|
|
||||||
def test__prepare_object_fanout_context(self):
|
def test__prepare_object_fanout_context(self):
|
||||||
expected_topic = topics.RESOURCE_TOPIC_PATTERN % {
|
expected_topic = topics.RESOURCE_TOPIC_PATTERN % {
|
||||||
'resource_type': resources.get_resource_type(self.resource_obj),
|
'resource_type': resources.get_resource_type(
|
||||||
'version': self.resource_obj.VERSION}
|
self.resource_objs[0]),
|
||||||
|
'version': TEST_VERSION}
|
||||||
|
|
||||||
with mock.patch.object(resources_rpc.resources, 'get_resource_cls',
|
observed = self.rpc._prepare_object_fanout_context(
|
||||||
return_value=FakeResource):
|
self.resource_objs[0], self.resource_objs[0].VERSION, '1.0')
|
||||||
observed = self.rpc._prepare_object_fanout_context(
|
|
||||||
self.resource_obj, self.resource_obj.VERSION)
|
|
||||||
|
|
||||||
self.rpc.client.prepare.assert_called_once_with(
|
self.rpc.client.prepare.assert_called_once_with(
|
||||||
fanout=True, topic=expected_topic)
|
fanout=True, topic=expected_topic, version='1.0')
|
||||||
self.assertEqual(self.cctxt_mock, observed)
|
self.assertEqual(self.cctxt_mock, observed)
|
||||||
|
|
||||||
def test_pushy(self):
|
def test_push_single_type(self):
|
||||||
with mock.patch.object(resources_rpc.resources, 'get_resource_cls',
|
self.rpc.push(
|
||||||
return_value=FakeResource):
|
self.context, self.resource_objs, TEST_EVENT)
|
||||||
with mock.patch.object(version_manager, 'get_resource_versions',
|
|
||||||
return_value=set([FakeResource.VERSION])):
|
|
||||||
self.rpc.push(
|
|
||||||
self.context, self.resource_obj, 'TYPE')
|
|
||||||
|
|
||||||
self.cctxt_mock.cast.assert_called_once_with(
|
self.cctxt_mock.cast.assert_called_once_with(
|
||||||
self.context, 'push',
|
self.context, 'push',
|
||||||
resource=self.resource_obj.obj_to_primitive(),
|
resource_list=[resource.obj_to_primitive()
|
||||||
event_type='TYPE')
|
for resource in self.resource_objs],
|
||||||
|
event_type=TEST_EVENT)
|
||||||
|
|
||||||
|
def test_push_mixed(self):
|
||||||
|
self.rpc.push(
|
||||||
|
self.context, self.resource_objs + self.resource_objs2,
|
||||||
|
event_type=TEST_EVENT)
|
||||||
|
|
||||||
|
self.cctxt_mock.cast.assert_any_call(
|
||||||
|
self.context, 'push',
|
||||||
|
resource_list=[resource.obj_to_primitive()
|
||||||
|
for resource in self.resource_objs],
|
||||||
|
event_type=TEST_EVENT)
|
||||||
|
|
||||||
|
self.cctxt_mock.cast.assert_any_call(
|
||||||
|
self.context, 'push',
|
||||||
|
resource_list=[resource.obj_to_primitive()
|
||||||
|
for resource in self.resource_objs2],
|
||||||
|
event_type=TEST_EVENT)
|
||||||
|
|
||||||
|
def test_push_mitaka_backwardscompat(self):
|
||||||
|
#TODO(mangelajo) remove in Ocata, since the 'resource' parameter
|
||||||
|
# is just for backwards compatibility with Mitaka
|
||||||
|
# agents.
|
||||||
|
self.rpc.push(
|
||||||
|
self.context, [self.resource_objs[0]], TEST_EVENT)
|
||||||
|
|
||||||
|
self.cctxt_mock.cast.assert_called_once_with(
|
||||||
|
self.context, 'push',
|
||||||
|
resource=self.resource_objs[0].obj_to_primitive(),
|
||||||
|
event_type=TEST_EVENT)
|
||||||
|
|
||||||
|
|
||||||
class ResourcesPushRpcCallbackTestCase(ResourcesRpcBaseTestCase):
|
class ResourcesPushRpcCallbackTestCase(ResourcesRpcBaseTestCase):
|
||||||
|
"""Tests the agent-side of the RPC interface."""
|
||||||
|
|
||||||
def setUp(self):
|
def setUp(self):
|
||||||
super(ResourcesPushRpcCallbackTestCase, self).setUp()
|
super(ResourcesPushRpcCallbackTestCase, self).setUp()
|
||||||
mock.patch.object(resources_rpc, '_validate_resource_type').start()
|
|
||||||
mock.patch.object(
|
|
||||||
resources_rpc.resources,
|
|
||||||
'get_resource_cls', return_value=FakeResource).start()
|
|
||||||
self.resource_obj = _create_test_resource(self.context)
|
|
||||||
self.resource_prim = self.resource_obj.obj_to_primitive()
|
|
||||||
self.callbacks = resources_rpc.ResourcesPushRpcCallback()
|
self.callbacks = resources_rpc.ResourcesPushRpcCallback()
|
||||||
|
|
||||||
@mock.patch.object(resources_rpc.cons_registry, 'push')
|
@mock.patch.object(resources_rpc.cons_registry, 'push')
|
||||||
def test_push(self, reg_push_mock):
|
def test_push(self, reg_push_mock):
|
||||||
self.obj_registry.register(FakeResource)
|
self.obj_registry.register(FakeResource)
|
||||||
self.callbacks.push(self.context, self.resource_prim, 'TYPE')
|
self.callbacks.push(self.context,
|
||||||
reg_push_mock.assert_called_once_with(self.resource_obj.obj_name(),
|
resource_list=[resource.obj_to_primitive()
|
||||||
self.resource_obj, 'TYPE')
|
for resource in self.resource_objs],
|
||||||
|
event_type=TEST_EVENT)
|
||||||
|
reg_push_mock.assert_called_once_with(self.resource_objs[0].obj_name(),
|
||||||
|
self.resource_objs,
|
||||||
|
TEST_EVENT)
|
||||||
|
|
||||||
|
@mock.patch.object(resources_rpc.cons_registry, 'push')
|
||||||
|
def test_push_mitaka_backwardscompat(self, reg_push_mock):
|
||||||
|
#TODO(mangelajo) remove in Ocata, since the 'resource' parameter
|
||||||
|
# is just for backwards compatibility with Mitaka
|
||||||
|
# agents.
|
||||||
|
self.obj_registry.register(FakeResource)
|
||||||
|
self.callbacks.push(self.context,
|
||||||
|
resource=self.resource_objs[0].obj_to_primitive(),
|
||||||
|
event_type=TEST_EVENT)
|
||||||
|
reg_push_mock.assert_called_once_with(self.resource_objs[0].obj_name(),
|
||||||
|
[self.resource_objs[0]],
|
||||||
|
TEST_EVENT)
|
||||||
|
@ -66,7 +66,7 @@ class TestQosDriversManager(TestQosDriversManagerBase):
|
|||||||
self.driver_manager = driver_mgr.QosServiceNotificationDriverManager()
|
self.driver_manager = driver_mgr.QosServiceNotificationDriverManager()
|
||||||
|
|
||||||
def _validate_registry_params(self, event_type, policy):
|
def _validate_registry_params(self, event_type, policy):
|
||||||
self.rpc_api.push.assert_called_with(self.context, policy,
|
self.rpc_api.push.assert_called_with(self.context, [policy],
|
||||||
event_type)
|
event_type)
|
||||||
|
|
||||||
def test_create_policy_default_configuration(self):
|
def test_create_policy_default_configuration(self):
|
||||||
|
@ -53,7 +53,7 @@ class TestQosRpcNotificationDriver(base.BaseQosTestCase):
|
|||||||
**self.rule_data['bandwidth_limit_rule'])
|
**self.rule_data['bandwidth_limit_rule'])
|
||||||
|
|
||||||
def _validate_push_params(self, event_type, policy):
|
def _validate_push_params(self, event_type, policy):
|
||||||
self.rpc_api.push.assert_called_once_with(self.context, policy,
|
self.rpc_api.push.assert_called_once_with(self.context, [policy],
|
||||||
event_type)
|
event_type)
|
||||||
|
|
||||||
def test_create_policy(self):
|
def test_create_policy(self):
|
||||||
|
Loading…
Reference in New Issue
Block a user