OVS agent QoS extension functional test for bandwidth limit rules

This functional test spawns the OVS agent, with bandwidth limit rules in
a policy attached to ports. Then it asserts that the low level OVS
bandwidth limits are set for each port.

To make this possible we refactor and extract the base OVS agent test
framework into neutron.tests.functional.agent.l2.base.

Partially-Implements: blueprint ml2-qos
Change-Id: Ie5424a257b9ca07afa72a39ae6f1551d6ad351e7
changes/12/210012/11
Miguel Angel Ajo 7 years ago committed by Ihar Hrachyshka
parent 088289acd2
commit a034115e61
  1. 0
      neutron/tests/functional/agent/l2/__init__.py
  2. 286
      neutron/tests/functional/agent/l2/base.py
  3. 0
      neutron/tests/functional/agent/l2/extensions/__init__.py
  4. 133
      neutron/tests/functional/agent/l2/extensions/test_ovs_agent_qos_extension.py
  5. 303
      neutron/tests/functional/agent/test_l2_ovs_agent.py

@ -0,0 +1,286 @@
# Copyright (c) 2015 Red Hat, Inc.
# Copyright (c) 2015 SUSE Linux Products GmbH
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import random
import eventlet
import mock
from oslo_config import cfg
from oslo_log import log as logging
from oslo_utils import uuidutils
from neutron.agent.common import config as agent_config
from neutron.agent.common import ovs_lib
from neutron.agent.l2.extensions import manager as ext_manager
from neutron.agent.linux import interface
from neutron.agent.linux import polling
from neutron.agent.linux import utils as agent_utils
from neutron.common import config as common_config
from neutron.common import constants as n_const
from neutron.common import utils
from neutron.plugins.common import constants as p_const
from neutron.plugins.ml2.drivers.openvswitch.agent.common import config \
as ovs_config
from neutron.plugins.ml2.drivers.openvswitch.agent.common import constants
from neutron.plugins.ml2.drivers.openvswitch.agent.openflow.ovs_ofctl \
import br_int
from neutron.plugins.ml2.drivers.openvswitch.agent.openflow.ovs_ofctl \
import br_phys
from neutron.plugins.ml2.drivers.openvswitch.agent.openflow.ovs_ofctl \
import br_tun
from neutron.plugins.ml2.drivers.openvswitch.agent import ovs_neutron_agent \
as ovs_agent
from neutron.tests.functional.agent.linux import base
LOG = logging.getLogger(__name__)
class OVSAgentTestFramework(base.BaseOVSLinuxTestCase):
def setUp(self):
super(OVSAgentTestFramework, self).setUp()
agent_rpc = ('neutron.plugins.ml2.drivers.openvswitch.agent.'
'ovs_neutron_agent.OVSPluginApi')
mock.patch(agent_rpc).start()
mock.patch('neutron.agent.rpc.PluginReportStateAPI').start()
self.br_int = base.get_rand_name(n_const.DEVICE_NAME_MAX_LEN,
prefix='br-int')
self.br_tun = base.get_rand_name(n_const.DEVICE_NAME_MAX_LEN,
prefix='br-tun')
patch_name_len = n_const.DEVICE_NAME_MAX_LEN - len("-patch-tun")
self.patch_tun = "%s-patch-tun" % self.br_int[patch_name_len:]
self.patch_int = "%s-patch-int" % self.br_tun[patch_name_len:]
self.ovs = ovs_lib.BaseOVS()
self.config = self._configure_agent()
self.driver = interface.OVSInterfaceDriver(self.config)
def _get_config_opts(self):
config = cfg.ConfigOpts()
config.register_opts(common_config.core_opts)
config.register_opts(interface.OPTS)
config.register_opts(ovs_config.ovs_opts, "OVS")
config.register_opts(ovs_config.agent_opts, "AGENT")
agent_config.register_interface_driver_opts_helper(config)
agent_config.register_agent_state_opts_helper(config)
ext_manager.register_opts(config)
return config
def _configure_agent(self):
config = self._get_config_opts()
config.set_override(
'interface_driver',
'neutron.agent.linux.interface.OVSInterfaceDriver')
config.set_override('integration_bridge', self.br_int, "OVS")
config.set_override('ovs_integration_bridge', self.br_int)
config.set_override('tunnel_bridge', self.br_tun, "OVS")
config.set_override('int_peer_patch_port', self.patch_tun, "OVS")
config.set_override('tun_peer_patch_port', self.patch_int, "OVS")
config.set_override('host', 'ovs-agent')
return config
def _bridge_classes(self):
return {
'br_int': br_int.OVSIntegrationBridge,
'br_phys': br_phys.OVSPhysicalBridge,
'br_tun': br_tun.OVSTunnelBridge
}
def create_agent(self, create_tunnels=True):
if create_tunnels:
tunnel_types = [p_const.TYPE_VXLAN]
else:
tunnel_types = None
local_ip = '192.168.10.1'
bridge_mappings = {'physnet': self.br_int}
agent = ovs_agent.OVSNeutronAgent(self._bridge_classes(),
self.br_int, self.br_tun,
local_ip, bridge_mappings,
polling_interval=1,
tunnel_types=tunnel_types,
prevent_arp_spoofing=False,
conf=self.config)
self.addCleanup(self.ovs.delete_bridge, self.br_int)
if tunnel_types:
self.addCleanup(self.ovs.delete_bridge, self.br_tun)
agent.sg_agent = mock.Mock()
return agent
def start_agent(self, agent):
self.setup_agent_rpc_mocks(agent)
polling_manager = polling.InterfacePollingMinimizer()
self.addCleanup(polling_manager.stop)
polling_manager.start()
agent_utils.wait_until_true(
polling_manager._monitor.is_active)
agent.check_ovs_status = mock.Mock(
return_value=constants.OVS_NORMAL)
t = eventlet.spawn(agent.rpc_loop, polling_manager)
def stop_agent(agent, rpc_loop_thread):
agent.run_daemon_loop = False
rpc_loop_thread.wait()
self.addCleanup(stop_agent, agent, t)
def _bind_ports(self, ports, network, agent):
devices = []
for port in ports:
dev = OVSAgentTestFramework._get_device_details(port, network)
vif_name = port.get('vif_name')
vif_id = uuidutils.generate_uuid(),
vif_port = ovs_lib.VifPort(
vif_name, "%s" % vif_id, 'id-%s' % vif_id,
port.get('mac_address'), agent.int_br)
dev['vif_port'] = vif_port
devices.append(dev)
agent._bind_devices(devices)
def _create_test_port_dict(self):
return {'id': uuidutils.generate_uuid(),
'mac_address': utils.get_random_mac(
'fa:16:3e:00:00:00'.split(':')),
'fixed_ips': [{
'ip_address': '10.%d.%d.%d' % (
random.randint(3, 254),
random.randint(3, 254),
random.randint(3, 254))}],
'vif_name': base.get_rand_name(
self.driver.DEV_NAME_LEN, self.driver.DEV_NAME_PREFIX)}
def _create_test_network_dict(self):
return {'id': uuidutils.generate_uuid(),
'tenant_id': uuidutils.generate_uuid()}
def _plug_ports(self, network, ports, agent, ip_len=24):
for port in ports:
self.driver.plug(
network.get('id'), port.get('id'), port.get('vif_name'),
port.get('mac_address'),
agent.int_br.br_name, namespace=None)
ip_cidrs = ["%s/%s" % (port.get('fixed_ips')[0][
'ip_address'], ip_len)]
self.driver.init_l3(port.get('vif_name'), ip_cidrs, namespace=None)
def _get_device_details(self, port, network):
dev = {'device': port['id'],
'port_id': port['id'],
'network_id': network['id'],
'network_type': 'vlan',
'physical_network': 'physnet',
'segmentation_id': 1,
'fixed_ips': port['fixed_ips'],
'device_owner': 'compute',
'admin_state_up': True}
return dev
def assert_bridge(self, br, exists=True):
self.assertEqual(exists, self.ovs.bridge_exists(br))
def assert_patch_ports(self, agent):
def get_peer(port):
return agent.int_br.db_get_val(
'Interface', port, 'options', check_error=True)
agent_utils.wait_until_true(
lambda: get_peer(self.patch_int) == {'peer': self.patch_tun})
agent_utils.wait_until_true(
lambda: get_peer(self.patch_tun) == {'peer': self.patch_int})
def assert_bridge_ports(self):
for port in [self.patch_tun, self.patch_int]:
self.assertTrue(self.ovs.port_exists(port))
def assert_vlan_tags(self, ports, agent):
for port in ports:
res = agent.int_br.db_get_val('Port', port.get('vif_name'), 'tag')
self.assertTrue(res)
def _expected_plugin_rpc_call(self, call, expected_devices, is_up=True):
"""Helper to check expected rpc call are received
:param call: The call to check
:param expected_devices The device for which call is expected
:param is_up True if expected_devices are devices that are set up,
False if expected_devices are devices that are set down
"""
if is_up:
rpc_devices = [
dev for args in call.call_args_list for dev in args[0][1]]
else:
rpc_devices = [
dev for args in call.call_args_list for dev in args[0][2]]
return not (set(expected_devices) - set(rpc_devices))
def create_test_ports(self, amount=3, **kwargs):
ports = []
for x in range(amount):
ports.append(self._create_test_port_dict(**kwargs))
return ports
def _mock_update_device(self, context, devices_up, devices_down, agent_id,
host=None):
dev_up = []
dev_down = []
for port in self.ports:
if devices_up and port['id'] in devices_up:
dev_up.append(port['id'])
if devices_down and port['id'] in devices_down:
dev_down.append({'device': port['id'], 'exists': True})
return {'devices_up': dev_up,
'failed_devices_up': [],
'devices_down': dev_down,
'failed_devices_down': []}
def setup_agent_rpc_mocks(self, agent):
def mock_device_details(context, devices, agent_id, host=None):
details = []
for port in self.ports:
if port['id'] in devices:
dev = self._get_device_details(
port, self.network)
details.append(dev)
return {'devices': details, 'failed_devices': []}
(agent.plugin_rpc.get_devices_details_list_and_failed_devices.
side_effect) = mock_device_details
agent.plugin_rpc.update_device_list.side_effect = (
self._mock_update_device)
def _prepare_resync_trigger(self, agent):
def mock_device_raise_exception(context, devices_up, devices_down,
agent_id, host=None):
agent.plugin_rpc.update_device_list.side_effect = (
self._mock_update_device)
raise Exception('Exception to trigger resync')
self.agent.plugin_rpc.update_device_list.side_effect = (
mock_device_raise_exception)
def wait_until_ports_state(self, ports, up):
port_ids = [p['id'] for p in ports]
agent_utils.wait_until_true(
lambda: self._expected_plugin_rpc_call(
self.agent.plugin_rpc.update_device_list, port_ids, up))
def setup_agent_and_ports(self, port_dicts, trigger_resync=False):
self.agent = self.create_agent()
self.start_agent(self.agent)
self.network = self._create_test_network_dict()
self.ports = port_dicts
if trigger_resync:
self._prepare_resync_trigger(self.agent)
self._plug_ports(self.network, self.ports, self.agent)

