RPC Callback rolling upgrades logic

Introduces a version manager in the callbacks rpc, which tracks
the agents' known resource versions across the distributed system,
making rpc callback push notifications smart about which resource
versions need to be serialized and sent over the wire during a
cloud upgrade process.

Subsequent patches will implement the callback to fetch agent
resource versions from database, the status updates with
agent resource versions included, and the fast path agent to
neutron-servers fanout call with version details as defined
in change I02b694137eb2d58e5f2f3e7631f0e4b90f7c17ad

Related-Bug: #1535247

Change-Id: I3fb49ae6fe237a926225b508bc8f0286426bf532
This commit is contained in:
Miguel Angel Ajo 2016-01-08 17:47:01 +01:00
parent 2c599814fb
commit cb18881770
6 changed files with 367 additions and 14 deletions

View File

@ -24,3 +24,7 @@ class CallbackNotFound(exceptions.NeutronException):
class CallbacksMaxLimitReached(exceptions.NeutronException):
message = _("Cannot add multiple callbacks for %(resource_type)s")
class VersionsCallbackNotFound(exceptions.NeutronException):
message = _("No versions callback provided in ResourceVersionsManager")

View File

@ -30,6 +30,11 @@ _TYPE_TO_CLS_MAP = {
QOS_POLICY: _QOS_POLICY_CLS,
}
LOCAL_RESOURCE_VERSIONS = {
resource_type: cls.VERSION
for resource_type, cls in _TYPE_TO_CLS_MAP.items()
}
def get_resource_type(resource_cls):
if not resource_cls:

View File

