Merge "Make port binding attempt after agent is revived"
This commit is contained in:
commit
66f0a4d189
@ -421,6 +421,8 @@ class AgentDbMixin(ext_agent.AgentPluginBase, AgentAvailabilityZoneMixin):
|
||||
status = agent_consts.AGENT_NEW
|
||||
greenthread.sleep(0)
|
||||
|
||||
agent_state['agent_status'] = status
|
||||
agent_state['admin_state_up'] = agent.admin_state_up
|
||||
registry.notify(resources.AGENT, event_type, self, context=context,
|
||||
host=agent_state['host'], plugin=self,
|
||||
agent=agent_state)
|
||||
|
@ -521,3 +521,13 @@ class Port(base.NeutronDbObject):
|
||||
query = query.filter(
|
||||
~models_v2.Port.device_owner.in_(excluded_device_owners))
|
||||
return [port_binding['port_id'] for port_binding in query.all()]
|
||||
|
||||
@classmethod
|
||||
def get_ports_by_binding_type_and_host(cls, context,
|
||||
binding_type, host):
|
||||
query = context.session.query(models_v2.Port).join(
|
||||
ml2_models.PortBinding)
|
||||
query = query.filter(
|
||||
ml2_models.PortBinding.vif_type == binding_type,
|
||||
ml2_models.PortBinding.host == host)
|
||||
return [cls._load_object(context, db_obj) for db_obj in query.all()]
|
||||
|
@ -14,6 +14,7 @@
|
||||
# under the License.
|
||||
|
||||
from eventlet import greenthread
|
||||
from neutron_lib.agent import constants as agent_consts
|
||||
from neutron_lib.agent import topics
|
||||
from neutron_lib.api.definitions import allowedaddresspairs as addr_apidef
|
||||
from neutron_lib.api.definitions import availability_zone as az_def
|
||||
@ -175,6 +176,10 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
|
||||
"port-mac-address-regenerate",
|
||||
"binding-extended"]
|
||||
|
||||
# List of agent types for which all binding_failed ports should try to be
|
||||
# rebound when agent revive
|
||||
_rebind_on_revive_agent_types = [const.AGENT_TYPE_OVS]
|
||||
|
||||
@property
|
||||
def supported_extension_aliases(self):
|
||||
if not hasattr(self, '_aliases'):
|
||||
@ -335,6 +340,49 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
|
||||
new_mac=port['mac_address'])
|
||||
return mac_change
|
||||
|
||||
@registry.receives(resources.AGENT, [events.AFTER_UPDATE])
|
||||
def _retry_binding_revived_agents(self, resource, event, trigger,
|
||||
**kwargs):
|
||||
context = kwargs['context']
|
||||
host = kwargs['host']
|
||||
agent = kwargs.get('agent', {})
|
||||
agent_status = agent.get('agent_status')
|
||||
|
||||
agent_type = agent.get('agent_type')
|
||||
|
||||
if (agent_status != agent_consts.AGENT_REVIVED or
|
||||
not agent.get('admin_state_up') or
|
||||
agent_type not in self._rebind_on_revive_agent_types):
|
||||
return
|
||||
|
||||
ports = ports_obj.Port.get_ports_by_binding_type_and_host(
|
||||
context, portbindings.VIF_TYPE_BINDING_FAILED, host)
|
||||
for port in ports:
|
||||
binding = self._get_binding_for_host(port.bindings, host)
|
||||
if not binding:
|
||||
LOG.debug('No bindings found for port %(port_id)s '
|
||||
'on host %(host)s',
|
||||
{'port_id': port.id, 'host': host})
|
||||
continue
|
||||
port_dict = self._make_port_dict(port.db_obj)
|
||||
network = self.get_network(context, port.network_id)
|
||||
try:
|
||||
levels = db.get_binding_level_objs(
|
||||
context, port.id, binding.host)
|
||||
# TODO(slaweq): use binding OVO instead of binding.db_obj when
|
||||
# ML2 plugin will switch to use Port Binding OVO everywhere
|
||||
mech_context = driver_context.PortContext(
|
||||
self, context, port_dict, network, binding.db_obj, levels)
|
||||
self._bind_port_if_needed(mech_context)
|
||||
except Exception as e:
|
||||
LOG.warning('Attempt to bind port %(port_id)s after agent '
|
||||
'%(agent_type)s on host %(host)s revived failed. '
|
||||
'Error: %(error)s',
|
||||
{'port_id': port.id,
|
||||
'agent_type': agent_type,
|
||||
'host': host,
|
||||
'error': e})
|
||||
|
||||
def _clear_port_binding(self, mech_context, binding, port, original_host):
|
||||
binding.vif_type = portbindings.VIF_TYPE_UNBOUND
|
||||
binding.vif_details = ''
|
||||
|
@ -80,6 +80,20 @@ class BaseFullStackTestCase(testlib_api.MySQLTestCaseMixin,
|
||||
class_name, test_name = self.id().split(".")[-2:]
|
||||
return "%s.%s" % (class_name, test_name)
|
||||
|
||||
def _wait_until_agent_up(self, agent_id):
|
||||
def _agent_up():
|
||||
agent = self.client.show_agent(agent_id)['agent']
|
||||
return agent.get('alive')
|
||||
|
||||
common_utils.wait_until_true(_agent_up)
|
||||
|
||||
def _wait_until_agent_down(self, agent_id):
|
||||
def _agent_down():
|
||||
agent = self.client.show_agent(agent_id)['agent']
|
||||
return not agent.get('alive')
|
||||
|
||||
common_utils.wait_until_true(_agent_down)
|
||||
|
||||
def _assert_ping_during_agents_restart(
|
||||
self, agents, src_namespace, ips, restart_timeout=10,
|
||||
ping_timeout=1, count=10):
|
||||
|
@ -79,13 +79,6 @@ class BaseDhcpAgentTest(base.BaseFullStackTestCase):
|
||||
|
||||
self.vm = self._spawn_vm()
|
||||
|
||||
def _wait_until_agent_down(self, agent_id):
|
||||
def _agent_down():
|
||||
agent = self.client.show_agent(agent_id)['agent']
|
||||
return not agent.get('alive')
|
||||
|
||||
common_utils.wait_until_true(_agent_down)
|
||||
|
||||
|
||||
class TestDhcpAgentNoHA(BaseDhcpAgentTest):
|
||||
|
||||
|
180
neutron/tests/fullstack/test_ports_rebind.py
Normal file
180
neutron/tests/fullstack/test_ports_rebind.py
Normal file
@ -0,0 +1,180 @@
|
||||
# Copyright 2018 Red Hat, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from neutron_lib.api.definitions import portbindings
|
||||
from neutron_lib import constants
|
||||
from oslo_utils import uuidutils
|
||||
|
||||
from neutron.common import utils as common_utils
|
||||
from neutron.tests.common.exclusive_resources import ip_network
|
||||
from neutron.tests.fullstack import base
|
||||
from neutron.tests.fullstack.resources import environment
|
||||
from neutron.tests.fullstack.resources import machine
|
||||
from neutron.tests.unit import testlib_api
|
||||
|
||||
load_tests = testlib_api.module_load_tests
|
||||
|
||||
|
||||
class TestPortsRebind(base.BaseFullStackTestCase):
|
||||
|
||||
scenarios = [
|
||||
('Open vSwitch Agent', {'l2_agent_type': constants.AGENT_TYPE_OVS}),
|
||||
('Linux Bridge Agent', {
|
||||
'l2_agent_type': constants.AGENT_TYPE_LINUXBRIDGE})]
|
||||
|
||||
def setUp(self):
|
||||
host_descriptions = [
|
||||
environment.HostDescription(
|
||||
l2_agent_type=self.l2_agent_type,
|
||||
l3_agent=self.use_l3_agent)]
|
||||
env = environment.Environment(
|
||||
environment.EnvironmentDescription(
|
||||
agent_down_time=10),
|
||||
host_descriptions)
|
||||
|
||||
super(TestPortsRebind, self).setUp(env)
|
||||
|
||||
self.l2_agent_process = self.environment.hosts[0].l2_agent
|
||||
self.l2_agent = self.safe_client.client.list_agents(
|
||||
agent_type=self.l2_agent_type)['agents'][0]
|
||||
|
||||
self.tenant_id = uuidutils.generate_uuid()
|
||||
self.network = self.safe_client.create_network(self.tenant_id)
|
||||
self.subnet = self.safe_client.create_subnet(
|
||||
self.tenant_id, self.network['id'], '20.0.0.0/24')
|
||||
|
||||
def _ensure_port_bound(self, port_id):
|
||||
|
||||
def port_bound():
|
||||
port = self.safe_client.client.show_port(port_id)['port']
|
||||
return (
|
||||
port[portbindings.VIF_TYPE] not in
|
||||
[portbindings.VIF_TYPE_UNBOUND,
|
||||
portbindings.VIF_TYPE_BINDING_FAILED])
|
||||
|
||||
common_utils.wait_until_true(port_bound)
|
||||
|
||||
def _ensure_port_binding_failed(self, port_id):
|
||||
|
||||
def port_binding_failed():
|
||||
port = self.safe_client.client.show_port(port_id)['port']
|
||||
return (port[portbindings.VIF_TYPE] ==
|
||||
portbindings.VIF_TYPE_BINDING_FAILED)
|
||||
|
||||
common_utils.wait_until_true(port_binding_failed)
|
||||
|
||||
|
||||
class TestVMPortRebind(TestPortsRebind):
|
||||
|
||||
use_l3_agent = False
|
||||
|
||||
def test_vm_port_rebound_when_L2_agent_revived(self):
|
||||
"""Test scenario
|
||||
|
||||
1. Create port which will be properly bound to host
|
||||
2. Stop L2 agent and wait until it will be DEAD
|
||||
3. Create another port - it should have "binding_failed"
|
||||
4. Turn on L2 agent
|
||||
5. Port from p.3 should be bound properly after L2 agent will be UP
|
||||
"""
|
||||
|
||||
vm_1 = self.useFixture(
|
||||
machine.FakeFullstackMachine(
|
||||
self.environment.hosts[0],
|
||||
self.network['id'],
|
||||
self.tenant_id,
|
||||
self.safe_client))
|
||||
vm_1.block_until_boot()
|
||||
self._ensure_port_bound(vm_1.neutron_port['id'])
|
||||
vm_1_port = self.safe_client.client.show_port(
|
||||
vm_1.neutron_port['id'])['port']
|
||||
|
||||
self.l2_agent_process = self.environment.hosts[0].l2_agent
|
||||
self.l2_agent = self.safe_client.client.list_agents(
|
||||
agent_type=self.l2_agent_type)['agents'][0]
|
||||
|
||||
self.l2_agent_process.stop()
|
||||
self._wait_until_agent_down(self.l2_agent['id'])
|
||||
|
||||
vm_2 = self.useFixture(
|
||||
machine.FakeFullstackMachine(
|
||||
self.environment.hosts[0],
|
||||
self.network['id'],
|
||||
self.tenant_id,
|
||||
self.safe_client))
|
||||
self._ensure_port_binding_failed(vm_2.neutron_port['id'])
|
||||
vm_2_port = self.safe_client.client.show_port(
|
||||
vm_2.neutron_port['id'])['port']
|
||||
|
||||
# check if vm_1 port is still bound as it was before
|
||||
self._ensure_port_bound(vm_1.neutron_port['id'])
|
||||
# and that revision number of vm_1's port wasn't changed
|
||||
self.assertEqual(
|
||||
vm_1_port['revision_number'],
|
||||
self.safe_client.client.show_port(
|
||||
vm_1_port['id'])['port']['revision_number'])
|
||||
|
||||
self.l2_agent_process.start()
|
||||
self._wait_until_agent_up(self.l2_agent['id'])
|
||||
|
||||
self._ensure_port_bound(vm_2_port['id'])
|
||||
|
||||
|
||||
class TestRouterPortRebind(TestPortsRebind):
|
||||
|
||||
use_l3_agent = True
|
||||
|
||||
def setUp(self):
|
||||
super(TestRouterPortRebind, self).setUp()
|
||||
|
||||
self.tenant_id = uuidutils.generate_uuid()
|
||||
self.ext_net = self.safe_client.create_network(
|
||||
self.tenant_id, external=True)
|
||||
ext_cidr = self.useFixture(
|
||||
ip_network.ExclusiveIPNetwork(
|
||||
"240.0.0.0", "240.255.255.255", "24")).network
|
||||
self.safe_client.create_subnet(
|
||||
self.tenant_id, self.ext_net['id'], ext_cidr)
|
||||
self.router = self.safe_client.create_router(
|
||||
self.tenant_id, external_network=self.ext_net['id'])
|
||||
|
||||
def test_vm_port_rebound_when_L2_agent_revived(self):
|
||||
"""Test scenario
|
||||
|
||||
1. Ensure that router gateway port is bound properly
|
||||
2. Stop L2 agent and wait until it will be DEAD
|
||||
3. Create router interface and check that it's port is "binding_failed"
|
||||
4. Turn on L2 agent
|
||||
5. Router's port created in p.3 should be now bound properly
|
||||
"""
|
||||
|
||||
if self.l2_agent_type == constants.AGENT_TYPE_LINUXBRIDGE:
|
||||
self.skipTest("Bug 1798085")
|
||||
|
||||
gw_port = self.safe_client.client.list_ports(
|
||||
device_id=self.router['id'],
|
||||
device_owner=constants.DEVICE_OWNER_ROUTER_GW)['ports'][0]
|
||||
self._ensure_port_bound(gw_port['id'])
|
||||
|
||||
self.l2_agent_process.stop()
|
||||
self._wait_until_agent_down(self.l2_agent['id'])
|
||||
|
||||
router_interface_info = self.safe_client.add_router_interface(
|
||||
self.router['id'], self.subnet['id'])
|
||||
self._ensure_port_binding_failed(router_interface_info['port_id'])
|
||||
|
||||
self.l2_agent_process.start()
|
||||
self._wait_until_agent_up(self.l2_agent['id'])
|
||||
|
||||
self._ensure_port_bound(router_interface_info['port_id'])
|
@ -18,6 +18,7 @@ import functools
|
||||
import fixtures
|
||||
import mock
|
||||
import netaddr
|
||||
from neutron_lib.agent import constants as agent_consts
|
||||
from neutron_lib.api.definitions import availability_zone as az_def
|
||||
from neutron_lib.api.definitions import external_net as extnet_apidef
|
||||
from neutron_lib.api.definitions import multiprovidernet as mpnet_apidef
|
||||
@ -710,6 +711,105 @@ class TestMl2DbOperationBoundsTenantRbac(TestMl2DbOperationBoundsTenant):
|
||||
self.make_port_in_shared_network, 'ports')
|
||||
|
||||
|
||||
class TestMl2RevivedAgentsBindPorts(Ml2PluginV2TestCase):
|
||||
|
||||
_mechanism_drivers = ['openvswitch', 'logger']
|
||||
|
||||
def _test__retry_binding_revived_agents(self, event, agent_status,
|
||||
admin_state_up, agent_type,
|
||||
ports, should_bind_ports):
|
||||
plugin = directory.get_plugin()
|
||||
context = mock.Mock()
|
||||
agent = {
|
||||
'agent_status': agent_status,
|
||||
'admin_state_up': admin_state_up,
|
||||
'agent_type': agent_type}
|
||||
host = "test_host"
|
||||
|
||||
binding = mock.MagicMock(
|
||||
vif_type=portbindings.VIF_TYPE_BINDING_FAILED, host=host)
|
||||
for port in ports:
|
||||
port.bindings = [binding]
|
||||
|
||||
with mock.patch(
|
||||
'neutron.objects.ports.Port.get_ports_by_binding_type_and_host',
|
||||
return_value=ports
|
||||
) as get_ports_by_binding_type_and_host, mock.patch.object(
|
||||
plugin, 'get_network', return_value=mock.Mock()
|
||||
) as get_network, mock.patch(
|
||||
'neutron.plugins.ml2.db.get_binding_level_objs',
|
||||
return_value=None
|
||||
) as get_binding_level_objs, mock.patch(
|
||||
'neutron.plugins.ml2.driver_context.PortContext'
|
||||
) as port_context, mock.patch.object(
|
||||
plugin, '_bind_port_if_needed'
|
||||
) as bind_port_if_needed:
|
||||
|
||||
plugin._retry_binding_revived_agents(
|
||||
resources.AGENT, event, plugin,
|
||||
**{'context': context, 'host': host, 'agent': agent})
|
||||
|
||||
if (agent_status == agent_consts.AGENT_ALIVE or
|
||||
not admin_state_up or
|
||||
agent_type not in plugin._rebind_on_revive_agent_types):
|
||||
get_ports_by_binding_type_and_host.assert_not_called()
|
||||
else:
|
||||
get_ports_by_binding_type_and_host.assert_called_once_with(
|
||||
context, portbindings.VIF_TYPE_BINDING_FAILED, host)
|
||||
|
||||
if should_bind_ports:
|
||||
get_network_expected_calls = [
|
||||
mock.call(context, port.network_id) for port in ports]
|
||||
get_network.assert_has_calls(get_network_expected_calls)
|
||||
get_binding_level_expected_calls = [
|
||||
mock.call(context, port.id, host) for port in ports]
|
||||
get_binding_level_objs.assert_has_calls(
|
||||
get_binding_level_expected_calls)
|
||||
bind_port_if_needed.assert_called_once_with(port_context())
|
||||
else:
|
||||
get_network.assert_not_called()
|
||||
get_binding_level_objs.assert_not_called()
|
||||
bind_port_if_needed.assert_not_called()
|
||||
|
||||
def test__retry_binding_revived_agents(self):
|
||||
port = mock.MagicMock(
|
||||
id=uuidutils.generate_uuid())
|
||||
self._test__retry_binding_revived_agents(
|
||||
events.AFTER_UPDATE, agent_consts.AGENT_REVIVED, True,
|
||||
constants.AGENT_TYPE_OVS, [port],
|
||||
should_bind_ports=True)
|
||||
|
||||
def test__retry_binding_revived_agents_no_binding_failed_ports(self):
|
||||
self._test__retry_binding_revived_agents(
|
||||
events.AFTER_UPDATE, agent_consts.AGENT_REVIVED, True,
|
||||
constants.AGENT_TYPE_OVS, [],
|
||||
should_bind_ports=False)
|
||||
|
||||
def test__retry_binding_revived_agents_alive_agent(self):
|
||||
port = mock.MagicMock(
|
||||
id=uuidutils.generate_uuid())
|
||||
self._test__retry_binding_revived_agents(
|
||||
events.AFTER_UPDATE, agent_consts.AGENT_ALIVE, True,
|
||||
constants.AGENT_TYPE_OVS, [port],
|
||||
should_bind_ports=False)
|
||||
|
||||
def test__retry_binding_revived_agents_not_binding_agent(self):
|
||||
port = mock.MagicMock(
|
||||
id=uuidutils.generate_uuid())
|
||||
self._test__retry_binding_revived_agents(
|
||||
events.AFTER_UPDATE, agent_consts.AGENT_REVIVED, True,
|
||||
"Other agent which don't support binding", [port],
|
||||
should_bind_ports=False)
|
||||
|
||||
def test__retry_binding_revived_agents_agent_admin_state_down(self):
|
||||
port = mock.MagicMock(
|
||||
id=uuidutils.generate_uuid())
|
||||
self._test__retry_binding_revived_agents(
|
||||
events.AFTER_UPDATE, agent_consts.AGENT_REVIVED, False,
|
||||
constants.AGENT_TYPE_OVS, [port],
|
||||
should_bind_ports=False)
|
||||
|
||||
|
||||
class TestMl2PortsV2(test_plugin.TestPortsV2, Ml2PluginV2TestCase):
|
||||
|
||||
def test__port_provisioned_with_blocks(self):
|
||||
|
Loading…
Reference in New Issue
Block a user