Merge "Placement reporting service plugin"

This commit is contained in:
Zuul 2019-03-03 14:57:41 +00:00 committed by Gerrit Code Review
commit c0eb55cfa6
6 changed files with 443 additions and 0 deletions

View File

@ -0,0 +1,227 @@
# Copyright 2018 Ericsson
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from keystoneauth1 import exceptions as ks_exc
from neutron_lib.agent import constants as agent_const
from neutron_lib.api.definitions import agent_resources_synced
from neutron_lib.callbacks import events
from neutron_lib.callbacks import registry
from neutron_lib.callbacks import resources
from neutron_lib.placement import client as place_client
from neutron_lib.plugins import directory
from neutron_lib.services import base as service_base
from oslo_config import cfg
from oslo_log import log as logging
from neutron.agent.common import placement_report
from neutron.notifiers import batch_notifier
LOG = logging.getLogger(__name__)
PLUGIN_TYPE = "placement_report"
@registry.has_registry_receivers
class PlacementReportPlugin(service_base.ServicePluginBase):
supported_extension_aliases = []
# A service plugin without claiming support for filter validation would
# disable filter validation for all other plugins, so we report support
# although this plugin doesn't have filters.
__filter_validation_support = True
@classmethod
def get_plugin_type(cls):
return PLUGIN_TYPE
def get_plugin_description(self):
return "Sync placement info from agent to server to placement."
def __init__(self):
self._core_plugin = directory.get_plugin()
# NOTE(bence romsics): The following bug and fix may be relevant here.
# https://bugs.launchpad.net/nova/+bug/1697825
# https://review.openstack.org/493536
self._placement_client = place_client.PlacementAPIClient(cfg.CONF)
self._agents = PlacementReporterAgents(self._core_plugin)
self._batch_notifier = batch_notifier.BatchNotifier(
cfg.CONF.send_events_interval, self._execute_deferred)
def _execute_deferred(self, deferred_batch):
for deferred in deferred_batch:
deferred()
def _get_rp_by_name(self, name):
rps = self._placement_client.list_resource_providers(
name=name)['resource_providers']
# RP names are unique, therefore we can get 0 or 1. But not many.
if len(rps) != 1:
# NOTE(bence romsics): While we could raise() here and by detect
# an error a bit earlier, we want the error to surface in the
# sync batch below so it is going to be properly caught and is
# going to influence the agent's resources_synced attribute.
LOG.warning(
'placement client: no such resource provider: %s', name)
return {'uuid': None}
return rps[0]
def _sync_placement_state(self, agent, agent_db):
configurations = agent['configurations']
mech_driver = self._agents.mechanism_driver_by_agent_type(
agent['agent_type'])
uuid_ns = mech_driver.resource_provider_uuid5_namespace
supported_vnic_types = mech_driver.supported_vnic_types
device_mappings = mech_driver.get_standard_device_mappings(agent)
try:
agent_host_rp_uuid = self._get_rp_by_name(
name=agent['host'])['uuid']
except ks_exc.HttpError:
# Delay the error for the same reason as in _get_rp_by_name().
agent_host_rp_uuid = None
state = placement_report.PlacementState(
rp_bandwidths=configurations[
'resource_provider_bandwidths'],
rp_inventory_defaults=configurations[
'resource_provider_inventory_defaults'],
driver_uuid_namespace=uuid_ns,
agent_type=agent['agent_type'],
agent_host=agent['host'],
agent_host_rp_uuid=agent_host_rp_uuid,
device_mappings=device_mappings,
supported_vnic_types=supported_vnic_types,
client=self._placement_client)
deferred_batch = state.deferred_sync()
# NOTE(bence romsics): Some client calls depend on earlier
# ones, but not all. There are calls in a batch that can succeed
# independently of earlier calls. Therefore even if a call fails
# we have to suppress its failure so the later independent calls
# have a chance to succeed. If we queue up the deferred client
# calls one by one then we cannot handle errors at the end of
# a batch. So instead we should wrap the deferred client calls
# in a single deferred batch which executes the client calls,
# continuing to the next client call even if there was an error
# but remembering if an error happened. Then at the end of the
# batch (also having access to the agent object) set the agent's
# resources_synced attribute according to the success/failure
# of the batch. Since each client call does monkey patched I/O
# we'll yield to other eventlet threads in each call therefore
# the performance should not be affected by the wrapping.
def batch():
errors = False
for deferred in deferred_batch:
try:
LOG.debug('placement client: {}'.format(deferred))
deferred.execute()
except Exception:
errors = True
LOG.exception(
'placement client call failed: %s',
str(deferred))
resources_synced = not errors
agent_db.resources_synced = resources_synced
agent_db.update()
LOG.debug(
'Synchronization of resources'
' of agent type %(type)s'
' at host %(host)s'
' to placement %(result)s.',
{'type': agent['agent_type'],
'host': agent['host'],
'result': 'succeeded' if resources_synced else 'failed'})
self._batch_notifier.queue_event(batch)
@registry.receives(resources.AGENT,
[events.AFTER_CREATE, events.AFTER_UPDATE])
def handle_placement_config(self, resource, event, trigger, payload):
# NOTE(bence romsics): This method gets called a lot, keep it quick.
agent = payload.desired_state
status = payload.metadata.get('status')
context = payload.context
if agent['agent_type'] not in self._agents.supported_agent_types:
return
if 'resource_provider_bandwidths' not in agent['configurations']:
LOG.warning(
"The mechanism driver claims agent type supports "
"placement reports, but the agent does not report "
"'resoure_provider_bandwidths' in its configurations. "
"host: %(host)s, type: %(type)s",
{'host': agent['agent_type'],
'type': agent['host']})
return
# We need to get the same agent as in
# neutron.db.agents_db.AgentDbMixin.create_or_update_agent()
agent_db = self._core_plugin._get_agent_by_type_and_host(
context, agent['agent_type'], agent['host'])
# sync the state known by us to placement
if (
# agent object in API (re-)created
status == agent_const.AGENT_NEW or
# agent (re-)started (even without config change)
'start_flag' in agent or
# never tried to sync yet or last sync failed
not agent_db[agent_resources_synced.RESOURCES_SYNCED]):
LOG.debug(
'placement: syncing state for agent type %s on host %s',
agent['agent_type'], agent['host'])
self._sync_placement_state(agent, agent_db)
else:
LOG.debug(
'placement: nothing to sync for agent type %s on host %s',
agent['agent_type'], agent['host'])
class PlacementReporterAgents(object):
# Yep, this is meant to depend on ML2.
def __init__(self, ml2_plugin):
self._mechanism_drivers = ml2_plugin.mechanism_manager.\
ordered_mech_drivers
self._supported_agent_types = []
self._agent_type_to_mech_driver = {}
@property
def supported_agent_types(self):
if not self._supported_agent_types:
# NOTE(bence romsics): We treat the presence of the
# RP uuid namespace a proxy for supporting placement reports from
# the driver's agent type. But we could introduce a property/logic
# explicitly describing the agent types supporting placement
# reports any time if this proved to be insufficient.
self._supported_agent_types = [
driver.obj.agent_type
for driver in self._mechanism_drivers
if driver.obj.resource_provider_uuid5_namespace is not None]
LOG.debug('agent types supporting placement reports: %s',
', '.join(self._supported_agent_types))
return self._supported_agent_types
def mechanism_driver_by_agent_type(self, agent_type):
if agent_type not in self._agent_type_to_mech_driver:
for driver in self._mechanism_drivers:
if (hasattr(driver.obj, 'agent_type') and
agent_type == driver.obj.agent_type):
self._agent_type_to_mech_driver[agent_type] = driver.obj
break
return self._agent_type_to_mech_driver[agent_type]