@ -0,0 +1,133 @@
# Copyright (c) 2015 Red Hat, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from oslo_utils import uuidutils
from neutron.objects.qos import policy
from neutron.objects.qos import rule
from neutron.tests.functional.agent.l2 import base
TEST_POLICY_ID1 = "a2d72369-4246-4f19-bd3c-af51ec8d70cd"
TEST_POLICY_ID2 = "46ebaec0-0570-43ac-82f6-60d2b03168c5"
TEST_BW_LIMIT_RULE_1 = rule.QosBandwidthLimitRule(
context=None,
id="5f126d84-551a-4dcf-bb01-0e9c0df0c793",
max_kbps=1000,
max_burst_kbps=10)
TEST_BW_LIMIT_RULE_2 = rule.QosBandwidthLimitRule(
context=None,
id="fa9128d9-44af-49b2-99bb-96548378ad42",
max_kbps=900,
max_burst_kbps=9)
class OVSAgentQoSExtensionTestFramework(base.OVSAgentTestFramework):
def setUp(self):
super(OVSAgentQoSExtensionTestFramework, self).setUp()
self.config.set_override('extensions', ['qos'], 'agent')
self._set_pull_mock()
def _set_pull_mock(self):
self.qos_policies = {}
def _pull_mock(context, resource_type, resource_id):
return self.qos_policies[resource_id]
self.pull = mock.patch(
'neutron.api.rpc.handlers.resources_rpc.'
'ResourcesPullRpcApi.pull').start()
self.pull.side_effect = _pull_mock
def set_test_qos_rules(self, policy_id, policy_rules):
"""This function sets the policy test rules to be exposed."""
qos_policy = policy.QosPolicy(
context=None,
tenant_id=uuidutils.generate_uuid(),
id=policy_id,
name="Test Policy Name",
description="This is a policy for testing purposes",
shared=False,
rules=policy_rules)
qos_policy.obj_reset_changes()
self.qos_policies[policy_id] = qos_policy
def _create_test_port_dict(self, policy_id=None):
port_dict = super(OVSAgentQoSExtensionTestFramework,
self)._create_test_port_dict()
port_dict['qos_policy_id'] = policy_id
return port_dict
def _get_device_details(self, port, network):
dev = super(OVSAgentQoSExtensionTestFramework,
self)._get_device_details(port, network)
dev['qos_policy_id'] = port['qos_policy_id']
return dev
def _assert_bandwidth_limit_rule_is_set(self, port, rule):
max_rate, burst = (
self.agent.int_br.get_qos_bw_limit_for_port(port['vif_name']))
self.assertEqual(max_rate, rule.max_kbps)
self.assertEqual(burst, rule.max_burst_kbps)
def _assert_bandwidth_limit_rule_not_set(self, port):
max_rate, burst = (
self.agent.int_br.get_qos_bw_limit_for_port(port['vif_name']))
self.assertIsNone(max_rate)
self.assertIsNone(burst)
class TestOVSAgentQosExtension(OVSAgentQoSExtensionTestFramework):
def test_port_creation_with_bandwidth_limit(self):
"""Make sure bandwidth limit rules are set in low level to ports."""
self.set_test_qos_rules(TEST_POLICY_ID1, [TEST_BW_LIMIT_RULE_1])
self.setup_agent_and_ports(
port_dicts=self.create_test_ports(amount=1,
policy_id=TEST_POLICY_ID1))
self.wait_until_ports_state(self.ports, up=True)
for port in self.ports:
self._assert_bandwidth_limit_rule_is_set(
port, TEST_BW_LIMIT_RULE_1)
def test_port_creation_with_different_bandwidth_limits(self):
"""Make sure different types of policies end on the right ports."""
self.set_test_qos_rules(TEST_POLICY_ID1, [TEST_BW_LIMIT_RULE_1])
self.set_test_qos_rules(TEST_POLICY_ID2, [TEST_BW_LIMIT_RULE_2])
port_dicts = self.create_test_ports(amount=3)
port_dicts[0]['qos_policy_id'] = TEST_POLICY_ID1
port_dicts[1]['qos_policy_id'] = TEST_POLICY_ID2
self.setup_agent_and_ports(port_dicts)
self.wait_until_ports_state(self.ports, up=True)
self._assert_bandwidth_limit_rule_is_set(self.ports[0],
TEST_BW_LIMIT_RULE_1)
self._assert_bandwidth_limit_rule_is_set(self.ports[1],
TEST_BW_LIMIT_RULE_2)
self._assert_bandwidth_limit_rule_not_set(self.ports[2])

