Merge pull request #474 from morgabra/port_network_plugin
add port-level network_plugin
This commit is contained in:
@@ -45,7 +45,10 @@ EXTENDED_ATTRIBUTES_2_0 = {
|
||||
"vlan_id": {
|
||||
"allow_post": False,
|
||||
"allow_put": False,
|
||||
"is_visible": True}}}
|
||||
"is_visible": True},
|
||||
"network_plugin": {"allow_post": True, "allow_put": False,
|
||||
"enforce_policy": True,
|
||||
"is_visible": False, "default": ''}}}
|
||||
|
||||
|
||||
class Ports_quark(object):
|
||||
|
||||
@@ -0,0 +1,23 @@
|
||||
"""add_port_network_plugin
|
||||
|
||||
Revision ID: 374c1bdb4480
|
||||
Revises: 4da4444d7706
|
||||
Create Date: 2015-10-20 12:08:24.780056
|
||||
|
||||
"""
|
||||
|
||||
# revision identifiers, used by Alembic.
|
||||
revision = '374c1bdb4480'
|
||||
down_revision = '4da4444d7706'
|
||||
|
||||
from alembic import op
|
||||
import sqlalchemy as sa
|
||||
|
||||
|
||||
def upgrade():
|
||||
op.add_column('quark_ports', sa.Column('network_plugin',
|
||||
sa.String(length=36), nullable=True))
|
||||
|
||||
|
||||
def downgrade():
|
||||
op.drop_column('quark_ports', 'network_plugin')
|
||||
@@ -1 +1 @@
|
||||
4da4444d7706
|
||||
374c1bdb4480
|
||||
@@ -430,6 +430,8 @@ class Port(BASEV2, models.HasTenant, models.HasId, IsHazTags):
|
||||
bridge = sa.Column(sa.String(255))
|
||||
associations = orm.relationship(PortIpAssociation, backref="port")
|
||||
|
||||
network_plugin = sa.Column(sa.String(36), nullable=True)
|
||||
|
||||
@declarative.declared_attr
|
||||
def ip_addresses(cls):
|
||||
primaryjoin = cls.id == port_ip_association_table.c.port_id
|
||||
|
||||
@@ -18,6 +18,10 @@ from quark.drivers import optimized_nvp_driver as optnvp
|
||||
from quark.drivers.registry_base import DriverRegistryBase
|
||||
from quark.drivers import unmanaged
|
||||
|
||||
from oslo_log import log as logging
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class DriverRegistry(DriverRegistryBase):
|
||||
def __init__(self):
|
||||
@@ -28,5 +32,45 @@ class DriverRegistry(DriverRegistryBase):
|
||||
optnvp.OptimizedNVPDriver.get_name(): optnvp.OptimizedNVPDriver(),
|
||||
unmanaged.UnmanagedDriver.get_name(): unmanaged.UnmanagedDriver()})
|
||||
|
||||
# You may optionally specify a port-level driver name that will
|
||||
# be used intead of the underlying network driver. This map determines
|
||||
# which drivers are allowed to be used in this way.
|
||||
# example: {"MY_DRIVER": ["MY_OTHER_DRIVER"]}
|
||||
# The above example would allow ports created with "MY_DRIVER"
|
||||
# specified to be used with networks that use "MY_OTHER_DRIVER",
|
||||
# but *not* the inverse.
|
||||
# Note that drivers are automatically compatible with themselves.
|
||||
self.port_driver_compat_map = {}
|
||||
|
||||
def get_driver(self, net_driver, port_driver=None):
|
||||
LOG.info("Selecting driver for net_driver:%s "
|
||||
"port_driver:%s" % (net_driver, port_driver))
|
||||
|
||||
if port_driver:
|
||||
|
||||
# Check port_driver is valid driver
|
||||
if port_driver not in self.drivers:
|
||||
raise Exception("Driver %s is not registered." % port_driver)
|
||||
|
||||
# Net drivers are compatible with themselves
|
||||
if port_driver == net_driver:
|
||||
LOG.info("Selecting port_driver:%s" % (port_driver))
|
||||
return self.drivers[port_driver]
|
||||
|
||||
# Check port_driver is compatible with the given net_driver
|
||||
allowed = self.port_driver_compat_map.get(port_driver, [])
|
||||
if net_driver not in allowed:
|
||||
raise Exception("Port driver %s not allowed for "
|
||||
"underlying network driver %s."
|
||||
% (port_driver, net_driver))
|
||||
|
||||
LOG.info("Selecting port_driver:%s" % (port_driver))
|
||||
return self.drivers[port_driver]
|
||||
|
||||
elif net_driver in self.drivers:
|
||||
LOG.info("Selecting net_driver:%s" % (net_driver))
|
||||
return self.drivers[net_driver]
|
||||
|
||||
raise Exception("Driver %s is not registered." % net_driver)
|
||||
|
||||
DRIVER_REGISTRY = DriverRegistry()
|
||||
|
||||
@@ -43,6 +43,19 @@ def _raise_if_unauthorized(tenant_id, net):
|
||||
raise exceptions.NotAuthorized()
|
||||
|
||||
|
||||
def _get_net_driver(network, port=None):
|
||||
port_driver = None
|
||||
if port and port.get("network_plugin"):
|
||||
port_driver = port.get("network_plugin")
|
||||
|
||||
try:
|
||||
return registry.DRIVER_REGISTRY.get_driver(
|
||||
network["network_plugin"], port_driver=port_driver)
|
||||
except Exception as e:
|
||||
raise exceptions.BadRequest(resource="ports",
|
||||
msg="invalid network_plugin: %s" % e)
|
||||
|
||||
|
||||
def split_and_validate_requested_subnets(context, net_id, segment_id,
|
||||
fixed_ips):
|
||||
subnets = []
|
||||
@@ -90,7 +103,7 @@ def create_port(context, port):
|
||||
port_attrs = port["port"]
|
||||
|
||||
admin_only = ["mac_address", "device_owner", "bridge", "admin_state_up",
|
||||
"use_forbidden_mac_range"]
|
||||
"use_forbidden_mac_range", "network_plugin"]
|
||||
utils.filter_body(context, port_attrs, admin_only=admin_only)
|
||||
|
||||
port_attrs = port["port"]
|
||||
@@ -144,7 +157,7 @@ def create_port(context, port):
|
||||
|
||||
ipam_driver = ipam.IPAM_REGISTRY.get_strategy(net["ipam_strategy"])
|
||||
|
||||
net_driver = registry.DRIVER_REGISTRY.get_driver(net["network_plugin"])
|
||||
net_driver = _get_net_driver(net, port=port_attrs)
|
||||
|
||||
# TODO(anyone): security groups are not currently supported on port create.
|
||||
# Please see JIRA:NCP-801
|
||||
@@ -285,7 +298,7 @@ def update_port(context, id, port):
|
||||
|
||||
admin_only = ["mac_address", "device_owner", "bridge", "admin_state_up",
|
||||
"device_id"]
|
||||
always_filter = ["network_id", "backend_key"]
|
||||
always_filter = ["network_id", "backend_key", "network_plugin"]
|
||||
utils.filter_body(context, port_dict, admin_only=admin_only,
|
||||
always_filter=always_filter)
|
||||
|
||||
@@ -380,8 +393,9 @@ def update_port(context, id, port):
|
||||
port_dict["addresses"] = port_db["ip_addresses"]
|
||||
port_dict["addresses"].extend(addresses)
|
||||
|
||||
net_driver = registry.DRIVER_REGISTRY.get_driver(
|
||||
port_db.network["network_plugin"])
|
||||
# NOTE(morgabra) Updating network_plugin on port objects is explicitly
|
||||
# disallowed in the api, so we use whatever exists in the db.
|
||||
net_driver = _get_net_driver(port_db.network, port=port_db)
|
||||
|
||||
# TODO(anyone): What do we want to have happen here if this fails? Is it
|
||||
# ok to continue to keep the IPs but fail to apply security
|
||||
@@ -520,8 +534,7 @@ def delete_port(context, id):
|
||||
ipam_driver.deallocate_ips_by_port(
|
||||
context, port, ipam_reuse_after=CONF.QUARK.ipam_reuse_after)
|
||||
|
||||
net_driver = registry.DRIVER_REGISTRY.get_driver(
|
||||
port.network["network_plugin"])
|
||||
net_driver = _get_net_driver(port.network, port=port)
|
||||
net_driver.delete_port(context, backend_key, device_id=port["device_id"],
|
||||
mac_address=port["mac_address"])
|
||||
|
||||
@@ -531,8 +544,7 @@ def delete_port(context, id):
|
||||
|
||||
def _diag_port(context, port, fields):
|
||||
p = v._make_port_dict(port)
|
||||
net_driver = registry.DRIVER_REGISTRY.get_driver(
|
||||
port.network["network_plugin"])
|
||||
net_driver = _get_net_driver(port.network, port=port)
|
||||
if 'config' in fields:
|
||||
p.update(net_driver.diag_port(
|
||||
context, port["backend_key"], get_status='status' in fields))
|
||||
|
||||
@@ -1132,11 +1132,49 @@ class TestPortDiagnose(test_quark_plugin.TestQuarkPlugin):
|
||||
self.plugin.diagnose_port(self.context, 1, [])
|
||||
|
||||
|
||||
class TestPortBadNetworkPlugin(test_quark_plugin.TestQuarkPlugin):
|
||||
class TestPortNetworkPlugin(test_quark_plugin.TestQuarkPlugin):
|
||||
@contextlib.contextmanager
|
||||
def _stubs(self, network=None, addr=None, mac=None, compat_map=None):
|
||||
network["ipam_strategy"] = "ANY"
|
||||
|
||||
# Mock out the driver registry
|
||||
foo_driver = mock.Mock()
|
||||
foo_driver.create_port.return_value = {"uuid": 1}
|
||||
bar_driver = mock.Mock()
|
||||
bar_driver.create_port.return_value = {"uuid": 1}
|
||||
drivers = {"FOO": foo_driver,
|
||||
"BAR": bar_driver}
|
||||
compat_map = compat_map or {}
|
||||
|
||||
with contextlib.nested(
|
||||
mock.patch("quark.db.api.port_create"),
|
||||
mock.patch("quark.db.api.network_find"),
|
||||
mock.patch("quark.ipam.QuarkIpam.allocate_ip_address"),
|
||||
mock.patch("quark.ipam.QuarkIpam.allocate_mac_address"),
|
||||
mock.patch("oslo_utils.uuidutils.generate_uuid"),
|
||||
mock.patch("quark.plugin_views._make_port_dict"),
|
||||
mock.patch("quark.db.api.port_count_all"),
|
||||
mock.patch("neutron.quota.QuotaEngine.limit_check"),
|
||||
mock.patch("quark.plugin_modules.ports.registry."
|
||||
"DRIVER_REGISTRY.drivers",
|
||||
new_callable=mock.PropertyMock(return_value=drivers)),
|
||||
mock.patch("quark.plugin_modules.ports.registry."
|
||||
"DRIVER_REGISTRY.port_driver_compat_map",
|
||||
new_callable=mock.PropertyMock(return_value=compat_map))
|
||||
) as (port_create, net_find, alloc_ip, alloc_mac, gen_uuid, make_port,
|
||||
port_count, limit_check, _, _):
|
||||
net_find.return_value = network
|
||||
alloc_ip.return_value = addr
|
||||
alloc_mac.return_value = mac
|
||||
gen_uuid.return_value = 1
|
||||
port_count.return_value = 0
|
||||
yield port_create, alloc_mac, net_find
|
||||
|
||||
def test_create_port_with_bad_network_plugin_fails(self):
|
||||
network_dict = dict(id=1)
|
||||
network_dict = dict(id=1, tenant_id=self.context.tenant_id)
|
||||
port_name = "foobar"
|
||||
mac = dict(address="AA:BB:CC:DD:EE:FF")
|
||||
ip = dict()
|
||||
port = dict(port=dict(mac_address=mac["address"], network_id=1,
|
||||
tenant_id=self.context.tenant_id, device_id=2,
|
||||
name=port_name))
|
||||
@@ -1147,20 +1185,153 @@ class TestPortBadNetworkPlugin(test_quark_plugin.TestQuarkPlugin):
|
||||
port_model.update(port)
|
||||
port_models = port_model
|
||||
|
||||
with contextlib.nested(
|
||||
mock.patch("quark.db.api.port_create"),
|
||||
mock.patch("quark.db.api.network_find"),
|
||||
mock.patch("quark.ipam.QuarkIpam.allocate_ip_address"),
|
||||
mock.patch("quark.ipam.QuarkIpam.allocate_mac_address"),
|
||||
) as (port_create, net_find, alloc_ip, alloc_mac):
|
||||
with self._stubs(network=network, addr=ip,
|
||||
mac=mac) as (port_create, alloc_mac, net_find):
|
||||
port_create.return_value = port_models
|
||||
net_find.return_value = network
|
||||
alloc_ip.return_value = {}
|
||||
alloc_mac.return_value = mac
|
||||
|
||||
with self.assertRaises(Exception): # noqa
|
||||
exc = "Driver FAIL is not registered."
|
||||
with self.assertRaisesRegexp(exceptions.BadRequest, exc):
|
||||
self.plugin.create_port(self.context, port)
|
||||
|
||||
def test_create_port_with_bad_port_network_plugin_fails(self):
|
||||
network_dict = dict(id=1, tenant_id=self.context.tenant_id)
|
||||
port_name = "foobar"
|
||||
mac = dict(address="AA:BB:CC:DD:EE:FF")
|
||||
ip = dict()
|
||||
port = dict(port=dict(mac_address=mac["address"], network_id=1,
|
||||
tenant_id=self.context.tenant_id, device_id=2,
|
||||
name=port_name, network_plugin="FAIL"))
|
||||
network = models.Network()
|
||||
network.update(network_dict)
|
||||
network["network_plugin"] = "FOO"
|
||||
port_model = models.Port()
|
||||
port_model.update(port)
|
||||
port_models = port_model
|
||||
|
||||
with self._stubs(network=network, addr=ip,
|
||||
mac=mac) as (port_create, alloc_mac, net_find):
|
||||
port_create.return_value = port_models
|
||||
|
||||
exc = "Driver FAIL is not registered."
|
||||
admin_ctx = self.context.elevated()
|
||||
with self.assertRaisesRegexp(exceptions.BadRequest, exc):
|
||||
self.plugin.create_port(admin_ctx, port)
|
||||
|
||||
def test_create_port_with_incompatable_port_network_plugin_fails(self):
|
||||
network_dict = dict(id=1, tenant_id=self.context.tenant_id)
|
||||
port_name = "foobar"
|
||||
mac = dict(address="AA:BB:CC:DD:EE:FF")
|
||||
ip = dict()
|
||||
port = dict(port=dict(mac_address=mac["address"], network_id=1,
|
||||
tenant_id=self.context.tenant_id, device_id=2,
|
||||
name=port_name, network_plugin="BAR"))
|
||||
network = models.Network()
|
||||
network.update(network_dict)
|
||||
network["network_plugin"] = "FOO"
|
||||
port_model = models.Port()
|
||||
port_model.update(port)
|
||||
port_models = port_model
|
||||
|
||||
with self._stubs(network=network, addr=ip,
|
||||
mac=mac) as (port_create, alloc_mac, net_find):
|
||||
port_create.return_value = port_models
|
||||
|
||||
exc = ("Port driver BAR not allowed for underlying network "
|
||||
"driver FOO.")
|
||||
admin_ctx = self.context.elevated()
|
||||
with self.assertRaisesRegexp(exceptions.BadRequest, exc):
|
||||
self.plugin.create_port(admin_ctx, port)
|
||||
|
||||
def test_create_port_with_port_network_plugin(self):
|
||||
network = dict(id=1, tenant_id=self.context.tenant_id,
|
||||
network_plugin="FOO")
|
||||
mac = dict(address="AA:BB:CC:DD:EE:FF")
|
||||
port_name = "foobar"
|
||||
ip = dict()
|
||||
|
||||
port = dict(port=dict(mac_address=mac["address"], network_id=1,
|
||||
tenant_id=self.context.tenant_id, device_id=2,
|
||||
name=port_name, device_owner="quark_tests",
|
||||
bridge="quark_bridge", admin_state_up=False))
|
||||
|
||||
expected_mac = "DE:AD:BE:EF:00:00"
|
||||
expected_bridge = "new_bridge"
|
||||
expected_device_owner = "new_device_owner"
|
||||
expected_admin_state = "new_state"
|
||||
expected_network_plugin = "FOO"
|
||||
|
||||
port_create_dict = {}
|
||||
port_create_dict["port"] = port["port"].copy()
|
||||
port_create_dict["port"]["mac_address"] = expected_mac
|
||||
port_create_dict["port"]["device_owner"] = expected_device_owner
|
||||
port_create_dict["port"]["bridge"] = expected_bridge
|
||||
port_create_dict["port"]["admin_state_up"] = expected_admin_state
|
||||
port_create_dict["port"]["network_plugin"] = expected_network_plugin
|
||||
|
||||
admin_ctx = self.context.elevated()
|
||||
with self._stubs(network=network, addr=ip,
|
||||
mac=mac) as (port_create, alloc_mac, net_find):
|
||||
self.plugin.create_port(admin_ctx, port_create_dict)
|
||||
|
||||
alloc_mac.assert_called_once_with(
|
||||
admin_ctx, network["id"], 1,
|
||||
cfg.CONF.QUARK.ipam_reuse_after,
|
||||
mac_address=expected_mac, use_forbidden_mac_range=False)
|
||||
|
||||
port_create.assert_called_once_with(
|
||||
admin_ctx, bridge=expected_bridge, uuid=1, name="foobar",
|
||||
admin_state_up=expected_admin_state, network_id=1,
|
||||
tenant_id="fake", id=1, device_owner=expected_device_owner,
|
||||
mac_address=mac["address"], device_id=2, backend_key=1,
|
||||
security_groups=[], addresses=[],
|
||||
network_plugin=expected_network_plugin)
|
||||
|
||||
def test_create_port_with_compatable_port_network_plugin(self):
|
||||
network = dict(id=1, tenant_id=self.context.tenant_id,
|
||||
network_plugin="FOO")
|
||||
mac = dict(address="AA:BB:CC:DD:EE:FF")
|
||||
port_name = "foobar"
|
||||
ip = dict()
|
||||
|
||||
port = dict(port=dict(mac_address=mac["address"], network_id=1,
|
||||
tenant_id=self.context.tenant_id, device_id=2,
|
||||
name=port_name, device_owner="quark_tests",
|
||||
bridge="quark_bridge", admin_state_up=False))
|
||||
|
||||
expected_mac = "DE:AD:BE:EF:00:00"
|
||||
expected_bridge = "new_bridge"
|
||||
expected_device_owner = "new_device_owner"
|
||||
expected_admin_state = "new_state"
|
||||
expected_network_plugin = "BAR"
|
||||
|
||||
port_create_dict = {}
|
||||
port_create_dict["port"] = port["port"].copy()
|
||||
port_create_dict["port"]["mac_address"] = expected_mac
|
||||
port_create_dict["port"]["device_owner"] = expected_device_owner
|
||||
port_create_dict["port"]["bridge"] = expected_bridge
|
||||
port_create_dict["port"]["admin_state_up"] = expected_admin_state
|
||||
port_create_dict["port"]["network_plugin"] = expected_network_plugin
|
||||
|
||||
compat_map = {"BAR": ["FOO"]}
|
||||
admin_ctx = self.context.elevated()
|
||||
with self._stubs(network=network, addr=ip, mac=mac,
|
||||
compat_map=compat_map) as (port_create, alloc_mac,
|
||||
net_find):
|
||||
self.plugin.create_port(admin_ctx, port_create_dict)
|
||||
|
||||
alloc_mac.assert_called_once_with(
|
||||
admin_ctx, network["id"], 1,
|
||||
cfg.CONF.QUARK.ipam_reuse_after,
|
||||
mac_address=expected_mac, use_forbidden_mac_range=False)
|
||||
|
||||
port_create.assert_called_once_with(
|
||||
admin_ctx, bridge=expected_bridge, uuid=1, name="foobar",
|
||||
admin_state_up=expected_admin_state, network_id=1,
|
||||
tenant_id="fake", id=1, device_owner=expected_device_owner,
|
||||
mac_address=mac["address"], device_id=2, backend_key=1,
|
||||
security_groups=[], addresses=[],
|
||||
network_plugin=expected_network_plugin)
|
||||
|
||||
|
||||
class TestQuarkPortCreateFiltering(test_quark_plugin.TestQuarkPlugin):
|
||||
@contextlib.contextmanager
|
||||
@@ -1202,6 +1373,7 @@ class TestQuarkPortCreateFiltering(test_quark_plugin.TestQuarkPlugin):
|
||||
port_create_dict["port"]["device_owner"] = "ignored"
|
||||
port_create_dict["port"]["bridge"] = "ignored"
|
||||
port_create_dict["port"]["admin_state_up"] = "ignored"
|
||||
port_create_dict["port"]["network_plugin"] = "ignored"
|
||||
|
||||
with self._stubs(network=network, addr=ip,
|
||||
mac=mac) as (port_create, alloc_mac, net_find):
|
||||
@@ -1231,6 +1403,7 @@ class TestQuarkPortCreateFiltering(test_quark_plugin.TestQuarkPlugin):
|
||||
expected_bridge = "new_bridge"
|
||||
expected_device_owner = "new_device_owner"
|
||||
expected_admin_state = "new_state"
|
||||
expected_network_plugin = "BASE"
|
||||
|
||||
port_create_dict = {}
|
||||
port_create_dict["port"] = port["port"].copy()
|
||||
@@ -1238,6 +1411,7 @@ class TestQuarkPortCreateFiltering(test_quark_plugin.TestQuarkPlugin):
|
||||
port_create_dict["port"]["device_owner"] = expected_device_owner
|
||||
port_create_dict["port"]["bridge"] = expected_bridge
|
||||
port_create_dict["port"]["admin_state_up"] = expected_admin_state
|
||||
port_create_dict["port"]["network_plugin"] = expected_network_plugin
|
||||
|
||||
admin_ctx = self.context.elevated()
|
||||
with self._stubs(network=network, addr=ip,
|
||||
@@ -1254,7 +1428,8 @@ class TestQuarkPortCreateFiltering(test_quark_plugin.TestQuarkPlugin):
|
||||
admin_state_up=expected_admin_state, network_id=1,
|
||||
tenant_id="fake", id=1, device_owner=expected_device_owner,
|
||||
mac_address=mac["address"], device_id=2, backend_key=1,
|
||||
security_groups=[], addresses=[])
|
||||
security_groups=[], addresses=[],
|
||||
network_plugin=expected_network_plugin)
|
||||
|
||||
|
||||
class TestQuarkPortUpdateFiltering(test_quark_plugin.TestQuarkPlugin):
|
||||
@@ -1274,7 +1449,8 @@ class TestQuarkPortUpdateFiltering(test_quark_plugin.TestQuarkPlugin):
|
||||
new_port["port"] = {
|
||||
"mac_address": "DD:EE:FF:00:00:00", "device_owner": "new_owner",
|
||||
"bridge": "new_bridge", "admin_state_up": False, "device_id": 3,
|
||||
"network_id": 10, "backend_key": 1234, "name": "new_name"}
|
||||
"network_id": 10, "backend_key": 1234, "name": "new_name",
|
||||
"network_plugin": "BASE"}
|
||||
|
||||
with self._stubs() as (port_find, port_update):
|
||||
self.plugin.update_port(self.context, 1, new_port)
|
||||
@@ -1289,7 +1465,8 @@ class TestQuarkPortUpdateFiltering(test_quark_plugin.TestQuarkPlugin):
|
||||
new_port["port"] = {
|
||||
"mac_address": "DD:EE:FF:00:00:00", "device_owner": "new_owner",
|
||||
"bridge": "new_bridge", "admin_state_up": False, "device_id": 3,
|
||||
"network_id": 10, "backend_key": 1234, "name": "new_name"}
|
||||
"network_id": 10, "backend_key": 1234, "name": "new_name",
|
||||
"network_plugin": "BASE"}
|
||||
|
||||
admin_ctx = self.context.elevated()
|
||||
with self._stubs() as (port_find, port_update):
|
||||
|
||||
98
quark/tests/test_driver_registry.py
Normal file
98
quark/tests/test_driver_registry.py
Normal file
@@ -0,0 +1,98 @@
|
||||
# Copyright 2013 Openstack Foundation
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from quark.drivers import registry
|
||||
from quark.drivers import registry_base
|
||||
from quark.tests import test_base
|
||||
|
||||
|
||||
class FakeBaseRegistry(registry_base.DriverRegistryBase):
|
||||
|
||||
def __init__(self):
|
||||
self.drivers = {"test_driver_1": 1,
|
||||
"test_driver_2": 2}
|
||||
|
||||
|
||||
class FakeNetDriverRegistry(registry.DriverRegistry):
|
||||
|
||||
def __init__(self):
|
||||
self.drivers = {"test_driver_1": 1,
|
||||
"test_driver_2": 2,
|
||||
"test_driver_3": 3}
|
||||
|
||||
self.port_driver_compat_map = {
|
||||
"test_driver_2": ["test_driver_1"],
|
||||
"test_driver_3": ["test_driver_1",
|
||||
"test_driver_2"]
|
||||
}
|
||||
|
||||
|
||||
class TestRegistryBase(test_base.TestBase):
|
||||
|
||||
def setUp(self):
|
||||
self.registry = FakeBaseRegistry()
|
||||
|
||||
def test_get_valid(self):
|
||||
driver = self.registry.get_driver("test_driver_1")
|
||||
self.assertEqual(driver, 1)
|
||||
|
||||
driver = self.registry.get_driver("test_driver_2")
|
||||
self.assertEqual(driver, 2)
|
||||
|
||||
def test_get_invalid(self):
|
||||
exc = "Driver does_not_exist is not registered."
|
||||
with self.assertRaisesRegexp(Exception, exc):
|
||||
self.registry.get_driver("does_not_exist")
|
||||
|
||||
|
||||
class TestDriverRegistry(TestRegistryBase):
|
||||
|
||||
def setUp(self):
|
||||
self.registry = FakeNetDriverRegistry()
|
||||
|
||||
def test_get_port_driver(self):
|
||||
driver = self.registry.get_driver(
|
||||
"test_driver_1", port_driver="test_driver_1")
|
||||
self.assertEqual(driver, 1)
|
||||
|
||||
driver = self.registry.get_driver(
|
||||
"test_driver_2", port_driver="test_driver_2")
|
||||
self.assertEqual(driver, 2)
|
||||
|
||||
def test_get_invalid_port_driver(self):
|
||||
exc = "Driver does_not_exist is not registered."
|
||||
with self.assertRaisesRegexp(Exception, exc):
|
||||
self.registry.get_driver(
|
||||
"test_driver_1", port_driver="does_not_exist")
|
||||
|
||||
def test_get_compatable_port_driver(self):
|
||||
driver = self.registry.get_driver(
|
||||
"test_driver_1", port_driver="test_driver_2")
|
||||
self.assertEqual(driver, 2)
|
||||
|
||||
driver = self.registry.get_driver(
|
||||
"test_driver_1", port_driver="test_driver_3")
|
||||
self.assertEqual(driver, 3)
|
||||
|
||||
driver = self.registry.get_driver(
|
||||
"test_driver_2", port_driver="test_driver_3")
|
||||
self.assertEqual(driver, 3)
|
||||
|
||||
def test_get_incompatable_port_driver(self):
|
||||
exc = ("Port driver test_driver_2 not allowed for "
|
||||
"underlying network driver test_driver_3.")
|
||||
with self.assertRaisesRegexp(Exception, exc):
|
||||
self.registry.get_driver(
|
||||
"test_driver_3", port_driver="test_driver_2")
|
||||
Reference in New Issue
Block a user