View File

@ -13,6 +13,8 @@
# License for the specific language governing permissions and limitations
# under the License.
import uuid
from neutron_lib.api.definitions import portbindings
from neutron_lib import constants as const
from neutron_lib.plugins.ml2 import api
@ -243,3 +245,18 @@ class TestMechanismDriver(api.MechanismDriver):
def filter_hosts_with_segment_access(
self, context, segments, candidate_hosts, agent_getter):
return set()
@property
def resource_provider_uuid5_namespace(self):
return uuid.UUID('7f0ce65c-1f13-11e9-8921-3c6aa7b21d17')
@property
def agent_type(self):
return 'test_mechanism_driver_agent'
@property
def supported_vnic_types(self):
return ('test_mechanism_driver_vnic_type',)
def get_standard_device_mappings(self, agent):
return {}

View File

@ -0,0 +1,198 @@
# Copyright 2019 Ericsson
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import mock
from neutron_lib.agent import constants as agent_const
from oslo_log import log as logging
from neutron.services.placement_report import plugin
from neutron.tests.unit.plugins.ml2.drivers import mechanism_test
from neutron.tests.unit.plugins.ml2 import test_plugin
LOG = logging.getLogger(__name__)
class PlacementReportPluginTestCases(test_plugin.Ml2PluginV2TestCase):
def setUp(self):
super(PlacementReportPluginTestCases, self).setUp()
self.service_plugin = plugin.PlacementReportPlugin()
def test__get_rp_by_name_found(self):
with mock.patch.object(
self.service_plugin._placement_client,
'list_resource_providers',
return_value={'resource_providers': ['fake_rp']}):
rp = self.service_plugin._get_rp_by_name('whatever')
self.assertEqual('fake_rp', rp)
def test__get_rp_by_name_not_found(self):
with mock.patch.object(
self.service_plugin._placement_client,
'list_resource_providers',
return_value={'resource_providers': []}), \
mock.patch.object(plugin.LOG, 'warning') as log_mock:
rp = self.service_plugin._get_rp_by_name('whatever')
self.assertEqual(1, log_mock.call_count)
self.assertEqual({'uuid': None}, rp)
def test_no_sync_for_unsupported_agent_type(self):
payload = mock.Mock(
# looking all good, but agent type not supported
desired_state={
'agent_type': 'unsupported agent type',
'configurations': {'resource_provider_bandwidths': {}},
'host': 'fake host',
})
with mock.patch.object(self.service_plugin._core_plugin,
'_get_agent_by_type_and_host') as mock_get_agent, \
mock.patch.object(self.service_plugin,
'_sync_placement_state') as mock_sync:
self.service_plugin.handle_placement_config(
mock.ANY, mock.ANY, mock.ANY, payload)
mock_get_agent.assert_not_called()
mock_sync.assert_not_called()
def test_no_sync_without_resource_info(self):
payload = mock.Mock(
# looking all good, but 'configurations' has no
# 'resource_provider_bandwidths'
desired_state={
'agent_type': 'test_mechanism_driver_agent',
'configurations': {},
'host': 'fake host',
})
with mock.patch.object(self.service_plugin._core_plugin,
'_get_agent_by_type_and_host') as mock_get_agent, \
mock.patch.object(self.service_plugin,
'_sync_placement_state') as mock_sync:
self.service_plugin.handle_placement_config(
mock.ANY, mock.ANY, mock.ANY, payload)
mock_get_agent.assert_not_called()
mock_sync.assert_not_called()
def test_sync_if_agent_is_new(self):
payload = mock.Mock(
desired_state={
'agent_type': 'test_mechanism_driver_agent',
'configurations': {'resource_provider_bandwidths': {}},
'host': 'fake host',
},
metadata={
'status': agent_const.AGENT_NEW,
},
)
with mock.patch.object(self.service_plugin._core_plugin,
'_get_agent_by_type_and_host') as mock_get_agent, \
mock.patch.object(self.service_plugin,
'_sync_placement_state') as mock_sync:
self.service_plugin.handle_placement_config(
mock.ANY, mock.ANY, mock.ANY, payload)
self.assertEqual(1, mock_get_agent.call_count)
self.assertEqual(1, mock_sync.call_count)
def test_sync_if_agent_is_restarted(self):
payload = mock.Mock(
desired_state={
'agent_type': 'test_mechanism_driver_agent',
'configurations': {'resource_provider_bandwidths': {}},
'host': 'fake host',
'start_flag': True,
},
)
with mock.patch.object(self.service_plugin._core_plugin,
'_get_agent_by_type_and_host') as mock_get_agent, \
mock.patch.object(self.service_plugin,
'_sync_placement_state') as mock_sync:
self.service_plugin.handle_placement_config(
mock.ANY, mock.ANY, mock.ANY, payload)
self.assertEqual(1, mock_get_agent.call_count)
self.assertEqual(1, mock_sync.call_count)
def test_sync_after_transient_error(self):
payload = mock.Mock(
desired_state={
'agent_type': 'test_mechanism_driver_agent',
'configurations': {'resource_provider_bandwidths': {}},
'host': 'fake host',
},
)
with mock.patch.object(self.service_plugin._core_plugin,
'_get_agent_by_type_and_host',
return_value={'resources_synced': False}) as mock_get_agent, \
mock.patch.object(self.service_plugin,
'_sync_placement_state') as mock_sync:
self.service_plugin.handle_placement_config(
mock.ANY, mock.ANY, mock.ANY, payload)
self.assertEqual(1, mock_get_agent.call_count)
self.assertEqual(1, mock_sync.call_count)
def test__sync_placement_state(self):
agent = {
'agent_type': 'test_mechanism_driver_agent',
'configurations': {
'resource_provider_bandwidths': {},
'resource_provider_inventory_defaults': {},
},
'host': 'fake host',
}
agent_db = mock.Mock()
with mock.patch.object(self.service_plugin._batch_notifier,
'queue_event') as mock_queue_event, \
mock.patch.object(self.service_plugin._placement_client,
'list_resource_providers',
return_value={'resource_providers': [{'uuid': 'fake uuid'}]}):
self.service_plugin._sync_placement_state(agent, agent_db)
self.assertEqual(1, mock_queue_event.call_count)
class PlacementReporterAgentsTestCases(test_plugin.Ml2PluginV2TestCase):
def test_supported_agent_types(self):
self.agents = plugin.PlacementReporterAgents(ml2_plugin=self.plugin)
self.assertEqual(
['test_mechanism_driver_agent'],
self.agents.supported_agent_types)
def test_mechanism_driver_by_agent_type_found(self):
self.agents = plugin.PlacementReporterAgents(ml2_plugin=self.plugin)
mech_driver = self.agents.mechanism_driver_by_agent_type(
'test_mechanism_driver_agent')
self.assertTrue(mech_driver, mechanism_test.TestMechanismDriver)
def test_mechanism_driver_by_agent_type_not_found(self):
self.agents = plugin.PlacementReporterAgents(ml2_plugin=self.plugin)
self.assertRaises(
Exception, # noqa
self.agents.mechanism_driver_by_agent_type,
'agent_not_belonging_to_any_mechanism_driver')

View File

@ -74,6 +74,7 @@ neutron.service_plugins =
loki = neutron.services.loki.loki_plugin:LokiPlugin
log = neutron.services.logapi.logging_plugin:LoggingPlugin
port_forwarding = neutron.services.portforwarding.pf_plugin:PortForwardingPlugin
placement = neutron.services.placement_report.plugin:PlacementReportPlugin
neutron.ml2.type_drivers =
flat = neutron.plugins.ml2.drivers.type_flat:FlatTypeDriver
local = neutron.plugins.ml2.drivers.type_local:LocalTypeDriver