@ -14,301 +14,34 @@
# License for the specific language governing permissions and limitations
# under the License.
import eventlet
import mock
import random
from oslo_config import cfg
from oslo_log import log as logging
from oslo_utils import uuidutils
from neutron.tests.functional.agent.l2 import base
from neutron.agent.common import config as agent_config
from neutron.agent.common import ovs_lib
from neutron.agent.linux import interface
from neutron.agent.linux import polling
from neutron.agent.linux import utils as agent_utils
from neutron.common import config as common_config
from neutron.common import constants as n_const
from neutron.common import utils
from neutron.plugins.common import constants as p_const
from neutron.plugins.ml2.drivers.openvswitch.agent.common import config \
as ovs_config
from neutron.plugins.ml2.drivers.openvswitch.agent.common import constants
from neutron.plugins.ml2.drivers.openvswitch.agent.openflow.ovs_ofctl \
import br_int
from neutron.plugins.ml2.drivers.openvswitch.agent.openflow.ovs_ofctl \
import br_phys
from neutron.plugins.ml2.drivers.openvswitch.agent.openflow.ovs_ofctl \
import br_tun
from neutron.plugins.ml2.drivers.openvswitch.agent import ovs_neutron_agent \
as ovs_agent
from neutron.tests.functional.agent.linux import base
LOG = logging.getLogger(__name__)
class TestOVSAgent(base.OVSAgentTestFramework):
def test_port_creation_and_deletion(self):
self.setup_agent_and_ports(
port_dicts=self.create_test_ports())
self.wait_until_ports_state(self.ports, up=True)
class OVSAgentTestFramework(base.BaseOVSLinuxTestCase):
def setUp(self):
super(OVSAgentTestFramework, self).setUp()
agent_rpc = ('neutron.plugins.ml2.drivers.openvswitch.agent.'
'ovs_neutron_agent.OVSPluginApi')
mock.patch(agent_rpc).start()
mock.patch('neutron.agent.rpc.PluginReportStateAPI').start()
self.br_int = base.get_rand_name(n_const.DEVICE_NAME_MAX_LEN,
prefix='br-int')
self.br_tun = base.get_rand_name(n_const.DEVICE_NAME_MAX_LEN,
prefix='br-tun')
patch_name_len = n_const.DEVICE_NAME_MAX_LEN - len("-patch-tun")
self.patch_tun = "%s-patch-tun" % self.br_int[patch_name_len:]
self.patch_int = "%s-patch-int" % self.br_tun[patch_name_len:]
self.ovs = ovs_lib.BaseOVS()
self.config = self._configure_agent()
self.driver = interface.OVSInterfaceDriver(self.config)
def _get_config_opts(self):
config = cfg.ConfigOpts()
config.register_opts(common_config.core_opts)
config.register_opts(interface.OPTS)
config.register_opts(ovs_config.ovs_opts, "OVS")
config.register_opts(ovs_config.agent_opts, "AGENT")
agent_config.register_interface_driver_opts_helper(config)
agent_config.register_agent_state_opts_helper(config)
return config
def _configure_agent(self):
config = self._get_config_opts()
config.set_override(
'interface_driver',
'neutron.agent.linux.interface.OVSInterfaceDriver')
config.set_override('integration_bridge', self.br_int, "OVS")
config.set_override('ovs_integration_bridge', self.br_int)
config.set_override('tunnel_bridge', self.br_tun, "OVS")
config.set_override('int_peer_patch_port', self.patch_tun, "OVS")
config.set_override('tun_peer_patch_port', self.patch_int, "OVS")
config.set_override('host', 'ovs-agent')
return config
def _bridge_classes(self):
return {
'br_int': br_int.OVSIntegrationBridge,
'br_phys': br_phys.OVSPhysicalBridge,
'br_tun': br_tun.OVSTunnelBridge
}
def create_agent(self, create_tunnels=True):
if create_tunnels:
tunnel_types = [p_const.TYPE_VXLAN]
else:
tunnel_types = None
local_ip = '192.168.10.1'
bridge_mappings = {'physnet': self.br_int}
agent = ovs_agent.OVSNeutronAgent(self._bridge_classes(),
self.br_int, self.br_tun,
local_ip, bridge_mappings,
polling_interval=1,
tunnel_types=tunnel_types,
prevent_arp_spoofing=False,
conf=self.config)
self.addCleanup(self.ovs.delete_bridge, self.br_int)
if tunnel_types:
self.addCleanup(self.ovs.delete_bridge, self.br_tun)
agent.sg_agent = mock.Mock()
return agent
def start_agent(self, agent):
polling_manager = polling.InterfacePollingMinimizer()
self.addCleanup(polling_manager.stop)
polling_manager.start()
agent_utils.wait_until_true(
polling_manager._monitor.is_active)
agent.check_ovs_status = mock.Mock(
return_value=constants.OVS_NORMAL)
t = eventlet.spawn(agent.rpc_loop, polling_manager)
def stop_agent(agent, rpc_loop_thread):
agent.run_daemon_loop = False
rpc_loop_thread.wait()
self.addCleanup(stop_agent, agent, t)
def _bind_ports(self, ports, network, agent):
devices = []
for port in ports:
dev = OVSAgentTestFramework._get_device_details(port, network)
vif_name = port.get('vif_name')
vif_id = uuidutils.generate_uuid(),
vif_port = ovs_lib.VifPort(
vif_name, "%s" % vif_id, 'id-%s' % vif_id,
port.get('mac_address'), agent.int_br)
dev['vif_port'] = vif_port
devices.append(dev)
agent._bind_devices(devices)
def _create_test_port_dict(self):
return {'id': uuidutils.generate_uuid(),
'mac_address': utils.get_random_mac(
'fa:16:3e:00:00:00'.split(':')),
'fixed_ips': [{
'ip_address': '10.%d.%d.%d' % (
random.randint(3, 254),
random.randint(3, 254),
random.randint(3, 254))}],
'vif_name': base.get_rand_name(
self.driver.DEV_NAME_LEN, self.driver.DEV_NAME_PREFIX)}
def _create_test_network_dict(self):
return {'id': uuidutils.generate_uuid(),
'tenant_id': uuidutils.generate_uuid()}
def _plug_ports(self, network, ports, agent, ip_len=24):
for port in ports:
self.driver.plug(
network.get('id'), port.get('id'), port.get('vif_name'),
port.get('mac_address'),
agent.int_br.br_name, namespace=None)
ip_cidrs = ["%s/%s" % (port.get('fixed_ips')[0][
'ip_address'], ip_len)]
self.driver.init_l3(port.get('vif_name'), ip_cidrs, namespace=None)
@staticmethod
def _get_device_details(port, network):
dev = {'device': port['id'],
'port_id': port['id'],
'network_id': network['id'],
'network_type': 'vlan',
'physical_network': 'physnet',
'segmentation_id': 1,
'fixed_ips': port['fixed_ips'],
'device_owner': 'compute',
'admin_state_up': True}
return dev
def assert_bridge(self, br, exists=True):
self.assertEqual(exists, self.ovs.bridge_exists(br))
def assert_patch_ports(self, agent):
def get_peer(port):
return agent.int_br.db_get_val(
'Interface', port, 'options', check_error=True)
agent_utils.wait_until_true(
lambda: get_peer(self.patch_int) == {'peer': self.patch_tun})
agent_utils.wait_until_true(
lambda: get_peer(self.patch_tun) == {'peer': self.patch_int})
def assert_bridge_ports(self):
for port in [self.patch_tun, self.patch_int]:
self.assertTrue(self.ovs.port_exists(port))
def assert_vlan_tags(self, ports, agent):
for port in ports:
res = agent.int_br.db_get_val('Port', port.get('vif_name'), 'tag')
self.assertTrue(res)
class TestOVSAgent(OVSAgentTestFramework):
def _expected_plugin_rpc_call(self, call, expected_devices, is_up=True):
"""Helper to check expected rpc call are received
:param call: The call to check
:param expected_devices The device for which call is expected
:param is_up True if expected_devices are devices that are set up,
False if expected_devices are devices that are set down
"""
if is_up:
rpc_devices = [
dev for args in call.call_args_list for dev in args[0][1]]
else:
rpc_devices = [
dev for args in call.call_args_list for dev in args[0][2]]
return not (set(expected_devices) - set(rpc_devices))
def _create_ports(self, network, agent, trigger_resync=False):
ports = []
for x in range(3):
ports.append(self._create_test_port_dict())
def mock_device_raise_exception(context, devices_up, devices_down,
agent_id, host=None):
agent.plugin_rpc.update_device_list.side_effect = (
mock_update_device)
raise Exception('Exception to trigger resync')
def mock_device_details(context, devices, agent_id, host=None):
details = []
for port in ports:
if port['id'] in devices:
dev = OVSAgentTestFramework._get_device_details(
port, network)
details.append(dev)
return {'devices': details, 'failed_devices': []}
def mock_update_device(context, devices_up, devices_down, agent_id,
host=None):
dev_up = []
dev_down = []
for port in ports:
if devices_up and port['id'] in devices_up:
dev_up.append(port['id'])
if devices_down and port['id'] in devices_down:
dev_down.append({'device': port['id'], 'exists': True})
return {'devices_up': dev_up,
'failed_devices_up': [],
'devices_down': dev_down,
'failed_devices_down': []}
(agent.plugin_rpc.get_devices_details_list_and_failed_devices.
side_effect) = mock_device_details
if trigger_resync:
agent.plugin_rpc.update_device_list.side_effect = (
mock_device_raise_exception)
else:
agent.plugin_rpc.update_device_list.side_effect = (
mock_update_device)
return ports
for port in self.ports:
self.agent.int_br.delete_port(port['vif_name'])
def test_port_creation_and_deletion(self):
agent = self.create_agent()
self.start_agent(agent)
network = self._create_test_network_dict()
ports = self._create_ports(network, agent)
self._plug_ports(network, ports, agent)
up_ports_ids = [p['id'] for p in ports]
agent_utils.wait_until_true(
lambda: self._expected_plugin_rpc_call(
agent.plugin_rpc.update_device_list, up_ports_ids))
down_ports_ids = [p['id'] for p in ports]
for port in ports:
agent.int_br.delete_port(port['vif_name'])
agent_utils.wait_until_true(
lambda: self._expected_plugin_rpc_call(
agent.plugin_rpc.update_device_list, down_ports_ids, False))
self.wait_until_ports_state(self.ports, up=False)
def test_resync_devices_set_up_after_exception(self):
agent = self.create_agent()
self.start_agent(agent)
network = self._create_test_network_dict()
ports = self._create_ports(network, agent, True)
self._plug_ports(network, ports, agent)
ports_ids = [p['id'] for p in ports]
agent_utils.wait_until_true(
lambda: self._expected_plugin_rpc_call(
agent.plugin_rpc.update_device_list, ports_ids))
self.setup_agent_and_ports(
port_dicts=self.create_test_ports(),
trigger_resync=True)
self.wait_until_ports_state(self.ports, up=True)
def test_port_vlan_tags(self):
agent = self.create_agent()
self.start_agent(agent)
network = self._create_test_network_dict()
ports = self._create_ports(network, agent)
ports_ids = [p['id'] for p in ports]
self._plug_ports(network, ports, agent)
agent_utils.wait_until_true(
lambda: self._expected_plugin_rpc_call(
agent.plugin_rpc.update_device_list, ports_ids))
self.assert_vlan_tags(ports, agent)
self.setup_agent_and_ports(
port_dicts=self.create_test_ports(),
trigger_resync=True)
self.wait_until_ports_state(self.ports, up=True)
self.assert_vlan_tags(self.ports, self.agent)
def test_assert_bridges_ports_vxlan(self):
agent = self.create_agent()

Loading…
Cancel
Save