Merge "ovs agent: signal to plugin if tunnel refresh needed"
This commit is contained in:
@@ -114,6 +114,8 @@ class PluginApi(object):
|
||||
get_devices_details_list_and_failed_devices
|
||||
1.6 - Support get_network_details
|
||||
1.7 - Support get_ports_by_vnic_type_and_host
|
||||
1.8 - Rename agent_restarted to refresh_tunnels in
|
||||
update_device_list to reflect its expanded purpose
|
||||
'''
|
||||
|
||||
def __init__(self, topic):
|
||||
@@ -161,8 +163,8 @@ class PluginApi(object):
|
||||
agent_id=agent_id, host=host)
|
||||
|
||||
def update_device_list(self, context, devices_up, devices_down,
|
||||
agent_id, host, agent_restarted=False):
|
||||
cctxt = self.client.prepare(version='1.5')
|
||||
agent_id, host, refresh_tunnels=False):
|
||||
cctxt = self.client.prepare(version='1.8')
|
||||
|
||||
ret_devices_up = []
|
||||
failed_devices_up = []
|
||||
@@ -178,7 +180,7 @@ class PluginApi(object):
|
||||
devices_up=devices_up[i:i + step],
|
||||
devices_down=devices_down[i:i + step],
|
||||
agent_id=agent_id, host=host,
|
||||
agent_restarted=agent_restarted)
|
||||
refresh_tunnels=refresh_tunnels)
|
||||
ret_devices_up.extend(ret.get("devices_up", []))
|
||||
failed_devices_up.extend(ret.get("failed_devices_up", []))
|
||||
ret_devices_down.extend(ret.get("devices_down", []))
|
||||
|
||||
@@ -271,7 +271,7 @@ class L2populationMechanismDriver(api.MechanismDriver):
|
||||
self.L2populationAgentNotify.remove_fdb_entries(
|
||||
self.rpc_ctx, fdb_entries)
|
||||
|
||||
def update_port_up(self, context, agent_restarted=False):
|
||||
def update_port_up(self, context, refresh_tunnels=False):
|
||||
port = context.current
|
||||
agent_host = context.host
|
||||
port_context = context._plugin_context
|
||||
@@ -285,7 +285,8 @@ class L2populationMechanismDriver(api.MechanismDriver):
|
||||
|
||||
agent_active_ports = l2pop_db.get_agent_network_active_port_count(
|
||||
port_context, agent_host, network_id)
|
||||
|
||||
LOG.debug("host: %s, agent_active_ports: %s, refresh_tunnels: %s",
|
||||
agent_host, agent_active_ports, refresh_tunnels)
|
||||
agent_ip = l2pop_db.get_agent_ip(agent)
|
||||
segment = context.bottom_bound_segment
|
||||
if not self._validate_segment(segment, port['id'], agent):
|
||||
@@ -297,7 +298,7 @@ class L2populationMechanismDriver(api.MechanismDriver):
|
||||
# with high concurrency more than 1 port may be activated on an agent
|
||||
# at the same time (like VM port + a DVR port) so checking for 1 or 2
|
||||
is_first_port = agent_active_ports in (1, 2)
|
||||
if is_first_port or agent_restarted:
|
||||
if is_first_port or refresh_tunnels:
|
||||
# First port(s) activated on current agent in this network,
|
||||
# we have to provide it with the whole list of fdb entries
|
||||
agent_fdb_entries = self._create_agent_fdb(port_context,
|
||||
|
||||
@@ -1096,6 +1096,7 @@ class OVSNeutronAgent(l2population_rpc.L2populationRpcCallBackTunnelMixin,
|
||||
devices_up = []
|
||||
devices_down = []
|
||||
failed_devices = []
|
||||
tunnels_missing = False
|
||||
port_names = [p['vif_port'].port_name for p in need_binding_ports]
|
||||
port_info = self.int_br.get_ports_attributes(
|
||||
"Port", columns=["name", "tag"], ports=port_names, if_exists=True)
|
||||
@@ -1130,6 +1131,10 @@ class OVSNeutronAgent(l2population_rpc.L2populationRpcCallBackTunnelMixin,
|
||||
if port_detail.get('admin_state_up'):
|
||||
LOG.debug("Setting status for %s to UP", device)
|
||||
devices_up.append(device)
|
||||
if (not tunnels_missing and
|
||||
lvm.network_type in constants.TUNNEL_NETWORK_TYPES and
|
||||
len(lvm.tun_ofports) == 0):
|
||||
tunnels_missing = True
|
||||
else:
|
||||
LOG.debug("Setting status for %s to DOWN", device)
|
||||
devices_down.append(device)
|
||||
@@ -1138,11 +1143,12 @@ class OVSNeutronAgent(l2population_rpc.L2populationRpcCallBackTunnelMixin,
|
||||
# the initialization work. L2 pop needs this precise knowledge
|
||||
# to notify the agent to refresh the tunnel related flows.
|
||||
# Otherwise, these flows will be cleaned as stale due to the
|
||||
# different cookie id.
|
||||
agent_restarted = self.iter_num == 0
|
||||
# different cookie id. We also set refresh_tunnels if the agent
|
||||
# has not received a notification and is missing tunnels.
|
||||
refresh_tunnels = (self.iter_num == 0) or tunnels_missing
|
||||
devices_set = self.plugin_rpc.update_device_list(
|
||||
self.context, devices_up, devices_down, self.agent_id,
|
||||
self.conf.host, agent_restarted=agent_restarted)
|
||||
self.conf.host, refresh_tunnels=refresh_tunnels)
|
||||
failed_devices = (devices_set.get('failed_devices_up') +
|
||||
devices_set.get('failed_devices_down'))
|
||||
if failed_devices:
|
||||
|
||||
@@ -55,7 +55,10 @@ class RpcCallbacks(type_tunnel.TunnelRpcCallbackMixin):
|
||||
# get_devices_details_list_and_failed_devices
|
||||
# 1.6 Support get_network_details
|
||||
# 1.7 Support get_ports_by_vnic_type_and_host
|
||||
target = oslo_messaging.Target(version='1.7')
|
||||
# 1.8 Rename agent_restarted to refresh_tunnels in
|
||||
# update_device_list to reflect its expanded purpose
|
||||
|
||||
target = oslo_messaging.Target(version='1.8')
|
||||
|
||||
def __init__(self, notifier, type_manager):
|
||||
self.setup_tunnel_callback_mixin(notifier, type_manager)
|
||||
@@ -267,7 +270,10 @@ class RpcCallbacks(type_tunnel.TunnelRpcCallbackMixin):
|
||||
@profiler.trace("rpc")
|
||||
def update_device_up(self, rpc_context, **kwargs):
|
||||
"""Device is up on agent."""
|
||||
agent_restarted = kwargs.pop('agent_restarted', False)
|
||||
refresh_tunnels = kwargs.pop('refresh_tunnels', False)
|
||||
if not refresh_tunnels:
|
||||
# For backward compatibility with older agents
|
||||
refresh_tunnels = kwargs.pop('agent_restarted', False)
|
||||
agent_id, host, device = self._get_request_details(kwargs)
|
||||
LOG.debug("Device %(device)s up at agent %(agent_id)s",
|
||||
{'device': device, 'agent_id': agent_id})
|
||||
@@ -301,7 +307,7 @@ class RpcCallbacks(type_tunnel.TunnelRpcCallbackMixin):
|
||||
self.update_port_status_to_active(port, rpc_context, port_id, host)
|
||||
self.notify_l2pop_port_wiring(port_id, rpc_context,
|
||||
n_const.PORT_STATUS_ACTIVE, host,
|
||||
agent_restarted)
|
||||
refresh_tunnels)
|
||||
|
||||
def update_port_status_to_active(self, port, rpc_context, port_id, host):
|
||||
plugin = directory.get_plugin()
|
||||
@@ -325,7 +331,7 @@ class RpcCallbacks(type_tunnel.TunnelRpcCallbackMixin):
|
||||
provisioning_blocks.L2_AGENT_ENTITY)
|
||||
|
||||
def notify_l2pop_port_wiring(self, port_id, rpc_context,
|
||||
status, host, agent_restarted=False):
|
||||
status, host, refresh_tunnels=False):
|
||||
"""Notify the L2pop driver that a port has been wired/unwired.
|
||||
|
||||
The L2pop driver uses this notification to broadcast forwarding
|
||||
@@ -349,7 +355,7 @@ class RpcCallbacks(type_tunnel.TunnelRpcCallbackMixin):
|
||||
# handle DVR ports while restart neutron-*-agent, we need to handle
|
||||
# it here.
|
||||
if (port['device_owner'] == n_const.DEVICE_OWNER_DVR_INTERFACE and
|
||||
not agent_restarted):
|
||||
not refresh_tunnels):
|
||||
return
|
||||
port = port_context.current
|
||||
if (port['device_owner'] != n_const.DEVICE_OWNER_DVR_INTERFACE and
|
||||
@@ -365,7 +371,7 @@ class RpcCallbacks(type_tunnel.TunnelRpcCallbackMixin):
|
||||
port_context.current['status'] = status
|
||||
port_context.current[portbindings.HOST_ID] = host
|
||||
if status == n_const.PORT_STATUS_ACTIVE:
|
||||
l2pop_driver.obj.update_port_up(port_context, agent_restarted)
|
||||
l2pop_driver.obj.update_port_up(port_context, refresh_tunnels)
|
||||
else:
|
||||
l2pop_driver.obj.update_port_down(port_context)
|
||||
|
||||
|
||||
@@ -336,7 +336,7 @@ class OVSAgentTestFramework(base.BaseOVSLinuxTestCase, OVSOFControllerHelper):
|
||||
return ports
|
||||
|
||||
def _mock_update_device(self, context, devices_up, devices_down, agent_id,
|
||||
host=None, agent_restarted=False):
|
||||
host=None, refresh_tunnels=False):
|
||||
dev_up = []
|
||||
dev_down = []
|
||||
for port in self.ports:
|
||||
@@ -382,7 +382,7 @@ class OVSAgentTestFramework(base.BaseOVSLinuxTestCase, OVSOFControllerHelper):
|
||||
|
||||
def mock_failed_devices_up(context, devices_up, devices_down,
|
||||
agent_id, host=None,
|
||||
agent_restarted=False):
|
||||
refresh_tunnels=False):
|
||||
failed_devices = []
|
||||
devices = list(devices_up)
|
||||
# first port fails
|
||||
@@ -404,7 +404,7 @@ class OVSAgentTestFramework(base.BaseOVSLinuxTestCase, OVSOFControllerHelper):
|
||||
|
||||
def mock_failed_devices_down(context, devices_up, devices_down,
|
||||
agent_id, host=None,
|
||||
agent_restarted=False):
|
||||
refresh_tunnels=False):
|
||||
# first port fails
|
||||
failed_port_id = self.ports[0]['id']
|
||||
failed_devices_down = []
|
||||
|
||||
@@ -373,7 +373,7 @@ class TestL2PopulationRpcTestCase(test_plugin.Ml2PluginV2TestCase):
|
||||
agent_id=HOST_4,
|
||||
device=port_id,
|
||||
host=HOST_4,
|
||||
agent_restarted=True)
|
||||
refresh_tunnels=True)
|
||||
fanout_expected = {port['port']['network_id']: {
|
||||
'network_type': u'vxlan',
|
||||
'ports': {u'20.0.0.4': [('00:00:00:00:00:00', '0.0.0.0')]},
|
||||
|
||||
@@ -774,7 +774,41 @@ class TestOvsNeutronAgent(object):
|
||||
update_devices.assert_called_once_with(mock.ANY, devices_up,
|
||||
devices_down,
|
||||
mock.ANY, mock.ANY,
|
||||
agent_restarted=True)
|
||||
refresh_tunnels=True)
|
||||
|
||||
def _test_bind_devices_sets_refresh_tunnels(self, tun_ofports, expected):
|
||||
self.agent.iter_num = 3
|
||||
self.agent.prevent_arp_spoofing = False
|
||||
self.agent.vlan_manager.add('fake_network', 1,
|
||||
n_const.TYPE_VXLAN, None, 1)
|
||||
ovs_db_list = [{'name': 'fake_device', 'tag': []}]
|
||||
self.agent.vlan_manager.get('fake_network').tun_ofports = tun_ofports
|
||||
vif_port = mock.Mock()
|
||||
vif_port.port_name = 'fake_device'
|
||||
vif_port.ofport = 1
|
||||
need_binding_ports = [{'network_id': 'fake_network',
|
||||
'vif_port': vif_port,
|
||||
'device': 'fake_device',
|
||||
'admin_state_up': True}]
|
||||
with mock.patch.object(
|
||||
self.agent.plugin_rpc, 'update_device_list',
|
||||
return_value={'devices_up': [],
|
||||
'devices_down': [],
|
||||
'failed_devices_up': [],
|
||||
'failed_devices_down': []}) as update_devices, \
|
||||
mock.patch.object(self.agent,
|
||||
'int_br') as int_br:
|
||||
int_br.get_ports_attributes.return_value = ovs_db_list
|
||||
self.agent._bind_devices(need_binding_ports)
|
||||
update_devices.assert_called_once_with(mock.ANY, ['fake_device'],
|
||||
[], mock.ANY, mock.ANY,
|
||||
refresh_tunnels=expected)
|
||||
|
||||
def test_bind_devices_sets_refresh_tunnels_if_tunnels_missing(self):
|
||||
self._test_bind_devices_sets_refresh_tunnels([], True)
|
||||
|
||||
def test_bind_devices_does_not_set_refresh_tunnels_if_tunnels_exist(self):
|
||||
self._test_bind_devices_sets_refresh_tunnels([1, 2, 3], False)
|
||||
|
||||
def _test_arp_spoofing(self, enable_prevent_arp_spoofing):
|
||||
self.agent.prevent_arp_spoofing = enable_prevent_arp_spoofing
|
||||
|
||||
@@ -479,8 +479,8 @@ class RpcApiTestCase(base.BaseTestCase):
|
||||
devices_down=['fake_device3', 'fake_device4'],
|
||||
agent_id='fake_agent_id',
|
||||
host='fake_host',
|
||||
agent_restarted=False,
|
||||
version='1.5')
|
||||
refresh_tunnels=False,
|
||||
version='1.8')
|
||||
|
||||
def test_get_devices_details_list_and_failed_devices(self):
|
||||
rpcapi = agent_rpc.PluginApi(topics.PLUGIN)
|
||||
|
||||
Reference in New Issue
Block a user