Merge "RPC Callback rolling upgrades logic"
This commit is contained in:
@@ -24,3 +24,7 @@ class CallbackNotFound(exceptions.NeutronException):
|
||||
|
||||
class CallbacksMaxLimitReached(exceptions.NeutronException):
|
||||
message = _("Cannot add multiple callbacks for %(resource_type)s")
|
||||
|
||||
|
||||
class VersionsCallbackNotFound(exceptions.NeutronException):
|
||||
message = _("No versions callback provided in ResourceVersionsManager")
|
||||
|
||||
@@ -30,6 +30,11 @@ _TYPE_TO_CLS_MAP = {
|
||||
QOS_POLICY: _QOS_POLICY_CLS,
|
||||
}
|
||||
|
||||
LOCAL_RESOURCE_VERSIONS = {
|
||||
resource_type: cls.VERSION
|
||||
for resource_type, cls in _TYPE_TO_CLS_MAP.items()
|
||||
}
|
||||
|
||||
|
||||
def get_resource_type(resource_cls):
|
||||
if not resource_cls:
|
||||
|
||||
189
neutron/api/rpc/callbacks/version_manager.py
Normal file
189
neutron/api/rpc/callbacks/version_manager.py
Normal file
@@ -0,0 +1,189 @@
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import collections
|
||||
import copy
|
||||
import time
|
||||
|
||||
from oslo_log import log as logging
|
||||
|
||||
from neutron.api.rpc.callbacks import resources
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
VERSIONS_TTL = 60
|
||||
|
||||
|
||||
class ResourceConsumerTracker(object):
|
||||
"""Class to be provided back by consumer_versions_callback.
|
||||
|
||||
This class is responsible for fetching the local versions of
|
||||
resources, and letting the callback register every consumer's
|
||||
resource version.
|
||||
|
||||
Later on, this class can also be used to recalculate, for each
|
||||
resource type, the collection of versions that are local or
|
||||
known by one or more consumers.
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
# Initialize with the local (server) versions, as we always want
|
||||
# to send those. Agents, as they upgrade, will need the latest version,
|
||||
# and there is a corner case we'd not be covering otherwise:
|
||||
# 1) one or several neutron-servers get disconnected from rpc (while
|
||||
# running)
|
||||
# 2) a new agent comes up, with the latest version and it reports
|
||||
# 2 ways:
|
||||
# a) via status report (which will be stored in the database)
|
||||
# b) via fanout call to all neutron servers, this way, all of them
|
||||
# get their version set updated right away without the need to
|
||||
# re-fetch anything from the database.
|
||||
# 3) the neutron-servers get back online to the rpc bus, but they
|
||||
# lost the fanout message.
|
||||
#
|
||||
# TODO(mangelajo) To cover this case we may need a callback from oslo
|
||||
# messaging to get notified about disconnections/reconnections to the
|
||||
# rpc bus, invalidating the consumer version cache when we receive such
|
||||
# callback.
|
||||
self._versions = self._get_local_resource_versions()
|
||||
self._versions_by_consumer = collections.defaultdict(dict)
|
||||
self._needs_recalculation = False
|
||||
|
||||
def _get_local_resource_versions(self):
|
||||
local_resource_versions = collections.defaultdict(set)
|
||||
for resource_type, version in (
|
||||
resources.LOCAL_RESOURCE_VERSIONS.items()):
|
||||
local_resource_versions[resource_type].add(version)
|
||||
return local_resource_versions
|
||||
|
||||
# TODO(mangelajo): add locking with _recalculate_versions if we ever
|
||||
# move out of green threads.
|
||||
def _set_version(self, consumer_id, resource_type, version):
|
||||
"""Set or update a consumer resource type version."""
|
||||
self._versions[resource_type].add(version)
|
||||
prev_version = (
|
||||
self._versions_by_consumer[consumer_id].get(resource_type, None))
|
||||
self._versions_by_consumer[consumer_id][resource_type] = version
|
||||
if prev_version and (prev_version != version):
|
||||
# If a version got updated/changed in a consumer, we need to
|
||||
# recalculate the main dictionary of versions based on the
|
||||
# new _versions_by_consumer.
|
||||
# We defer the recalculation until every consumer version has
|
||||
# been set for all of its resource types.
|
||||
self._needs_recalculation = True
|
||||
LOG.debug("Version for resource type %(resource_type)s changed "
|
||||
"%(prev_version)s to %(version)s on "
|
||||
"consumer %(consumer_id)s",
|
||||
{'resource_type': resource_type,
|
||||
'version': version,
|
||||
'prev_version': prev_version,
|
||||
'consumer_id': consumer_id})
|
||||
|
||||
def set_versions(self, consumer_id, versions):
|
||||
"""Set or update an specific consumer resource types."""
|
||||
for resource_type, resource_version in versions.items():
|
||||
self._set_version(consumer_id, resource_type,
|
||||
resource_version)
|
||||
|
||||
def get_resource_versions(self, resource_type):
|
||||
"""Fetch the versions necessary to notify all consumers."""
|
||||
if self._needs_recalculation:
|
||||
self._recalculate_versions()
|
||||
self._needs_recalculation = False
|
||||
|
||||
return copy.copy(self._versions[resource_type])
|
||||
|
||||
# TODO(mangelajo): Add locking if we ever move out of greenthreads.
|
||||
def _recalculate_versions(self):
|
||||
"""Recalculate the _versions set.
|
||||
|
||||
Re-fetch the local (server) versions and expand with consumers'
|
||||
versions.
|
||||
"""
|
||||
versions = self._get_local_resource_versions()
|
||||
for versions_dict in self._versions_by_consumer.values():
|
||||
for res_type, res_version in versions_dict.items():
|
||||
versions[res_type].add(res_version)
|
||||
self._versions = versions
|
||||
|
||||
|
||||
class CachedResourceConsumerTracker(object):
|
||||
"""This class takes care of the caching logic of versions."""
|
||||
|
||||
def __init__(self):
|
||||
self._consumer_versions_callback = None
|
||||
# This is TTL expiration time, 0 means it will be expired at start
|
||||
self._expires_at = 0
|
||||
self._versions = ResourceConsumerTracker()
|
||||
|
||||
def _update_consumer_versions(self):
|
||||
if self._consumer_versions_callback:
|
||||
new_tracker = ResourceConsumerTracker()
|
||||
self._consumer_versions_callback(new_tracker)
|
||||
self._versions = new_tracker
|
||||
else:
|
||||
pass # TODO(mangelajo): throw exception if callback not provided
|
||||
|
||||
def _check_expiration(self):
|
||||
if time.time() > self._expires_at:
|
||||
self._update_consumer_versions()
|
||||
self._expires_at = time.time() + VERSIONS_TTL
|
||||
|
||||
def set_consumer_versions_callback(self, callback):
|
||||
self._consumer_versions_callback = callback
|
||||
|
||||
def get_resource_versions(self, resource_type):
|
||||
self._check_expiration()
|
||||
return self._versions.get_resource_versions(resource_type)
|
||||
|
||||
def update_versions(self, consumer_id, resource_versions):
|
||||
self._versions.set_versions(consumer_id, resource_versions)
|
||||
|
||||
|
||||
cached_version_tracker = CachedResourceConsumerTracker()
|
||||
|
||||
|
||||
def set_consumer_versions_callback(callback):
|
||||
"""Register a callback to retrieve the system consumer versions.
|
||||
|
||||
Specific consumer logic has been decoupled from this, so we could reuse
|
||||
in other places.
|
||||
|
||||
The callback will receive a ResourceConsumerTracker object,
|
||||
and the ResourceConsumerTracker methods must be used to provide
|
||||
each consumer_id versions. Consumer ids can be obtained from this
|
||||
module via the next functions:
|
||||
* get_agent_consumer_id
|
||||
"""
|
||||
cached_version_tracker.set_consumer_versions_callback(callback)
|
||||
|
||||
|
||||
def get_resource_versions(resource_type):
|
||||
"""Return the set of versions expected by the consumers of a resource."""
|
||||
return cached_version_tracker.get_resource_versions(resource_type)
|
||||
|
||||
|
||||
def update_versions(consumer_id, resource_versions):
|
||||
"""Update the resources' versions for a consumer id."""
|
||||
cached_version_tracker.set_versions(consumer_id, resource_versions)
|
||||
|
||||
|
||||
def get_agent_consumer_id(agent_type, agent_host):
|
||||
"""Return a consumer id string for an agent type + host tuple.
|
||||
|
||||
The logic behind this function, is that, eventually we could have
|
||||
consumers of RPC callbacks which are not agents, thus we want
|
||||
to totally collate all the different consumer types and provide
|
||||
unique consumer ids.
|
||||
"""
|
||||
return "%(agent_type)s@%(agent_host)s" % {'agent_type': agent_type,
|
||||
'agent_host': agent_host}
|
||||
@@ -21,6 +21,7 @@ from neutron._i18n import _
|
||||
from neutron.api.rpc.callbacks.consumer import registry as cons_registry
|
||||
from neutron.api.rpc.callbacks.producer import registry as prod_registry
|
||||
from neutron.api.rpc.callbacks import resources
|
||||
from neutron.api.rpc.callbacks import version_manager
|
||||
from neutron.common import constants
|
||||
from neutron.common import exceptions
|
||||
from neutron.common import rpc as n_rpc
|
||||
@@ -49,11 +50,16 @@ def _validate_resource_type(resource_type):
|
||||
raise InvalidResourceTypeClass(resource_type=resource_type)
|
||||
|
||||
|
||||
def resource_type_versioned_topic(resource_type):
|
||||
def resource_type_versioned_topic(resource_type, version=None):
|
||||
"""Return the topic for a resource type.
|
||||
|
||||
If no version is provided, the latest version of the object will
|
||||
be used.
|
||||
"""
|
||||
_validate_resource_type(resource_type)
|
||||
cls = resources.get_resource_cls(resource_type)
|
||||
return topics.RESOURCE_TOPIC_PATTERN % {'resource_type': resource_type,
|
||||
'version': cls.VERSION}
|
||||
'version': version or cls.VERSION}
|
||||
|
||||
|
||||
class ResourcesPullRpcApi(object):
|
||||
@@ -130,22 +136,23 @@ class ResourcesPushRpcApi(object):
|
||||
namespace=constants.RPC_NAMESPACE_RESOURCES)
|
||||
self.client = n_rpc.get_client(target)
|
||||
|
||||
def _prepare_object_fanout_context(self, obj):
|
||||
def _prepare_object_fanout_context(self, obj, version):
|
||||
"""Prepare fanout context, one topic per object type."""
|
||||
obj_topic = resource_type_versioned_topic(obj.obj_name())
|
||||
obj_topic = resource_type_versioned_topic(obj.obj_name(), version)
|
||||
return self.client.prepare(fanout=True, topic=obj_topic)
|
||||
|
||||
@log_helpers.log_method_call
|
||||
def push(self, context, resource, event_type):
|
||||
resource_type = resources.get_resource_type(resource)
|
||||
_validate_resource_type(resource_type)
|
||||
cctxt = self._prepare_object_fanout_context(resource)
|
||||
#TODO(QoS): Push notifications for every known version once we have
|
||||
# multiple of those
|
||||
dehydrated_resource = resource.obj_to_primitive()
|
||||
cctxt.cast(context, 'push',
|
||||
resource=dehydrated_resource,
|
||||
event_type=event_type)
|
||||
versions = version_manager.get_resource_versions(resource_type)
|
||||
for version in versions:
|
||||
cctxt = self._prepare_object_fanout_context(resource, version)
|
||||
dehydrated_resource = resource.obj_to_primitive(
|
||||
target_version=version)
|
||||
cctxt.cast(context, 'push',
|
||||
resource=dehydrated_resource,
|
||||
event_type=event_type)
|
||||
|
||||
|
||||
class ResourcesPushRpcCallback(object):
|
||||
|
||||
145
neutron/tests/unit/api/rpc/callbacks/test_version_manager.py
Normal file
145
neutron/tests/unit/api/rpc/callbacks/test_version_manager.py
Normal file
@@ -0,0 +1,145 @@
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import mock
|
||||
|
||||
from neutron.api.rpc.callbacks import exceptions
|
||||
from neutron.api.rpc.callbacks import version_manager
|
||||
from neutron.tests import base
|
||||
|
||||
|
||||
TEST_RESOURCE_TYPE = 'TestResourceType'
|
||||
TEST_RESOURCE_VERSION_A = '1.11'
|
||||
TEST_RESOURCE_VERSION_B = '1.12'
|
||||
|
||||
TEST_RESOURCE_TYPE_2 = 'AnotherResource'
|
||||
|
||||
AGENT_HOST_1 = 'host-1'
|
||||
AGENT_HOST_2 = 'host-2'
|
||||
AGENT_TYPE_1 = 'dhcp-agent'
|
||||
AGENT_TYPE_2 = 'openvswitch-agent'
|
||||
CONSUMER_1 = version_manager.get_agent_consumer_id(AGENT_HOST_1, AGENT_TYPE_1)
|
||||
CONSUMER_2 = version_manager.get_agent_consumer_id(AGENT_HOST_2, AGENT_TYPE_2)
|
||||
|
||||
|
||||
class ResourceConsumerTrackerTest(base.BaseTestCase):
|
||||
|
||||
def test_consumer_set_versions(self):
|
||||
cv = version_manager.ResourceConsumerTracker()
|
||||
cv.set_versions(CONSUMER_1, {TEST_RESOURCE_TYPE:
|
||||
TEST_RESOURCE_VERSION_A})
|
||||
self.assertIn(TEST_RESOURCE_VERSION_A,
|
||||
cv.get_resource_versions(TEST_RESOURCE_TYPE))
|
||||
|
||||
def test_consumer_updates_version(self):
|
||||
cv = version_manager.ResourceConsumerTracker()
|
||||
for version in [TEST_RESOURCE_VERSION_A, TEST_RESOURCE_VERSION_B]:
|
||||
cv.set_versions(CONSUMER_1, {TEST_RESOURCE_TYPE: version})
|
||||
|
||||
self.assertEqual(set([TEST_RESOURCE_VERSION_B]),
|
||||
cv.get_resource_versions(TEST_RESOURCE_TYPE))
|
||||
|
||||
def test_multiple_consumer_version_update(self):
|
||||
cv = version_manager.ResourceConsumerTracker()
|
||||
cv.set_versions(CONSUMER_1, {TEST_RESOURCE_TYPE:
|
||||
TEST_RESOURCE_VERSION_A})
|
||||
cv.set_versions(CONSUMER_2, {TEST_RESOURCE_TYPE:
|
||||
TEST_RESOURCE_VERSION_A})
|
||||
cv.set_versions(CONSUMER_1, {TEST_RESOURCE_TYPE:
|
||||
TEST_RESOURCE_VERSION_B})
|
||||
|
||||
self.assertEqual(set([TEST_RESOURCE_VERSION_A,
|
||||
TEST_RESOURCE_VERSION_B]),
|
||||
cv.get_resource_versions(TEST_RESOURCE_TYPE))
|
||||
|
||||
def test_different_adds_triggers_recalculation(self):
|
||||
cv = version_manager.ResourceConsumerTracker()
|
||||
|
||||
for version in [TEST_RESOURCE_VERSION_A, TEST_RESOURCE_VERSION_B]:
|
||||
cv.set_versions(CONSUMER_1, {TEST_RESOURCE_TYPE: version})
|
||||
|
||||
self.assertTrue(cv._needs_recalculation)
|
||||
cv._recalculate_versions = mock.Mock()
|
||||
cv.get_resource_versions(TEST_RESOURCE_TYPE)
|
||||
cv._recalculate_versions.assert_called_once_with()
|
||||
|
||||
|
||||
class CachedResourceConsumerTrackerTest(base.BaseTestCase):
|
||||
|
||||
# TODO(mangelajo): re-enable once we provide the callback
|
||||
def _test_exception_with_no_callback(self):
|
||||
cached_tracker = version_manager.CachedResourceConsumerTracker()
|
||||
self.assertRaises(
|
||||
exceptions.VersionsCallbackNotFound,
|
||||
cached_tracker.get_resource_versions, [mock.ANY])
|
||||
|
||||
def _set_consumer_versions_callback(self, cached_tracker):
|
||||
def consumer_versions(rct):
|
||||
rct.set_versions(CONSUMER_1,
|
||||
{TEST_RESOURCE_TYPE: TEST_RESOURCE_VERSION_A})
|
||||
|
||||
cached_tracker.set_consumer_versions_callback(consumer_versions)
|
||||
|
||||
def test_consumer_versions_callback(self):
|
||||
cached_tracker = version_manager.CachedResourceConsumerTracker()
|
||||
self._set_consumer_versions_callback(cached_tracker)
|
||||
|
||||
self.assertIn(TEST_RESOURCE_VERSION_A,
|
||||
cached_tracker.get_resource_versions(
|
||||
TEST_RESOURCE_TYPE))
|
||||
|
||||
def test_update_versions(self):
|
||||
cached_tracker = version_manager.CachedResourceConsumerTracker()
|
||||
self._set_consumer_versions_callback(cached_tracker)
|
||||
|
||||
initial_versions = cached_tracker.get_resource_versions(
|
||||
TEST_RESOURCE_TYPE)
|
||||
|
||||
initial_versions_2 = cached_tracker.get_resource_versions(
|
||||
TEST_RESOURCE_TYPE_2)
|
||||
|
||||
cached_tracker.update_versions(
|
||||
CONSUMER_1, {TEST_RESOURCE_TYPE: TEST_RESOURCE_VERSION_B,
|
||||
TEST_RESOURCE_TYPE_2: TEST_RESOURCE_VERSION_A})
|
||||
|
||||
final_versions = cached_tracker.get_resource_versions(
|
||||
TEST_RESOURCE_TYPE)
|
||||
final_versions_2 = cached_tracker.get_resource_versions(
|
||||
TEST_RESOURCE_TYPE_2)
|
||||
|
||||
self.assertNotEqual(initial_versions, final_versions)
|
||||
self.assertNotEqual(initial_versions_2, final_versions_2)
|
||||
|
||||
def test_versions_ttl(self):
|
||||
self.refreshed = False
|
||||
|
||||
def consumer_versions_callback(consumer_tracker):
|
||||
consumer_tracker.set_versions(
|
||||
CONSUMER_1, {TEST_RESOURCE_TYPE: TEST_RESOURCE_VERSION_A})
|
||||
self.refreshed = True
|
||||
|
||||
cached_tracker = version_manager.CachedResourceConsumerTracker()
|
||||
cached_tracker.set_consumer_versions_callback(
|
||||
consumer_versions_callback)
|
||||
with mock.patch('time.time') as time_patch:
|
||||
time_patch.return_value = 1
|
||||
cached_tracker.get_resource_versions(TEST_RESOURCE_TYPE)
|
||||
self.assertTrue(self.refreshed)
|
||||
self.refreshed = False
|
||||
|
||||
time_patch.return_value = 2
|
||||
cached_tracker.get_resource_versions(TEST_RESOURCE_TYPE)
|
||||
self.assertFalse(self.refreshed)
|
||||
|
||||
time_patch.return_value = 2 + version_manager.VERSIONS_TTL
|
||||
cached_tracker.get_resource_versions(TEST_RESOURCE_TYPE)
|
||||
self.assertTrue(self.refreshed)
|
||||
@@ -21,6 +21,7 @@ from oslo_versionedobjects import fields as obj_fields
|
||||
import testtools
|
||||
|
||||
from neutron.api.rpc.callbacks import resources
|
||||
from neutron.api.rpc.callbacks import version_manager
|
||||
from neutron.api.rpc.handlers import resources_rpc
|
||||
from neutron.common import topics
|
||||
from neutron import context
|
||||
@@ -194,7 +195,7 @@ class ResourcesPushRpcApiTestCase(ResourcesRpcBaseTestCase):
|
||||
with mock.patch.object(resources_rpc.resources, 'get_resource_cls',
|
||||
return_value=FakeResource):
|
||||
observed = self.rpc._prepare_object_fanout_context(
|
||||
self.resource_obj)
|
||||
self.resource_obj, self.resource_obj.VERSION)
|
||||
|
||||
self.rpc.client.prepare.assert_called_once_with(
|
||||
fanout=True, topic=expected_topic)
|
||||
@@ -203,8 +204,10 @@ class ResourcesPushRpcApiTestCase(ResourcesRpcBaseTestCase):
|
||||
def test_pushy(self):
|
||||
with mock.patch.object(resources_rpc.resources, 'get_resource_cls',
|
||||
return_value=FakeResource):
|
||||
self.rpc.push(
|
||||
self.context, self.resource_obj, 'TYPE')
|
||||
with mock.patch.object(version_manager, 'get_resource_versions',
|
||||
return_value=set([FakeResource.VERSION])):
|
||||
self.rpc.push(
|
||||
self.context, self.resource_obj, 'TYPE')
|
||||
|
||||
self.cctxt_mock.cast.assert_called_once_with(
|
||||
self.context, 'push',
|
||||
|
||||
Reference in New Issue
Block a user