Rename/remove Nicira NVP references from VMware NSX unit tests

This patch sweeps for Nicira and NVP references and replace
them with VMware and NSX where possible. Some clean-up is
done along the way to improve the organization of the
unit tests module.

Partial-implements blueprint: nicira-plugin-renaming

Change-Id: I48e4779ccec64ba300b26d4967d3f9fae7cfdc14
This commit is contained in:
armando-migliaccio 2014-02-03 14:19:25 -08:00
parent 77dfe0962b
commit f7fcb809f0
33 changed files with 944 additions and 895 deletions

View File

@ -19,7 +19,7 @@ import os
import neutron.plugins.nicira.api_client.client_eventlet as client
from neutron.plugins.nicira import extensions
import neutron.plugins.nicira.NvpApiClient as nvpapi
import neutron.plugins.nicira.NvpApiClient as nsxapi
from neutron.plugins.nicira import nvplib
from neutron.plugins.nicira.vshield.common import VcnsApiClient as vcnsapi
from neutron.plugins.nicira.vshield import vcns
@ -28,7 +28,7 @@ import neutron.plugins.vmware.plugin as neutron_plugin
plugin = neutron_plugin.NsxPlugin
service_plugin = neutron_plugin.NsxServicePlugin
api_helper = nvpapi.NVPApiHelper
api_helper = nsxapi.NVPApiHelper
api_client = client.NvpApiClientEventlet
vcns_class = vcns.Vcns
vcns_driver = vcnsdriver.VcnsDriver

View File

