diff --git a/doc/source/devref/rpc_callbacks.rst b/doc/source/devref/rpc_callbacks.rst index 865aef9ace3..514c8224594 100644 --- a/doc/source/devref/rpc_callbacks.rst +++ b/doc/source/devref/rpc_callbacks.rst @@ -154,6 +154,15 @@ Starting at Mitaka, any agent interested in versioned objects via this API should report their resource/version tuples of interest (the resource type/ version pairs they're subscribed to). +The plugins interested in this RPC mechanism must inherit AgentDbMixin, +since this mechanism is only intended to be used from agents at the moment, +while it could be extended to be consumed from other components if necessary. + +The AgentDbMixin provides:: + + def get_agents_resource_versions(self, tracker): + ... + Caching mechanism ''''''''''''''''' The version subset per object will be cached to avoid DB requests on every push diff --git a/neutron/api/rpc/callbacks/exceptions.py b/neutron/api/rpc/callbacks/exceptions.py index f6511ac0c78..b5a6b21bca7 100644 --- a/neutron/api/rpc/callbacks/exceptions.py +++ b/neutron/api/rpc/callbacks/exceptions.py @@ -27,5 +27,7 @@ class CallbacksMaxLimitReached(exceptions.NeutronException): message = _("Cannot add multiple callbacks for %(resource_type)s") -class VersionsCallbackNotFound(exceptions.NeutronException): - message = _("No versions callback provided in ResourceVersionsManager") +class NoAgentDbMixinImplemented(exceptions.NeutronException): + message = _("RPC callbacks mechanism needs the implementation of " + "AgentDbMixin in the plugin, as so far it's only designed " + "to work with agents") diff --git a/neutron/api/rpc/callbacks/version_manager.py b/neutron/api/rpc/callbacks/version_manager.py index abe91bf297a..2e0ffcc7743 100644 --- a/neutron/api/rpc/callbacks/version_manager.py +++ b/neutron/api/rpc/callbacks/version_manager.py @@ -20,6 +20,7 @@ from oslo_log import log as logging from oslo_utils import importutils from neutron.api.rpc.callbacks import exceptions +from neutron import manager LOG = logging.getLogger(__name__) @@ -43,18 +44,25 @@ def _import_resources(): return importutils.import_module('neutron.api.rpc.callbacks.resources') +def _import_agents_db(): + return importutils.import_module('neutron.db.agents_db') + + AgentConsumer = collections.namedtuple('AgentConsumer', ['agent_type', 'host']) AgentConsumer.__repr__ = lambda self: '%s@%s' % self class ResourceConsumerTracker(object): - """Class to be provided back by consumer_versions_callback. + """Class passed down to collect consumer's resource versions. This class is responsible for fetching the local versions of - resources, and letting the callback register every consumer's + resources, and letting the called function register every consumer's resource version. + This class is passed down to the plugin get_agents_resource_versions + currently, as the only expected consumers are agents so far. + Later on, this class can also be used to recalculate, for each resource type, the collection of versions that are local or known by one or more consumers. @@ -119,7 +127,16 @@ class ResourceConsumerTracker(object): 'consumer': consumer}) def set_versions(self, consumer, versions): - """Set or update an specific consumer resource types.""" + """Set or update an specific consumer resource types. + + :param consumer: should be an AgentConsumer object, with agent_type + and host set. This acts as the unique ID for the + agent. + :param versions: should be a dictionary in the following format: + {'QosPolicy': '1.1', + 'SecurityGroup': '1.0', + 'Port': '1.0'} + """ for resource_type, resource_version in versions.items(): self._set_version(consumer, resource_type, resource_version) @@ -190,28 +207,31 @@ class CachedResourceConsumerTracker(object): """This class takes care of the caching logic of versions.""" def __init__(self): - self._consumer_versions_callback = None # This is TTL expiration time, 0 means it will be expired at start self._expires_at = 0 self._versions = ResourceConsumerTracker() def _update_consumer_versions(self): - if self._consumer_versions_callback: - new_tracker = ResourceConsumerTracker() - self._consumer_versions_callback(new_tracker) - self._versions = new_tracker - self._versions.report() + new_tracker = ResourceConsumerTracker() + neutron_plugin = manager.NeutronManager.get_plugin() + agents_db = _import_agents_db() + # If you use RPC callbacks, your plugin needs to implement + # AgentsDbMixin so that we know which resource versions your + # agents consume via RPC, please note that rpc_callbacks are + # only designed to work with agents currently. + if isinstance(neutron_plugin, agents_db.AgentDbMixin): + neutron_plugin.get_agents_resource_versions(new_tracker) else: - raise exceptions.VersionsCallbackNotFound() + raise exceptions.NoAgentDbMixinImplemented() + + self._versions = new_tracker + self._versions.report() def _check_expiration(self): if time.time() > self._expires_at: self._update_consumer_versions() self._expires_at = time.time() + VERSIONS_TTL - def set_consumer_versions_callback(self, callback): - self._consumer_versions_callback = callback - def get_resource_versions(self, resource_type): self._check_expiration() return self._versions.get_resource_versions(resource_type) @@ -234,21 +254,6 @@ def _get_cached_tracker(): return _cached_version_tracker -def set_consumer_versions_callback(callback): - """Register a callback to retrieve the system consumer versions. - - Specific consumer logic has been decoupled from this, so we could reuse - in other places. - - The callback will receive a ResourceConsumerTracker object, - and the ResourceConsumerTracker methods must be used to provide - each consumer versions. Consumer ids can be obtained from this - module via the next functions: - * get_agent_consumer_id - """ - _get_cached_tracker().set_consumer_versions_callback(callback) - - def get_resource_versions(resource_type): """Return the set of versions expected by the consumers of a resource.""" return _get_cached_tracker().get_resource_versions(resource_type) diff --git a/neutron/db/agents_db.py b/neutron/db/agents_db.py index 567fbc013db..fce08ec5b5a 100644 --- a/neutron/db/agents_db.py +++ b/neutron/db/agents_db.py @@ -180,11 +180,6 @@ class AgentAvailabilityZoneMixin(az_ext.AvailabilityZonePluginBase): class AgentDbMixin(ext_agent.AgentPluginBase, AgentAvailabilityZoneMixin): """Mixin class to add agent extension to db_base_plugin_v2.""" - def __init__(self, *args, **kwargs): - version_manager.set_consumer_versions_callback( - self._get_agents_resource_versions) - super(AgentDbMixin, self).__init__(*args, **kwargs) - def _get_agent(self, context, id): try: agent = self._get_by_id(context, Agent, id) @@ -425,14 +420,15 @@ class AgentDbMixin(ext_agent.AgentPluginBase, AgentAvailabilityZoneMixin): filters={'admin_state_up': [True]}) return filter(self.is_agent_considered_for_versions, up_agents) - def _get_agents_resource_versions(self, tracker): + def get_agents_resource_versions(self, tracker): """Get the known agent resource versions and update the tracker. - Receives a version_manager.ResourceConsumerTracker instance and it's - expected to look up in to the database and update every agent resource - versions. + This function looks up into the database and updates every agent + resource versions. This method is called from version_manager when the cached information has passed TTL. + + :param tracker: receives a version_manager.ResourceConsumerTracker """ for agent in self._get_agents_considered_for_versions(): resource_versions = agent.get('resource_versions', {}) diff --git a/neutron/tests/unit/api/rpc/callbacks/test_version_manager.py b/neutron/tests/unit/api/rpc/callbacks/test_version_manager.py index 31e15b6eca4..6400cbe7133 100644 --- a/neutron/tests/unit/api/rpc/callbacks/test_version_manager.py +++ b/neutron/tests/unit/api/rpc/callbacks/test_version_manager.py @@ -16,6 +16,7 @@ import mock from neutron.api.rpc.callbacks import exceptions from neutron.api.rpc.callbacks import resources from neutron.api.rpc.callbacks import version_manager +from neutron.db import agents_db from neutron.tests import base @@ -118,22 +119,32 @@ class ResourceConsumerTrackerTest(base.BaseTestCase): class CachedResourceConsumerTrackerTest(base.BaseTestCase): - def test_exception_with_no_callback(self): + def setUp(self): + super(CachedResourceConsumerTrackerTest, self).setUp() + + self.refreshed = False + + class _FakePlugin(agents_db.AgentDbMixin): + @staticmethod + def get_agents_resource_versions(tracker): + self.refreshed = True + tracker.set_versions(CONSUMER_1, + {TEST_RESOURCE_TYPE: TEST_VERSION_A}) + + self.get_plugin = mock.patch('neutron.manager.NeutronManager' + '.get_plugin').start() + + self.get_plugin.return_value = _FakePlugin() + + def test_plugin_does_not_implement_agentsdb_exception(self): + self.get_plugin.return_value = object() cached_tracker = version_manager.CachedResourceConsumerTracker() - self.assertRaises( - exceptions.VersionsCallbackNotFound, - cached_tracker.get_resource_versions, [mock.ANY]) - - def _set_consumer_versions_callback(self, cached_tracker): - def consumer_versions(rct): - rct.set_versions(CONSUMER_1, - {TEST_RESOURCE_TYPE: TEST_VERSION_A}) - - cached_tracker.set_consumer_versions_callback(consumer_versions) + self.assertRaises(exceptions.NoAgentDbMixinImplemented, + cached_tracker.get_resource_versions, + resources.QOS_POLICY) def test_consumer_versions_callback(self): cached_tracker = version_manager.CachedResourceConsumerTracker() - self._set_consumer_versions_callback(cached_tracker) self.assertIn(TEST_VERSION_A, cached_tracker.get_resource_versions( @@ -141,7 +152,6 @@ class CachedResourceConsumerTrackerTest(base.BaseTestCase): def test_update_versions(self): cached_tracker = version_manager.CachedResourceConsumerTracker() - self._set_consumer_versions_callback(cached_tracker) initial_versions = cached_tracker.get_resource_versions( TEST_RESOURCE_TYPE) @@ -162,16 +172,8 @@ class CachedResourceConsumerTrackerTest(base.BaseTestCase): self.assertNotEqual(initial_versions_2, final_versions_2) def test_versions_ttl(self): - self.refreshed = False - - def consumer_versions_callback(consumer_tracker): - consumer_tracker.set_versions( - CONSUMER_1, {TEST_RESOURCE_TYPE: TEST_VERSION_A}) - self.refreshed = True cached_tracker = version_manager.CachedResourceConsumerTracker() - cached_tracker.set_consumer_versions_callback( - consumer_versions_callback) with mock.patch('time.time') as time_patch: time_patch.return_value = 1 cached_tracker.get_resource_versions(TEST_RESOURCE_TYPE) diff --git a/neutron/tests/unit/db/test_agents_db.py b/neutron/tests/unit/db/test_agents_db.py index f3c0c726fb1..8fcd31a2e1a 100644 --- a/neutron/tests/unit/db/test_agents_db.py +++ b/neutron/tests/unit/db/test_agents_db.py @@ -228,14 +228,14 @@ class TestAgentsDbMixin(TestAgentsDbBase): cfg = self.plugin.get_configuration_dict(db_obj) self.assertIn('cfg', cfg) - def test__get_agents_resource_versions(self): + def test_get_agents_resource_versions(self): tracker = mock.Mock() self._create_and_save_agents( ['host-%d' % i for i in range(5)], constants.AGENT_TYPE_L3, down_agents_count=3, down_but_version_considered=2) - self.plugin._get_agents_resource_versions(tracker) + self.plugin.get_agents_resource_versions(tracker) self.assertEqual(tracker.set_versions.call_count, 2)