@ -0,0 +1,189 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import collections
import copy
import time
from oslo_log import log as logging
from neutron.api.rpc.callbacks import resources
LOG = logging.getLogger(__name__)
VERSIONS_TTL = 60
class ResourceConsumerTracker(object):
"""Class to be provided back by consumer_versions_callback.
This class is responsible for fetching the local versions of
resources, and letting the callback register every consumer's
resource version.
Later on, this class can also be used to recalculate, for each
resource type, the collection of versions that are local or
known by one or more consumers.
"""
def __init__(self):
# Initialize with the local (server) versions, as we always want
# to send those. Agents, as they upgrade, will need the latest version,
# and there is a corner case we'd not be covering otherwise:
# 1) one or several neutron-servers get disconnected from rpc (while
# running)
# 2) a new agent comes up, with the latest version and it reports
# 2 ways:
# a) via status report (which will be stored in the database)
# b) via fanout call to all neutron servers, this way, all of them
# get their version set updated right away without the need to
# re-fetch anything from the database.
# 3) the neutron-servers get back online to the rpc bus, but they
# lost the fanout message.
#
# TODO(mangelajo) To cover this case we may need a callback from oslo
# messaging to get notified about disconnections/reconnections to the
# rpc bus, invalidating the consumer version cache when we receive such
# callback.
self._versions = self._get_local_resource_versions()
self._versions_by_consumer = collections.defaultdict(dict)
self._needs_recalculation = False
def _get_local_resource_versions(self):
local_resource_versions = collections.defaultdict(set)
for resource_type, version in (
resources.LOCAL_RESOURCE_VERSIONS.items()):
local_resource_versions[resource_type].add(version)
return local_resource_versions
# TODO(mangelajo): add locking with _recalculate_versions if we ever
# move out of green threads.
def _set_version(self, consumer_id, resource_type, version):
"""Set or update a consumer resource type version."""
self._versions[resource_type].add(version)
prev_version = (
self._versions_by_consumer[consumer_id].get(resource_type, None))
self._versions_by_consumer[consumer_id][resource_type] = version
if prev_version and (prev_version != version):
# If a version got updated/changed in a consumer, we need to
# recalculate the main dictionary of versions based on the
# new _versions_by_consumer.
# We defer the recalculation until every consumer version has
# been set for all of its resource types.
self._needs_recalculation = True
LOG.debug("Version for resource type %(resource_type)s changed "
"%(prev_version)s to %(version)s on "
"consumer %(consumer_id)s",
{'resource_type': resource_type,
'version': version,
'prev_version': prev_version,
'consumer_id': consumer_id})
def set_versions(self, consumer_id, versions):
"""Set or update an specific consumer resource types."""
for resource_type, resource_version in versions.items():
self._set_version(consumer_id, resource_type,
resource_version)
def get_resource_versions(self, resource_type):
"""Fetch the versions necessary to notify all consumers."""
if self._needs_recalculation:
self._recalculate_versions()
self._needs_recalculation = False
return copy.copy(self._versions[resource_type])
# TODO(mangelajo): Add locking if we ever move out of greenthreads.
def _recalculate_versions(self):
"""Recalculate the _versions set.
Re-fetch the local (server) versions and expand with consumers'
versions.
"""
versions = self._get_local_resource_versions()
for versions_dict in self._versions_by_consumer.values():
for res_type, res_version in versions_dict.items():
versions[res_type].add(res_version)
self._versions = versions
class CachedResourceConsumerTracker(object):
"""This class takes care of the caching logic of versions."""
def __init__(self):
self._consumer_versions_callback = None
# This is TTL expiration time, 0 means it will be expired at start
self._expires_at = 0
self._versions = ResourceConsumerTracker()
def _update_consumer_versions(self):
if self._consumer_versions_callback:
new_tracker = ResourceConsumerTracker()
self._consumer_versions_callback(new_tracker)
self._versions = new_tracker
else:
pass # TODO(mangelajo): throw exception if callback not provided
def _check_expiration(self):
if time.time() > self._expires_at:
self._update_consumer_versions()
self._expires_at = time.time() + VERSIONS_TTL
def set_consumer_versions_callback(self, callback):
self._consumer_versions_callback = callback
def get_resource_versions(self, resource_type):
self._check_expiration()
return self._versions.get_resource_versions(resource_type)
def update_versions(self, consumer_id, resource_versions):
self._versions.set_versions(consumer_id, resource_versions)
cached_version_tracker = CachedResourceConsumerTracker()
def set_consumer_versions_callback(callback):
"""Register a callback to retrieve the system consumer versions.
Specific consumer logic has been decoupled from this, so we could reuse
in other places.
The callback will receive a ResourceConsumerTracker object,
and the ResourceConsumerTracker methods must be used to provide
each consumer_id versions. Consumer ids can be obtained from this
module via the next functions:
* get_agent_consumer_id
"""
cached_version_tracker.set_consumer_versions_callback(callback)
def get_resource_versions(resource_type):
"""Return the set of versions expected by the consumers of a resource."""
return cached_version_tracker.get_resource_versions(resource_type)
def update_versions(consumer_id, resource_versions):
"""Update the resources' versions for a consumer id."""
cached_version_tracker.set_versions(consumer_id, resource_versions)
def get_agent_consumer_id(agent_type, agent_host):
"""Return a consumer id string for an agent type + host tuple.
The logic behind this function, is that, eventually we could have
consumers of RPC callbacks which are not agents, thus we want
to totally collate all the different consumer types and provide
unique consumer ids.
"""
return "%(agent_type)s@%(agent_host)s" % {'agent_type': agent_type,
'agent_host': agent_host}

View File