@ -1,6 +1,6 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2012 Nicira Networks, Inc.
# Copyright 2012 VMware, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
@ -20,7 +20,7 @@ import six.moves.urllib.parse as urlparse
from neutron.openstack.common import log as logging
from neutron.openstack.common import uuidutils
from neutron.plugins.nicira import NvpApiClient
from neutron.plugins.nicira import NvpApiClient as api_client
LOG = logging.getLogger(__name__)
@ -181,9 +181,9 @@ class FakeClient:
'gateway_ip_address', '0.0.0.0')
else:
fake_lrouter['default_next_hop'] = '0.0.0.0'
# NOTE(salv-orlando): We won't make the Fake NVP API client
# aware of NVP version. The long term plan is to replace it
# with behavioral mocking of NVP API requests
# NOTE(salv-orlando): We won't make the Fake NSX API client
# aware of NSX version. The long term plan is to replace it
# with behavioral mocking of NSX API requests
if 'distributed' not in fake_lrouter:
fake_lrouter['distributed'] = False
distributed_json = ('"distributed": %s,' %
@ -257,7 +257,7 @@ class FakeClient:
try:
fake_lrouter = self._fake_lrouter_dict[lr_uuid]
except KeyError:
raise NvpApiClient.ResourceNotFound()
raise api_client.ResourceNotFound()
fake_lrouter['lport_count'] += 1
fake_lport_status = fake_lport.copy()
fake_lport_status['lr_tenant_id'] = fake_lrouter['tenant_id']
@ -496,7 +496,7 @@ class FakeClient:
for res_uuid in res_dict if res_uuid == target_uuid]
if items:
return json.dumps(items[0])
raise NvpApiClient.ResourceNotFound()
raise api_client.ResourceNotFound()
def handle_get(self, url):
#TODO(salvatore-orlando): handle field selection
@ -505,7 +505,7 @@ class FakeClient:
relations = urlparse.parse_qs(parsedurl.query).get('relations')
response_file = self.FAKE_GET_RESPONSES.get(res_type)
if not response_file:
raise NvpApiClient.NvpApiException()
raise api_client.NvpApiException()
if 'lport' in res_type or 'nat' in res_type:
if len(uuids) > 1:
return self._show(res_type, response_file, uuids[0],
@ -569,7 +569,7 @@ class FakeClient:
try:
resource = res_dict[uuids[-1]]
except KeyError:
raise NvpApiClient.ResourceNotFound()
raise api_client.ResourceNotFound()
if not is_attachment:
edit_resource = getattr(self, '_build_%s' % res_type, None)
if edit_resource:
@ -640,7 +640,7 @@ class FakeClient:
try:
del res_dict[uuids[-1]]
except KeyError:
raise NvpApiClient.ResourceNotFound()
raise api_client.ResourceNotFound()
return ""
def fake_request(self, *args, **kwargs):

View File

@ -21,7 +21,7 @@ import neutron.plugins.nicira.api_client.common as naco
from neutron.tests import base
class NvpApiCommonTest(base.BaseTestCase):
class ApiCommonTest(base.BaseTestCase):
def test_conn_str(self):
conn = httplib.HTTPSConnection('localhost', 4242, timeout=0)

View File

@ -41,11 +41,11 @@ def fetch(url):
return urllib2.urlopen(url).read()
class NvpApiRequestEventletTest(base.BaseTestCase):
class ApiRequestEventletTest(base.BaseTestCase):
def setUp(self):
super(NvpApiRequestEventletTest, self).setUp()
super(ApiRequestEventletTest, self).setUp()
self.client = nace.NvpApiClientEventlet(
[("127.0.0.1", 4401, True)], "admin", "admin")
self.url = "/ws.v1/_debug"
@ -54,7 +54,7 @@ class NvpApiRequestEventletTest(base.BaseTestCase):
def tearDown(self):
self.client = None
self.req = None
super(NvpApiRequestEventletTest, self).tearDown()
super(ApiRequestEventletTest, self).tearDown()
def test_construct_eventlet_api_request(self):
e = nare.NvpApiRequestEventlet(self.client, self.url)

View File

View File

@ -19,15 +19,15 @@ from neutron import context
from neutron.db import api as db
from neutron.db import models_v2
from neutron.openstack.common.db import exception as d_exc
from neutron.plugins.nicira.dbexts import nicira_db
from neutron.plugins.nicira.dbexts import nicira_models
from neutron.plugins.nicira.dbexts import nicira_db as nsx_db
from neutron.plugins.nicira.dbexts import nicira_models as models
from neutron.tests import base
class NiciraDBTestCase(base.BaseTestCase):
class NsxDBTestCase(base.BaseTestCase):
def setUp(self):
super(NiciraDBTestCase, self).setUp()
super(NsxDBTestCase, self).setUp()
db.configure_db()
self.ctx = context.get_admin_context()
self.addCleanup(db.clear_db)
@ -51,12 +51,12 @@ class NiciraDBTestCase(base.BaseTestCase):
nsx_switch_id = 'foo_nsx_switch_id'
self._setup_neutron_network_and_port(neutron_net_id, neutron_port_id)
nicira_db.add_neutron_nsx_port_mapping(
nsx_db.add_neutron_nsx_port_mapping(
self.ctx.session, neutron_port_id, nsx_switch_id, nsx_port_id)
# Call the method twice to trigger a db duplicate constraint error
nicira_db.add_neutron_nsx_port_mapping(
nsx_db.add_neutron_nsx_port_mapping(
self.ctx.session, neutron_port_id, nsx_switch_id, nsx_port_id)
result = (self.ctx.session.query(nicira_models.NeutronNsxPortMapping).
result = (self.ctx.session.query(models.NeutronNsxPortMapping).
filter_by(neutron_id=neutron_port_id).one())
self.assertEqual(nsx_port_id, result.nsx_port_id)
self.assertEqual(neutron_port_id, result.neutron_id)
@ -69,12 +69,12 @@ class NiciraDBTestCase(base.BaseTestCase):
nsx_switch_id = 'foo_nsx_switch_id'
self._setup_neutron_network_and_port(neutron_net_id, neutron_port_id)
nicira_db.add_neutron_nsx_port_mapping(
nsx_db.add_neutron_nsx_port_mapping(
self.ctx.session, neutron_port_id, nsx_switch_id, nsx_port_id_1)
# Call the method twice to trigger a db duplicate constraint error,
# this time with a different nsx port id!
self.assertRaises(d_exc.DBDuplicateEntry,
nicira_db.add_neutron_nsx_port_mapping,
nsx_db.add_neutron_nsx_port_mapping,
self.ctx.session, neutron_port_id,
nsx_switch_id, nsx_port_id_2)
@ -83,6 +83,6 @@ class NiciraDBTestCase(base.BaseTestCase):
nsx_port_id = 'foo_nsx_port_id'
nsx_switch_id = 'foo_nsx_switch_id'
self.assertRaises(d_exc.DBError,
nicira_db.add_neutron_nsx_port_mapping,
nsx_db.add_neutron_nsx_port_mapping,
self.ctx.session, neutron_port_id,
nsx_switch_id, nsx_port_id)

View File

@ -0,0 +1,22 @@
# Copyright (c) 2014 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from neutron.tests.unit import test_extension_allowedaddresspairs as ext_pairs
from neutron.tests.unit.vmware import test_nsx_plugin
class TestAllowedAddressPairs(test_nsx_plugin.NsxPluginV2TestCase,
ext_pairs.TestAllowedAddressPairs):
pass

View File

@ -27,7 +27,7 @@ from neutron.extensions import agent
from neutron.plugins.nicira.common import sync
from neutron.plugins.nicira.NvpApiClient import NVPVersion
from neutron.tests.unit import test_db_plugin
from neutron.tests.unit.vmware import fake_nvpapiclient
from neutron.tests.unit.vmware.apiclient import fake
from neutron.tests.unit.vmware import get_fake_conf
from neutron.tests.unit.vmware import NSXAPI_NAME
from neutron.tests.unit.vmware import NSXEXT_PATH
@ -65,23 +65,20 @@ class MacLearningDBTestCase(test_db_plugin.NeutronDbPluginV2TestCase):
for resource, attrs in attributes.RESOURCE_ATTRIBUTE_MAP.iteritems():
self.saved_attr_map[resource] = attrs.copy()
ext_mgr = MacLearningExtensionManager()
# mock nvp api client
self.fc = fake_nvpapiclient.FakeClient(STUBS_PATH)
self.mock_nvpapi = mock.patch(NSXAPI_NAME, autospec=True)
instance = self.mock_nvpapi.start()
# mock api client
self.fc = fake.FakeClient(STUBS_PATH)
self.mock_nsx = mock.patch(NSXAPI_NAME, autospec=True)
instance = self.mock_nsx.start()
# Avoid runs of the synchronizer looping call
patch_sync = mock.patch.object(sync, '_start_loopingcall')
patch_sync.start()
def _fake_request(*args, **kwargs):
return self.fc.fake_request(*args, **kwargs)
# Emulate tests against NVP 2.x
# Emulate tests against NSX 2.x
instance.return_value.get_nvp_version.return_value = NVPVersion("3.0")
instance.return_value.request.side_effect = _fake_request
instance.return_value.request.side_effect = self.fc.fake_request
cfg.CONF.set_override('metadata_mode', None, 'NSX')
self.addCleanup(self.fc.reset_all)
self.addCleanup(self.mock_nvpapi.stop)
self.addCleanup(self.mock_nsx.stop)
self.addCleanup(patch_sync.stop)
self.addCleanup(self.restore_resource_attribute_map)
self.addCleanup(cfg.CONF.reset)

View File

@ -1,5 +1,3 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2012 VMware, Inc. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
@ -15,8 +13,8 @@
# under the License.
import contextlib
import mock
from oslo.config import cfg
from webob import exc
import webtest
@ -29,15 +27,19 @@ from neutron import context
from neutron.db import api as db_api
from neutron.db import db_base_plugin_v2
from neutron import manager
from neutron.openstack.common import uuidutils
from neutron.plugins.nicira.dbexts import networkgw_db
from neutron.plugins.nicira.extensions import networkgw
from neutron.plugins.nicira.NeutronPlugin import NVP_EXT_PATH
from neutron.plugins.nicira import nsxlib
from neutron.plugins.nicira import NvpApiClient
from neutron import quota
from neutron.tests import base
from neutron.tests.unit import test_api_v2
from neutron.tests.unit import test_db_plugin
from neutron.tests.unit import test_extensions
from neutron.tests.unit.vmware import NSXEXT_PATH
from neutron.tests.unit.vmware import PLUGIN_NAME
from neutron.tests.unit.vmware.test_nsx_plugin import NsxPluginV2TestCase
_uuid = test_api_v2._uuid
_get_path = test_api_v2._get_path
@ -594,6 +596,106 @@ class NetworkGatewayDbTestCase(test_db_plugin.NeutronDbPluginV2TestCase):
'vlan', 555)
class TestNetworkGateway(NsxPluginV2TestCase,
NetworkGatewayDbTestCase):
def setUp(self, plugin=PLUGIN_NAME, ext_mgr=None):
cfg.CONF.set_override('api_extensions_path', NSXEXT_PATH)
super(TestNetworkGateway,
self).setUp(plugin=plugin, ext_mgr=ext_mgr)
def test_create_network_gateway_name_exceeds_40_chars(self):
name = 'this_is_a_gateway_whose_name_is_longer_than_40_chars'
with self._network_gateway(name=name) as nw_gw:
# Assert Neutron name is not truncated
self.assertEqual(nw_gw[self.resource]['name'], name)
def test_update_network_gateway_with_name_calls_backend(self):
with mock.patch.object(
nsxlib.l2gateway, 'update_l2_gw_service') as mock_update_gw:
with self._network_gateway(name='cavani') as nw_gw:
nw_gw_id = nw_gw[self.resource]['id']
self._update(networkgw.COLLECTION_NAME, nw_gw_id,
{self.resource: {'name': 'higuain'}})
mock_update_gw.assert_called_once_with(
mock.ANY, nw_gw_id, 'higuain')
def test_update_network_gateway_without_name_does_not_call_backend(self):
with mock.patch.object(
nsxlib.l2gateway, 'update_l2_gw_service') as mock_update_gw:
with self._network_gateway(name='something') as nw_gw:
nw_gw_id = nw_gw[self.resource]['id']
self._update(networkgw.COLLECTION_NAME, nw_gw_id,
{self.resource: {}})
self.assertEqual(mock_update_gw.call_count, 0)
def test_update_network_gateway_name_exceeds_40_chars(self):
new_name = 'this_is_a_gateway_whose_name_is_longer_than_40_chars'
with self._network_gateway(name='something') as nw_gw:
nw_gw_id = nw_gw[self.resource]['id']
self._update(networkgw.COLLECTION_NAME, nw_gw_id,
{self.resource: {'name': new_name}})
req = self.new_show_request(networkgw.COLLECTION_NAME,
nw_gw_id)
res = self.deserialize('json', req.get_response(self.ext_api))
# Assert Neutron name is not truncated
self.assertEqual(new_name, res[self.resource]['name'])
# Assert NSX name is truncated
self.assertEqual(
new_name[:40],
self.fc._fake_gatewayservice_dict[nw_gw_id]['display_name'])
def test_create_network_gateway_nsx_error_returns_500(self):
def raise_nsx_api_exc(*args, **kwargs):
raise NvpApiClient.NvpApiException
with mock.patch.object(nsxlib.l2gateway,
'create_l2_gw_service',
new=raise_nsx_api_exc):
res = self._create_network_gateway(
self.fmt, 'xxx', name='yyy',
devices=[{'id': uuidutils.generate_uuid()}])
self.assertEqual(500, res.status_int)
def test_create_network_gateway_nsx_error_returns_409(self):
with mock.patch.object(nsxlib.l2gateway,
'create_l2_gw_service',
side_effect=NvpApiClient.Conflict):
res = self._create_network_gateway(
self.fmt, 'xxx', name='yyy',
devices=[{'id': uuidutils.generate_uuid()}])
self.assertEqual(409, res.status_int)
def test_list_network_gateways(self):
with self._network_gateway(name='test-gw-1') as gw1:
with self._network_gateway(name='test_gw_2') as gw2:
req = self.new_list_request(networkgw.COLLECTION_NAME)
res = self.deserialize('json', req.get_response(self.ext_api))
# We expect the default gateway too
key = self.resource + 's'
self.assertEqual(len(res[key]), 3)
self.assertEqual(res[key][0]['default'],
True)
self.assertEqual(res[key][1]['name'],
gw1[self.resource]['name'])
self.assertEqual(res[key][2]['name'],
gw2[self.resource]['name'])
def test_list_network_gateway_with_multiple_connections(self):
self._test_list_network_gateway_with_multiple_connections(
expected_gateways=2)
def test_delete_network_gateway(self):
# The default gateway must still be there
self._test_delete_network_gateway(1)
def test_show_network_gateway_nsx_error_returns_404(self):
invalid_id = 'b5afd4a9-eb71-4af7-a082-8fc625a35b61'
req = self.new_show_request(networkgw.COLLECTION_NAME, invalid_id)
res = req.get_response(self.ext_api)
self.assertEqual(exc.HTTPNotFound.code, res.status_int)
class TestNetworkGatewayPlugin(db_base_plugin_v2.NeutronDbPluginV2,
networkgw_db.NetworkGatewayMixin):
"""Simple plugin class for testing db support for network gateway ext."""
@ -602,7 +704,7 @@ class TestNetworkGatewayPlugin(db_base_plugin_v2.NeutronDbPluginV2,
def __init__(self, **args):
super(TestNetworkGatewayPlugin, self).__init__(**args)
extensions.append_api_extensions_path([NVP_EXT_PATH])
extensions.append_api_extensions_path([NSXEXT_PATH])
def delete_port(self, context, id, nw_gw_port_check=True):
if nw_gw_port_check:

View File

@ -0,0 +1,49 @@
# Copyright (c) 2014 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from neutron.common import test_lib
from neutron.plugins.nicira.common import sync
from neutron.tests.unit import test_extension_portsecurity as psec
from neutron.tests.unit.vmware.apiclient import fake
from neutron.tests.unit.vmware import get_fake_conf
from neutron.tests.unit.vmware import NSXAPI_NAME
from neutron.tests.unit.vmware import PLUGIN_NAME
from neutron.tests.unit.vmware import STUBS_PATH
class PortSecurityTestCase(psec.PortSecurityDBTestCase):
def setUp(self):
test_lib.test_config['config_files'] = [get_fake_conf('nsx.ini.test')]
# mock api client
self.fc = fake.FakeClient(STUBS_PATH)
self.mock_nsx = mock.patch(NSXAPI_NAME, autospec=True)
instance = self.mock_nsx.start()
instance.return_value.login.return_value = "the_cookie"
# Avoid runs of the synchronizer looping call
patch_sync = mock.patch.object(sync, '_start_loopingcall')
patch_sync.start()
instance.return_value.request.side_effect = self.fc.fake_request
super(PortSecurityTestCase, self).setUp(PLUGIN_NAME)
self.addCleanup(self.fc.reset_all)
self.addCleanup(self.mock_nsx.stop)
self.addCleanup(patch_sync.stop)
class TestPortSecurity(PortSecurityTestCase, psec.TestPortSecurity):
pass

View File

@ -0,0 +1,150 @@
# Copyright (c) 2014 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo.config import cfg
from neutron.extensions import multiprovidernet as mpnet
from neutron.extensions import providernet as pnet
from neutron.tests.unit.vmware import NSXEXT_PATH
from neutron.tests.unit.vmware.test_nsx_plugin import NsxPluginV2TestCase
class TestProvidernet(NsxPluginV2TestCase):
def test_create_provider_network_default_physical_net(self):
data = {'network': {'name': 'net1',
'admin_state_up': True,
'tenant_id': 'admin',
pnet.NETWORK_TYPE: 'vlan',
pnet.SEGMENTATION_ID: 411}}
network_req = self.new_create_request('networks', data, self.fmt)
net = self.deserialize(self.fmt, network_req.get_response(self.api))
self.assertEqual(net['network'][pnet.NETWORK_TYPE], 'vlan')
self.assertEqual(net['network'][pnet.SEGMENTATION_ID], 411)
def test_create_provider_network(self):
data = {'network': {'name': 'net1',
'admin_state_up': True,
'tenant_id': 'admin',
pnet.NETWORK_TYPE: 'vlan',
pnet.SEGMENTATION_ID: 411,
pnet.PHYSICAL_NETWORK: 'physnet1'}}
network_req = self.new_create_request('networks', data, self.fmt)
net = self.deserialize(self.fmt, network_req.get_response(self.api))
self.assertEqual(net['network'][pnet.NETWORK_TYPE], 'vlan')
self.assertEqual(net['network'][pnet.SEGMENTATION_ID], 411)
self.assertEqual(net['network'][pnet.PHYSICAL_NETWORK], 'physnet1')
class TestMultiProviderNetworks(NsxPluginV2TestCase):
def setUp(self, plugin=None):
cfg.CONF.set_override('api_extensions_path', NSXEXT_PATH)
super(TestMultiProviderNetworks, self).setUp()
def test_create_network_provider(self):
data = {'network': {'name': 'net1',
pnet.NETWORK_TYPE: 'vlan',
pnet.PHYSICAL_NETWORK: 'physnet1',
pnet.SEGMENTATION_ID: 1,
'tenant_id': 'tenant_one'}}
network_req = self.new_create_request('networks', data)
network = self.deserialize(self.fmt,
network_req.get_response(self.api))
self.assertEqual(network['network'][pnet.NETWORK_TYPE], 'vlan')
self.assertEqual(network['network'][pnet.PHYSICAL_NETWORK], 'physnet1')
self.assertEqual(network['network'][pnet.SEGMENTATION_ID], 1)
self.assertNotIn(mpnet.SEGMENTS, network['network'])
def test_create_network_single_multiple_provider(self):
data = {'network': {'name': 'net1',
mpnet.SEGMENTS:
[{pnet.NETWORK_TYPE: 'vlan',
pnet.PHYSICAL_NETWORK: 'physnet1',
pnet.SEGMENTATION_ID: 1}],
'tenant_id': 'tenant_one'}}
net_req = self.new_create_request('networks', data)
network = self.deserialize(self.fmt, net_req.get_response(self.api))
for provider_field in [pnet.NETWORK_TYPE, pnet.PHYSICAL_NETWORK,
pnet.SEGMENTATION_ID]:
self.assertNotIn(provider_field, network['network'])
tz = network['network'][mpnet.SEGMENTS][0]
self.assertEqual(tz[pnet.NETWORK_TYPE], 'vlan')
self.assertEqual(tz[pnet.PHYSICAL_NETWORK], 'physnet1')
self.assertEqual(tz[pnet.SEGMENTATION_ID], 1)
# Tests get_network()
net_req = self.new_show_request('networks', network['network']['id'])
network = self.deserialize(self.fmt, net_req.get_response(self.api))
tz = network['network'][mpnet.SEGMENTS][0]
self.assertEqual(tz[pnet.NETWORK_TYPE], 'vlan')
self.assertEqual(tz[pnet.PHYSICAL_NETWORK], 'physnet1')
self.assertEqual(tz[pnet.SEGMENTATION_ID], 1)
def test_create_network_multprovider(self):
data = {'network': {'name': 'net1',
mpnet.SEGMENTS:
[{pnet.NETWORK_TYPE: 'vlan',
pnet.PHYSICAL_NETWORK: 'physnet1',
pnet.SEGMENTATION_ID: 1},
{pnet.NETWORK_TYPE: 'stt',
pnet.PHYSICAL_NETWORK: 'physnet1'}],
'tenant_id': 'tenant_one'}}
network_req = self.new_create_request('networks', data)
network = self.deserialize(self.fmt,
network_req.get_response(self.api))
tz = network['network'][mpnet.SEGMENTS]
for tz in data['network'][mpnet.SEGMENTS]:
for field in [pnet.NETWORK_TYPE, pnet.PHYSICAL_NETWORK,
pnet.SEGMENTATION_ID]:
self.assertEqual(tz.get(field), tz.get(field))
# Tests get_network()
net_req = self.new_show_request('networks', network['network']['id'])
network = self.deserialize(self.fmt, net_req.get_response(self.api))
tz = network['network'][mpnet.SEGMENTS]
for tz in data['network'][mpnet.SEGMENTS]:
for field in [pnet.NETWORK_TYPE, pnet.PHYSICAL_NETWORK,
pnet.SEGMENTATION_ID]:
self.assertEqual(tz.get(field), tz.get(field))
def test_create_network_with_provider_and_multiprovider_fail(self):
data = {'network': {'name': 'net1',
mpnet.SEGMENTS:
[{pnet.NETWORK_TYPE: 'vlan',
pnet.PHYSICAL_NETWORK: 'physnet1',
pnet.SEGMENTATION_ID: 1}],
pnet.NETWORK_TYPE: 'vlan',
pnet.PHYSICAL_NETWORK: 'physnet1',
pnet.SEGMENTATION_ID: 1,
'tenant_id': 'tenant_one'}}
network_req = self.new_create_request('networks', data)
res = network_req.get_response(self.api)
self.assertEqual(res.status_int, 400)
def test_create_network_duplicate_segments(self):
data = {'network': {'name': 'net1',
mpnet.SEGMENTS:
[{pnet.NETWORK_TYPE: 'vlan',
pnet.PHYSICAL_NETWORK: 'physnet1',
pnet.SEGMENTATION_ID: 1},
{pnet.NETWORK_TYPE: 'vlan',
pnet.PHYSICAL_NETWORK: 'physnet1',
pnet.SEGMENTATION_ID: 1}],
'tenant_id': 'tenant_one'}}
network_req = self.new_create_request('networks', data)
res = network_req.get_response(self.api)
self.assertEqual(res.status_int, 400)

View File

@ -0,0 +1,273 @@
# Copyright (c) 2014 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import contextlib
import mock
from oslo.config import cfg
import webob.exc
from neutron import context
from neutron.plugins.nicira.dbexts import qos_db
from neutron.plugins.nicira.extensions import qos as ext_qos
from neutron.plugins.nicira import nsxlib
from neutron.tests.unit import test_extensions
from neutron.tests.unit.vmware import NSXEXT_PATH
from neutron.tests.unit.vmware.test_nsx_plugin import NsxPluginV2TestCase
class QoSTestExtensionManager(object):
def get_resources(self):
return ext_qos.Qos.get_resources()
def get_actions(self):
return []
def get_request_extensions(self):
return []
class TestQoSQueue(NsxPluginV2TestCase):
def setUp(self, plugin=None):
cfg.CONF.set_override('api_extensions_path', NSXEXT_PATH)
super(TestQoSQueue, self).setUp()
ext_mgr = QoSTestExtensionManager()
self.ext_api = test_extensions.setup_extensions_middleware(ext_mgr)
def _create_qos_queue(self, fmt, body, **kwargs):
qos_queue = self.new_create_request('qos-queues', body)
if (kwargs.get('set_context') and 'tenant_id' in kwargs):
# create a specific auth context for this request
qos_queue.environ['neutron.context'] = context.Context(
'', kwargs['tenant_id'])
return qos_queue.get_response(self.ext_api)
@contextlib.contextmanager
def qos_queue(self, name='foo', min='0', max='10',
qos_marking=None, dscp='0', default=None, no_delete=False):
body = {'qos_queue': {'tenant_id': 'tenant',
'name': name,
'min': min,
'max': max}}
if qos_marking:
body['qos_queue']['qos_marking'] = qos_marking
if dscp:
body['qos_queue']['dscp'] = dscp
if default:
body['qos_queue']['default'] = default
res = self._create_qos_queue('json', body)
qos_queue = self.deserialize('json', res)
if res.status_int >= 400:
raise webob.exc.HTTPClientError(code=res.status_int)
try:
yield qos_queue
finally:
if not no_delete:
self._delete('qos-queues',
qos_queue['qos_queue']['id'])
def test_create_qos_queue(self):
with self.qos_queue(name='fake_lqueue', min=34, max=44,
qos_marking='untrusted', default=False) as q:
self.assertEqual(q['qos_queue']['name'], 'fake_lqueue')
self.assertEqual(q['qos_queue']['min'], 34)
self.assertEqual(q['qos_queue']['max'], 44)
self.assertEqual(q['qos_queue']['qos_marking'], 'untrusted')
self.assertFalse(q['qos_queue']['default'])
def test_create_trusted_qos_queue(self):
with mock.patch.object(qos_db.LOG, 'info') as log:
with mock.patch.object(nsxlib.queue, 'do_request',
return_value={"uuid": "fake_queue"}):
with self.qos_queue(name='fake_lqueue', min=34, max=44,
qos_marking='trusted', default=False) as q:
self.assertIsNone(q['qos_queue']['dscp'])
self.assertTrue(log.called)
def test_create_qos_queue_name_exceeds_40_chars(self):
name = 'this_is_a_queue_whose_name_is_longer_than_40_chars'
with self.qos_queue(name=name) as queue:
# Assert Neutron name is not truncated
self.assertEqual(queue['qos_queue']['name'], name)
def test_create_qos_queue_default(self):
with self.qos_queue(default=True) as q:
self.assertTrue(q['qos_queue']['default'])
def test_create_qos_queue_two_default_queues_fail(self):
with self.qos_queue(default=True):
body = {'qos_queue': {'tenant_id': 'tenant',
'name': 'second_default_queue',
'default': True}}
res = self._create_qos_queue('json', body)
self.assertEqual(res.status_int, 409)
def test_create_port_with_queue(self):
with self.qos_queue(default=True) as q1:
res = self._create_network('json', 'net1', True,
arg_list=(ext_qos.QUEUE,),
queue_id=q1['qos_queue']['id'])
net1 = self.deserialize('json', res)
self.assertEqual(net1['network'][ext_qos.QUEUE],
q1['qos_queue']['id'])
device_id = "00fff4d0-e4a8-4a3a-8906-4c4cdafb59f1"
with self.port(device_id=device_id, do_delete=False) as p:
self.assertEqual(len(p['port'][ext_qos.QUEUE]), 36)
def test_create_shared_queue_networks(self):
with self.qos_queue(default=True, no_delete=True) as q1:
res = self._create_network('json', 'net1', True,
arg_list=(ext_qos.QUEUE,),
queue_id=q1['qos_queue']['id'])
net1 = self.deserialize('json', res)
self.assertEqual(net1['network'][ext_qos.QUEUE],
q1['qos_queue']['id'])
res = self._create_network('json', 'net2', True,
arg_list=(ext_qos.QUEUE,),
queue_id=q1['qos_queue']['id'])
net2 = self.deserialize('json', res)
self.assertEqual(net1['network'][ext_qos.QUEUE],
q1['qos_queue']['id'])
device_id = "00fff4d0-e4a8-4a3a-8906-4c4cdafb59f1"
res = self._create_port('json', net1['network']['id'],
device_id=device_id)
port1 = self.deserialize('json', res)
res = self._create_port('json', net2['network']['id'],
device_id=device_id)
port2 = self.deserialize('json', res)
self.assertEqual(port1['port'][ext_qos.QUEUE],
port2['port'][ext_qos.QUEUE])
self._delete('ports', port1['port']['id'])
self._delete('ports', port2['port']['id'])
def test_remove_queue_in_use_fail(self):
with self.qos_queue(no_delete=True) as q1:
res = self._create_network('json', 'net1', True,
arg_list=(ext_qos.QUEUE,),
queue_id=q1['qos_queue']['id'])
net1 = self.deserialize('json', res)
device_id = "00fff4d0-e4a8-4a3a-8906-4c4cdafb59f1"
res = self._create_port('json', net1['network']['id'],
device_id=device_id)
port = self.deserialize('json', res)
self._delete('qos-queues', port['port'][ext_qos.QUEUE], 409)
def test_update_network_new_queue(self):
with self.qos_queue() as q1:
res = self._create_network('json', 'net1', True,
arg_list=(ext_qos.QUEUE,),
queue_id=q1['qos_queue']['id'])
net1 = self.deserialize('json', res)
with self.qos_queue() as new_q:
data = {'network': {ext_qos.QUEUE: new_q['qos_queue']['id']}}
req = self.new_update_request('networks', data,
net1['network']['id'])
res = req.get_response(self.api)
net1 = self.deserialize('json', res)
self.assertEqual(net1['network'][ext_qos.QUEUE],
new_q['qos_queue']['id'])
def test_update_port_adding_device_id(self):
with self.qos_queue(no_delete=True) as q1:
res = self._create_network('json', 'net1', True,
arg_list=(ext_qos.QUEUE,),
queue_id=q1['qos_queue']['id'])
net1 = self.deserialize('json', res)
device_id = "00fff4d0-e4a8-4a3a-8906-4c4cdafb59f1"
res = self._create_port('json', net1['network']['id'])
port = self.deserialize('json', res)
self.assertIsNone(port['port'][ext_qos.QUEUE])
data = {'port': {'device_id': device_id}}
req = self.new_update_request('ports', data,
port['port']['id'])
res = req.get_response(self.api)
port = self.deserialize('json', res)
self.assertEqual(len(port['port'][ext_qos.QUEUE]), 36)
def test_get_port_with_qos_not_admin(self):
body = {'qos_queue': {'tenant_id': 'not_admin',
'name': 'foo', 'min': 20, 'max': 20}}
res = self._create_qos_queue('json', body, tenant_id='not_admin')
q1 = self.deserialize('json', res)
res = self._create_network('json', 'net1', True,
arg_list=(ext_qos.QUEUE, 'tenant_id',),
queue_id=q1['qos_queue']['id'],
tenant_id="not_admin")
net1 = self.deserialize('json', res)
self.assertEqual(len(net1['network'][ext_qos.QUEUE]), 36)
res = self._create_port('json', net1['network']['id'],
tenant_id='not_admin', set_context=True)
port = self.deserialize('json', res)
self.assertNotIn(ext_qos.QUEUE, port['port'])
def test_dscp_value_out_of_range(self):
body = {'qos_queue': {'tenant_id': 'admin', 'dscp': '64',
'name': 'foo', 'min': 20, 'max': 20}}
res = self._create_qos_queue('json', body)
self.assertEqual(res.status_int, 400)
def test_non_admin_cannot_create_queue(self):
body = {'qos_queue': {'tenant_id': 'not_admin',
'name': 'foo', 'min': 20, 'max': 20}}
res = self._create_qos_queue('json', body, tenant_id='not_admin',
set_context=True)
self.assertEqual(res.status_int, 403)
def test_update_port_non_admin_does_not_show_queue_id(self):
body = {'qos_queue': {'tenant_id': 'not_admin',
'name': 'foo', 'min': 20, 'max': 20}}
res = self._create_qos_queue('json', body, tenant_id='not_admin')
q1 = self.deserialize('json', res)
res = self._create_network('json', 'net1', True,
arg_list=(ext_qos.QUEUE,),
tenant_id='not_admin',
queue_id=q1['qos_queue']['id'])
net1 = self.deserialize('json', res)
res = self._create_port('json', net1['network']['id'],
tenant_id='not_admin', set_context=True)
port = self.deserialize('json', res)
device_id = "00fff4d0-e4a8-4a3a-8906-4c4cdafb59f1"
data = {'port': {'device_id': device_id}}
neutron_context = context.Context('', 'not_admin')
port = self._update('ports', port['port']['id'], data,
neutron_context=neutron_context)
self.assertNotIn(ext_qos.QUEUE, port['port'])
def test_rxtx_factor(self):
with self.qos_queue(max=10) as q1:
res = self._create_network('json', 'net1', True,
arg_list=(ext_qos.QUEUE,),
queue_id=q1['qos_queue']['id'])
net1 = self.deserialize('json', res)
res = self._create_port('json', net1['network']['id'],
arg_list=(ext_qos.RXTX_FACTOR,),
rxtx_factor=2, device_id='1')
port = self.deserialize('json', res)
req = self.new_show_request('qos-queues',
port['port'][ext_qos.QUEUE])
res = req.get_response(self.ext_api)
queue = self.deserialize('json', res)
self.assertEqual(queue['qos_queue']['max'], 20)

View File

@ -19,12 +19,11 @@
import mock
from neutron.plugins.nicira.common import config # noqa
from neutron.plugins.nicira import nsx_cluster
from neutron.plugins.nicira import NvpApiClient
from neutron.plugins.nicira import nvplib
from neutron.plugins.nicira import nsx_cluster as cluster
from neutron.plugins.nicira import NvpApiClient as api_client
from neutron.tests import base
from neutron.tests.unit import test_api_v2
from neutron.tests.unit.vmware import fake_nvpapiclient
from neutron.tests.unit.vmware.apiclient import fake
from neutron.tests.unit.vmware import NSXAPI_NAME
from neutron.tests.unit.vmware import STUBS_PATH
@ -34,23 +33,19 @@ _uuid = test_api_v2._uuid
class NsxlibTestCase(base.BaseTestCase):
def setUp(self):
# mock nvp api client
self.fc = fake_nvpapiclient.FakeClient(STUBS_PATH)
self.fc = fake.FakeClient(STUBS_PATH)
self.mock_nsxapi = mock.patch(NSXAPI_NAME, autospec=True)
instance = self.mock_nsxapi.start()
instance.return_value.login.return_value = "the_cookie"
fake_version = getattr(self, 'fake_version', "3.0")
instance.return_value.get_nvp_version.return_value = (
NvpApiClient.NVPVersion(fake_version))
api_client.NVPVersion(fake_version))
def _fake_request(*args, **kwargs):
return self.fc.fake_request(*args, **kwargs)
instance.return_value.request.side_effect = _fake_request
self.fake_cluster = nsx_cluster.NSXCluster(
instance.return_value.request.side_effect = self.fc.fake_request
self.fake_cluster = cluster.NSXCluster(
name='fake-cluster', nsx_controllers=['1.1.1.1:999'],
default_tz_uuid=_uuid(), nsx_user='foo', nsx_password='bar')
self.fake_cluster.api_client = NvpApiClient.NVPApiHelper(
self.fake_cluster.api_client = api_client.NVPApiHelper(
('1.1.1.1', '999', True),
self.fake_cluster.nsx_user, self.fake_cluster.nsx_password,
self.fake_cluster.req_timeout, self.fake_cluster.http_timeout,
@ -68,8 +63,7 @@ class NsxlibTestCase(base.BaseTestCase):
class NsxlibNegativeBaseTestCase(base.BaseTestCase):
def setUp(self):
# mock nsx api client
self.fc = fake_nvpapiclient.FakeClient(STUBS_PATH)
self.fc = fake.FakeClient(STUBS_PATH)
self.mock_nsxapi = mock.patch(NSXAPI_NAME, autospec=True)
instance = self.mock_nsxapi.start()
instance.return_value.login.return_value = "the_cookie"
@ -77,16 +71,16 @@ class NsxlibNegativeBaseTestCase(base.BaseTestCase):
# these tests as calls are throwing up errors anyway
fake_version = getattr(self, 'fake_version', "3.0")
instance.return_value.get_nvp_version.return_value = (
NvpApiClient.NVPVersion(fake_version))
api_client.NVPVersion(fake_version))
def _faulty_request(*args, **kwargs):
raise nvplib.NvpApiClient.NvpApiException
raise api_client.NvpApiException
instance.return_value.request.side_effect = _faulty_request
self.fake_cluster = nsx_cluster.NSXCluster(
self.fake_cluster = cluster.NSXCluster(
name='fake-cluster', nsx_controllers=['1.1.1.1:999'],
default_tz_uuid=_uuid(), nsx_user='foo', nsx_password='bar')
self.fake_cluster.api_client = NvpApiClient.NVPApiHelper(
self.fake_cluster.api_client = api_client.NVPApiHelper(
('1.1.1.1', '999', True),
self.fake_cluster.nsx_user, self.fake_cluster.nsx_password,
self.fake_cluster.req_timeout, self.fake_cluster.http_timeout,

View File

@ -16,7 +16,7 @@
from neutron.plugins.nicira.nsxlib import l2gateway as l2gwlib
from neutron.plugins.nicira.nsxlib import switch as switchlib
from neutron.plugins.nicira import nvplib
from neutron.plugins.nicira import NvpApiClient as api_client
from neutron.tests.unit import test_api_v2
from neutron.tests.unit.vmware.nsxlib import base
@ -26,7 +26,7 @@ _uuid = test_api_v2._uuid
class L2GatewayNegativeTestCase(base.NsxlibNegativeBaseTestCase):
def test_create_l2_gw_service_on_failure(self):
self.assertRaises(nvplib.NvpApiClient.NvpApiException,
self.assertRaises(api_client.NvpApiException,
l2gwlib.create_l2_gw_service,
self.fake_cluster,
'fake-tenant',
@ -35,19 +35,19 @@ class L2GatewayNegativeTestCase(base.NsxlibNegativeBaseTestCase):
'interface_name': 'xxx'}])
def test_delete_l2_gw_service_on_failure(self):
self.assertRaises(nvplib.NvpApiClient.NvpApiException,
self.assertRaises(api_client.NvpApiException,
l2gwlib.delete_l2_gw_service,
self.fake_cluster,
'fake-gateway')
def test_get_l2_gw_service_on_failure(self):
self.assertRaises(nvplib.NvpApiClient.NvpApiException,
self.assertRaises(api_client.NvpApiException,
l2gwlib.get_l2_gw_service,
self.fake_cluster,
'fake-gateway')
def test_update_l2_gw_service_on_failure(self):
self.assertRaises(nvplib.NvpApiClient.NvpApiException,
self.assertRaises(api_client.NvpApiException,
l2gwlib.update_l2_gw_service,
self.fake_cluster,
'fake-gateway',
@ -135,12 +135,12 @@ class L2GatewayTestCase(base.NsxlibTestCase):
l2gwlib.plug_l2_gw_service(
self.fake_cluster, lswitch['uuid'],
lport['uuid'], gw_id)
uri = nvplib._build_uri_path(nvplib.LSWITCHPORT_RESOURCE,
lport['uuid'],
lswitch['uuid'],
is_attachment=True)
resp_obj = nvplib.do_request("GET", uri,
cluster=self.fake_cluster)
uri = l2gwlib._build_uri_path(switchlib.LSWITCHPORT_RESOURCE,
lport['uuid'],
lswitch['uuid'],
is_attachment=True)
resp_obj = l2gwlib.do_request("GET", uri,
cluster=self.fake_cluster)
self.assertIn('LogicalPortAttachment', resp_obj)
self.assertEqual(resp_obj['LogicalPortAttachment']['type'],
'L2GatewayAttachment')

View File

@ -17,10 +17,10 @@ import json
import mock
from neutron.common import exceptions
from neutron.plugins.nicira.common import exceptions as nvp_exc
from neutron.plugins.nicira.common import exceptions as nsx_exc
from neutron.plugins.nicira.common import utils
from neutron.plugins.nicira.nsxlib import lsn as lsnlib
from neutron.plugins.nicira import NvpApiClient
from neutron.plugins.nicira import NvpApiClient as api_client
from neutron.tests import base
@ -239,9 +239,9 @@ class LSNTestCase(base.BaseTestCase):
lsn_id = "foo_lsn_id"
lsn_port_id = "foo_lsn_port_id"
lswitch_port_id = "foo_lswitch_port_id"
self.mock_request.side_effect = NvpApiClient.Conflict
self.mock_request.side_effect = api_client.Conflict
self.assertRaises(
nvp_exc.LsnConfigurationConflict,
nsx_exc.LsnConfigurationConflict,
lsnlib.lsn_port_plug_network,
self.cluster, lsn_id, lsn_port_id, lswitch_port_id)

View File

@ -18,7 +18,7 @@ import mock
from neutron.common import exceptions
from neutron.plugins.nicira.nsxlib import queue as queuelib
from neutron.plugins.nicira import NvpApiClient
from neutron.plugins.nicira import NvpApiClient as api_client
from neutron.tests.unit.vmware.nsxlib import base
@ -44,7 +44,7 @@ class TestLogicalQueueLib(base.NsxlibTestCase):
def test_create_lqueue_nsx_error_raises(self):
def raise_nsx_exc(*args, **kwargs):
raise NvpApiClient.NvpApiException()
raise api_client.NvpApiException()
with mock.patch.object(queuelib, 'do_request', new=raise_nsx_exc):
self.assertRaises(

View File

@ -18,12 +18,11 @@ import mock
from neutron.common import exceptions
from neutron.openstack.common import uuidutils
from neutron.plugins.nicira.common import exceptions as nvp_exc
from neutron.plugins.nicira.common import exceptions as nsx_exc
from neutron.plugins.nicira.common import utils
from neutron.plugins.nicira.nsxlib import router as routerlib
from neutron.plugins.nicira.nsxlib import switch as switchlib
from neutron.plugins.nicira import NvpApiClient
from neutron.plugins.nicira import nvplib
from neutron.plugins.nicira import NvpApiClient as api_client
from neutron.tests.unit import test_api_v2
from neutron.tests.unit.vmware.nsxlib import base
@ -35,7 +34,7 @@ class TestNatRules(base.NsxlibTestCase):
def _test_create_lrouter_dnat_rule(self, version):
with mock.patch.object(self.fake_cluster.api_client,
'get_nvp_version',
new=lambda: NvpApiClient.NVPVersion(version)):
new=lambda: api_client.NVPVersion(version)):
tenant_id = 'pippo'
lrouter = routerlib.create_lrouter(self.fake_cluster,
uuidutils.generate_uuid(),
@ -46,10 +45,11 @@ class TestNatRules(base.NsxlibTestCase):
self.fake_cluster, lrouter['uuid'], '10.0.0.99',
match_criteria={'destination_ip_addresses':
'192.168.0.5'})
uri = nvplib._build_uri_path(routerlib.LROUTERNAT_RESOURCE,
nat_rule['uuid'],
lrouter['uuid'])
resp_obj = nvplib.do_request("GET", uri, cluster=self.fake_cluster)
uri = routerlib._build_uri_path(routerlib.LROUTERNAT_RESOURCE,
nat_rule['uuid'],
lrouter['uuid'])
resp_obj = routerlib.do_request(
"GET", uri, cluster=self.fake_cluster)
self.assertEqual('DestinationNatRule', resp_obj['type'])
self.assertEqual('192.168.0.5',
resp_obj['match']['destination_ip_addresses'])
@ -178,14 +178,14 @@ class TestExplicitLRouters(base.NsxlibTestCase):
new_routes = [{"nexthop": "10.0.0.2",
"destination": "169.254.169.0/30"}, ]
nvp_routes = [self._get_single_route(router_id)]
nsx_routes = [self._get_single_route(router_id)]
with mock.patch.object(routerlib, 'get_explicit_routes_lrouter',
return_value=nvp_routes):
return_value=nsx_routes):
with mock.patch.object(routerlib, 'create_explicit_route_lrouter',
return_value='fake_uuid'):
old_routes = routerlib.update_explicit_routes_lrouter(
self.fake_cluster, router_id, new_routes)
self.assertEqual(old_routes, nvp_routes)
self.assertEqual(old_routes, nsx_routes)
def test_update_lrouter_with_no_routes_raise_nsx_exception(self):
router_id = 'fake_router_id'
@ -196,8 +196,8 @@ class TestExplicitLRouters(base.NsxlibTestCase):
with mock.patch.object(routerlib, 'get_explicit_routes_lrouter',
return_value=nsx_routes):
with mock.patch.object(routerlib, 'create_explicit_route_lrouter',
side_effect=NvpApiClient.NvpApiException):
self.assertRaises(NvpApiClient.NvpApiException,
side_effect=api_client.NvpApiException):
self.assertRaises(api_client.NvpApiException,
routerlib.update_explicit_routes_lrouter,
self.fake_cluster, router_id, new_routes)
@ -237,12 +237,12 @@ class TestExplicitLRouters(base.NsxlibTestCase):
with mock.patch.object(routerlib, 'get_explicit_routes_lrouter',
return_value=nsx_routes):
with mock.patch.object(routerlib, 'delete_explicit_route_lrouter',
side_effect=NvpApiClient.NvpApiException):
side_effect=api_client.NvpApiException):
with mock.patch.object(
routerlib, 'create_explicit_route_lrouter',
return_value='fake_uuid'):
self.assertRaises(
NvpApiClient.NvpApiException,
api_client.NvpApiException,
routerlib.update_explicit_routes_lrouter,
self.fake_cluster, router_id, new_routes)
@ -250,7 +250,7 @@ class TestExplicitLRouters(base.NsxlibTestCase):
class RouterNegativeTestCase(base.NsxlibNegativeBaseTestCase):
def test_create_lrouter_on_failure(self):
self.assertRaises(nvplib.NvpApiClient.NvpApiException,
self.assertRaises(api_client.NvpApiException,
routerlib.create_lrouter,
self.fake_cluster,
uuidutils.generate_uuid(),
@ -259,19 +259,19 @@ class RouterNegativeTestCase(base.NsxlibNegativeBaseTestCase):
'my_hop')
def test_delete_lrouter_on_failure(self):
self.assertRaises(nvplib.NvpApiClient.NvpApiException,
self.assertRaises(api_client.NvpApiException,
routerlib.delete_lrouter,
self.fake_cluster,
'fake_router')
def test_get_lrouter_on_failure(self):
self.assertRaises(nvplib.NvpApiClient.NvpApiException,
self.assertRaises(api_client.NvpApiException,
routerlib.get_lrouter,
self.fake_cluster,
'fake_router')
def test_update_lrouter_on_failure(self):
self.assertRaises(nvplib.NvpApiClient.NvpApiException,
self.assertRaises(api_client.NvpApiException,
routerlib.update_lrouter,
self.fake_cluster,
'fake_router',
@ -314,7 +314,7 @@ class TestLogicalRouters(base.NsxlibTestCase):
def _create_lrouter(self, version, neutron_id=None, distributed=None):
with mock.patch.object(
self.fake_cluster.api_client, 'get_nvp_version',
return_value=NvpApiClient.NVPVersion(version)):
return_value=api_client.NVPVersion(version)):
if not neutron_id:
neutron_id = uuidutils.generate_uuid()
lrouter = routerlib.create_lrouter(
@ -375,7 +375,7 @@ class TestLogicalRouters(base.NsxlibTestCase):
with mock.patch.object(self.fake_cluster.api_client,
'get_nvp_version',
return_value=NvpApiClient.NVPVersion(version)):
return_value=api_client.NVPVersion(version)):
with mock.patch.dict(routerlib.ROUTER_FUNC_DICT,
foo_func_dict, clear=True):
return routerlib.update_lrouter(
@ -383,13 +383,13 @@ class TestLogicalRouters(base.NsxlibTestCase):
'foo_nexthop', routes={'foo_destination': 'foo_address'})
def test_version_dependent_update_lrouter_old_versions(self):
self.assertRaises(nvp_exc.NvpInvalidVersion,
self.assertRaises(nsx_exc.NvpInvalidVersion,
self._test_version_dependent_update_lrouter,
"2.9")
self.assertRaises(nvp_exc.NvpInvalidVersion,
self.assertRaises(nsx_exc.NvpInvalidVersion,
self._test_version_dependent_update_lrouter,
"3.0")
self.assertRaises(nvp_exc.NvpInvalidVersion,
self.assertRaises(nsx_exc.NvpInvalidVersion,
self._test_version_dependent_update_lrouter,
"3.1")
@ -657,7 +657,7 @@ class TestLogicalRouters(base.NsxlibTestCase):
def test_update_lrouter_port_ips_nonexistent_router_raises(self):
self.assertRaises(
nvp_exc.NvpPluginException, routerlib.update_lrouter_port_ips,
nsx_exc.NvpPluginException, routerlib.update_lrouter_port_ips,
self.fake_cluster, 'boo-router', 'boo-port', [], [])
def test_update_lrouter_port_ips_nsx_exception_raises(self):
@ -671,11 +671,11 @@ class TestLogicalRouters(base.NsxlibTestCase):
'name', True, ['192.168.0.1'], '00:11:22:33:44:55')
def raise_nsx_exc(*args, **kwargs):
raise NvpApiClient.NvpApiException()
raise api_client.NvpApiException()
with mock.patch.object(routerlib, 'do_request', new=raise_nsx_exc):
self.assertRaises(
nvp_exc.NvpPluginException, routerlib.update_lrouter_port_ips,
nsx_exc.NvpPluginException, routerlib.update_lrouter_port_ips,
self.fake_cluster, lrouter['uuid'],
lrouter_port['uuid'], [], [])
@ -751,7 +751,7 @@ class TestLogicalRouters(base.NsxlibTestCase):
lrouter_port = routerlib.create_router_lport(
self.fake_cluster, lrouter['uuid'], 'pippo', 'neutron_port_id',
'name', True, ['192.168.0.1'], '00:11:22:33:44:55')
self.assertRaises(nvp_exc.NvpInvalidAttachmentType,
self.assertRaises(nsx_exc.NvpInvalidAttachmentType,
routerlib.plug_router_port_attachment,
self.fake_cluster, lrouter['uuid'],
lrouter_port['uuid'], 'gw_att', 'BadType')
@ -764,7 +764,7 @@ class TestLogicalRouters(base.NsxlibTestCase):
'10.0.0.1')
with mock.patch.object(self.fake_cluster.api_client,
'get_nvp_version',
new=lambda: NvpApiClient.NVPVersion(version)):
new=lambda: api_client.NVPVersion(version)):
routerlib.create_lrouter_snat_rule(
self.fake_cluster, lrouter['uuid'],
'10.0.0.2', '10.0.0.2', order=200,
@ -787,7 +787,7 @@ class TestLogicalRouters(base.NsxlibTestCase):
'10.0.0.1')
with mock.patch.object(self.fake_cluster.api_client,
'get_nvp_version',
return_value=NvpApiClient.NVPVersion(version)):
return_value=api_client.NVPVersion(version)):
routerlib.create_lrouter_dnat_rule(
self.fake_cluster, lrouter['uuid'], '192.168.0.2', order=200,
dest_port=dest_port,
@ -833,7 +833,7 @@ class TestLogicalRouters(base.NsxlibTestCase):
'10.0.0.1')
with mock.patch.object(self.fake_cluster.api_client,
'get_nvp_version',
new=lambda: NvpApiClient.NVPVersion(version)):
new=lambda: api_client.NVPVersion(version)):
routerlib.create_lrouter_nosnat_rule(
self.fake_cluster, lrouter['uuid'],
order=100,
@ -858,7 +858,7 @@ class TestLogicalRouters(base.NsxlibTestCase):
# v2 or v3 makes no difference for this test
with mock.patch.object(self.fake_cluster.api_client,
'get_nvp_version',
new=lambda: NvpApiClient.NVPVersion('2.0')):
new=lambda: api_client.NVPVersion('2.0')):
routerlib.create_lrouter_snat_rule(
self.fake_cluster, lrouter['uuid'],
'10.0.0.2', '10.0.0.2', order=220,
@ -911,7 +911,7 @@ class TestLogicalRouters(base.NsxlibTestCase):
rules = routerlib.query_nat_rules(self.fake_cluster, lrouter['uuid'])
self.assertEqual(len(rules), 3)
self.assertRaises(
nvp_exc.NvpNatRuleMismatch,
nsx_exc.NvpNatRuleMismatch,
routerlib.delete_nat_rules_by_match,
self.fake_cluster, lrouter['uuid'],
'SomeWeirdType', 1, 1)

View File

@ -16,7 +16,7 @@
from neutron.common import exceptions
from neutron.plugins.nicira.nsxlib import secgroup as secgrouplib
from neutron.plugins.nicira import nvplib
from neutron.plugins.nicira import nvplib as nsx_utils
from neutron.tests.unit.vmware.nsxlib import base
@ -25,10 +25,10 @@ class SecurityProfileTestCase(base.NsxlibTestCase):
def test_create_and_get_security_profile(self):
sec_prof = secgrouplib.create_security_profile(
self.fake_cluster, 'pippo', {'name': 'test'})
sec_prof_res = nvplib.do_request(
sec_prof_res = secgrouplib.do_request(
secgrouplib.HTTP_GET,
nvplib._build_uri_path('security-profile',
resource_id=sec_prof['uuid']),
nsx_utils._build_uri_path('security-profile',
resource_id=sec_prof['uuid']),
cluster=self.fake_cluster)
self.assertEqual(sec_prof['uuid'], sec_prof_res['uuid'])
# Check for builtin rules
@ -38,10 +38,10 @@ class SecurityProfileTestCase(base.NsxlibTestCase):
def test_create_and_get_default_security_profile(self):
sec_prof = secgrouplib.create_security_profile(
self.fake_cluster, 'pippo', {'name': 'default'})
sec_prof_res = nvplib.do_request(
nvplib.HTTP_GET,
nvplib._build_uri_path('security-profile',
resource_id=sec_prof['uuid']),
sec_prof_res = nsx_utils.do_request(
secgrouplib.HTTP_GET,
nsx_utils._build_uri_path('security-profile',
resource_id=sec_prof['uuid']),
cluster=self.fake_cluster)
self.assertEqual(sec_prof['uuid'], sec_prof_res['uuid'])
# Check for builtin rules
@ -57,10 +57,10 @@ class SecurityProfileTestCase(base.NsxlibTestCase):
'logical_port_ingress_rules': [ingress_rule]}
secgrouplib.update_security_group_rules(
self.fake_cluster, sec_prof['uuid'], new_rules)
sec_prof_res = nvplib.do_request(
nvplib.HTTP_GET,
nvplib._build_uri_path('security-profile',
resource_id=sec_prof['uuid']),
sec_prof_res = nsx_utils.do_request(
secgrouplib.HTTP_GET,
nsx_utils._build_uri_path('security-profile',
resource_id=sec_prof['uuid']),
cluster=self.fake_cluster)
self.assertEqual(sec_prof['uuid'], sec_prof_res['uuid'])
# Check for builtin rules
@ -81,10 +81,10 @@ class SecurityProfileTestCase(base.NsxlibTestCase):
'logical_port_ingress_rules': []}
secgrouplib.update_security_group_rules(
self.fake_cluster, sec_prof['uuid'], new_rules)
sec_prof_res = nvplib.do_request(
nvplib.HTTP_GET,
nvplib._build_uri_path('security-profile',
resource_id=sec_prof['uuid']),
sec_prof_res = nsx_utils.do_request(
nsx_utils.HTTP_GET,
nsx_utils._build_uri_path('security-profile',
resource_id=sec_prof['uuid']),
cluster=self.fake_cluster)
self.assertEqual(sec_prof['uuid'], sec_prof_res['uuid'])
# Check for builtin rules
@ -108,9 +108,9 @@ class SecurityProfileTestCase(base.NsxlibTestCase):
secgrouplib.delete_security_profile(
self.fake_cluster, sec_prof['uuid'])
self.assertRaises(exceptions.NotFound,
nvplib.do_request,
nvplib.HTTP_GET,
nvplib._build_uri_path(
secgrouplib.do_request,
secgrouplib.HTTP_GET,
nsx_utils._build_uri_path(
'security-profile',
resource_id=sec_prof['uuid']),
cluster=self.fake_cluster)

View File

@ -16,42 +16,42 @@
from neutron.plugins.nicira.nsxlib import router as routerlib
from neutron.plugins.nicira.nsxlib import versioning
from neutron.plugins.nicira import NvpApiClient
from neutron.plugins.nicira import NvpApiClient as api_client
from neutron.tests import base
class TestVersioning(base.BaseTestCase):
def test_function_handling_missing_minor(self):
version = NvpApiClient.NVPVersion('2.0')
version = api_client.NVPVersion('2.0')
function = versioning.get_function_by_version(
routerlib.ROUTER_FUNC_DICT, 'create_lrouter', version)
self.assertEqual(routerlib.create_implicit_routing_lrouter,
function)
def test_function_handling_with_both_major_and_minor(self):
version = NvpApiClient.NVPVersion('3.2')
version = api_client.NVPVersion('3.2')
function = versioning.get_function_by_version(
routerlib.ROUTER_FUNC_DICT, 'create_lrouter', version)
self.assertEqual(routerlib.create_explicit_routing_lrouter,
function)
def test_function_handling_with_newer_major(self):
version = NvpApiClient.NVPVersion('5.2')
version = api_client.NVPVersion('5.2')
function = versioning.get_function_by_version(
routerlib.ROUTER_FUNC_DICT, 'create_lrouter', version)
self.assertEqual(routerlib.create_explicit_routing_lrouter,
function)
def test_function_handling_with_obsolete_major(self):
version = NvpApiClient.NVPVersion('1.2')
version = api_client.NVPVersion('1.2')
self.assertRaises(NotImplementedError,
versioning.get_function_by_version,
routerlib.ROUTER_FUNC_DICT,
'create_lrouter', version)
def test_function_handling_with_unknown_version(self):
self.assertRaises(NvpApiClient.ServiceUnavailable,
self.assertRaises(api_client.ServiceUnavailable,
versioning.get_function_by_version,
routerlib.ROUTER_FUNC_DICT,
'create_lrouter', None)

View File

@ -23,37 +23,34 @@ from neutron.common.test_lib import test_config
from neutron.plugins.nicira.common import sync
from neutron.plugins.nicira.dhcp_meta import rpc
from neutron.tests.unit.openvswitch import test_agent_scheduler as test_base
from neutron.tests.unit.vmware import fake_nvpapiclient
from neutron.tests.unit.vmware.apiclient import fake
from neutron.tests.unit.vmware import get_fake_conf
from neutron.tests.unit.vmware import NSXAPI_NAME
from neutron.tests.unit.vmware import PLUGIN_NAME
from neutron.tests.unit.vmware import STUBS_PATH
class NVPDhcpAgentNotifierTestCase(test_base.OvsDhcpAgentNotifierTestCase):
class DhcpAgentNotifierTestCase(test_base.OvsDhcpAgentNotifierTestCase):
plugin_str = PLUGIN_NAME
def setUp(self):
test_config['config_files'] = [get_fake_conf('nsx.ini.full.test')]
# mock nvp api client
self.fc = fake_nvpapiclient.FakeClient(STUBS_PATH)
self.mock_nvpapi = mock.patch(NSXAPI_NAME, autospec=True)
instance = self.mock_nvpapi.start()
# mock api client
self.fc = fake.FakeClient(STUBS_PATH)
self.mock_nsx_api = mock.patch(NSXAPI_NAME, autospec=True)
instance = self.mock_nsx_api.start()
# Avoid runs of the synchronizer looping call
patch_sync = mock.patch.object(sync, '_start_loopingcall')
patch_sync.start()
def _fake_request(*args, **kwargs):
return self.fc.fake_request(*args, **kwargs)
# Emulate tests against NVP 2.x
# Emulate tests against NSX 2.x
instance.return_value.get_nvp_version.return_value = "2.999"
instance.return_value.request.side_effect = _fake_request
super(NVPDhcpAgentNotifierTestCase, self).setUp()
instance.return_value.request.side_effect = self.fc.fake_request
super(DhcpAgentNotifierTestCase, self).setUp()
self.addCleanup(self.fc.reset_all)
self.addCleanup(patch_sync.stop)
self.addCleanup(self.mock_nvpapi.stop)
self.addCleanup(self.mock_nsx_api.stop)
self.addCleanup(cfg.CONF.reset)
def _test_gateway_subnet_notification(self, gateway='10.0.0.1'):

View File

@ -28,7 +28,7 @@ from neutron.plugins.nicira.common import exceptions
from neutron.plugins.nicira.common import sync
from neutron.plugins.nicira import nsx_cluster
from neutron.plugins.nicira.nsxlib import lsn as lsnlib
from neutron.plugins.nicira import NvpApiClient as nvp_client
from neutron.plugins.nicira import NvpApiClient as api_client
from neutron.tests import base
from neutron.tests.unit.vmware import get_fake_conf
from neutron.tests.unit.vmware import PLUGIN_NAME
@ -149,10 +149,10 @@ class ConfigurationTest(base.BaseTestCase):
cfg.CONF.set_override('core_plugin', PLUGIN_NAME)
self.assertEqual(config.AgentModes.AGENTLESS,
cfg.CONF.NSX.agent_mode)
# The version returned from NVP does not really matter here
with mock.patch.object(nvp_client.NVPApiHelper,
# The version returned from NSX does not really matter here
with mock.patch.object(api_client.NVPApiHelper,
'get_nvp_version',
return_value=nvp_client.NVPVersion("9.9")):
return_value=api_client.NVPVersion("9.9")):
with mock.patch.object(lsnlib,
'service_cluster_exists',
return_value=True):
@ -168,9 +168,9 @@ class ConfigurationTest(base.BaseTestCase):
cfg.CONF.set_override('core_plugin', PLUGIN_NAME)
self.assertEqual(config.AgentModes.AGENTLESS,
cfg.CONF.NSX.agent_mode)
with mock.patch.object(nvp_client.NVPApiHelper,
with mock.patch.object(api_client.NVPApiHelper,
'get_nvp_version',
return_value=nvp_client.NVPVersion("3.2")):
return_value=api_client.NVPVersion("3.2")):
self.assertRaises(exceptions.NvpPluginException, NeutronManager)
def test_agentless_extensions_unmet_deps_fail(self):
@ -179,9 +179,9 @@ class ConfigurationTest(base.BaseTestCase):
cfg.CONF.set_override('core_plugin', PLUGIN_NAME)
self.assertEqual(config.AgentModes.AGENTLESS,
cfg.CONF.NSX.agent_mode)
with mock.patch.object(nvp_client.NVPApiHelper,
with mock.patch.object(api_client.NVPApiHelper,
'get_nvp_version',
return_value=nvp_client.NVPVersion("3.2")):
return_value=api_client.NVPVersion("3.2")):
with mock.patch.object(lsnlib,
'service_cluster_exists',
return_value=False):

File diff suppressed because it is too large Load Diff

View File

@ -1,6 +1,6 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2013 Nicira Networks, Inc.
# Copyright 2013 VMware, Inc.
# All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
@ -29,13 +29,13 @@ from neutron import context
from neutron.openstack.common import jsonutils as json
from neutron.openstack.common import log
from neutron.plugins.nicira.common import sync
from neutron.plugins.nicira import nsx_cluster
from neutron.plugins.nicira import nsx_cluster as cluster
from neutron.plugins.nicira import NvpApiClient
from neutron.plugins.nicira import nvplib
from neutron.plugins.nicira import nvplib as nsx_utils
from neutron.plugins.vmware import plugin
from neutron.tests import base
from neutron.tests.unit import test_api_v2
from neutron.tests.unit.vmware import fake_nvpapiclient
from neutron.tests.unit.vmware.apiclient import fake
from neutron.tests.unit.vmware import get_fake_conf
from neutron.tests.unit.vmware import NSXAPI_NAME
from neutron.tests.unit.vmware import STUBS_PATH
@ -51,60 +51,60 @@ LROUTERS = [{'uuid': _uuid(), 'name': 'lr-1'},
{'uuid': _uuid(), 'name': 'lr-2'}]
class NvpCacheTestCase(base.BaseTestCase):
"""Test suite providing coverage for the NvpCache class."""
class CacheTestCase(base.BaseTestCase):
"""Test suite providing coverage for the Cache class."""
def setUp(self):
self.nvp_cache = sync.NvpCache()
self.nsx_cache = sync.NvpCache()
for lswitch in LSWITCHES:
self.nvp_cache._uuid_dict_mappings[lswitch['uuid']] = (
self.nvp_cache._lswitches)
self.nvp_cache._lswitches[lswitch['uuid']] = (
self.nsx_cache._uuid_dict_mappings[lswitch['uuid']] = (
self.nsx_cache._lswitches)
self.nsx_cache._lswitches[lswitch['uuid']] = (
{'data': lswitch,
'hash': hash(json.dumps(lswitch))})
for lswitchport in LSWITCHPORTS:
self.nvp_cache._uuid_dict_mappings[lswitchport['uuid']] = (
self.nvp_cache._lswitchports)
self.nvp_cache._lswitchports[lswitchport['uuid']] = (
self.nsx_cache._uuid_dict_mappings[lswitchport['uuid']] = (
self.nsx_cache._lswitchports)
self.nsx_cache._lswitchports[lswitchport['uuid']] = (
{'data': lswitchport,
'hash': hash(json.dumps(lswitchport))})
for lrouter in LROUTERS:
self.nvp_cache._uuid_dict_mappings[lrouter['uuid']] = (
self.nvp_cache._lrouters)
self.nvp_cache._lrouters[lrouter['uuid']] = (
self.nsx_cache._uuid_dict_mappings[lrouter['uuid']] = (
self.nsx_cache._lrouters)
self.nsx_cache._lrouters[lrouter['uuid']] = (
{'data': lrouter,
'hash': hash(json.dumps(lrouter))})
super(NvpCacheTestCase, self).setUp()
super(CacheTestCase, self).setUp()
def test_get_lswitches(self):
ls_uuids = self.nvp_cache.get_lswitches()
ls_uuids = self.nsx_cache.get_lswitches()
self.assertEqual(set(ls_uuids),
set([ls['uuid'] for ls in LSWITCHES]))
def test_get_lswitchports(self):
lp_uuids = self.nvp_cache.get_lswitchports()
lp_uuids = self.nsx_cache.get_lswitchports()
self.assertEqual(set(lp_uuids),
set([lp['uuid'] for lp in LSWITCHPORTS]))
def test_get_lrouters(self):
lr_uuids = self.nvp_cache.get_lrouters()
lr_uuids = self.nsx_cache.get_lrouters()
self.assertEqual(set(lr_uuids),
set([lr['uuid'] for lr in LROUTERS]))
def test_get_lswitches_changed_only(self):
ls_uuids = self.nvp_cache.get_lswitches(changed_only=True)
ls_uuids = self.nsx_cache.get_lswitches(changed_only=True)
self.assertEqual(0, len(ls_uuids))
def test_get_lswitchports_changed_only(self):
lp_uuids = self.nvp_cache.get_lswitchports(changed_only=True)
lp_uuids = self.nsx_cache.get_lswitchports(changed_only=True)
self.assertEqual(0, len(lp_uuids))
def test_get_lrouters_changed_only(self):
lr_uuids = self.nvp_cache.get_lrouters(changed_only=True)
lr_uuids = self.nsx_cache.get_lrouters(changed_only=True)
self.assertEqual(0, len(lr_uuids))
def _verify_update(self, new_resource, changed=True, hit=True):
cached_resource = self.nvp_cache[new_resource['uuid']]
cached_resource = self.nsx_cache[new_resource['uuid']]
self.assertEqual(new_resource, cached_resource['data'])
self.assertEqual(hit, cached_resource.get('hit', False))
self.assertEqual(changed,
@ -113,68 +113,68 @@ class NvpCacheTestCase(base.BaseTestCase):
def test_update_lswitch_new_item(self):
new_switch_uuid = _uuid()
new_switch = {'uuid': new_switch_uuid, 'name': 'new_switch'}
self.nvp_cache.update_lswitch(new_switch)
self.assertIn(new_switch_uuid, self.nvp_cache._lswitches.keys())
self.nsx_cache.update_lswitch(new_switch)
self.assertIn(new_switch_uuid, self.nsx_cache._lswitches.keys())
self._verify_update(new_switch)
def test_update_lswitch_existing_item(self):
switch = LSWITCHES[0]
switch['name'] = 'new_name'
self.nvp_cache.update_lswitch(switch)
self.assertIn(switch['uuid'], self.nvp_cache._lswitches.keys())
self.nsx_cache.update_lswitch(switch)
self.assertIn(switch['uuid'], self.nsx_cache._lswitches.keys())
self._verify_update(switch)
def test_update_lswitchport_new_item(self):
new_switchport_uuid = _uuid()
new_switchport = {'uuid': new_switchport_uuid,
'name': 'new_switchport'}
self.nvp_cache.update_lswitchport(new_switchport)
self.nsx_cache.update_lswitchport(new_switchport)
self.assertIn(new_switchport_uuid,
self.nvp_cache._lswitchports.keys())
self.nsx_cache._lswitchports.keys())
self._verify_update(new_switchport)
def test_update_lswitchport_existing_item(self):
switchport = LSWITCHPORTS[0]
switchport['name'] = 'new_name'
self.nvp_cache.update_lswitchport(switchport)
self.nsx_cache.update_lswitchport(switchport)
self.assertIn(switchport['uuid'],
self.nvp_cache._lswitchports.keys())
self.nsx_cache._lswitchports.keys())
self._verify_update(switchport)
def test_update_lrouter_new_item(self):
new_router_uuid = _uuid()
new_router = {'uuid': new_router_uuid,
'name': 'new_router'}
self.nvp_cache.update_lrouter(new_router)
self.nsx_cache.update_lrouter(new_router)
self.assertIn(new_router_uuid,
self.nvp_cache._lrouters.keys())
self.nsx_cache._lrouters.keys())
self._verify_update(new_router)
def test_update_lrouter_existing_item(self):
router = LROUTERS[0]
router['name'] = 'new_name'
self.nvp_cache.update_lrouter(router)
self.nsx_cache.update_lrouter(router)
self.assertIn(router['uuid'],
self.nvp_cache._lrouters.keys())
self.nsx_cache._lrouters.keys())
self._verify_update(router)
def test_process_updates_initial(self):
# Clear cache content to simulate first-time filling
self.nvp_cache._lswitches.clear()
self.nvp_cache._lswitchports.clear()
self.nvp_cache._lrouters.clear()
self.nvp_cache.process_updates(LSWITCHES, LROUTERS, LSWITCHPORTS)
self.nsx_cache._lswitches.clear()
self.nsx_cache._lswitchports.clear()
self.nsx_cache._lrouters.clear()
self.nsx_cache.process_updates(LSWITCHES, LROUTERS, LSWITCHPORTS)
for resource in LSWITCHES + LROUTERS + LSWITCHPORTS:
self._verify_update(resource)
def test_process_updates_no_change(self):
self.nvp_cache.process_updates(LSWITCHES, LROUTERS, LSWITCHPORTS)
self.nsx_cache.process_updates(LSWITCHES, LROUTERS, LSWITCHPORTS)
for resource in LSWITCHES + LROUTERS + LSWITCHPORTS:
self._verify_update(resource, changed=False)
def test_process_updates_with_changes(self):
LSWITCHES[0]['name'] = 'altered'
self.nvp_cache.process_updates(LSWITCHES, LROUTERS, LSWITCHPORTS)
self.nsx_cache.process_updates(LSWITCHES, LROUTERS, LSWITCHPORTS)
for resource in LSWITCHES + LROUTERS + LSWITCHPORTS:
changed = (True if resource['uuid'] == LSWITCHES[0]['uuid']
else False)
@ -183,7 +183,7 @@ class NvpCacheTestCase(base.BaseTestCase):
def _test_process_updates_with_removals(self):
lswitches = LSWITCHES[:]
lswitch = lswitches.pop()
self.nvp_cache.process_updates(lswitches, LROUTERS, LSWITCHPORTS)
self.nsx_cache.process_updates(lswitches, LROUTERS, LSWITCHPORTS)
for resource in LSWITCHES + LROUTERS + LSWITCHPORTS:
hit = (False if resource['uuid'] == lswitch['uuid']
else True)
@ -195,12 +195,12 @@ class NvpCacheTestCase(base.BaseTestCase):
def test_process_updates_cleanup_after_delete(self):
deleted_lswitch, lswitches = self._test_process_updates_with_removals()
self.nvp_cache.process_deletes()
self.nvp_cache.process_updates(lswitches, LROUTERS, LSWITCHPORTS)
self.assertNotIn(deleted_lswitch['uuid'], self.nvp_cache._lswitches)
self.nsx_cache.process_deletes()
self.nsx_cache.process_updates(lswitches, LROUTERS, LSWITCHPORTS)
self.assertNotIn(deleted_lswitch['uuid'], self.nsx_cache._lswitches)
def _verify_delete(self, resource, deleted=True, hit=True):
cached_resource = self.nvp_cache[resource['uuid']]
cached_resource = self.nsx_cache[resource['uuid']]
data_field = 'data_bk' if deleted else 'data'
self.assertEqual(resource, cached_resource[data_field])
self.assertEqual(hit, cached_resource.get('hit', False))
@ -214,23 +214,23 @@ class NvpCacheTestCase(base.BaseTestCase):
def test_process_deletes_no_change(self):
# Mark all resources as hit
self._set_hit(self.nvp_cache._lswitches.values())
self._set_hit(self.nvp_cache._lswitchports.values())
self._set_hit(self.nvp_cache._lrouters.values())
self.nvp_cache.process_deletes()
self._set_hit(self.nsx_cache._lswitches.values())
self._set_hit(self.nsx_cache._lswitchports.values())
self._set_hit(self.nsx_cache._lrouters.values())
self.nsx_cache.process_deletes()
for resource in LSWITCHES + LROUTERS + LSWITCHPORTS:
self._verify_delete(resource, hit=False, deleted=False)
def test_process_deletes_with_removals(self):
# Mark all resources but one as hit
uuid_to_delete = LSWITCHPORTS[0]['uuid']
self._set_hit(self.nvp_cache._lswitches.values(),
self._set_hit(self.nsx_cache._lswitches.values(),
uuid_to_delete)
self._set_hit(self.nvp_cache._lswitchports.values(),
self._set_hit(self.nsx_cache._lswitchports.values(),
uuid_to_delete)
self._set_hit(self.nvp_cache._lrouters.values(),
self._set_hit(self.nsx_cache._lrouters.values(),
uuid_to_delete)
self.nvp_cache.process_deletes()
self.nsx_cache.process_deletes()
for resource in LSWITCHES + LROUTERS + LSWITCHPORTS:
deleted = resource['uuid'] == uuid_to_delete
self._verify_delete(resource, hit=False, deleted=deleted)
@ -253,27 +253,24 @@ class SyncLoopingCallTestCase(base.BaseTestCase):
self.assertTrue(synchronizer._synchronize_state.call_count)
class NvpSyncTestCase(base.BaseTestCase):
class SyncTestCase(base.BaseTestCase):
def setUp(self):
# mock nvp api client
self.fc = fake_nvpapiclient.FakeClient(STUBS_PATH)
mock_nvpapi = mock.patch(NSXAPI_NAME, autospec=True)
# mock api client
self.fc = fake.FakeClient(STUBS_PATH)
mock_api = mock.patch(NSXAPI_NAME, autospec=True)
# Avoid runs of the synchronizer looping call
# These unit tests will excplicitly invoke synchronization
patch_sync = mock.patch.object(sync, '_start_loopingcall')
self.mock_nvpapi = mock_nvpapi.start()
self.mock_api = mock_api.start()
patch_sync.start()
self.mock_nvpapi.return_value.login.return_value = "the_cookie"
# Emulate tests against NVP 3.x
self.mock_nvpapi.return_value.get_nvp_version.return_value = (
self.mock_api.return_value.login.return_value = "the_cookie"
# Emulate tests against NSX 3.x
self.mock_api.return_value.get_nvp_version.return_value = (
NvpApiClient.NVPVersion("3.1"))
def _fake_request(*args, **kwargs):
return self.fc.fake_request(*args, **kwargs)
self.mock_nvpapi.return_value.request.side_effect = _fake_request
self.fake_cluster = nsx_cluster.NSXCluster(
self.mock_api.return_value.request.side_effect = self.fc.fake_request
self.fake_cluster = cluster.NSXCluster(
name='fake-cluster', nsx_controllers=['1.1.1.1:999'],
default_tz_uuid=_uuid(), nsx_user='foo', nsx_password='bar')
self.fake_cluster.api_client = NvpApiClient.NVPApiHelper(
@ -296,16 +293,16 @@ class NvpSyncTestCase(base.BaseTestCase):
self.mock_nm_get_plugin = mock_nm_get_plugin.start()
self.mock_nm_get_plugin.return_value = self._plugin
mock_nm_get_service_plugins.start()
super(NvpSyncTestCase, self).setUp()
super(SyncTestCase, self).setUp()
self.addCleanup(self.fc.reset_all)
self.addCleanup(patch_sync.stop)
self.addCleanup(mock_nvpapi.stop)
self.addCleanup(mock_api.stop)
self.addCleanup(mock_nm_get_plugin.stop)
self.addCleanup(mock_nm_get_service_plugins.stop)
def tearDown(self):
cfg.CONF.reset()
super(NvpSyncTestCase, self).tearDown()
super(SyncTestCase, self).tearDown()
@contextlib.contextmanager
def _populate_data(self, ctx, net_size=2, port_size=2, router_size=2):
@ -456,7 +453,7 @@ class NvpSyncTestCase(base.BaseTestCase):
def _test_sync_with_chunk_larger_maxpagesize(
self, net_size, port_size, router_size, chunk_size, exp_calls):
ctx = context.get_admin_context()
real_func = nvplib.get_single_query_page
real_func = nsx_utils.get_single_query_page
sp = sync.SyncParameters(chunk_size)
with self._populate_data(ctx, net_size=net_size,
port_size=port_size,
@ -465,7 +462,7 @@ class NvpSyncTestCase(base.BaseTestCase):
# The following mock is just for counting calls,
# but we will still run the actual function
with mock.patch.object(
nvplib, 'get_single_query_page',
nsx_utils, 'get_single_query_page',
side_effect=real_func) as mock_get_page:
self._test_sync(
constants.NET_STATUS_ACTIVE,
@ -491,7 +488,7 @@ class NvpSyncTestCase(base.BaseTestCase):
chunk_size=48, exp_calls=6)
def test_sync_multi_chunk(self):
# The fake NVP API client cannot be used for this test
# The fake NSX API client cannot be used for this test
ctx = context.get_admin_context()
# Generate 4 networks, 1 port per network, and 4 routers
with self._populate_data(ctx, net_size=4, port_size=1, router_size=4):
@ -633,8 +630,8 @@ class NvpSyncTestCase(base.BaseTestCase):
q_rtr_data = self._plugin.get_router(ctx, q_rtr_id)
self.assertEqual(constants.NET_STATUS_DOWN, q_rtr_data['status'])
def test_sync_nvp_failure_backoff(self):
self.mock_nvpapi.return_value.request.side_effect = (
def test_sync_nsx_failure_backoff(self):
self.mock_api.return_value.request.side_effect = (
NvpApiClient.RequestTimeout)
# chunk size won't matter here
sp = sync.SyncParameters(999)

View File

@ -19,7 +19,7 @@ import mock
from neutron.db import api as db_api
from neutron.openstack.common import uuidutils
from neutron.plugins.nicira.common import exceptions as nvp_exc
from neutron.plugins.nicira.common import exceptions as nsx_exc
from neutron.plugins.nicira.common import nsx_utils
from neutron.plugins.nicira.common import utils
from neutron.plugins.nicira import NvpApiClient
@ -279,7 +279,7 @@ class ClusterManagementTestCase(nsx_base.NsxlibTestCase):
with mock.patch.object(self.fake_cluster.api_client,
'request',
side_effect=NvpApiClient.ReadOnlyMode):
self.assertRaises(nvp_exc.MaintenanceInProgress,
self.assertRaises(nsx_exc.MaintenanceInProgress,
nvplib.do_request, cluster=self.fake_cluster)
def test_cluster_method_not_implemented(self):

View File

@ -41,7 +41,7 @@ class FakeVcns(object):
self._edge_idx = 0
self._lswitches = {}
self._unique_router_name = unique_router_name
self._fake_nvpapi = None
self._fake_nsx_api = None
self.fake_firewall_dict = {}
self.temp_firewall = {
"firewallRules": {
@ -54,8 +54,8 @@ class FakeVcns(object):
self._fake_app_profiles_dict = {}
self._fake_loadbalancer_config = {}
def set_fake_nvpapi(self, fake_nvpapi):
self._fake_nvpapi = fake_nvpapi
def set_fake_nsx_api(self, fake_nsx_api):
self._fake_nsx_api = fake_nsx_api
def _validate_edge_name(self, name):
for edge_id, edge in self._edges.iteritems():
@ -237,10 +237,10 @@ class FakeVcns(object):
return (header, response)
def create_lswitch(self, lsconfig):
# The lswitch is created via VCNS API so the fake nvpapi wont
# see it. Added to fake nvpapi here.
if self._fake_nvpapi:
lswitch = self._fake_nvpapi._add_lswitch(json.dumps(lsconfig))
# The lswitch is created via VCNS API so the fake nsx_api will not
# see it. Added to fake nsx_api here.
if self._fake_nsx_api:
lswitch = self._fake_nsx_api._add_lswitch(json.dumps(lsconfig))
else:
lswitch = lsconfig
lswitch['uuid'] = uuidutils.generate_uuid()
@ -255,9 +255,9 @@ class FakeVcns(object):
if id not in self._lswitches:
raise Exception(_("Lswitch %s does not exist") % id)
del self._lswitches[id]
if self._fake_nvpapi:
if self._fake_nsx_api:
# TODO(fank): fix the hack
del self._fake_nvpapi._fake_lswitch_dict[id]
del self._fake_nsx_api._fake_lswitch_dict[id]
header = {
'status': 200
}

View File

@ -63,7 +63,7 @@ class ServiceRouterTestExtensionManager(object):
return []
class ServiceRouterTest(test_nsx_plugin.NiciraL3NatTest,
class ServiceRouterTest(test_nsx_plugin.L3NatTest,
test_l3_plugin.L3NatTestCaseMixin):
def vcns_patch(self):
@ -115,7 +115,7 @@ class ServiceRouterTest(test_nsx_plugin.NiciraL3NatTest,
service_plugins=service_plugins,
ext_mgr=ext_mgr)
self.fc2.set_fake_nvpapi(self.fc)
self.fc2.set_fake_nsx_api(self.fc)
self.addCleanup(self.fc2.reset_all)
self.addCleanup(mock.patch.stopall)
@ -158,7 +158,7 @@ class ServiceRouterTest(test_nsx_plugin.NiciraL3NatTest,
class ServiceRouterTestCase(ServiceRouterTest,
test_nsx_plugin.TestNiciraL3NatTestCase):
test_nsx_plugin.TestL3NatTestCase):
def test_router_create(self):
name = 'router1'

View File

@ -14,7 +14,6 @@
# License for the specific language governing permissions and limitations
# under the License.
#
# @author: linb, VMware
import contextlib
import mock
@ -70,8 +69,7 @@ class VcnsDriverTestCase(test_db_firewall.FirewallPluginDbTestCase,
self.mock_vcns = mock.patch(VCNS_NAME, autospec=True)
self.vcns_firewall_patch()
self.nvp_service_plugin_callback = mock.Mock()
self.driver = vcns_driver.VcnsDriver(self.nvp_service_plugin_callback)
self.driver = vcns_driver.VcnsDriver(mock.Mock())
super(VcnsDriverTestCase, self).setUp()
self.addCleanup(self.fc2.reset_all)

View File

@ -27,7 +27,7 @@ from neutron import manager
from neutron.openstack.common import uuidutils
from neutron.plugins.common import constants as const
from neutron.tests.unit.db.firewall import test_db_firewall
from neutron.tests.unit.vmware import test_edge_router
from neutron.tests.unit.vmware.vshield import test_edge_router
_uuid = uuidutils.generate_uuid

View File

@ -14,7 +14,6 @@
# License for the specific language governing permissions and limitations
# under the License.
#
# @author: linb, VMware
import contextlib
@ -27,7 +26,7 @@ from neutron.extensions import loadbalancer as lb
from neutron import manager
from neutron.openstack.common import uuidutils
from neutron.tests.unit.db.loadbalancer import test_db_loadbalancer
from neutron.tests.unit.vmware import test_edge_router
from neutron.tests.unit.vmware.vshield import test_edge_router
_uuid = uuidutils.generate_uuid

View File

@ -22,10 +22,8 @@ from neutron.common import config as n_config
from neutron import context
from neutron.openstack.common import uuidutils
from neutron.plugins.nicira.dbexts import vcns_db
from neutron.plugins.nicira.vshield import (
vcns_driver)
from neutron.plugins.nicira.vshield.common import (
exceptions as vcns_exc)
from neutron.plugins.nicira.vshield.common import exceptions as vcns_exc
from neutron.plugins.nicira.vshield import vcns_driver
from neutron.tests.unit.db.loadbalancer import test_db_loadbalancer
from neutron.tests.unit.vmware import get_fake_conf
from neutron.tests.unit.vmware import VCNS_NAME
@ -85,8 +83,7 @@ class VcnsDriverTestCase(test_db_loadbalancer.LoadBalancerPluginDbTestCase):
self.mock_vcns = mock.patch(VCNS_NAME, autospec=True)
self.vcns_loadbalancer_patch()
self.nvp_service_plugin_callback = mock.Mock()
self.driver = vcns_driver.VcnsDriver(self.nvp_service_plugin_callback)
self.driver = vcns_driver.VcnsDriver(mock.Mock())
super(VcnsDriverTestCase, self).setUp()
self.addCleanup(self.fc2.reset_all)

View File

@ -19,8 +19,7 @@ from eventlet import greenthread
import mock
from neutron.common import config as n_config
from neutron.plugins.nicira.vshield.common import (
constants as vcns_const)
from neutron.plugins.nicira.vshield.common import constants as vcns_const
from neutron.plugins.nicira.vshield.common.constants import RouterStatus
from neutron.plugins.nicira.vshield.tasks.constants import TaskState
from neutron.plugins.nicira.vshield.tasks.constants import TaskStatus