neutron/neutron/tests/unit/plugins/ml2/drivers/agent/test__common_agent.py

588 lines
25 KiB
Python

# Copyright (c) 2016 IBM Corp.
#
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from neutron_lib.agent import constants as agent_consts
from neutron_lib.callbacks import events
from neutron_lib.callbacks import registry
from neutron_lib.callbacks import resources
from neutron_lib import constants
from oslo_config import cfg
import testtools
from neutron.agent.linux import bridge_lib
from neutron.plugins.ml2.drivers.agent import _agent_manager_base as amb
from neutron.plugins.ml2.drivers.agent import _common_agent as ca
from neutron.tests import base
LOCAL_IP = '192.168.0.33'
LOCAL_IPV6 = '2001:db8:1::33'
VXLAN_GROUPV6 = 'ff05::/120'
PORT_1 = 'abcdef01-12ddssdfds-fdsfsd'
DEVICE_1 = 'tapabcdef01-12'
NETWORK_ID = '57653b20-ed5b-4ed0-a31d-06f84e3fd909'
BRIDGE_MAPPING_VALUE = 'br-eth2'
BRIDGE_MAPPINGS = {'physnet0': BRIDGE_MAPPING_VALUE}
INTERFACE_MAPPINGS = {'physnet1': 'eth1'}
FAKE_DEFAULT_DEV = mock.Mock()
FAKE_DEFAULT_DEV.name = 'eth1'
PORT_DATA = {
"port_id": PORT_1,
"device": DEVICE_1
}
class TestCommonAgentLoop(base.BaseTestCase):
def setUp(self):
super(TestCommonAgentLoop, self).setUp()
# disable setting up periodic state reporting
cfg.CONF.set_override('report_interval', 0, 'AGENT')
cfg.CONF.set_default('firewall_driver',
'neutron.agent.firewall.NoopFirewallDriver',
group='SECURITYGROUP')
cfg.CONF.set_override('local_ip', LOCAL_IP, 'VXLAN')
self.get_bridge_names_p = mock.patch.object(bridge_lib,
'get_bridge_names')
self.get_bridge_names = self.get_bridge_names_p.start()
self.get_bridge_names.return_value = ["br-int", "brq1"]
manager = mock.Mock()
manager.get_all_devices.return_value = []
manager.get_agent_configurations.return_value = {}
manager.get_rpc_consumers.return_value = []
with mock.patch.object(ca.CommonAgentLoop, '_validate_manager_class'),\
mock.patch.object(ca.CommonAgentLoop, '_validate_rpc_endpoints'):
self.agent = ca.CommonAgentLoop(manager, 0, 10, 'fake_agent',
'foo-binary')
with mock.patch.object(self.agent, "daemon_loop"):
self.agent.start()
def test_treat_devices_removed_notify(self):
handler = mock.Mock()
registry.subscribe(handler, resources.PORT_DEVICE, events.AFTER_DELETE)
devices = [DEVICE_1]
self.agent.treat_devices_removed(devices)
handler.assert_called_once_with(mock.ANY, mock.ANY, self.agent,
context=mock.ANY, device=DEVICE_1,
port_id=mock.ANY)
def test_treat_devices_added_updated_notify(self):
handler = mock.Mock()
registry.subscribe(handler, resources.PORT_DEVICE, events.AFTER_UPDATE)
agent = self.agent
mock_details = {'device': 'dev123',
'port_id': 'port123',
'network_id': 'net123',
'admin_state_up': True,
'network_type': 'vlan',
'segmentation_id': 100,
'physical_network': 'physnet1',
'device_owner': 'horse'}
agent.plugin_rpc = mock.Mock()
agent.plugin_rpc.get_devices_details_list.return_value = [mock_details]
agent.mgr = mock.Mock()
agent.mgr.plug_interface.return_value = True
agent.treat_devices_added_updated(set(['dev123']))
handler.assert_called_once_with(mock.ANY, mock.ANY, self.agent,
context=mock.ANY,
device_details=mock_details)
def test_treat_devices_removed_with_existed_device(self):
agent = self.agent
agent.mgr.ensure_port_admin_state = mock.Mock()
devices = [DEVICE_1]
agent.network_ports[NETWORK_ID].append(PORT_DATA)
with mock.patch.object(agent.plugin_rpc,
"update_device_down") as fn_udd,\
mock.patch.object(agent.sg_agent,
"remove_devices_filter") as fn_rdf,\
mock.patch.object(agent.ext_manager,
"delete_port") as ext_mgr_delete_port:
fn_udd.return_value = {'device': DEVICE_1,
'exists': True}
with mock.patch.object(ca.LOG, 'info') as log:
resync = agent.treat_devices_removed(devices)
self.assertEqual(2, log.call_count)
self.assertFalse(resync)
self.assertTrue(fn_udd.called)
self.assertTrue(fn_rdf.called)
self.assertTrue(ext_mgr_delete_port.called)
self.assertNotIn(PORT_DATA, agent.network_ports[NETWORK_ID])
def test_treat_devices_removed_with_not_existed_device(self):
agent = self.agent
devices = [DEVICE_1]
agent.network_ports[NETWORK_ID].append(PORT_DATA)
with mock.patch.object(agent.plugin_rpc,
"update_device_down") as fn_udd,\
mock.patch.object(agent.sg_agent,
"remove_devices_filter") as fn_rdf,\
mock.patch.object(agent.ext_manager,
"delete_port") as ext_mgr_delete_port:
fn_udd.return_value = {'device': DEVICE_1,
'exists': False}
with mock.patch.object(ca.LOG, 'debug') as log:
resync = agent.treat_devices_removed(devices)
self.assertEqual(1, log.call_count)
self.assertFalse(resync)
self.assertTrue(fn_udd.called)
self.assertTrue(fn_rdf.called)
self.assertTrue(ext_mgr_delete_port.called)
self.assertNotIn(PORT_DATA, agent.network_ports[NETWORK_ID])
def test_treat_devices_removed_failed(self):
agent = self.agent
devices = [DEVICE_1]
agent.network_ports[NETWORK_ID].append(PORT_DATA)
with mock.patch.object(agent.plugin_rpc,
"update_device_down") as fn_udd,\
mock.patch.object(agent.sg_agent,
"remove_devices_filter") as fn_rdf,\
mock.patch.object(agent.ext_manager,
"delete_port") as ext_mgr_delete_port:
fn_udd.side_effect = Exception()
resync = agent.treat_devices_removed(devices)
self.assertTrue(resync)
self.assertTrue(fn_udd.called)
self.assertTrue(fn_rdf.called)
self.assertTrue(ext_mgr_delete_port.called)
self.assertNotIn(PORT_DATA, agent.network_ports[NETWORK_ID])
def test_treat_devices_removed_failed_extension(self):
agent = self.agent
devices = [DEVICE_1]
agent.network_ports[NETWORK_ID].append(PORT_DATA)
with mock.patch.object(agent.plugin_rpc,
"update_device_down") as fn_udd,\
mock.patch.object(agent.sg_agent,
"remove_devices_filter") as fn_rdf,\
mock.patch.object(agent.ext_manager,
"delete_port") as ext_mgr_delete_port:
ext_mgr_delete_port.side_effect = Exception()
resync = agent.treat_devices_removed(devices)
self.assertTrue(resync)
self.assertTrue(fn_udd.called)
self.assertTrue(fn_rdf.called)
self.assertTrue(ext_mgr_delete_port.called)
self.assertNotIn(PORT_DATA, agent.network_ports[NETWORK_ID])
def test_treat_devices_removed_delete_arp_spoofing(self):
agent = self.agent
agent._ensure_port_admin_state = mock.Mock()
devices = [DEVICE_1]
with mock.patch.object(agent.plugin_rpc,
"update_device_down") as fn_udd,\
mock.patch.object(agent.sg_agent,
"remove_devices_filter"):
fn_udd.return_value = {'device': DEVICE_1,
'exists': True}
with mock.patch.object(agent.mgr,
'delete_arp_spoofing_protection') as de_arp:
agent.treat_devices_removed(devices)
de_arp.assert_called_with(devices)
def test__get_devices_locally_modified(self):
new_ts = {1: 1000, 2: 2000, 3: 3000}
old_ts = {1: 10, 2: 2000, 4: 900}
# 3 and 4 are not returned because 3 is a new device and 4 is a
# removed device
self.assertEqual(
set([1]),
self.agent._get_devices_locally_modified(new_ts, old_ts))
def _test_scan_devices(self, previous, updated,
fake_current, expected, sync,
fake_ts_current=None):
self.agent.mgr = mock.Mock()
self.agent.mgr.get_all_devices.return_value = fake_current
self.agent.mgr.get_devices_modified_timestamps.return_value = (
fake_ts_current or {})
self.agent.rpc_callbacks.get_and_clear_updated_devices.return_value =\
updated
results = self.agent.scan_devices(previous, sync)
self.assertEqual(expected, results)
def test_scan_devices_no_changes(self):
previous = {'current': set([1, 2]),
'updated': set(),
'added': set(),
'removed': set(),
'timestamps': {}}
fake_current = set([1, 2])
updated = set()
expected = {'current': set([1, 2]),
'updated': set(),
'added': set(),
'removed': set(),
'timestamps': {}}
self._test_scan_devices(previous, updated, fake_current, expected,
sync=False)
def test_scan_devices_timestamp_triggers_updated(self):
previous = {'current': set([1, 2]),
'updated': set(),
'added': set(),
'removed': set(),
'timestamps': {2: 600}}
fake_current = set([1, 2])
updated = set()
expected = {'current': set([1, 2]),
'updated': set([2]),
'added': set(),
'removed': set(),
'timestamps': {2: 1000}}
self._test_scan_devices(previous, updated, fake_current, expected,
sync=False, fake_ts_current={2: 1000})
def test_scan_devices_added_removed(self):
previous = {'current': set([1, 2]),
'updated': set(),
'added': set(),
'removed': set(),
'timestamps': {}}
fake_current = set([2, 3])
updated = set()
expected = {'current': set([2, 3]),
'updated': set(),
'added': set([3]),
'removed': set([1]),
'timestamps': {}}
self._test_scan_devices(previous, updated, fake_current, expected,
sync=False)
def test_scan_devices_removed_retried_on_sync(self):
previous = {'current': set([2, 3]),
'updated': set(),
'added': set(),
'removed': set([1]),
'timestamps': {}}
fake_current = set([2, 3])
updated = set()
expected = {'current': set([2, 3]),
'updated': set(),
'added': set([2, 3]),
'removed': set([1]),
'timestamps': {}}
self._test_scan_devices(previous, updated, fake_current, expected,
sync=True)
def test_scan_devices_vanished_removed_on_sync(self):
previous = {'current': set([2, 3]),
'updated': set(),
'added': set(),
'removed': set([1]),
'timestamps': {}}
# Device 2 disappeared.
fake_current = set([3])
updated = set()
# Device 1 should be retried.
expected = {'current': set([3]),
'updated': set(),
'added': set([3]),
'removed': set([1, 2]),
'timestamps': {}}
self._test_scan_devices(previous, updated, fake_current, expected,
sync=True)
def test_scan_devices_updated(self):
previous = {'current': set([1, 2]),
'updated': set(),
'added': set(),
'removed': set(),
'timestamps': {}}
fake_current = set([1, 2])
updated = set([1])
expected = {'current': set([1, 2]),
'updated': set([1]),
'added': set(),
'removed': set(),
'timestamps': {}}
self._test_scan_devices(previous, updated, fake_current, expected,
sync=False)
def test_scan_devices_updated_non_existing(self):
previous = {'current': set([1, 2]),
'updated': set(),
'added': set(),
'removed': set(),
'timestamps': {}}
fake_current = set([1, 2])
updated = set([3])
expected = {'current': set([1, 2]),
'updated': set(),
'added': set(),
'removed': set(),
'timestamps': {}}
self._test_scan_devices(previous, updated, fake_current, expected,
sync=False)
def test_scan_devices_updated_deleted_concurrently(self):
previous = {
'current': set([1, 2]),
'updated': set(),
'added': set(),
'removed': set(),
'timestamps': {}
}
# Device 2 disappeared.
fake_current = set([1])
# Device 2 got an concurrent update via network_update
updated = set([2])
expected = {
'current': set([1]),
'updated': set(),
'added': set(),
'removed': set([2]),
'timestamps': {}
}
self._test_scan_devices(
previous, updated, fake_current, expected, sync=False
)
def test_scan_devices_updated_on_sync(self):
previous = {'current': set([1, 2]),
'updated': set([1]),
'added': set(),
'removed': set(),
'timestamps': {}}
fake_current = set([1, 2])
updated = set([2])
expected = {'current': set([1, 2]),
'updated': set([1, 2]),
'added': set([1, 2]),
'removed': set(),
'timestamps': {}}
self._test_scan_devices(previous, updated, fake_current, expected,
sync=True)
def test_scan_devices_with_delete_arp_protection(self):
previous = None
fake_current = set([1, 2])
updated = set()
expected = {'current': set([1, 2]),
'updated': set(),
'added': set([1, 2]),
'removed': set(),
'timestamps': {}}
self._test_scan_devices(previous, updated, fake_current, expected,
sync=False)
self.agent.mgr.delete_unreferenced_arp_protection.assert_called_with(
fake_current)
def test_process_network_devices(self):
agent = self.agent
device_info = {'current': set(),
'added': set(['tap3', 'tap4']),
'updated': set(['tap2', 'tap3']),
'removed': set(['tap1'])}
agent.sg_agent.setup_port_filters = mock.Mock()
agent.treat_devices_added_updated = mock.Mock(return_value=False)
agent.treat_devices_removed = mock.Mock(return_value=False)
agent.process_network_devices(device_info)
agent.sg_agent.setup_port_filters.assert_called_with(
device_info['added'],
device_info['updated'])
agent.treat_devices_added_updated.assert_called_with(set(['tap2',
'tap3',
'tap4']))
agent.treat_devices_removed.assert_called_with(set(['tap1']))
def test_treat_devices_added_updated_no_local_interface(self):
agent = self.agent
mock_details = {'device': 'dev123',
'port_id': 'port123',
'network_id': 'net123',
'admin_state_up': True,
'network_type': 'vlan',
'segmentation_id': 100,
'physical_network': 'physnet1',
'device_owner': constants.DEVICE_OWNER_NETWORK_PREFIX}
agent.ext_manager = mock.Mock()
agent.plugin_rpc = mock.Mock()
agent.plugin_rpc.get_devices_details_list.return_value = [mock_details]
agent.mgr = mock.Mock()
agent.mgr.plug_interface.return_value = False
agent.mgr.ensure_port_admin_state = mock.Mock()
agent.treat_devices_added_updated(set(['tap1']))
self.assertFalse(agent.mgr.ensure_port_admin_state.called)
def test_treat_devices_added_updated_admin_state_up_true(self):
agent = self.agent
mock_details = {'device': 'dev123',
'port_id': 'port123',
'network_id': 'net123',
'admin_state_up': True,
'network_type': 'vlan',
'segmentation_id': 100,
'physical_network': 'physnet1',
'device_owner': constants.DEVICE_OWNER_NETWORK_PREFIX}
mock_port_data = {
'port_id': mock_details['port_id'],
'device': mock_details['device']
}
agent.ext_manager = mock.Mock()
agent.plugin_rpc = mock.Mock()
agent.plugin_rpc.get_devices_details_list.return_value = [mock_details]
agent.mgr = mock.Mock()
agent.mgr.plug_interface.return_value = True
agent.mgr.ensure_port_admin_state = mock.Mock()
mock_segment = amb.NetworkSegment(mock_details['network_type'],
mock_details['physical_network'],
mock_details['segmentation_id'])
with mock.patch('neutron.plugins.ml2.drivers.agent.'
'_agent_manager_base.NetworkSegment',
return_value=mock_segment):
resync_needed = agent.treat_devices_added_updated(set(['tap1']))
self.assertFalse(resync_needed)
agent.rpc_callbacks.add_network.assert_called_with('net123',
mock_segment)
agent.mgr.plug_interface.assert_called_with(
'net123', mock_segment, 'dev123',
constants.DEVICE_OWNER_NETWORK_PREFIX)
self.assertTrue(agent.plugin_rpc.update_device_up.called)
self.assertTrue(agent.ext_manager.handle_port.called)
self.assertIn(mock_port_data, agent.network_ports[
mock_details['network_id']]
)
def test_treat_devices_added_updated_setup_arp_protection(self):
agent = self.agent
mock_details = {'device': 'dev123',
'port_id': 'port123',
'network_id': 'net123',
'admin_state_up': True,
'network_type': 'vlan',
'segmentation_id': 100,
'physical_network': 'physnet1',
'device_owner': constants.DEVICE_OWNER_NETWORK_PREFIX}
agent.plugin_rpc = mock.Mock()
agent.plugin_rpc.get_devices_details_list.return_value = [mock_details]
agent.mgr = mock.Mock()
agent.mgr.plug_interface.return_value = True
with mock.patch.object(agent.mgr,
'setup_arp_spoofing_protection') as set_arp:
agent.treat_devices_added_updated(set(['tap1']))
set_arp.assert_called_with(mock_details['device'], mock_details)
def test__process_device_if_exists_missing_intf(self):
mock_details = {'device': 'dev123',
'port_id': 'port123',
'network_id': 'net123',
'admin_state_up': True,
'network_type': 'vlan',
'segmentation_id': 100,
'physical_network': 'physnet1',
'device_owner': constants.DEVICE_OWNER_NETWORK_PREFIX}
self.agent.mgr = mock.Mock()
self.agent.mgr.get_all_devices.return_value = []
self.agent.mgr.plug_interface.side_effect = RuntimeError()
self.agent._process_device_if_exists(mock_details)
def test__process_device_if_exists_error(self):
mock_details = {'device': 'dev123',
'port_id': 'port123',
'network_id': 'net123',
'admin_state_up': True,
'network_type': 'vlan',
'segmentation_id': 100,
'physical_network': 'physnet1',
'device_owner': constants.DEVICE_OWNER_NETWORK_PREFIX}
self.agent.mgr = mock.Mock()
self.agent.mgr.get_all_devices.return_value = ['dev123']
self.agent.mgr.plug_interface.side_effect = RuntimeError()
with testtools.ExpectedException(RuntimeError):
# device exists so it should raise
self.agent._process_device_if_exists(mock_details)
def test_set_rpc_timeout(self):
self.agent.stop()
for rpc_client in (self.agent.plugin_rpc.client,
self.agent.sg_plugin_rpc.client,
self.agent.state_rpc.client):
self.assertEqual(cfg.CONF.AGENT.quitting_rpc_timeout,
rpc_client.timeout)
def test_set_rpc_timeout_no_value(self):
self.agent.quitting_rpc_timeout = None
with mock.patch.object(self.agent, 'set_rpc_timeout') as mock_set_rpc:
self.agent.stop()
self.assertFalse(mock_set_rpc.called)
def test_report_state_revived(self):
with mock.patch.object(self.agent.state_rpc,
"report_state") as report_st:
report_st.return_value = agent_consts.AGENT_REVIVED
self.agent._report_state()
self.assertTrue(self.agent.fullsync)
def test_update_network_ports(self):
port_1_data = PORT_DATA
NETWORK_2_ID = 'fake_second_network'
port_2_data = {
'port_id': 'fake_port_2',
'device': 'fake_port_2_device_name'
}
self.agent.network_ports[NETWORK_ID].append(
port_1_data
)
self.agent.network_ports[NETWORK_ID].append(
port_2_data
)
# check update port:
self.agent._update_network_ports(
NETWORK_2_ID, port_2_data['port_id'], port_2_data['device']
)
self.assertNotIn(port_2_data, self.agent.network_ports[NETWORK_ID])
self.assertIn(port_2_data, self.agent.network_ports[NETWORK_2_ID])
def test_clean_network_ports(self):
port_1_data = PORT_DATA
port_2_data = {
'port_id': 'fake_port_2',
'device': 'fake_port_2_device_name'
}
self.agent.network_ports[NETWORK_ID].append(
port_1_data
)
self.agent.network_ports[NETWORK_ID].append(
port_2_data
)
# check removing port from network when other ports are still there:
cleaned_port_id = self.agent._clean_network_ports(DEVICE_1)
self.assertIn(NETWORK_ID, self.agent.network_ports.keys())
self.assertNotIn(port_1_data, self.agent.network_ports[NETWORK_ID])
self.assertIn(port_2_data, self.agent.network_ports[NETWORK_ID])
self.assertEqual(PORT_1, cleaned_port_id)
# and now remove last port from network:
cleaned_port_id = self.agent._clean_network_ports(
port_2_data['device']
)
self.assertNotIn(NETWORK_ID, self.agent.network_ports.keys())
self.assertEqual(port_2_data['port_id'], cleaned_port_id)