@ -21,6 +21,7 @@ from neutron._i18n import _
from neutron.api.rpc.callbacks.consumer import registry as cons_registry
from neutron.api.rpc.callbacks.producer import registry as prod_registry
from neutron.api.rpc.callbacks import resources
from neutron.api.rpc.callbacks import version_manager
from neutron.common import constants
from neutron.common import exceptions
from neutron.common import rpc as n_rpc
@ -49,11 +50,16 @@ def _validate_resource_type(resource_type):
raise InvalidResourceTypeClass(resource_type=resource_type)
def resource_type_versioned_topic(resource_type):
def resource_type_versioned_topic(resource_type, version=None):
"""Return the topic for a resource type.
If no version is provided, the latest version of the object will
be used.
"""
_validate_resource_type(resource_type)
cls = resources.get_resource_cls(resource_type)
return topics.RESOURCE_TOPIC_PATTERN % {'resource_type': resource_type,
'version': cls.VERSION}
'version': version or cls.VERSION}
class ResourcesPullRpcApi(object):
@ -130,22 +136,23 @@ class ResourcesPushRpcApi(object):
namespace=constants.RPC_NAMESPACE_RESOURCES)
self.client = n_rpc.get_client(target)
def _prepare_object_fanout_context(self, obj):
def _prepare_object_fanout_context(self, obj, version):
"""Prepare fanout context, one topic per object type."""
obj_topic = resource_type_versioned_topic(obj.obj_name())
obj_topic = resource_type_versioned_topic(obj.obj_name(), version)
return self.client.prepare(fanout=True, topic=obj_topic)
@log_helpers.log_method_call
def push(self, context, resource, event_type):
resource_type = resources.get_resource_type(resource)
_validate_resource_type(resource_type)
cctxt = self._prepare_object_fanout_context(resource)
#TODO(QoS): Push notifications for every known version once we have
# multiple of those
dehydrated_resource = resource.obj_to_primitive()
cctxt.cast(context, 'push',
resource=dehydrated_resource,
event_type=event_type)
versions = version_manager.get_resource_versions(resource_type)
for version in versions:
cctxt = self._prepare_object_fanout_context(resource, version)
dehydrated_resource = resource.obj_to_primitive(
target_version=version)
cctxt.cast(context, 'push',
resource=dehydrated_resource,
event_type=event_type)
class ResourcesPushRpcCallback(object):

View File

@ -0,0 +1,145 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from neutron.api.rpc.callbacks import exceptions
from neutron.api.rpc.callbacks import version_manager
from neutron.tests import base
TEST_RESOURCE_TYPE = 'TestResourceType'
TEST_RESOURCE_VERSION_A = '1.11'
TEST_RESOURCE_VERSION_B = '1.12'
TEST_RESOURCE_TYPE_2 = 'AnotherResource'
AGENT_HOST_1 = 'host-1'
AGENT_HOST_2 = 'host-2'
AGENT_TYPE_1 = 'dhcp-agent'
AGENT_TYPE_2 = 'openvswitch-agent'
CONSUMER_1 = version_manager.get_agent_consumer_id(AGENT_HOST_1, AGENT_TYPE_1)
CONSUMER_2 = version_manager.get_agent_consumer_id(AGENT_HOST_2, AGENT_TYPE_2)
class ResourceConsumerTrackerTest(base.BaseTestCase):
def test_consumer_set_versions(self):
cv = version_manager.ResourceConsumerTracker()
cv.set_versions(CONSUMER_1, {TEST_RESOURCE_TYPE:
TEST_RESOURCE_VERSION_A})
self.assertIn(TEST_RESOURCE_VERSION_A,
cv.get_resource_versions(TEST_RESOURCE_TYPE))
def test_consumer_updates_version(self):
cv = version_manager.ResourceConsumerTracker()
for version in [TEST_RESOURCE_VERSION_A, TEST_RESOURCE_VERSION_B]:
cv.set_versions(CONSUMER_1, {TEST_RESOURCE_TYPE: version})
self.assertEqual(set([TEST_RESOURCE_VERSION_B]),
cv.get_resource_versions(TEST_RESOURCE_TYPE))
def test_multiple_consumer_version_update(self):
cv = version_manager.ResourceConsumerTracker()
cv.set_versions(CONSUMER_1, {TEST_RESOURCE_TYPE:
TEST_RESOURCE_VERSION_A})
cv.set_versions(CONSUMER_2, {TEST_RESOURCE_TYPE:
TEST_RESOURCE_VERSION_A})
cv.set_versions(CONSUMER_1, {TEST_RESOURCE_TYPE:
TEST_RESOURCE_VERSION_B})
self.assertEqual(set([TEST_RESOURCE_VERSION_A,
TEST_RESOURCE_VERSION_B]),
cv.get_resource_versions(TEST_RESOURCE_TYPE))
def test_different_adds_triggers_recalculation(self):
cv = version_manager.ResourceConsumerTracker()
for version in [TEST_RESOURCE_VERSION_A, TEST_RESOURCE_VERSION_B]:
cv.set_versions(CONSUMER_1, {TEST_RESOURCE_TYPE: version})
self.assertTrue(cv._needs_recalculation)
cv._recalculate_versions = mock.Mock()
cv.get_resource_versions(TEST_RESOURCE_TYPE)
cv._recalculate_versions.assert_called_once_with()
class CachedResourceConsumerTrackerTest(base.BaseTestCase):
# TODO(mangelajo): re-enable once we provide the callback
def _test_exception_with_no_callback(self):
cached_tracker = version_manager.CachedResourceConsumerTracker()
self.assertRaises(
exceptions.VersionsCallbackNotFound,
cached_tracker.get_resource_versions, [mock.ANY])
def _set_consumer_versions_callback(self, cached_tracker):
def consumer_versions(rct):
rct.set_versions(CONSUMER_1,
{TEST_RESOURCE_TYPE: TEST_RESOURCE_VERSION_A})
cached_tracker.set_consumer_versions_callback(consumer_versions)
def test_consumer_versions_callback(self):
cached_tracker = version_manager.CachedResourceConsumerTracker()
self._set_consumer_versions_callback(cached_tracker)
self.assertIn(TEST_RESOURCE_VERSION_A,
cached_tracker.get_resource_versions(
TEST_RESOURCE_TYPE))
def test_update_versions(self):
cached_tracker = version_manager.CachedResourceConsumerTracker()
self._set_consumer_versions_callback(cached_tracker)
initial_versions = cached_tracker.get_resource_versions(
TEST_RESOURCE_TYPE)
initial_versions_2 = cached_tracker.get_resource_versions(
TEST_RESOURCE_TYPE_2)
cached_tracker.update_versions(
CONSUMER_1, {TEST_RESOURCE_TYPE: TEST_RESOURCE_VERSION_B,
TEST_RESOURCE_TYPE_2: TEST_RESOURCE_VERSION_A})
final_versions = cached_tracker.get_resource_versions(
TEST_RESOURCE_TYPE)
final_versions_2 = cached_tracker.get_resource_versions(
TEST_RESOURCE_TYPE_2)
self.assertNotEqual(initial_versions, final_versions)
self.assertNotEqual(initial_versions_2, final_versions_2)
def test_versions_ttl(self):
self.refreshed = False
def consumer_versions_callback(consumer_tracker):
consumer_tracker.set_versions(
CONSUMER_1, {TEST_RESOURCE_TYPE: TEST_RESOURCE_VERSION_A})
self.refreshed = True
cached_tracker = version_manager.CachedResourceConsumerTracker()
cached_tracker.set_consumer_versions_callback(
consumer_versions_callback)
with mock.patch('time.time') as time_patch:
time_patch.return_value = 1
cached_tracker.get_resource_versions(TEST_RESOURCE_TYPE)
self.assertTrue(self.refreshed)
self.refreshed = False
time_patch.return_value = 2
cached_tracker.get_resource_versions(TEST_RESOURCE_TYPE)
self.assertFalse(self.refreshed)
time_patch.return_value = 2 + version_manager.VERSIONS_TTL
cached_tracker.get_resource_versions(TEST_RESOURCE_TYPE)
self.assertTrue(self.refreshed)

