Adds coverage over drivers and db custom types

Adds more coverage tests for some portions of the driver logic and for
the db custom types. I decided to abstain from implementing some of the
optimized driver coverage tests at this time because the only thing one
can test there is that python and sqlalchemy work correctly. Otherwise,
they're just convenience mthods.
This commit is contained in:
Matt Dietz
2013-07-31 22:48:33 +00:00
parent e628cb8e00
commit ddd61ca084
11 changed files with 273 additions and 27 deletions

View File

@@ -30,9 +30,7 @@ quark_opts = [
"after deallocation.")),
cfg.StrOpt("strategy_driver",
default='quark.network_strategy.JSONStrategy',
help=_("Tree of network assignment strategy")),
cfg.StrOpt('net_driver_cfg', default='/etc/neutron/quark.ini',
help=_("Path to the config for the net driver"))
help=_("Tree of network assignment strategy"))
]

View File

@@ -23,15 +23,15 @@ class BaseDriver(object):
Usable as a replacement for the sample plugin.
"""
def load_config(self, path):
LOG.info("load_config %s" % path)
def load_config(self):
LOG.info("load_config")
def get_connection(self):
LOG.info("get_connection")
def create_network(self, tenant_id, network_name, tags=None,
def create_network(self, context, network_name, tags=None,
network_id=None, **kwargs):
LOG.info("create_network %s %s %s" % (tenant_id, network_name,
LOG.info("create_network %s %s %s" % (context, network_name,
tags))
def delete_network(self, context, network_id):

View File

@@ -68,7 +68,7 @@ class NVPDriver(base.BaseDriver):
'max_rules_per_group': 0,
'max_rules_per_port': 0}
def load_config(self, path):
def load_config(self):
#NOTE(mdietz): What does default_tz actually mean?
# We don't have one default.
default_tz = CONF.NVP.default_tz

View File

@@ -60,9 +60,7 @@ class OptimizedNVPDriver(NVPDriver):
update_port(context, port_id, status=status,
security_groups=security_groups,
allowed_pairs=allowed_pairs)
port = context.session.query(LSwitchPort).\
filter(LSwitchPort.port_id == port_id).\
first()
port = self._lport_select_by_id(context, port_id)
port.update(nvp_port)
def delete_port(self, context, port_id, lswitch_uuid=None):
@@ -85,9 +83,7 @@ class OptimizedNVPDriver(NVPDriver):
def delete_security_group(self, context, group_id):
super(OptimizedNVPDriver, self).\
delete_security_group(context, group_id)
group = context.session.query(SecurityProfile).\
filter(SecurityProfile.id == group_id).\
first()
group = self._query_security_group(context, group_id)
context.session.delete(group)
def _lport_select_by_id(self, context, port_id):
@@ -178,9 +174,9 @@ class OptimizedNVPDriver(NVPDriver):
port = self._lport_select_by_id(context, port_id)
return port.switch.nvp_id
def _get_security_group_id(self, context, group_id):
def _query_security_group(self, context, group_id):
return context.session.query(SecurityProfile).\
filter(SecurityProfile.id == group_id).first().nvp_id
filter(SecurityProfile.id == group_id).first()
def _make_security_rule_dict(self, rule):
res = {"port_range_min": rule.get("port_range_min"),
@@ -201,7 +197,7 @@ class OptimizedNVPDriver(NVPDriver):
for rule in group.rules:
rulelist[rule.direction].append(
self._make_security_rule_dict(rule))
return {'uuid': self._get_security_group_id(context, group_id),
return {'uuid': self._query_security_group(context, group_id).nvp_id,
'logical_port_ingress_rules': rulelist['ingress'],
'logical_port_egress_rules': rulelist['egress']}

View File

@@ -36,7 +36,7 @@ STRATEGY = network_strategy.STRATEGY
ipam_driver = (importutils.import_class(CONF.QUARK.ipam_driver))()
net_driver = (importutils.import_class(CONF.QUARK.net_driver))()
net_driver.load_config(CONF.QUARK.net_driver_cfg)
net_driver.load_config()
def _adapt_provider_nets(context, network):

View File

@@ -31,7 +31,7 @@ LOG = logging.getLogger("neutron.quark")
ipam_driver = (importutils.import_class(CONF.QUARK.ipam_driver))()
net_driver = (importutils.import_class(CONF.QUARK.net_driver))()
net_driver.load_config(CONF.QUARK.net_driver_cfg)
net_driver.load_config()
def create_port(context, port):

View File

@@ -31,7 +31,7 @@ DEFAULT_SG_UUID = "00000000-0000-0000-0000-000000000000"
net_driver = (importutils.import_class(CONF.QUARK.net_driver))()
net_driver.load_config(CONF.QUARK.net_driver_cfg)
net_driver.load_config()
def _validate_security_group_rule(context, rule):

View File

@@ -0,0 +1,68 @@
# Copyright (c) 2013 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from quark.drivers import base
from quark.tests import test_base
class TestBaseDriver(test_base.TestBase):
def setUp(self):
super(TestBaseDriver, self).setUp()
self.driver = base.BaseDriver()
def test_load_config(self):
self.driver.load_config()
def test_get_connection(self):
self.driver.get_connection()
def test_create_network(self):
self.driver.create_network(context=self.context, network_name="public")
def test_delete_network(self):
self.driver.delete_network(context=self.context, network_id=1)
def test_create_port(self):
self.driver.create_port(context=self.context, network_id=1, port_id=2)
def test_update_port(self):
self.driver.update_port(context=self.context, network_id=1, port_id=2)
def test_delete_port(self):
self.driver.delete_port(context=self.context, port_id=2)
def test_create_security_group(self):
self.driver.create_security_group(context=self.context,
group_name="mygroup")
def test_delete_security_group(self):
self.driver.delete_security_group(context=self.context,
group_id=3)
def test_update_security_group(self):
self.driver.update_security_group(context=self.context,
group_id=3)
def test_create_security_group_rule(self):
rule = {'ethertype': 'IPv4', 'direction': 'ingress'}
self.driver.create_security_group_rule(context=self.context,
group_id=3,
rule=rule)
def test_delete_security_group_rule(self):
rule = {'ethertype': 'IPv4', 'direction': 'ingress'}
self.driver.delete_security_group_rule(context=self.context,
group_id=3,
rule=rule)

View File

@@ -0,0 +1,76 @@
# Copyright 2013 Openstack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from quark.db import custom_types
from quark.tests import test_base
from sqlalchemy.dialects import mysql, sqlite
class TestDBCustomTypesINET(test_base.TestBase):
"""Adding for coverage of the custom types."""
def setUp(self):
super(TestDBCustomTypesINET, self).setUp()
self.inet = custom_types.INET()
def test_inet_load_dialect_impl(self):
dialect = self.inet.load_dialect_impl(mysql.dialect())
self.assertEqual(type(dialect), type(custom_types.INET.impl()))
def test_inet_load_dialect_impl_sqlite(self):
dialect = self.inet.load_dialect_impl(sqlite.dialect())
self.assertEqual(type(dialect), sqlite.CHAR)
def test_process_bind_param(self):
bind = self.inet.process_bind_param(None, None)
self.assertIsNone(bind)
def test_process_bind_param_with_value(self):
bind = self.inet.process_bind_param("foo", sqlite.dialect())
self.assertEqual(bind, "foo")
def test_process_bind_param_with_value_not_sqlite(self):
bind = self.inet.process_bind_param("foo", mysql.dialect())
self.assertEqual(bind, "foo")
def test_process_result_value(self):
bind = self.inet.process_result_value(None, mysql.dialect())
self.assertIsNone(bind)
def test_process_result_value_with_value(self):
bind = self.inet.process_result_value(1.0, sqlite.dialect())
self.assertEqual(bind, 1.0)
def test_process_result_value_with_value_not_sqlite(self):
bind = self.inet.process_result_value(1.0, mysql.dialect())
self.assertEqual(bind, 1.0)
class TestDBCustomTypesMACAddress(test_base.TestBase):
"""Adding for coverage of the mac address custom types."""
def setUp(self):
super(TestDBCustomTypesMACAddress, self).setUp()
self.mac = custom_types.MACAddress()
def test_mac_load_dialect_impl(self):
dialect = self.mac.load_dialect_impl(sqlite.dialect())
self.assertEqual(type(dialect), sqlite.CHAR)
def test_mac_load_dialect_impl_not_sqlite(self):
dialect = self.mac.load_dialect_impl(mysql.dialect())
self.assertEqual(type(dialect), type(custom_types.MACAddress.impl()))

View File

@@ -532,14 +532,11 @@ class TestSwitchCopying(TestNVPDriver):
class TestNVPDriverDeletePort(TestNVPDriver):
@contextlib.contextmanager
def _stubs(self, single_switch=True):
def _stubs(self, switch_count=1):
with contextlib.nested(
mock.patch("%s.get_connection" % self.d_pkg),
) as (get_connection,):
if not single_switch:
connection = self._create_connection(switch_count=2)
else:
connection = self._create_connection(switch_count=1)
connection = self._create_connection(switch_count=switch_count)
get_connection.return_value = connection
yield connection
@@ -556,7 +553,12 @@ class TestNVPDriverDeletePort(TestNVPDriver):
self.assertTrue(connection.lswitch_port().delete.called)
def test_delete_port_many_switches(self):
with self._stubs(single_switch=False):
with self._stubs(switch_count=2):
with self.assertRaises(Exception):
self.driver.delete_port(self.context, self.port_id)
def test_delete_port_no_switch_bad_data(self):
with self._stubs(switch_count=0):
with self.assertRaises(Exception):
self.driver.delete_port(self.context, self.port_id)
@@ -729,6 +731,29 @@ class TestNVPDriverCreateSecurityGroupRule(TestNVPDriver):
mock.call.update(),
], any_order=True)
def test_security_rule_create_with_ip_prefix_and_profile(self):
with self._stubs() as connection:
self.driver.create_security_group_rule(
self.context, 1,
{'ethertype': 'IPv4', 'direction': 'ingress',
'remote_ip_prefix': "pre", "remote_group_id": "group",
"protocol": "udp"})
connection.securityprofile.assert_any_calls(self.profile_id)
connection.securityprofile().assert_has_calls([
mock.call.port_ingress_rules([{'ethertype': 'IPv4',
"ip_prefix": "pre",
"profile_uuid": "group",
"protocol": "udp"}]),
mock.call.update(),
], any_order=True)
def test_security_rule_create_invalid_direction(self):
with self._stubs():
with self.assertRaises(AttributeError):
self.driver.create_security_group_rule(
self.context, 1,
{'ethertype': 'IPv4', 'direction': 'instantregret'})
def test_security_rule_create_duplicate(self):
with self._stubs() as connection:
connection.securityprofile().read().update({
@@ -797,3 +822,52 @@ class TestNVPDriverDeleteSecurityGroupRule(TestNVPDriver):
self.driver.delete_security_group_rule(
self.context, 1,
{'ethertype': 'IPv6', 'direction': 'egress'})
class TestNVPDriverLoadConfig(TestNVPDriver):
def test_load_config(self):
controllers = "192.168.221.139:443:admin:admin:30:10:2:2"
cfg.CONF.set_override("controller_connection", [controllers], "NVP")
self.driver.load_config()
conn = self.driver.nvp_connections[0]
self.assertEqual(conn["username"], "admin")
self.assertEqual(conn["retries"], "2")
self.assertEqual(conn["redirects"], "2")
self.assertEqual(conn["http_timeout"], "10")
self.assertEqual(conn["req_timeout"], "30")
self.assertEqual(conn["default_tz"], None)
self.assertEqual(conn["password"], "admin")
self.assertEqual(conn["ip_address"], "192.168.221.139")
self.assertEqual(conn["port"], "443")
cfg.CONF.clear_override("controller_connection", "NVP")
def test_load_config_no_connections(self):
self.driver.load_config()
self.assertEqual(len(self.driver.nvp_connections), 0)
class TestNVPGetConnection(TestNVPDriver):
@contextlib.contextmanager
def _stubs(self, has_conn):
controllers = "192.168.221.139:443:admin:admin:30:10:2:2"
cfg.CONF.set_override("controller_connection", [controllers], "NVP")
if has_conn:
self.driver.nvp_connections.append(dict(connection="foo"))
else:
self.driver.nvp_connections.append(dict(port="443",
ip_address="192.168.0.1",
username="admin",
password="admin"))
with mock.patch("aiclib.nvp.Connection") as (aiclib_conn):
yield aiclib_conn
cfg.CONF.clear_override("controller_connection", "NVP")
def test_get_connection(self):
with self._stubs(has_conn=False) as aiclib_conn:
self.driver.get_connection()
self.assertTrue(aiclib_conn.called)
def test_get_connection_connection_defined(self):
with self._stubs(has_conn=True) as aiclib_conn:
self.driver.get_connection()
self.assertFalse(aiclib_conn.called)

View File

@@ -269,4 +269,38 @@ class TestOptimizedNVPDriverCreatePort(TestOptimizedNVPDriver):
class TestOptimizedNVPDriverUpdatePort(TestOptimizedNVPDriver):
pass
def test_update_port(self):
mod_path = "quark.drivers.%s"
op_path = "optimized_nvp_driver.OptimizedNVPDriver"
lport_path = "%s._lport_select_by_id" % op_path
with contextlib.nested(
mock.patch(mod_path % "nvp_driver.NVPDriver.update_port"),
mock.patch(mod_path % lport_path),
) as (update_port, port_find):
ret_port = quark.drivers.optimized_nvp_driver.LSwitchPort()
port_find.return_value = ret_port
update_port.return_value = dict(switch_id=2)
self.driver.update_port(self.context, 1)
self.assertEqual(ret_port.switch_id, 2)
class TestCreateSecurityGroups(TestOptimizedNVPDriver):
def test_create_security_group(self):
with mock.patch("%s.get_connection" % self.d_pkg):
self.driver.create_security_group(self.context, "newgroup")
self.assertTrue(self.context.session.add.called)
class TestDeleteSecurityGroups(TestOptimizedNVPDriver):
def test_delete_security_group(self):
mod_path = "quark.drivers.nvp_driver.NVPDriver"
with contextlib.nested(
mock.patch("%s.get_connection" % self.d_pkg),
mock.patch("%s._query_security_group" % self.d_pkg),
mock.patch("%s.delete_security_group" % mod_path)):
session_delete = self.context.session.delete
self.context.session.delete = mock.Mock(return_value=None)
self.driver.delete_security_group(self.context, 1)
self.assertTrue(self.context.session.delete.called)
self.context.session.delete = session_delete