diff --git a/neutron/api/rpc/callbacks/exceptions.py b/neutron/api/rpc/callbacks/exceptions.py index c45615bca2b..87724ae419b 100644 --- a/neutron/api/rpc/callbacks/exceptions.py +++ b/neutron/api/rpc/callbacks/exceptions.py @@ -24,3 +24,7 @@ class CallbackNotFound(exceptions.NeutronException): class CallbacksMaxLimitReached(exceptions.NeutronException): message = _("Cannot add multiple callbacks for %(resource_type)s") + + +class VersionsCallbackNotFound(exceptions.NeutronException): + message = _("No versions callback provided in ResourceVersionsManager") diff --git a/neutron/api/rpc/callbacks/resources.py b/neutron/api/rpc/callbacks/resources.py index bde7aed9a7e..552f026d05c 100644 --- a/neutron/api/rpc/callbacks/resources.py +++ b/neutron/api/rpc/callbacks/resources.py @@ -30,6 +30,11 @@ _TYPE_TO_CLS_MAP = { QOS_POLICY: _QOS_POLICY_CLS, } +LOCAL_RESOURCE_VERSIONS = { + resource_type: cls.VERSION + for resource_type, cls in _TYPE_TO_CLS_MAP.items() +} + def get_resource_type(resource_cls): if not resource_cls: diff --git a/neutron/api/rpc/callbacks/version_manager.py b/neutron/api/rpc/callbacks/version_manager.py new file mode 100644 index 00000000000..409691ee1a1 --- /dev/null +++ b/neutron/api/rpc/callbacks/version_manager.py @@ -0,0 +1,189 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import collections +import copy +import time + +from oslo_log import log as logging + +from neutron.api.rpc.callbacks import resources + +LOG = logging.getLogger(__name__) + +VERSIONS_TTL = 60 + + +class ResourceConsumerTracker(object): + """Class to be provided back by consumer_versions_callback. + + This class is responsible for fetching the local versions of + resources, and letting the callback register every consumer's + resource version. + + Later on, this class can also be used to recalculate, for each + resource type, the collection of versions that are local or + known by one or more consumers. + """ + + def __init__(self): + # Initialize with the local (server) versions, as we always want + # to send those. Agents, as they upgrade, will need the latest version, + # and there is a corner case we'd not be covering otherwise: + # 1) one or several neutron-servers get disconnected from rpc (while + # running) + # 2) a new agent comes up, with the latest version and it reports + # 2 ways: + # a) via status report (which will be stored in the database) + # b) via fanout call to all neutron servers, this way, all of them + # get their version set updated right away without the need to + # re-fetch anything from the database. + # 3) the neutron-servers get back online to the rpc bus, but they + # lost the fanout message. + # + # TODO(mangelajo) To cover this case we may need a callback from oslo + # messaging to get notified about disconnections/reconnections to the + # rpc bus, invalidating the consumer version cache when we receive such + # callback. + self._versions = self._get_local_resource_versions() + self._versions_by_consumer = collections.defaultdict(dict) + self._needs_recalculation = False + + def _get_local_resource_versions(self): + local_resource_versions = collections.defaultdict(set) + for resource_type, version in ( + resources.LOCAL_RESOURCE_VERSIONS.items()): + local_resource_versions[resource_type].add(version) + return local_resource_versions + + # TODO(mangelajo): add locking with _recalculate_versions if we ever + # move out of green threads. + def _set_version(self, consumer_id, resource_type, version): + """Set or update a consumer resource type version.""" + self._versions[resource_type].add(version) + prev_version = ( + self._versions_by_consumer[consumer_id].get(resource_type, None)) + self._versions_by_consumer[consumer_id][resource_type] = version + if prev_version and (prev_version != version): + # If a version got updated/changed in a consumer, we need to + # recalculate the main dictionary of versions based on the + # new _versions_by_consumer. + # We defer the recalculation until every consumer version has + # been set for all of its resource types. + self._needs_recalculation = True + LOG.debug("Version for resource type %(resource_type)s changed " + "%(prev_version)s to %(version)s on " + "consumer %(consumer_id)s", + {'resource_type': resource_type, + 'version': version, + 'prev_version': prev_version, + 'consumer_id': consumer_id}) + + def set_versions(self, consumer_id, versions): + """Set or update an specific consumer resource types.""" + for resource_type, resource_version in versions.items(): + self._set_version(consumer_id, resource_type, + resource_version) + + def get_resource_versions(self, resource_type): + """Fetch the versions necessary to notify all consumers.""" + if self._needs_recalculation: + self._recalculate_versions() + self._needs_recalculation = False + + return copy.copy(self._versions[resource_type]) + + # TODO(mangelajo): Add locking if we ever move out of greenthreads. + def _recalculate_versions(self): + """Recalculate the _versions set. + + Re-fetch the local (server) versions and expand with consumers' + versions. + """ + versions = self._get_local_resource_versions() + for versions_dict in self._versions_by_consumer.values(): + for res_type, res_version in versions_dict.items(): + versions[res_type].add(res_version) + self._versions = versions + + +class CachedResourceConsumerTracker(object): + """This class takes care of the caching logic of versions.""" + + def __init__(self): + self._consumer_versions_callback = None + # This is TTL expiration time, 0 means it will be expired at start + self._expires_at = 0 + self._versions = ResourceConsumerTracker() + + def _update_consumer_versions(self): + if self._consumer_versions_callback: + new_tracker = ResourceConsumerTracker() + self._consumer_versions_callback(new_tracker) + self._versions = new_tracker + else: + pass # TODO(mangelajo): throw exception if callback not provided + + def _check_expiration(self): + if time.time() > self._expires_at: + self._update_consumer_versions() + self._expires_at = time.time() + VERSIONS_TTL + + def set_consumer_versions_callback(self, callback): + self._consumer_versions_callback = callback + + def get_resource_versions(self, resource_type): + self._check_expiration() + return self._versions.get_resource_versions(resource_type) + + def update_versions(self, consumer_id, resource_versions): + self._versions.set_versions(consumer_id, resource_versions) + + +cached_version_tracker = CachedResourceConsumerTracker() + + +def set_consumer_versions_callback(callback): + """Register a callback to retrieve the system consumer versions. + + Specific consumer logic has been decoupled from this, so we could reuse + in other places. + + The callback will receive a ResourceConsumerTracker object, + and the ResourceConsumerTracker methods must be used to provide + each consumer_id versions. Consumer ids can be obtained from this + module via the next functions: + * get_agent_consumer_id + """ + cached_version_tracker.set_consumer_versions_callback(callback) + + +def get_resource_versions(resource_type): + """Return the set of versions expected by the consumers of a resource.""" + return cached_version_tracker.get_resource_versions(resource_type) + + +def update_versions(consumer_id, resource_versions): + """Update the resources' versions for a consumer id.""" + cached_version_tracker.set_versions(consumer_id, resource_versions) + + +def get_agent_consumer_id(agent_type, agent_host): + """Return a consumer id string for an agent type + host tuple. + + The logic behind this function, is that, eventually we could have + consumers of RPC callbacks which are not agents, thus we want + to totally collate all the different consumer types and provide + unique consumer ids. + """ + return "%(agent_type)s@%(agent_host)s" % {'agent_type': agent_type, + 'agent_host': agent_host} diff --git a/neutron/api/rpc/handlers/resources_rpc.py b/neutron/api/rpc/handlers/resources_rpc.py index f49351184f3..dec46aff059 100755 --- a/neutron/api/rpc/handlers/resources_rpc.py +++ b/neutron/api/rpc/handlers/resources_rpc.py @@ -21,6 +21,7 @@ from neutron._i18n import _ from neutron.api.rpc.callbacks.consumer import registry as cons_registry from neutron.api.rpc.callbacks.producer import registry as prod_registry from neutron.api.rpc.callbacks import resources +from neutron.api.rpc.callbacks import version_manager from neutron.common import constants from neutron.common import exceptions from neutron.common import rpc as n_rpc @@ -49,11 +50,16 @@ def _validate_resource_type(resource_type): raise InvalidResourceTypeClass(resource_type=resource_type) -def resource_type_versioned_topic(resource_type): +def resource_type_versioned_topic(resource_type, version=None): + """Return the topic for a resource type. + + If no version is provided, the latest version of the object will + be used. + """ _validate_resource_type(resource_type) cls = resources.get_resource_cls(resource_type) return topics.RESOURCE_TOPIC_PATTERN % {'resource_type': resource_type, - 'version': cls.VERSION} + 'version': version or cls.VERSION} class ResourcesPullRpcApi(object): @@ -130,22 +136,23 @@ class ResourcesPushRpcApi(object): namespace=constants.RPC_NAMESPACE_RESOURCES) self.client = n_rpc.get_client(target) - def _prepare_object_fanout_context(self, obj): + def _prepare_object_fanout_context(self, obj, version): """Prepare fanout context, one topic per object type.""" - obj_topic = resource_type_versioned_topic(obj.obj_name()) + obj_topic = resource_type_versioned_topic(obj.obj_name(), version) return self.client.prepare(fanout=True, topic=obj_topic) @log_helpers.log_method_call def push(self, context, resource, event_type): resource_type = resources.get_resource_type(resource) _validate_resource_type(resource_type) - cctxt = self._prepare_object_fanout_context(resource) - #TODO(QoS): Push notifications for every known version once we have - # multiple of those - dehydrated_resource = resource.obj_to_primitive() - cctxt.cast(context, 'push', - resource=dehydrated_resource, - event_type=event_type) + versions = version_manager.get_resource_versions(resource_type) + for version in versions: + cctxt = self._prepare_object_fanout_context(resource, version) + dehydrated_resource = resource.obj_to_primitive( + target_version=version) + cctxt.cast(context, 'push', + resource=dehydrated_resource, + event_type=event_type) class ResourcesPushRpcCallback(object): diff --git a/neutron/tests/unit/api/rpc/callbacks/test_version_manager.py b/neutron/tests/unit/api/rpc/callbacks/test_version_manager.py new file mode 100644 index 00000000000..228e1621078 --- /dev/null +++ b/neutron/tests/unit/api/rpc/callbacks/test_version_manager.py @@ -0,0 +1,145 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import mock + +from neutron.api.rpc.callbacks import exceptions +from neutron.api.rpc.callbacks import version_manager +from neutron.tests import base + + +TEST_RESOURCE_TYPE = 'TestResourceType' +TEST_RESOURCE_VERSION_A = '1.11' +TEST_RESOURCE_VERSION_B = '1.12' + +TEST_RESOURCE_TYPE_2 = 'AnotherResource' + +AGENT_HOST_1 = 'host-1' +AGENT_HOST_2 = 'host-2' +AGENT_TYPE_1 = 'dhcp-agent' +AGENT_TYPE_2 = 'openvswitch-agent' +CONSUMER_1 = version_manager.get_agent_consumer_id(AGENT_HOST_1, AGENT_TYPE_1) +CONSUMER_2 = version_manager.get_agent_consumer_id(AGENT_HOST_2, AGENT_TYPE_2) + + +class ResourceConsumerTrackerTest(base.BaseTestCase): + + def test_consumer_set_versions(self): + cv = version_manager.ResourceConsumerTracker() + cv.set_versions(CONSUMER_1, {TEST_RESOURCE_TYPE: + TEST_RESOURCE_VERSION_A}) + self.assertIn(TEST_RESOURCE_VERSION_A, + cv.get_resource_versions(TEST_RESOURCE_TYPE)) + + def test_consumer_updates_version(self): + cv = version_manager.ResourceConsumerTracker() + for version in [TEST_RESOURCE_VERSION_A, TEST_RESOURCE_VERSION_B]: + cv.set_versions(CONSUMER_1, {TEST_RESOURCE_TYPE: version}) + + self.assertEqual(set([TEST_RESOURCE_VERSION_B]), + cv.get_resource_versions(TEST_RESOURCE_TYPE)) + + def test_multiple_consumer_version_update(self): + cv = version_manager.ResourceConsumerTracker() + cv.set_versions(CONSUMER_1, {TEST_RESOURCE_TYPE: + TEST_RESOURCE_VERSION_A}) + cv.set_versions(CONSUMER_2, {TEST_RESOURCE_TYPE: + TEST_RESOURCE_VERSION_A}) + cv.set_versions(CONSUMER_1, {TEST_RESOURCE_TYPE: + TEST_RESOURCE_VERSION_B}) + + self.assertEqual(set([TEST_RESOURCE_VERSION_A, + TEST_RESOURCE_VERSION_B]), + cv.get_resource_versions(TEST_RESOURCE_TYPE)) + + def test_different_adds_triggers_recalculation(self): + cv = version_manager.ResourceConsumerTracker() + + for version in [TEST_RESOURCE_VERSION_A, TEST_RESOURCE_VERSION_B]: + cv.set_versions(CONSUMER_1, {TEST_RESOURCE_TYPE: version}) + + self.assertTrue(cv._needs_recalculation) + cv._recalculate_versions = mock.Mock() + cv.get_resource_versions(TEST_RESOURCE_TYPE) + cv._recalculate_versions.assert_called_once_with() + + +class CachedResourceConsumerTrackerTest(base.BaseTestCase): + + # TODO(mangelajo): re-enable once we provide the callback + def _test_exception_with_no_callback(self): + cached_tracker = version_manager.CachedResourceConsumerTracker() + self.assertRaises( + exceptions.VersionsCallbackNotFound, + cached_tracker.get_resource_versions, [mock.ANY]) + + def _set_consumer_versions_callback(self, cached_tracker): + def consumer_versions(rct): + rct.set_versions(CONSUMER_1, + {TEST_RESOURCE_TYPE: TEST_RESOURCE_VERSION_A}) + + cached_tracker.set_consumer_versions_callback(consumer_versions) + + def test_consumer_versions_callback(self): + cached_tracker = version_manager.CachedResourceConsumerTracker() + self._set_consumer_versions_callback(cached_tracker) + + self.assertIn(TEST_RESOURCE_VERSION_A, + cached_tracker.get_resource_versions( + TEST_RESOURCE_TYPE)) + + def test_update_versions(self): + cached_tracker = version_manager.CachedResourceConsumerTracker() + self._set_consumer_versions_callback(cached_tracker) + + initial_versions = cached_tracker.get_resource_versions( + TEST_RESOURCE_TYPE) + + initial_versions_2 = cached_tracker.get_resource_versions( + TEST_RESOURCE_TYPE_2) + + cached_tracker.update_versions( + CONSUMER_1, {TEST_RESOURCE_TYPE: TEST_RESOURCE_VERSION_B, + TEST_RESOURCE_TYPE_2: TEST_RESOURCE_VERSION_A}) + + final_versions = cached_tracker.get_resource_versions( + TEST_RESOURCE_TYPE) + final_versions_2 = cached_tracker.get_resource_versions( + TEST_RESOURCE_TYPE_2) + + self.assertNotEqual(initial_versions, final_versions) + self.assertNotEqual(initial_versions_2, final_versions_2) + + def test_versions_ttl(self): + self.refreshed = False + + def consumer_versions_callback(consumer_tracker): + consumer_tracker.set_versions( + CONSUMER_1, {TEST_RESOURCE_TYPE: TEST_RESOURCE_VERSION_A}) + self.refreshed = True + + cached_tracker = version_manager.CachedResourceConsumerTracker() + cached_tracker.set_consumer_versions_callback( + consumer_versions_callback) + with mock.patch('time.time') as time_patch: + time_patch.return_value = 1 + cached_tracker.get_resource_versions(TEST_RESOURCE_TYPE) + self.assertTrue(self.refreshed) + self.refreshed = False + + time_patch.return_value = 2 + cached_tracker.get_resource_versions(TEST_RESOURCE_TYPE) + self.assertFalse(self.refreshed) + + time_patch.return_value = 2 + version_manager.VERSIONS_TTL + cached_tracker.get_resource_versions(TEST_RESOURCE_TYPE) + self.assertTrue(self.refreshed) diff --git a/neutron/tests/unit/api/rpc/handlers/test_resources_rpc.py b/neutron/tests/unit/api/rpc/handlers/test_resources_rpc.py index bd32fe01bd1..4095bc711f5 100755 --- a/neutron/tests/unit/api/rpc/handlers/test_resources_rpc.py +++ b/neutron/tests/unit/api/rpc/handlers/test_resources_rpc.py @@ -21,6 +21,7 @@ from oslo_versionedobjects import fields as obj_fields import testtools from neutron.api.rpc.callbacks import resources +from neutron.api.rpc.callbacks import version_manager from neutron.api.rpc.handlers import resources_rpc from neutron.common import topics from neutron import context @@ -194,7 +195,7 @@ class ResourcesPushRpcApiTestCase(ResourcesRpcBaseTestCase): with mock.patch.object(resources_rpc.resources, 'get_resource_cls', return_value=FakeResource): observed = self.rpc._prepare_object_fanout_context( - self.resource_obj) + self.resource_obj, self.resource_obj.VERSION) self.rpc.client.prepare.assert_called_once_with( fanout=True, topic=expected_topic) @@ -203,8 +204,10 @@ class ResourcesPushRpcApiTestCase(ResourcesRpcBaseTestCase): def test_pushy(self): with mock.patch.object(resources_rpc.resources, 'get_resource_cls', return_value=FakeResource): - self.rpc.push( - self.context, self.resource_obj, 'TYPE') + with mock.patch.object(version_manager, 'get_resource_versions', + return_value=set([FakeResource.VERSION])): + self.rpc.push( + self.context, self.resource_obj, 'TYPE') self.cctxt_mock.cast.assert_called_once_with( self.context, 'push',