neutron/quantum/tests/unit/nicira/test_nvpopts.py

165 lines
6.8 KiB
Python

# Copyright 2013 Nicira Networks, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
import fixtures
import os
import testtools
from oslo.config import cfg
from quantum.common import config as q_config
from quantum.manager import QuantumManager
from quantum.openstack.common import uuidutils
from quantum.plugins.nicira.common import config # noqa
from quantum.plugins.nicira.common import exceptions
from quantum.plugins.nicira import nvp_cluster
BASE_CONF_PATH = os.path.join(os.path.dirname(__file__),
'../../etc/quantum.conf.test')
NVP_INI_PATH = os.path.join(os.path.dirname(__file__),
'etc/nvp.ini.basic.test')
NVP_INI_FULL_PATH = os.path.join(os.path.dirname(__file__),
'etc/nvp.ini.full.test')
NVP_INI_DEPR_PATH = os.path.join(os.path.dirname(__file__),
'etc/nvp.ini.grizzly.test')
NVP_PLUGIN_PATH = ('quantum.plugins.nicira.nicira_nvp_plugin.'
'QuantumPlugin.NvpPluginV2')
class NVPClusterTest(testtools.TestCase):
cluster_opts = {'default_tz_uuid': uuidutils.generate_uuid(),
'default_l2_gw_service_uuid': uuidutils.generate_uuid(),
'default_l2_gw_service_uuid': uuidutils.generate_uuid(),
'nvp_cluster_uuid': uuidutils.generate_uuid(),
'nvp_user': 'foo',
'nvp_password': 'bar',
'req_timeout': 45,
'http_timeout': 25,
'retries': 7,
'redirects': 23,
'default_interface_name': 'baz',
'nvp_controllers': ['1.1.1.1:443']}
def setUp(self):
super(NVPClusterTest, self).setUp()
self.addCleanup(cfg.CONF.reset)
def test_create_cluster(self):
cluster = nvp_cluster.NVPCluster(**self.cluster_opts)
for (k, v) in self.cluster_opts.iteritems():
self.assertEqual(v, getattr(cluster, k))
def test_create_cluster_default_port(self):
opts = self.cluster_opts.copy()
opts['nvp_controllers'] = ['1.1.1.1']
cluster = nvp_cluster.NVPCluster(**opts)
for (k, v) in self.cluster_opts.iteritems():
self.assertEqual(v, getattr(cluster, k))
def test_create_cluster_missing_required_attribute_raises(self):
opts = self.cluster_opts.copy()
opts.pop('default_tz_uuid')
self.assertRaises(exceptions.NvpInvalidClusterConfiguration,
nvp_cluster.NVPCluster, **opts)
class ConfigurationTest(testtools.TestCase):
def setUp(self):
super(ConfigurationTest, self).setUp()
self.addCleanup(cfg.CONF.reset)
self.useFixture(fixtures.MonkeyPatch(
'quantum.manager.QuantumManager._instance',
None))
def _assert_required_options(self, cluster):
self.assertEqual(cluster.nvp_controllers, ['fake_1:443', 'fake_2:443'])
self.assertEqual(cluster.default_tz_uuid, 'fake_tz_uuid')
self.assertEqual(cluster.nvp_user, 'foo')
self.assertEqual(cluster.nvp_password, 'bar')
def _assert_extra_options(self, cluster):
self.assertEqual(14, cluster.req_timeout)
self.assertEqual(13, cluster.http_timeout)
self.assertEqual(12, cluster.redirects)
self.assertEqual(11, cluster.retries)
self.assertEqual('whatever', cluster.default_l2_gw_service_uuid)
self.assertEqual('whatever', cluster.default_l3_gw_service_uuid)
self.assertEqual('whatever', cluster.default_interface_name)
def test_load_plugin_with_full_options(self):
q_config.parse(['--config-file', BASE_CONF_PATH,
'--config-file', NVP_INI_FULL_PATH])
cfg.CONF.set_override('core_plugin', NVP_PLUGIN_PATH)
plugin = QuantumManager().get_plugin()
cluster = plugin.cluster
self._assert_required_options(cluster)
self._assert_extra_options(cluster)
def test_load_plugin_with_required_options_only(self):
q_config.parse(['--config-file', BASE_CONF_PATH,
'--config-file', NVP_INI_PATH])
cfg.CONF.set_override('core_plugin', NVP_PLUGIN_PATH)
plugin = QuantumManager().get_plugin()
self._assert_required_options(plugin.cluster)
def test_defaults(self):
self.assertEqual(64, cfg.CONF.NVP.max_lp_per_bridged_ls)
self.assertEqual(256, cfg.CONF.NVP.max_lp_per_overlay_ls)
self.assertEqual(5, cfg.CONF.NVP.concurrent_connections)
self.assertIsNone(cfg.CONF.default_tz_uuid)
self.assertIsNone(cfg.CONF.nvp_cluster_uuid)
self.assertEqual('admin', cfg.CONF.nvp_user)
self.assertEqual('admin', cfg.CONF.nvp_password)
self.assertEqual(30, cfg.CONF.req_timeout)
self.assertEqual(10, cfg.CONF.http_timeout)
self.assertEqual(2, cfg.CONF.retries)
self.assertEqual(2, cfg.CONF.redirects)
self.assertIsNone(cfg.CONF.nvp_controllers)
self.assertIsNone(cfg.CONF.default_l3_gw_service_uuid)
self.assertIsNone(cfg.CONF.default_l2_gw_service_uuid)
self.assertEqual('breth0', cfg.CONF.default_interface_name)
class OldConfigurationTest(testtools.TestCase):
def setUp(self):
super(OldConfigurationTest, self).setUp()
self.addCleanup(cfg.CONF.reset)
self.useFixture(fixtures.MonkeyPatch(
'quantum.manager.QuantumManager._instance',
None))
def _assert_required_options(self, cluster):
self.assertEqual(cluster.nvp_controllers, ['fake_1:443', 'fake_2:443'])
self.assertEqual(cluster.default_tz_uuid, 'fake_tz_uuid')
self.assertEqual(cluster.nvp_user, 'foo')
self.assertEqual(cluster.nvp_password, 'bar')
def test_load_plugin_with_deprecated_options(self):
q_config.parse(['--config-file', BASE_CONF_PATH,
'--config-file', NVP_INI_DEPR_PATH])
cfg.CONF.set_override('core_plugin', NVP_PLUGIN_PATH)
plugin = QuantumManager().get_plugin()
cluster = plugin.cluster
self._assert_required_options(cluster)
# Verify nvp_controller_connection has been fully parsed
self.assertEqual(4, cluster.req_timeout)
self.assertEqual(3, cluster.http_timeout)
self.assertEqual(2, cluster.retries)
self.assertEqual(1, cluster.redirects)