Refactor the rpc callback version discovery mechanism

The previous version depended on the AgentDbMixin to be loaded by
any plugin, and also introduced an __init__ on the mixin which
was problematic: mixins are expected to be classes which add methods
to another class, but to implement no constructor. One of the plugins
had one of the elements of MRO not calling to super().__init__ and
hence not triggering this __init__ method.

This change requires the plugins using the rpc callback mechanism
to provide the AgentDbMixin which is used to refresh cache of known
resource consumers (agents) and versions on demand, this way
we make it more clear that the rpc_callback api is currently designed
to be used with agents only, despite of its extensibility to other
areas.

Change-Id: Ie96b52dbe3a1f32cd4c11de8d8a5eff663fbf7f6
Related-Bug: #1584204
This commit is contained in:
Miguel Angel Ajo 2016-05-30 10:34:05 -04:00
parent 4e39d3e070
commit 1d43dd217a
6 changed files with 76 additions and 62 deletions

View File

@ -154,6 +154,15 @@ Starting at Mitaka, any agent interested in versioned objects via this API
should report their resource/version tuples of interest (the resource type/
version pairs they're subscribed to).
The plugins interested in this RPC mechanism must inherit AgentDbMixin,
since this mechanism is only intended to be used from agents at the moment,
while it could be extended to be consumed from other components if necessary.
The AgentDbMixin provides::
def get_agents_resource_versions(self, tracker):
...
Caching mechanism
'''''''''''''''''
The version subset per object will be cached to avoid DB requests on every push

View File

@ -27,5 +27,7 @@ class CallbacksMaxLimitReached(exceptions.NeutronException):
message = _("Cannot add multiple callbacks for %(resource_type)s")
class VersionsCallbackNotFound(exceptions.NeutronException):
message = _("No versions callback provided in ResourceVersionsManager")
class NoAgentDbMixinImplemented(exceptions.NeutronException):
message = _("RPC callbacks mechanism needs the implementation of "
"AgentDbMixin in the plugin, as so far it's only designed "
"to work with agents")

View File

@ -20,6 +20,7 @@ from oslo_log import log as logging
from oslo_utils import importutils
from neutron.api.rpc.callbacks import exceptions
from neutron import manager
LOG = logging.getLogger(__name__)
@ -43,18 +44,25 @@ def _import_resources():
return importutils.import_module('neutron.api.rpc.callbacks.resources')
def _import_agents_db():
return importutils.import_module('neutron.db.agents_db')
AgentConsumer = collections.namedtuple('AgentConsumer', ['agent_type',
'host'])
AgentConsumer.__repr__ = lambda self: '%s@%s' % self
class ResourceConsumerTracker(object):
"""Class to be provided back by consumer_versions_callback.
"""Class passed down to collect consumer's resource versions.
This class is responsible for fetching the local versions of
resources, and letting the callback register every consumer's
resources, and letting the called function register every consumer's
resource version.
This class is passed down to the plugin get_agents_resource_versions
currently, as the only expected consumers are agents so far.
Later on, this class can also be used to recalculate, for each
resource type, the collection of versions that are local or
known by one or more consumers.
@ -119,7 +127,16 @@ class ResourceConsumerTracker(object):
'consumer': consumer})
def set_versions(self, consumer, versions):
"""Set or update an specific consumer resource types."""
"""Set or update an specific consumer resource types.
:param consumer: should be an AgentConsumer object, with agent_type
and host set. This acts as the unique ID for the
agent.
:param versions: should be a dictionary in the following format:
{'QosPolicy': '1.1',
'SecurityGroup': '1.0',
'Port': '1.0'}
"""
for resource_type, resource_version in versions.items():
self._set_version(consumer, resource_type,
resource_version)
@ -190,28 +207,31 @@ class CachedResourceConsumerTracker(object):
"""This class takes care of the caching logic of versions."""
def __init__(self):
self._consumer_versions_callback = None
# This is TTL expiration time, 0 means it will be expired at start
self._expires_at = 0
self._versions = ResourceConsumerTracker()
def _update_consumer_versions(self):
if self._consumer_versions_callback:
new_tracker = ResourceConsumerTracker()
self._consumer_versions_callback(new_tracker)
self._versions = new_tracker
self._versions.report()
new_tracker = ResourceConsumerTracker()
neutron_plugin = manager.NeutronManager.get_plugin()
agents_db = _import_agents_db()
# If you use RPC callbacks, your plugin needs to implement
# AgentsDbMixin so that we know which resource versions your
# agents consume via RPC, please note that rpc_callbacks are
# only designed to work with agents currently.
if isinstance(neutron_plugin, agents_db.AgentDbMixin):
neutron_plugin.get_agents_resource_versions(new_tracker)
else:
raise exceptions.VersionsCallbackNotFound()
raise exceptions.NoAgentDbMixinImplemented()
self._versions = new_tracker
self._versions.report()
def _check_expiration(self):
if time.time() > self._expires_at:
self._update_consumer_versions()
self._expires_at = time.time() + VERSIONS_TTL
def set_consumer_versions_callback(self, callback):
self._consumer_versions_callback = callback
def get_resource_versions(self, resource_type):
self._check_expiration()
return self._versions.get_resource_versions(resource_type)
@ -234,21 +254,6 @@ def _get_cached_tracker():
return _cached_version_tracker
def set_consumer_versions_callback(callback):
"""Register a callback to retrieve the system consumer versions.
Specific consumer logic has been decoupled from this, so we could reuse
in other places.
The callback will receive a ResourceConsumerTracker object,
and the ResourceConsumerTracker methods must be used to provide
each consumer versions. Consumer ids can be obtained from this
module via the next functions:
* get_agent_consumer_id
"""
_get_cached_tracker().set_consumer_versions_callback(callback)
def get_resource_versions(resource_type):
"""Return the set of versions expected by the consumers of a resource."""
return _get_cached_tracker().get_resource_versions(resource_type)

View File

@ -180,11 +180,6 @@ class AgentAvailabilityZoneMixin(az_ext.AvailabilityZonePluginBase):
class AgentDbMixin(ext_agent.AgentPluginBase, AgentAvailabilityZoneMixin):
"""Mixin class to add agent extension to db_base_plugin_v2."""
def __init__(self, *args, **kwargs):
version_manager.set_consumer_versions_callback(
self._get_agents_resource_versions)
super(AgentDbMixin, self).__init__(*args, **kwargs)
def _get_agent(self, context, id):
try:
agent = self._get_by_id(context, Agent, id)
@ -425,14 +420,15 @@ class AgentDbMixin(ext_agent.AgentPluginBase, AgentAvailabilityZoneMixin):
filters={'admin_state_up': [True]})
return filter(self.is_agent_considered_for_versions, up_agents)
def _get_agents_resource_versions(self, tracker):
def get_agents_resource_versions(self, tracker):
"""Get the known agent resource versions and update the tracker.
Receives a version_manager.ResourceConsumerTracker instance and it's
expected to look up in to the database and update every agent resource
versions.
This function looks up into the database and updates every agent
resource versions.
This method is called from version_manager when the cached information
has passed TTL.
:param tracker: receives a version_manager.ResourceConsumerTracker
"""
for agent in self._get_agents_considered_for_versions():
resource_versions = agent.get('resource_versions', {})

View File

@ -16,6 +16,7 @@ import mock
from neutron.api.rpc.callbacks import exceptions
from neutron.api.rpc.callbacks import resources
from neutron.api.rpc.callbacks import version_manager
from neutron.db import agents_db
from neutron.tests import base
@ -118,22 +119,32 @@ class ResourceConsumerTrackerTest(base.BaseTestCase):
class CachedResourceConsumerTrackerTest(base.BaseTestCase):
def test_exception_with_no_callback(self):
def setUp(self):
super(CachedResourceConsumerTrackerTest, self).setUp()
self.refreshed = False
class _FakePlugin(agents_db.AgentDbMixin):
@staticmethod
def get_agents_resource_versions(tracker):
self.refreshed = True
tracker.set_versions(CONSUMER_1,
{TEST_RESOURCE_TYPE: TEST_VERSION_A})
self.get_plugin = mock.patch('neutron.manager.NeutronManager'
'.get_plugin').start()
self.get_plugin.return_value = _FakePlugin()
def test_plugin_does_not_implement_agentsdb_exception(self):
self.get_plugin.return_value = object()
cached_tracker = version_manager.CachedResourceConsumerTracker()
self.assertRaises(
exceptions.VersionsCallbackNotFound,
cached_tracker.get_resource_versions, [mock.ANY])
def _set_consumer_versions_callback(self, cached_tracker):
def consumer_versions(rct):
rct.set_versions(CONSUMER_1,
{TEST_RESOURCE_TYPE: TEST_VERSION_A})
cached_tracker.set_consumer_versions_callback(consumer_versions)
self.assertRaises(exceptions.NoAgentDbMixinImplemented,
cached_tracker.get_resource_versions,
resources.QOS_POLICY)
def test_consumer_versions_callback(self):
cached_tracker = version_manager.CachedResourceConsumerTracker()
self._set_consumer_versions_callback(cached_tracker)
self.assertIn(TEST_VERSION_A,
cached_tracker.get_resource_versions(
@ -141,7 +152,6 @@ class CachedResourceConsumerTrackerTest(base.BaseTestCase):
def test_update_versions(self):
cached_tracker = version_manager.CachedResourceConsumerTracker()
self._set_consumer_versions_callback(cached_tracker)
initial_versions = cached_tracker.get_resource_versions(
TEST_RESOURCE_TYPE)
@ -162,16 +172,8 @@ class CachedResourceConsumerTrackerTest(base.BaseTestCase):
self.assertNotEqual(initial_versions_2, final_versions_2)
def test_versions_ttl(self):
self.refreshed = False
def consumer_versions_callback(consumer_tracker):
consumer_tracker.set_versions(
CONSUMER_1, {TEST_RESOURCE_TYPE: TEST_VERSION_A})
self.refreshed = True
cached_tracker = version_manager.CachedResourceConsumerTracker()
cached_tracker.set_consumer_versions_callback(
consumer_versions_callback)
with mock.patch('time.time') as time_patch:
time_patch.return_value = 1
cached_tracker.get_resource_versions(TEST_RESOURCE_TYPE)

View File

@ -228,14 +228,14 @@ class TestAgentsDbMixin(TestAgentsDbBase):
cfg = self.plugin.get_configuration_dict(db_obj)
self.assertIn('cfg', cfg)
def test__get_agents_resource_versions(self):
def test_get_agents_resource_versions(self):
tracker = mock.Mock()
self._create_and_save_agents(
['host-%d' % i for i in range(5)],
constants.AGENT_TYPE_L3,
down_agents_count=3,
down_but_version_considered=2)
self.plugin._get_agents_resource_versions(tracker)
self.plugin.get_agents_resource_versions(tracker)
self.assertEqual(tracker.set_versions.call_count, 2)