View File

@ -21,6 +21,7 @@ from oslo_versionedobjects import fields as obj_fields
import testtools
from neutron.api.rpc.callbacks import resources
from neutron.api.rpc.callbacks import version_manager
from neutron.api.rpc.handlers import resources_rpc
from neutron.common import topics
from neutron import context
@ -194,7 +195,7 @@ class ResourcesPushRpcApiTestCase(ResourcesRpcBaseTestCase):
with mock.patch.object(resources_rpc.resources, 'get_resource_cls',
return_value=FakeResource):
observed = self.rpc._prepare_object_fanout_context(
self.resource_obj)
self.resource_obj, self.resource_obj.VERSION)
self.rpc.client.prepare.assert_called_once_with(
fanout=True, topic=expected_topic)
@ -203,8 +204,10 @@ class ResourcesPushRpcApiTestCase(ResourcesRpcBaseTestCase):
def test_pushy(self):
with mock.patch.object(resources_rpc.resources, 'get_resource_cls',
return_value=FakeResource):
self.rpc.push(
self.context, self.resource_obj, 'TYPE')
with mock.patch.object(version_manager, 'get_resource_versions',
return_value=set([FakeResource.VERSION])):
self.rpc.push(
self.context, self.resource_obj, 'TYPE')
self.cctxt_mock.cast.assert_called_once_with(
self.context, 'push',