From 9e8e987e6c3d2fa753701689b9ff5d1227fb3c24 Mon Sep 17 00:00:00 2001 From: Bence Romsics Date: Fri, 6 Jul 2018 16:02:15 +0200 Subject: [PATCH] Placement reporting service plugin This service plugin synchronizes ML2 mechanism driver agents' resource information to Placement. To use this service an agent must add 'resource_provider_bandwidths' to the 'configurations' field of its RPC heartbeat. It also may add 'resource_provider_inventory_defaults' to fine tune Placement inventory parameters. Again to use this service a mechanism driver must implement get_standrd_device_mappings() and allocate a UUID as mechanism driver property 'resource_provider_uuid5_namespace'. The synchronization is triggered by: * any new agent object in the DB * restart of an agent (via 'start_flag' in the RPC heartbeat) * if an agent's 'resources_synced' attribute is not True (None/False) The latter should autoheal transient errors of the synchronization process. That is if a sync attemp fails then we store resources_synced=False which triggers a sync retry at each new heartbeat message until a sync attempt finally succeeds and we can set resources_synced=True. Since this code functionally depends on ML2 we can also consider making it part of ML2, but at the moment it is a service plugin for better decoupling. Even if you load the service plugin the logic gracefully degrades for heartbeat messages not containing resource provider info. If needed the sync can be forced in multiple ways. First, if you restart an agent then the RPs belonging to that agent will be re-synced. You may also delete the agent by 'openstack network agent delete' and let the next heartbeat message re-create the agent object. On re-creation the RPs belonging to that agent will be re-synced. On the other hand a neutron-server restart does not trigger a re-sync in any way. Depending on the trade-off between the admin's needs to force re-syncs and the performance of (not absolutely necessary) Placement updates re-sync conditions may be further fine tuned. Example config for neutron-server: neutron.conf: [DEFAULT] service_plugins = placement Change-Id: Ia1ff6f7559ab77913ddb9c3b134420a401b8eb43 Co-Authored-By: Lajos Katona Depends-On: https://review.openstack.org/586567 Partial-Bug: #1578989 See-Also: https://review.openstack.org/502306 (nova spec) See-Also: https://review.openstack.org/508149 (neutron spec) --- neutron/services/placement_report/__init__.py | 0 neutron/services/placement_report/plugin.py | 227 ++++++++++++++++++ .../plugins/ml2/drivers/mechanism_test.py | 17 ++ .../services/placement_report/__init__.py | 0 .../services/placement_report/test_plugin.py | 198 +++++++++++++++ setup.cfg | 1 + 6 files changed, 443 insertions(+) create mode 100644 neutron/services/placement_report/__init__.py create mode 100644 neutron/services/placement_report/plugin.py create mode 100644 neutron/tests/unit/services/placement_report/__init__.py create mode 100644 neutron/tests/unit/services/placement_report/test_plugin.py diff --git a/neutron/services/placement_report/__init__.py b/neutron/services/placement_report/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/neutron/services/placement_report/plugin.py b/neutron/services/placement_report/plugin.py new file mode 100644 index 00000000000..8a6e3f737e4 --- /dev/null +++ b/neutron/services/placement_report/plugin.py @@ -0,0 +1,227 @@ +# Copyright 2018 Ericsson +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from keystoneauth1 import exceptions as ks_exc +from neutron_lib.agent import constants as agent_const +from neutron_lib.api.definitions import agent_resources_synced +from neutron_lib.callbacks import events +from neutron_lib.callbacks import registry +from neutron_lib.callbacks import resources +from neutron_lib.placement import client as place_client +from neutron_lib.plugins import directory +from neutron_lib.services import base as service_base +from oslo_config import cfg +from oslo_log import log as logging + +from neutron.agent.common import placement_report +from neutron.notifiers import batch_notifier + +LOG = logging.getLogger(__name__) + +PLUGIN_TYPE = "placement_report" + + +@registry.has_registry_receivers +class PlacementReportPlugin(service_base.ServicePluginBase): + + supported_extension_aliases = [] + + # A service plugin without claiming support for filter validation would + # disable filter validation for all other plugins, so we report support + # although this plugin doesn't have filters. + __filter_validation_support = True + + @classmethod + def get_plugin_type(cls): + return PLUGIN_TYPE + + def get_plugin_description(self): + return "Sync placement info from agent to server to placement." + + def __init__(self): + self._core_plugin = directory.get_plugin() + # NOTE(bence romsics): The following bug and fix may be relevant here. + # https://bugs.launchpad.net/nova/+bug/1697825 + # https://review.openstack.org/493536 + self._placement_client = place_client.PlacementAPIClient(cfg.CONF) + self._agents = PlacementReporterAgents(self._core_plugin) + self._batch_notifier = batch_notifier.BatchNotifier( + cfg.CONF.send_events_interval, self._execute_deferred) + + def _execute_deferred(self, deferred_batch): + for deferred in deferred_batch: + deferred() + + def _get_rp_by_name(self, name): + rps = self._placement_client.list_resource_providers( + name=name)['resource_providers'] + # RP names are unique, therefore we can get 0 or 1. But not many. + if len(rps) != 1: + # NOTE(bence romsics): While we could raise() here and by detect + # an error a bit earlier, we want the error to surface in the + # sync batch below so it is going to be properly caught and is + # going to influence the agent's resources_synced attribute. + LOG.warning( + 'placement client: no such resource provider: %s', name) + return {'uuid': None} + return rps[0] + + def _sync_placement_state(self, agent, agent_db): + configurations = agent['configurations'] + mech_driver = self._agents.mechanism_driver_by_agent_type( + agent['agent_type']) + uuid_ns = mech_driver.resource_provider_uuid5_namespace + supported_vnic_types = mech_driver.supported_vnic_types + device_mappings = mech_driver.get_standard_device_mappings(agent) + + try: + agent_host_rp_uuid = self._get_rp_by_name( + name=agent['host'])['uuid'] + except ks_exc.HttpError: + # Delay the error for the same reason as in _get_rp_by_name(). + agent_host_rp_uuid = None + + state = placement_report.PlacementState( + rp_bandwidths=configurations[ + 'resource_provider_bandwidths'], + rp_inventory_defaults=configurations[ + 'resource_provider_inventory_defaults'], + driver_uuid_namespace=uuid_ns, + agent_type=agent['agent_type'], + agent_host=agent['host'], + agent_host_rp_uuid=agent_host_rp_uuid, + device_mappings=device_mappings, + supported_vnic_types=supported_vnic_types, + client=self._placement_client) + + deferred_batch = state.deferred_sync() + + # NOTE(bence romsics): Some client calls depend on earlier + # ones, but not all. There are calls in a batch that can succeed + # independently of earlier calls. Therefore even if a call fails + # we have to suppress its failure so the later independent calls + # have a chance to succeed. If we queue up the deferred client + # calls one by one then we cannot handle errors at the end of + # a batch. So instead we should wrap the deferred client calls + # in a single deferred batch which executes the client calls, + # continuing to the next client call even if there was an error + # but remembering if an error happened. Then at the end of the + # batch (also having access to the agent object) set the agent's + # resources_synced attribute according to the success/failure + # of the batch. Since each client call does monkey patched I/O + # we'll yield to other eventlet threads in each call therefore + # the performance should not be affected by the wrapping. + def batch(): + errors = False + + for deferred in deferred_batch: + try: + LOG.debug('placement client: {}'.format(deferred)) + deferred.execute() + except Exception: + errors = True + LOG.exception( + 'placement client call failed: %s', + str(deferred)) + + resources_synced = not errors + agent_db.resources_synced = resources_synced + agent_db.update() + + LOG.debug( + 'Synchronization of resources' + ' of agent type %(type)s' + ' at host %(host)s' + ' to placement %(result)s.', + {'type': agent['agent_type'], + 'host': agent['host'], + 'result': 'succeeded' if resources_synced else 'failed'}) + + self._batch_notifier.queue_event(batch) + + @registry.receives(resources.AGENT, + [events.AFTER_CREATE, events.AFTER_UPDATE]) + def handle_placement_config(self, resource, event, trigger, payload): + # NOTE(bence romsics): This method gets called a lot, keep it quick. + agent = payload.desired_state + status = payload.metadata.get('status') + context = payload.context + if agent['agent_type'] not in self._agents.supported_agent_types: + return + if 'resource_provider_bandwidths' not in agent['configurations']: + LOG.warning( + "The mechanism driver claims agent type supports " + "placement reports, but the agent does not report " + "'resoure_provider_bandwidths' in its configurations. " + "host: %(host)s, type: %(type)s", + {'host': agent['agent_type'], + 'type': agent['host']}) + return + + # We need to get the same agent as in + # neutron.db.agents_db.AgentDbMixin.create_or_update_agent() + agent_db = self._core_plugin._get_agent_by_type_and_host( + context, agent['agent_type'], agent['host']) + + # sync the state known by us to placement + if ( + # agent object in API (re-)created + status == agent_const.AGENT_NEW or + # agent (re-)started (even without config change) + 'start_flag' in agent or + # never tried to sync yet or last sync failed + not agent_db[agent_resources_synced.RESOURCES_SYNCED]): + LOG.debug( + 'placement: syncing state for agent type %s on host %s', + agent['agent_type'], agent['host']) + self._sync_placement_state(agent, agent_db) + else: + LOG.debug( + 'placement: nothing to sync for agent type %s on host %s', + agent['agent_type'], agent['host']) + + +class PlacementReporterAgents(object): + + # Yep, this is meant to depend on ML2. + def __init__(self, ml2_plugin): + self._mechanism_drivers = ml2_plugin.mechanism_manager.\ + ordered_mech_drivers + self._supported_agent_types = [] + self._agent_type_to_mech_driver = {} + + @property + def supported_agent_types(self): + if not self._supported_agent_types: + # NOTE(bence romsics): We treat the presence of the + # RP uuid namespace a proxy for supporting placement reports from + # the driver's agent type. But we could introduce a property/logic + # explicitly describing the agent types supporting placement + # reports any time if this proved to be insufficient. + self._supported_agent_types = [ + driver.obj.agent_type + for driver in self._mechanism_drivers + if driver.obj.resource_provider_uuid5_namespace is not None] + LOG.debug('agent types supporting placement reports: %s', + ', '.join(self._supported_agent_types)) + return self._supported_agent_types + + def mechanism_driver_by_agent_type(self, agent_type): + if agent_type not in self._agent_type_to_mech_driver: + for driver in self._mechanism_drivers: + if (hasattr(driver.obj, 'agent_type') and + agent_type == driver.obj.agent_type): + self._agent_type_to_mech_driver[agent_type] = driver.obj + break + return self._agent_type_to_mech_driver[agent_type] diff --git a/neutron/tests/unit/plugins/ml2/drivers/mechanism_test.py b/neutron/tests/unit/plugins/ml2/drivers/mechanism_test.py index ac632b665ce..3e6ebe2ae0b 100644 --- a/neutron/tests/unit/plugins/ml2/drivers/mechanism_test.py +++ b/neutron/tests/unit/plugins/ml2/drivers/mechanism_test.py @@ -13,6 +13,8 @@ # License for the specific language governing permissions and limitations # under the License. +import uuid + from neutron_lib.api.definitions import portbindings from neutron_lib import constants as const from neutron_lib.plugins.ml2 import api @@ -243,3 +245,18 @@ class TestMechanismDriver(api.MechanismDriver): def filter_hosts_with_segment_access( self, context, segments, candidate_hosts, agent_getter): return set() + + @property + def resource_provider_uuid5_namespace(self): + return uuid.UUID('7f0ce65c-1f13-11e9-8921-3c6aa7b21d17') + + @property + def agent_type(self): + return 'test_mechanism_driver_agent' + + @property + def supported_vnic_types(self): + return ('test_mechanism_driver_vnic_type',) + + def get_standard_device_mappings(self, agent): + return {} diff --git a/neutron/tests/unit/services/placement_report/__init__.py b/neutron/tests/unit/services/placement_report/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/neutron/tests/unit/services/placement_report/test_plugin.py b/neutron/tests/unit/services/placement_report/test_plugin.py new file mode 100644 index 00000000000..2eae9445d8a --- /dev/null +++ b/neutron/tests/unit/services/placement_report/test_plugin.py @@ -0,0 +1,198 @@ +# Copyright 2019 Ericsson +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import mock + +from neutron_lib.agent import constants as agent_const +from oslo_log import log as logging + +from neutron.services.placement_report import plugin +from neutron.tests.unit.plugins.ml2.drivers import mechanism_test +from neutron.tests.unit.plugins.ml2 import test_plugin + +LOG = logging.getLogger(__name__) + + +class PlacementReportPluginTestCases(test_plugin.Ml2PluginV2TestCase): + + def setUp(self): + super(PlacementReportPluginTestCases, self).setUp() + self.service_plugin = plugin.PlacementReportPlugin() + + def test__get_rp_by_name_found(self): + with mock.patch.object( + self.service_plugin._placement_client, + 'list_resource_providers', + return_value={'resource_providers': ['fake_rp']}): + rp = self.service_plugin._get_rp_by_name('whatever') + self.assertEqual('fake_rp', rp) + + def test__get_rp_by_name_not_found(self): + with mock.patch.object( + self.service_plugin._placement_client, + 'list_resource_providers', + return_value={'resource_providers': []}), \ + mock.patch.object(plugin.LOG, 'warning') as log_mock: + rp = self.service_plugin._get_rp_by_name('whatever') + self.assertEqual(1, log_mock.call_count) + self.assertEqual({'uuid': None}, rp) + + def test_no_sync_for_unsupported_agent_type(self): + payload = mock.Mock( + # looking all good, but agent type not supported + desired_state={ + 'agent_type': 'unsupported agent type', + 'configurations': {'resource_provider_bandwidths': {}}, + 'host': 'fake host', + }) + + with mock.patch.object(self.service_plugin._core_plugin, + '_get_agent_by_type_and_host') as mock_get_agent, \ + mock.patch.object(self.service_plugin, + '_sync_placement_state') as mock_sync: + + self.service_plugin.handle_placement_config( + mock.ANY, mock.ANY, mock.ANY, payload) + + mock_get_agent.assert_not_called() + mock_sync.assert_not_called() + + def test_no_sync_without_resource_info(self): + payload = mock.Mock( + # looking all good, but 'configurations' has no + # 'resource_provider_bandwidths' + desired_state={ + 'agent_type': 'test_mechanism_driver_agent', + 'configurations': {}, + 'host': 'fake host', + }) + + with mock.patch.object(self.service_plugin._core_plugin, + '_get_agent_by_type_and_host') as mock_get_agent, \ + mock.patch.object(self.service_plugin, + '_sync_placement_state') as mock_sync: + + self.service_plugin.handle_placement_config( + mock.ANY, mock.ANY, mock.ANY, payload) + + mock_get_agent.assert_not_called() + mock_sync.assert_not_called() + + def test_sync_if_agent_is_new(self): + payload = mock.Mock( + desired_state={ + 'agent_type': 'test_mechanism_driver_agent', + 'configurations': {'resource_provider_bandwidths': {}}, + 'host': 'fake host', + }, + metadata={ + 'status': agent_const.AGENT_NEW, + }, + ) + + with mock.patch.object(self.service_plugin._core_plugin, + '_get_agent_by_type_and_host') as mock_get_agent, \ + mock.patch.object(self.service_plugin, + '_sync_placement_state') as mock_sync: + + self.service_plugin.handle_placement_config( + mock.ANY, mock.ANY, mock.ANY, payload) + + self.assertEqual(1, mock_get_agent.call_count) + self.assertEqual(1, mock_sync.call_count) + + def test_sync_if_agent_is_restarted(self): + payload = mock.Mock( + desired_state={ + 'agent_type': 'test_mechanism_driver_agent', + 'configurations': {'resource_provider_bandwidths': {}}, + 'host': 'fake host', + 'start_flag': True, + }, + ) + + with mock.patch.object(self.service_plugin._core_plugin, + '_get_agent_by_type_and_host') as mock_get_agent, \ + mock.patch.object(self.service_plugin, + '_sync_placement_state') as mock_sync: + + self.service_plugin.handle_placement_config( + mock.ANY, mock.ANY, mock.ANY, payload) + + self.assertEqual(1, mock_get_agent.call_count) + self.assertEqual(1, mock_sync.call_count) + + def test_sync_after_transient_error(self): + payload = mock.Mock( + desired_state={ + 'agent_type': 'test_mechanism_driver_agent', + 'configurations': {'resource_provider_bandwidths': {}}, + 'host': 'fake host', + }, + ) + + with mock.patch.object(self.service_plugin._core_plugin, + '_get_agent_by_type_and_host', + return_value={'resources_synced': False}) as mock_get_agent, \ + mock.patch.object(self.service_plugin, + '_sync_placement_state') as mock_sync: + + self.service_plugin.handle_placement_config( + mock.ANY, mock.ANY, mock.ANY, payload) + + self.assertEqual(1, mock_get_agent.call_count) + self.assertEqual(1, mock_sync.call_count) + + def test__sync_placement_state(self): + agent = { + 'agent_type': 'test_mechanism_driver_agent', + 'configurations': { + 'resource_provider_bandwidths': {}, + 'resource_provider_inventory_defaults': {}, + }, + 'host': 'fake host', + } + agent_db = mock.Mock() + + with mock.patch.object(self.service_plugin._batch_notifier, + 'queue_event') as mock_queue_event, \ + mock.patch.object(self.service_plugin._placement_client, + 'list_resource_providers', + return_value={'resource_providers': [{'uuid': 'fake uuid'}]}): + + self.service_plugin._sync_placement_state(agent, agent_db) + + self.assertEqual(1, mock_queue_event.call_count) + + +class PlacementReporterAgentsTestCases(test_plugin.Ml2PluginV2TestCase): + + def test_supported_agent_types(self): + self.agents = plugin.PlacementReporterAgents(ml2_plugin=self.plugin) + self.assertEqual( + ['test_mechanism_driver_agent'], + self.agents.supported_agent_types) + + def test_mechanism_driver_by_agent_type_found(self): + self.agents = plugin.PlacementReporterAgents(ml2_plugin=self.plugin) + mech_driver = self.agents.mechanism_driver_by_agent_type( + 'test_mechanism_driver_agent') + self.assertTrue(mech_driver, mechanism_test.TestMechanismDriver) + + def test_mechanism_driver_by_agent_type_not_found(self): + self.agents = plugin.PlacementReporterAgents(ml2_plugin=self.plugin) + self.assertRaises( + Exception, # noqa + self.agents.mechanism_driver_by_agent_type, + 'agent_not_belonging_to_any_mechanism_driver') diff --git a/setup.cfg b/setup.cfg index 7f3da2a1e3d..46118d8b4cd 100644 --- a/setup.cfg +++ b/setup.cfg @@ -74,6 +74,7 @@ neutron.service_plugins = loki = neutron.services.loki.loki_plugin:LokiPlugin log = neutron.services.logapi.logging_plugin:LoggingPlugin port_forwarding = neutron.services.portforwarding.pf_plugin:PortForwardingPlugin + placement = neutron.services.placement_report.plugin:PlacementReportPlugin neutron.ml2.type_drivers = flat = neutron.plugins.ml2.drivers.type_flat:FlatTypeDriver local = neutron.plugins.ml2.drivers.type_local:LocalTypeDriver