Remove unit tests that are no longer run.

A long time ago, quantum/tests/unit became the home for all unit
tests, but these ones in the Cisco plugin directory got left
behind. They have suffered bit-rot and need to be removed.

Also:
- Move the fake Nexus driver to new home.
- Filed new bugs to track the task of improving unit test coverage of
  Cisco plugin code.

Fixes: bug #1174311
Change-Id: I372e24aebbe1804e5b6ce62984bfd76b030a44b1
changes/80/32980/1
HenryVIII 2013-06-13 20:03:20 -04:00
parent a78207e574
commit 5586fc3973
17 changed files with 3 additions and 1606 deletions

View File

@ -11,7 +11,7 @@
#max_networks=65568
#model_class=quantum.plugins.cisco.models.virt_phy_sw_v2.VirtualPhysicalSwitchModelV2
#manager_class=quantum.plugins.cisco.segmentation.l2network_vlan_mgr_v2.L2NetworkVLANMgr
#nexus_driver=quantum.plugins.cisco.tests.unit.v2.nexus.fake_nexus_driver.CiscoNEXUSFakeDriver
#nexus_driver=quantum.plugins.cisco.test.nexus.fake_nexus_driver.CiscoNEXUSFakeDriver
#svi_round_robin=False
# IMPORTANT: Comment out the following two lines for production deployments

0
quantum/plugins/cisco/README Executable file → Normal file
View File

View File

@ -55,7 +55,7 @@ cisco_opts = [
'l2network_vlan_mgr_v2.L2NetworkVLANMgr',
help=_("Manager Class")),
cfg.StrOpt('nexus_driver',
default='quantum.plugins.cisco.tests.unit.v2.nexus.'
default='quantum.plugins.cisco.test.nexus.'
'fake_nexus_driver.CiscoNEXUSFakeDriver',
help=_("Nexus Driver Name")),
]

View File

@ -1,51 +0,0 @@
#!/usr/bin/env python
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2010 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Unittest runner for quantum Cisco plugin
export PLUGIN_DIR=quantum/plugins/cisco
./run_tests.sh -N
"""
import os
import sys
from nose import config
sys.path.append(os.getcwd())
sys.path.append(os.path.dirname(__file__))
from quantum.common.test_lib import run_tests, test_config
def main():
test_config['plugin_name'] = "l2network_plugin.L2Network"
cwd = os.getcwd()
os.chdir(cwd)
working_dir = os.path.abspath("quantum/plugins/cisco")
c = config.Config(stream=sys.stdout,
env=os.environ,
verbosity=3,
workingDir=working_dir)
sys.exit(run_tests(c))
if __name__ == '__main__':
main()

View File

@ -1,22 +0,0 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
# See http://code.google.com/p/python-nose/issues/detail?id=373
# The code below enables nosetests to work with i18n _() blocks
import __builtin__
setattr(__builtin__, '_', lambda x: x)

View File

@ -1,8 +0,0 @@
[pipeline:extensions_app_with_filter]
pipeline = extensions extensions_test_app
[filter:extensions]
paste.filter_factory = quantum.common.extensions:plugin_aware_extension_middleware_factory
[app:extensions_test_app]
paste.app_factory = quantum.plugins.cisco.tests.unit.test_cisco_extension:app_factory

View File

@ -1,20 +0,0 @@
[DEFAULT]
# Show more verbose log output (sets INFO log level output)
verbose = True
# Show debugging output in logs (sets DEBUG log level output)
debug = False
# Address to bind the API server
bind_host = 0.0.0.0
# Port the bind the API server to
bind_port = 9696
# Path to the extensions
api_extensions_path = ../../../../extensions
# Paste configuration file
api_paste_config = api-paste.ini.cisco.test
core_plugin = quantum.plugins.cisco.l2network_plugin.L2Network

View File

@ -1,673 +0,0 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# @authors: Shweta Padubidri, Cisco Systems, Inc.
# Peter Strunk , Cisco Systems, Inc.
# Shubhangi Satras , Cisco Systems, Inc.
import logging
import os.path
import routes
import webob
from webtest import TestApp
from quantum.api import extensions
from quantum.api.extensions import (
ExtensionMiddleware,
PluginAwareExtensionManager,
)
from quantum.common import config
from quantum.extensions import (
credential,
qos,
)
from quantum.manager import QuantumManager
from quantum.openstack.common import jsonutils
from quantum.plugins.cisco.db import api as db
from quantum.plugins.cisco import l2network_plugin
from quantum.plugins.cisco.l2network_plugin import L2Network
from quantum.tests import base
from quantum.tests.unit.extension_stubs import StubBaseAppController
from quantum import wsgi
LOG = logging.getLogger('quantum.plugins.cisco.tests.test_cisco_extensions')
EXTENSIONS_PATH = os.path.join(os.path.dirname(__file__), os.pardir, os.pardir,
os.pardir, os.pardir, "extensions")
ROOTDIR = os.path.dirname(os.path.dirname(__file__))
UNITDIR = os.path.join(ROOTDIR, 'unit')
def testsdir(*p):
return os.path.join(UNITDIR, *p)
config_file = 'quantum.conf.cisco.test'
args = ['--config-file', testsdir(config_file)]
config.parse(args=args)
class ExtensionsTestApp(wsgi.Router):
def __init__(self, options=None):
options = options or {}
mapper = routes.Mapper()
controller = StubBaseAppController()
mapper.resource("dummy_resource", "/dummy_resources",
controller=controller)
super(ExtensionsTestApp, self).__init__(mapper)
def create_request(self, path, body, content_type, method='GET'):
"""Test create request."""
LOG.debug("test_create_request - START")
req = webob.Request.blank(path)
req.method = method
req.headers = {}
req.headers['Accept'] = content_type
req.body = body
LOG.debug("test_create_request - END")
return req
def _create_network(self, name=None):
"""Test create network."""
LOG.debug("Creating network - START")
if name:
net_name = name
else:
net_name = self.network_name
net_path = "/tenants/tt/networks"
net_data = {'network': {'name': '%s' % net_name}}
req_body = wsgi.Serializer().serialize(net_data, self.contenttype)
network_req = self.create_request(net_path, req_body,
self.contenttype, 'POST')
network_res = network_req.get_response(self.api)
network_data = wsgi.Serializer().deserialize(network_res.body,
self.contenttype)
LOG.debug("Creating network - END")
return network_data['network']['id']
def _create_port(self, network_id, port_state):
"""Test create port."""
LOG.debug("Creating port for network %s - START", network_id)
port_path = "/tenants/tt/networks/%s/ports" % network_id
port_req_data = {'port': {'state': '%s' % port_state}}
req_body = wsgi.Serializer().serialize(port_req_data,
self.contenttype)
port_req = self.create_request(port_path, req_body,
self.contenttype, 'POST')
port_res = port_req.get_response(self.api)
port_data = wsgi.Serializer().deserialize(port_res.body,
self.contenttype)
LOG.debug("Creating port for network - END")
return port_data['port']['id']
def _delete_port(self, network_id, port_id):
"""Delete port."""
LOG.debug("Deleting port for network %s - START", network_id)
data = {'network_id': network_id, 'port_id': port_id}
port_path = ("/tenants/tt/networks/%(network_id)s/ports/%(port_id)s" %
data)
port_req = self.create_request(port_path, None,
self.contenttype, 'DELETE')
port_req.get_response(self.api)
LOG.debug("Deleting port for network - END")
def _delete_network(self, network_id):
"""Delete network."""
LOG.debug("Deleting network %s - START", network_id)
network_path = "/tenants/tt/networks/%s" % network_id
network_req = self.create_request(network_path, None,
self.contenttype, 'DELETE')
network_req.get_response(self.api)
LOG.debug("Deleting network - END")
def tear_down_port_network(self, net_id, port_id):
"""Tear down port and network."""
self._delete_port(net_id, port_id)
self._delete_network(net_id)
class QosExtensionTest(base.BaseTestCase):
def setUp(self):
"""Set up function."""
super(QosExtensionTest, self).setUp()
parent_resource = dict(member_name="tenant",
collection_name="extensions/csco/tenants")
controller = qos.QosController(QuantumManager.get_plugin())
res_ext = extensions.ResourceExtension('qos', controller,
parent=parent_resource)
self.test_app = setup_extensions_test_app(
SimpleExtensionManager(res_ext))
self.contenttype = 'application/json'
self.qos_path = '/extensions/csco/tenants/tt/qos'
self.qos_second_path = '/extensions/csco/tenants/tt/qos/'
self.test_qos_data = {
'qos': {
'qos_name': 'cisco_test_qos',
'qos_desc': {
'PPS': 50,
'TTL': 5,
},
},
}
self._l2network_plugin = l2network_plugin.L2Network()
def test_create_qos(self):
"""Test create qos."""
LOG.debug("test_create_qos - START")
req_body = jsonutils.dumps(self.test_qos_data)
index_response = self.test_app.post(self.qos_path,
req_body,
content_type=self.contenttype)
self.assertEqual(200, index_response.status_int)
# Clean Up - Delete the qos
resp_body = wsgi.Serializer().deserialize(index_response.body,
self.contenttype)
qos_path_temp = self.qos_second_path + resp_body['qoss']['qos']['id']
qos_path = str(qos_path_temp)
self.tearDownQos(qos_path)
LOG.debug("test_create_qos - END")
def test_create_qosBADRequest(self):
"""Test create qos bad request."""
LOG.debug("test_create_qosBADRequest - START")
index_response = self.test_app.post(self.qos_path,
'BAD_REQUEST',
content_type=self.contenttype,
status='*')
self.assertEqual(400, index_response.status_int)
LOG.debug("test_create_qosBADRequest - END")
def test_list_qoss(self):
"""Test list qoss."""
LOG.debug("test_list_qoss - START")
req_body1 = jsonutils.dumps(self.test_qos_data)
create_resp1 = self.test_app.post(self.qos_path, req_body1,
content_type=self.contenttype)
req_body2 = jsonutils.dumps({
'qos': {
'qos_name': 'cisco_test_qos2',
'qos_desc': {
'PPS': 50,
'TTL': 5,
},
},
})
create_resp2 = self.test_app.post(self.qos_path, req_body2,
content_type=self.contenttype)
index_response = self.test_app.get(self.qos_path)
index_resp_body = wsgi.Serializer().deserialize(index_response.body,
self.contenttype)
self.assertEqual(200, index_response.status_int)
# Clean Up - Delete the qos's
resp_body1 = wsgi.Serializer().deserialize(create_resp1.body,
self.contenttype)
qos_path1_temp = self.qos_second_path + resp_body1['qoss']['qos']['id']
qos_path1 = str(qos_path1_temp)
resp_body2 = wsgi.Serializer().deserialize(create_resp2.body,
self.contenttype)
list_all_qos = [resp_body1['qoss']['qos'], resp_body2['qoss']['qos']]
self.assertTrue(index_resp_body['qoss'][0] in list_all_qos)
self.assertTrue(index_resp_body['qoss'][1] in list_all_qos)
qos_path2_temp = self.qos_second_path + resp_body2['qoss']['qos']['id']
qos_path2 = str(qos_path2_temp)
self.tearDownQos(qos_path1)
self.tearDownQos(qos_path2)
LOG.debug("test_list_qoss - END")
def test_show_qos(self):
"""Test show qos."""
LOG.debug("test_show_qos - START")
req_body = jsonutils.dumps(self.test_qos_data)
index_response = self.test_app.post(self.qos_path, req_body,
content_type=self.contenttype)
resp_body = wsgi.Serializer().deserialize(index_response.body,
self.contenttype)
show_path_temp = self.qos_second_path + resp_body['qoss']['qos']['id']
show_qos_path = str(show_path_temp)
show_response = self.test_app.get(show_qos_path)
show_resp_dict = wsgi.Serializer().deserialize(show_response.body,
self.contenttype)
self.assertEqual(show_resp_dict['qoss']['qos']['name'],
self.test_qos_data['qos']['qos_name'])
self.assertEqual(200, show_response.status_int)
# Clean Up - Delete the qos
self.tearDownQos(show_qos_path)
LOG.debug("test_show_qos - END")
def test_show_qosDNE(self, qos_id='100'):
"""Test show qos does not exist."""
LOG.debug("test_show_qosDNE - START")
show_path_temp = self.qos_second_path + qos_id
show_qos_path = str(show_path_temp)
show_response = self.test_app.get(show_qos_path, status='*')
self.assertEqual(452, show_response.status_int)
LOG.debug("test_show_qosDNE - END")
def test_update_qos(self):
"""Test update qos."""
LOG.debug("test_update_qos - START")
req_body = jsonutils.dumps(self.test_qos_data)
index_response = self.test_app.post(self.qos_path, req_body,
content_type=self.contenttype)
resp_body = wsgi.Serializer().deserialize(index_response.body,
self.contenttype)
rename_req_body = jsonutils.dumps({
'qos': {
'qos_name': 'cisco_rename_qos',
'qos_desc': {
'PPS': 50,
'TTL': 5,
},
},
})
rename_path_temp = (self.qos_second_path +
resp_body['qoss']['qos']['id'])
rename_path = str(rename_path_temp)
rename_response = self.test_app.put(rename_path, rename_req_body,
content_type=self.contenttype)
self.assertEqual(200, rename_response.status_int)
rename_resp_dict = wsgi.Serializer().deserialize(rename_response.body,
self.contenttype)
self.assertEqual(rename_resp_dict['qoss']['qos']['name'],
'cisco_rename_qos')
self.tearDownQos(rename_path)
LOG.debug("test_update_qos - END")
def test_update_qosDNE(self, qos_id='100'):
"""Test update qos does not exist."""
LOG.debug("test_update_qosDNE - START")
rename_req_body = jsonutils.dumps({
'qos': {
'qos_name': 'cisco_rename_qos',
'qos_desc': {
'PPS': 50,
'TTL': 5,
},
},
})
rename_path_temp = self.qos_second_path + qos_id
rename_path = str(rename_path_temp)
rename_response = self.test_app.put(rename_path, rename_req_body,
content_type=self.contenttype,
status='*')
self.assertEqual(452, rename_response.status_int)
LOG.debug("test_update_qosDNE - END")
def test_update_qosBADRequest(self):
"""Test update qos bad request."""
LOG.debug("test_update_qosBADRequest - START")
req_body = jsonutils.dumps(self.test_qos_data)
index_response = self.test_app.post(self.qos_path, req_body,
content_type=self.contenttype)
resp_body = wsgi.Serializer().deserialize(index_response.body,
self.contenttype)
rename_path_temp = (self.qos_second_path +
resp_body['qoss']['qos']['id'])
rename_path = str(rename_path_temp)
rename_response = self.test_app.put(rename_path, 'BAD_REQUEST',
status="*")
self.assertEqual(400, rename_response.status_int)
# Clean Up - Delete the Port Profile
self.tearDownQos(rename_path)
LOG.debug("test_update_qosBADRequest - END")
def test_delete_qos(self):
"""Test delte qos."""
LOG.debug("test_delete_qos - START")
req_body = jsonutils.dumps({
'qos': {
'qos_name': 'cisco_test_qos',
'qos_desc': {
'PPS': 50,
'TTL': 5,
},
},
})
index_response = self.test_app.post(self.qos_path, req_body,
content_type=self.contenttype)
resp_body = wsgi.Serializer().deserialize(index_response.body,
self.contenttype)
delete_path_temp = (self.qos_second_path +
resp_body['qoss']['qos']['id'])
delete_path = str(delete_path_temp)
delete_response = self.test_app.delete(delete_path)
self.assertEqual(200, delete_response.status_int)
LOG.debug("test_delete_qos - END")
def test_delete_qosDNE(self, qos_id='100'):
"""Test delte qos does not exist."""
LOG.debug("test_delete_qosDNE - START")
delete_path_temp = self.qos_second_path + qos_id
delete_path = str(delete_path_temp)
delete_response = self.test_app.delete(delete_path, status='*')
self.assertEqual(452, delete_response.status_int)
LOG.debug("test_delete_qosDNE - END")
def tearDownQos(self, delete_profile_path):
"""Tear Down Qos."""
self.test_app.delete(delete_profile_path)
def tearDown(self):
db.clear_db()
class CredentialExtensionTest(base.BaseTestCase):
def setUp(self):
"""Set up function."""
super(CredentialExtensionTest, self).setUp()
parent_resource = dict(member_name="tenant",
collection_name="extensions/csco/tenants")
controller = credential.CredentialController(QuantumManager.
get_plugin())
res_ext = extensions.ResourceExtension('credentials', controller,
parent=parent_resource)
self.test_app = setup_extensions_test_app(SimpleExtensionManager(
res_ext))
self.contenttype = 'application/json'
self.credential_path = '/extensions/csco/tenants/tt/credentials'
self.cred_second_path = '/extensions/csco/tenants/tt/credentials/'
self.test_credential_data = {
'credential': {
'credential_name': 'cred8',
'user_name': 'newUser2',
'password': 'newPasswd1',
},
}
self._l2network_plugin = l2network_plugin.L2Network()
def test_list_credentials(self):
"""Test list credentials."""
#Create Credential before listing
LOG.debug("test_list_credentials - START")
req_body1 = jsonutils.dumps(self.test_credential_data)
create_response1 = self.test_app.post(
self.credential_path, req_body1,
content_type=self.contenttype)
req_body2 = jsonutils.dumps({
'credential': {
'credential_name': 'cred9',
'user_name': 'newUser2',
'password': 'newPasswd2',
},
})
create_response2 = self.test_app.post(
self.credential_path, req_body2,
content_type=self.contenttype)
index_response = self.test_app.get(self.credential_path)
index_resp_body = wsgi.Serializer().deserialize(index_response.body,
self.contenttype)
self.assertEqual(200, index_response.status_int)
#CLean Up - Deletion of the Credentials
resp_body1 = wsgi.Serializer().deserialize(create_response1.body,
self.contenttype)
delete_path1_temp = (self.cred_second_path +
resp_body1['credentials']['credential']['id'])
delete_path1 = str(delete_path1_temp)
resp_body2 = wsgi.Serializer().deserialize(create_response2.body,
self.contenttype)
list_all_credential = [resp_body1['credentials']['credential'],
resp_body2['credentials']['credential']]
self.assertTrue(
index_resp_body['credentials'][0] in list_all_credential)
self.assertTrue(
index_resp_body['credentials'][1] in list_all_credential)
delete_path2_temp = (self.cred_second_path +
resp_body2['credentials']['credential']['id'])
delete_path2 = str(delete_path2_temp)
self.tearDownCredential(delete_path1)
self.tearDownCredential(delete_path2)
LOG.debug("test_list_credentials - END")
def test_create_credential(self):
"""Test create credential."""
LOG.debug("test_create_credential - START")
req_body = jsonutils.dumps(self.test_credential_data)
index_response = self.test_app.post(
self.credential_path, req_body,
content_type=self.contenttype)
self.assertEqual(200, index_response.status_int)
#CLean Up - Deletion of the Credentials
resp_body = wsgi.Serializer().deserialize(
index_response.body, self.contenttype)
delete_path_temp = (self.cred_second_path +
resp_body['credentials']['credential']['id'])
delete_path = str(delete_path_temp)
self.tearDownCredential(delete_path)
LOG.debug("test_create_credential - END")
def test_create_credentialBADRequest(self):
"""Test create credential bad request."""
LOG.debug("test_create_credentialBADRequest - START")
index_response = self.test_app.post(
self.credential_path, 'BAD_REQUEST',
content_type=self.contenttype, status='*')
self.assertEqual(400, index_response.status_int)
LOG.debug("test_create_credentialBADRequest - END")
def test_show_credential(self):
"""Test show credential."""
LOG.debug("test_show_credential - START")
req_body = jsonutils.dumps(self.test_credential_data)
index_response = self.test_app.post(
self.credential_path, req_body,
content_type=self.contenttype)
resp_body = wsgi.Serializer().deserialize(index_response.body,
self.contenttype)
show_path_temp = (self.cred_second_path +
resp_body['credentials']['credential']['id'])
show_cred_path = str(show_path_temp)
show_response = self.test_app.get(show_cred_path)
show_resp_dict = wsgi.Serializer().deserialize(show_response.body,
self.contenttype)
self.assertEqual(show_resp_dict['credentials']['credential']['name'],
self.test_credential_data['credential']['user_name'])
self.assertEqual(
show_resp_dict['credentials']['credential']['password'],
self.test_credential_data['credential']['password'])
self.assertEqual(200, show_response.status_int)
LOG.debug("test_show_credential - END")
def test_show_credentialDNE(self, credential_id='100'):
"""Test show credential does not exist."""
LOG.debug("test_show_credentialDNE - START")
show_path_temp = self.cred_second_path + credential_id
show_cred_path = str(show_path_temp)
show_response = self.test_app.get(show_cred_path, status='*')
self.assertEqual(451, show_response.status_int)
LOG.debug("test_show_credentialDNE - END")
def test_update_credential(self):
"""Test update credential."""
LOG.debug("test_update_credential - START")
req_body = jsonutils.dumps(self.test_credential_data)
index_response = self.test_app.post(
self.credential_path, req_body,
content_type=self.contenttype)
resp_body = wsgi.Serializer().deserialize(
index_response.body, self.contenttype)
rename_req_body = jsonutils.dumps({
'credential': {
'credential_name': 'cred3',
'user_name': 'RenamedUser',
'password': 'Renamedpassword',
},
})
rename_path_temp = (self.cred_second_path +
resp_body['credentials']['credential']['id'])
rename_path = str(rename_path_temp)
rename_response = self.test_app.put(rename_path, rename_req_body,
content_type=self.contenttype)
rename_resp_dict = wsgi.Serializer().deserialize(rename_response.body,
self.contenttype)
self.assertEqual(rename_resp_dict['credentials']['credential']['name'],
'cred3')
self.assertEqual(
rename_resp_dict['credentials']['credential']['password'],
self.test_credential_data['credential']['password'])
self.assertEqual(200, rename_response.status_int)
# Clean Up - Delete the Credentials
self.tearDownCredential(rename_path)
LOG.debug("test_update_credential - END")
def test_update_credBADReq(self):
"""Test update credential bad request."""
LOG.debug("test_update_credBADReq - START")
req_body = jsonutils.dumps(self.test_credential_data)
index_response = self.test_app.post(
self.credential_path, req_body,
content_type=self.contenttype)
resp_body = wsgi.Serializer().deserialize(
index_response.body, self.contenttype)
rename_path_temp = (self.cred_second_path +
resp_body['credentials']['credential']['id'])
rename_path = str(rename_path_temp)
rename_response = self.test_app.put(rename_path, 'BAD_REQUEST',
status='*')
self.assertEqual(400, rename_response.status_int)
LOG.debug("test_update_credBADReq - END")
def test_update_credentialDNE(self, credential_id='100'):
"""Test update credential does not exist."""
LOG.debug("test_update_credentialDNE - START")
rename_req_body = jsonutils.dumps({
'credential': {
'credential_name': 'cred3',
'user_name': 'RenamedUser',
'password': 'Renamedpassword',
},
})
rename_path_temp = self.cred_second_path + credential_id
rename_path = str(rename_path_temp)
rename_response = self.test_app.put(rename_path, rename_req_body,
content_type=self.contenttype,
status='*')
self.assertEqual(451, rename_response.status_int)
LOG.debug("test_update_credentialDNE - END")
def test_delete_credential(self):
"""Test delete credential."""
LOG.debug("test_delete_credential - START")
req_body = jsonutils.dumps(self.test_credential_data)
index_response = self.test_app.post(
self.credential_path, req_body,
content_type=self.contenttype)
resp_body = wsgi.Serializer().deserialize(
index_response.body, self.contenttype)
delete_path_temp = (self.cred_second_path +
resp_body['credentials']['credential']['id'])
delete_path = str(delete_path_temp)
delete_response = self.test_app.delete(delete_path)
self.assertEqual(200, delete_response.status_int)
LOG.debug("test_delete_credential - END")
def test_delete_credentialDNE(self, credential_id='100'):
"""Test delete credential does not exist."""
LOG.debug("test_delete_credentialDNE - START")
delete_path_temp = self.cred_second_path + credential_id
delete_path = str(delete_path_temp)
delete_response = self.test_app.delete(delete_path, status='*')
self.assertEqual(451, delete_response.status_int)
LOG.debug("test_delete_credentialDNE - END")
def tearDownCredential(self, delete_path):
self.test_app.delete(delete_path)
def tearDown(self):
db.clear_db()
def app_factory(global_conf, **local_conf):
conf = global_conf.copy()
conf.update(local_conf)
return ExtensionsTestApp(conf)
def setup_extensions_middleware(extension_manager=None):
extension_manager = (extension_manager or
PluginAwareExtensionManager(EXTENSIONS_PATH,
L2Network()))
app = config.load_paste_app('extensions_test_app')
return ExtensionMiddleware(app, ext_mgr=extension_manager)
def setup_extensions_test_app(extension_manager=None):
return TestApp(setup_extensions_middleware(extension_manager))
class SimpleExtensionManager(object):
def __init__(self, resource_ext=None, action_ext=None, request_ext=None):
self.resource_ext = resource_ext
self.action_ext = action_ext
self.request_ext = request_ext
def get_resources(self):
resource_exts = []
if self.resource_ext:
resource_exts.append(self.resource_ext)
return resource_exts
def get_actions(self):
action_exts = []
if self.action_ext:
action_exts.append(self.action_ext)
return action_exts
def get_request_extensions(self):
request_extensions = []
if self.request_ext:
request_extensions.append(self.request_ext)
return request_extensions

View File

@ -1,692 +0,0 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011, Cisco Systems, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
# @author: Rohit Agarwalla, Cisco Systems, Inc.
"""
test_database.py is an independent test suite
that tests the database api method calls
"""
import mock
import sqlalchemy
from quantum.openstack.common import log as logging
import quantum.plugins.cisco.common.cisco_exceptions as c_exc
import quantum.plugins.cisco.db.api as db
import quantum.plugins.cisco.db.l2network_db as l2network_db
import quantum.plugins.cisco.db.nexus_db_v2 as nexus_db
from quantum.tests import base
LOG = logging.getLogger(__name__)
class NexusDB(object):
"""Class consisting of methods to call nexus db methods."""
def get_all_nexusportbindings(self):
"""Get all nexus port bindings."""
bindings = []
try:
for bind in nexus_db.get_all_nexusport_bindings():
LOG.debug("Getting nexus port binding : %s" % bind.port_id)
bind_dict = {}
bind_dict["port-id"] = str(bind.port_id)
bind_dict["vlan-id"] = str(bind.vlan_id)
bindings.append(bind_dict)
except Exception as exc:
LOG.error("Failed to get all bindings: %s" % str(exc))
return bindings
def get_nexusportbinding(self, vlan_id):
"""Get nexus port binding."""
binding = []
try:
for bind in nexus_db.get_nexusport_binding(vlan_id):
LOG.debug("Getting nexus port binding : %s" % bind.port_id)
bind_dict = {}
bind_dict["port-id"] = str(bind.port_id)
bind_dict["vlan-id"] = str(bind.vlan_id)
binding.append(bind_dict)
except Exception as exc:
LOG.error("Failed to get all bindings: %s" % str(exc))
return binding
def create_nexusportbinding(self, port_id, vlan_id):
"""Create nexus port binding."""
bind_dict = {}
try:
res = nexus_db.add_nexusport_binding(port_id, vlan_id)
LOG.debug("Created nexus port binding : %s" % res.port_id)
bind_dict["port-id"] = str(res.port_id)
bind_dict["vlan-id"] = str(res.vlan_id)
return bind_dict
except Exception as exc:
LOG.error("Failed to create nexus binding: %s" % str(exc))
def delete_nexusportbinding(self, vlan_id):
"""Delete nexus port binding."""
bindings = []
try:
bind = nexus_db.remove_nexusport_binding(vlan_id)
for res in bind:
LOG.debug("Deleted nexus port binding: %s" % res.vlan_id)
bind_dict = {}
bind_dict["port-id"] = res.port_id
bindings.append(bind_dict)
return bindings
except Exception as exc:
raise Exception("Failed to delete nexus port binding: %s"
% str(exc))
def update_nexusport_binding(self, port_id, new_vlan_id):
"""Update nexus port binding."""
try:
res = nexus_db.update_nexusport_binding(port_id, new_vlan_id)
LOG.debug("Updating nexus port binding : %s" % res.port_id)
bind_dict = {}
bind_dict["port-id"] = str(res.port_id)
bind_dict["vlan-id"] = str(res.vlan_id)
return bind_dict
except Exception as exc:
raise Exception("Failed to update nexus port binding vnic: %s"
% str(exc))
class L2networkDB(object):
"""Class conisting of methods to call L2network db methods."""
def get_all_vlan_bindings(self):
"""Get all vlan binding into a list of dict."""
vlans = []
try:
for vlan_bind in l2network_db.get_all_vlan_bindings():
LOG.debug("Getting vlan bindings for vlan: %s" %
vlan_bind.vlan_id)
vlan_dict = {}
vlan_dict["vlan-id"] = str(vlan_bind.vlan_id)
vlan_dict["vlan-name"] = vlan_bind.vlan_name
vlan_dict["net-id"] = str(vlan_bind.network_id)
vlans.append(vlan_dict)
except Exception as exc:
LOG.error("Failed to get all vlan bindings: %s" % str(exc))
return vlans
def get_vlan_binding(self, network_id):
"""Get a vlan binding."""
vlan = []
try:
for vlan_bind in l2network_db.get_vlan_binding(network_id):
LOG.debug("Getting vlan binding for vlan: %s" %
vlan_bind.vlan_id)
vlan_dict = {}
vlan_dict["vlan-id"] = str(vlan_bind.vlan_id)
vlan_dict["vlan-name"] = vlan_bind.vlan_name
vlan_dict["net-id"] = str(vlan_bind.network_id)
vlan.append(vlan_dict)
except Exception as exc:
LOG.error("Failed to get vlan binding: %s" % str(exc))
return vlan
def create_vlan_binding(self, vlan_id, vlan_name, network_id):
"""Create a vlan binding."""
vlan_dict = {}
try:
res = l2network_db.add_vlan_binding(vlan_id, vlan_name, network_id)
LOG.debug("Created vlan binding for vlan: %s" % res.vlan_id)
vlan_dict["vlan-id"] = str(res.vlan_id)
vlan_dict["vlan-name"] = res.vlan_name
vlan_dict["net-id"] = str(res.network_id)
return vlan_dict
except Exception as exc:
LOG.error("Failed to create vlan binding: %s" % str(exc))
def delete_vlan_binding(self, network_id):
"""Delete a vlan binding."""
try:
res = l2network_db.remove_vlan_binding(network_id)
LOG.debug("Deleted vlan binding for vlan: %s" % res.vlan_id)
vlan_dict = {}
vlan_dict["vlan-id"] = str(res.vlan_id)
return vlan_dict
except Exception as exc:
raise Exception("Failed to delete vlan binding: %s" % str(exc))
def update_vlan_binding(self, network_id, vlan_id, vlan_name):
"""Update a vlan binding."""
try:
res = l2network_db.update_vlan_binding(network_id, vlan_id,
vlan_name)
LOG.debug("Updating vlan binding for vlan: %s" % res.vlan_id)
vlan_dict = {}
vlan_dict["vlan-id"] = str(res.vlan_id)
vlan_dict["vlan-name"] = res.vlan_name
vlan_dict["net-id"] = str(res.network_id)
return vlan_dict
except Exception as exc:
raise Exception("Failed to update vlan binding: %s" % str(exc))
class QuantumDB(object):
"""Class conisting of methods to call Quantum db methods."""
def get_all_networks(self, tenant_id):
"""Get all networks."""
nets = []
try:
for net in db.network_list(tenant_id):
LOG.debug("Getting network: %s" % net.uuid)
net_dict = {}
net_dict["tenant-id"] = net.tenant_id
net_dict["net-id"] = str(net.uuid)
net_dict["net-name"] = net.name
nets.append(net_dict)
except Exception as exc:
LOG.error("Failed to get all networks: %s" % str(exc))
return nets
def get_network(self, network_id):
"""Get a network."""
net = []
try:
for net in db.network_get(network_id):
LOG.debug("Getting network: %s" % net.uuid)
net_dict = {}
net_dict["tenant-id"] = net.tenant_id
net_dict["net-id"] = str(net.uuid)
net_dict["net-name"] = net.name
net.append(net_dict)
except Exception as exc:
LOG.error("Failed to get network: %s" % str(exc))
return net
def create_network(self, tenant_id, net_name):
"""Create a network."""
net_dict = {}
try:
res = db.network_create(tenant_id, net_name)
LOG.debug("Created network: %s" % res.uuid)
net_dict["tenant-id"] = res.tenant_id
net_dict["net-id"] = str(res.uuid)
net_dict["net-name"] = res.name
return net_dict
except Exception as exc:
LOG.error("Failed to create network: %s" % str(exc))
def delete_network(self, net_id):
"""Delete a network."""
try:
net = db.network_destroy(net_id)
LOG.debug("Deleted network: %s" % net.uuid)
net_dict = {}
net_dict["net-id"] = str(net.uuid)
return net_dict
except Exception as exc:
raise Exception("Failed to delete port: %s" % str(exc))
def update_network(self, tenant_id, net_id, **kwargs):
"""Update a network."""
try:
net = db.network_update(net_id, tenant_id, **kwargs)
LOG.debug("Updated network: %s" % net.uuid)
net_dict = {}
net_dict["net-id"] = str(net.uuid)
net_dict["net-name"] = net.name
return net_dict
except Exception as exc:
raise Exception("Failed to update network: %s" % str(exc))
def get_all_ports(self, net_id):
"""Get all ports."""
ports = []
try:
for port in db.port_list(net_id):
LOG.debug("Getting port: %s" % port.uuid)
port_dict = {}
port_dict["port-id"] = str(port.uuid)
port_dict["net-id"] = str(port.network_id)
port_dict["int-id"] = port.interface_id
port_dict["state"] = port.state
port_dict["net"] = port.network
ports.append(port_dict)
return ports
except Exception as exc:
LOG.error("Failed to get all ports: %s" % str(exc))
def get_port(self, net_id, port_id):
"""Get a port."""
port_list = []
port = db.port_get(net_id, port_id)
try:
LOG.debug("Getting port: %s" % port.uuid)
port_dict = {}
port_dict["port-id"] = str(port.uuid)
port_dict["net-id"] = str(port.network_id)
port_dict["int-id"] = port.interface_id
port_dict["state"] = port.state
port_list.append(port_dict)
return port_list
except Exception as exc:
LOG.error("Failed to get port: %s" % str(exc))
def create_port(self, net_id):
"""Add a port."""
port_dict = {}
try:
port = db.port_create(net_id)
LOG.debug("Creating port %s" % port.uuid)
port_dict["port-id"] = str(port.uuid)
port_dict["net-id"] = str(port.network_id)
port_dict["int-id"] = port.interface_id
port_dict["state"] = port.state
return port_dict
except Exception as exc:
LOG.error("Failed to create port: %s" % str(exc))
def delete_port(self, net_id, port_id):
"""Delete a port."""
try:
port = db.port_destroy(net_id, port_id)
LOG.debug("Deleted port %s" % port.uuid)
port_dict = {}
port_dict["port-id"] = str(port.uuid)
return port_dict
except Exception as exc:
raise Exception("Failed to delete port: %s" % str(exc))
def update_port(self, net_id, port_id, port_state):
"""Update a port."""
try:
port = db.port_set_state(net_id, port_id, port_state)
LOG.debug("Updated port %s" % port.uuid)
port_dict = {}
port_dict["port-id"] = str(port.uuid)
port_dict["net-id"] = str(port.network_id)
port_dict["int-id"] = port.interface_id
port_dict["state"] = port.state
return port_dict
except Exception as exc:
raise Exception("Failed to update port state: %s" % str(exc))
def plug_interface(self, net_id, port_id, int_id):
"""Plug interface to a port."""
try:
port = db.port_set_attachment(net_id, port_id, int_id)
LOG.debug("Attached interface to port %s" % port.uuid)
port_dict = {}
port_dict["port-id"] = str(port.uuid)
port_dict["net-id"] = str(port.network_id)
port_dict["int-id"] = port.interface_id
port_dict["state"] = port.state
return port_dict
except Exception as exc:
raise Exception("Failed to plug interface: %s" % str(exc))
def unplug_interface(self, net_id, port_id):
"""Unplug interface to a port."""
try:
port = db.port_unset_attachment(net_id, port_id)
LOG.debug("Detached interface from port %s" % port.uuid)
port_dict = {}
port_dict["port-id"] = str(port.uuid)
port_dict["net-id"] = str(port.network_id)
port_dict["int-id"] = port.interface_id
port_dict["state"] = port.state
return port_dict
except Exception as exc:
raise Exception("Failed to unplug interface: %s" % str(exc))
class NexusDBTest(base.BaseTestCase):
"""Class conisting of nexus DB unit tests."""
def setUp(self):
super(NexusDBTest, self).setUp()
"""Setup for nexus db tests."""
l2network_db.initialize()
self.addCleanup(db.clear_db)
self.dbtest = NexusDB()
LOG.debug("Setup")
def testa_create_nexusportbinding(self):
"""Create nexus port binding."""
binding1 = self.dbtest.create_nexusportbinding("port1", 10)
self.assertTrue(binding1["port-id"] == "port1")
self.tearDown_nexusportbinding()
def testb_getall_nexusportbindings(self):
"""Get all nexus port bindings."""
self.dbtest.create_nexusportbinding("port1", 10)
self.dbtest.create_nexusportbinding("port2", 10)
bindings = self.dbtest.get_all_nexusportbindings()
count = 0
for bind in bindings:
if "port" in bind["port-id"]:
count += 1
self.assertTrue(count == 2)
self.tearDown_nexusportbinding()
def testc_delete_nexusportbinding(self):
"""Delete nexus port binding."""
self.dbtest.create_nexusportbinding("port1", 10)
self.dbtest.delete_nexusportbinding(10)
bindings = self.dbtest.get_all_nexusportbindings()
count = 0
for bind in bindings:
if "port " in bind["port-id"]:
count += 1
self.assertTrue(count == 0)
self.tearDown_nexusportbinding()
def testd_update_nexusportbinding(self):
"""Update nexus port binding."""
binding1 = self.dbtest.create_nexusportbinding("port1", 10)
binding1 = self.dbtest.update_nexusport_binding(binding1["port-id"],
20)
bindings = self.dbtest.get_all_nexusportbindings()
count = 0
for bind in bindings:
if "20" in str(bind["vlan-id"]):
count += 1
self.assertTrue(count == 1)
self.tearDown_nexusportbinding()
def test_get_nexusport_binding_no_result_found_handling(self):
with mock.patch('sqlalchemy.orm.Query.all') as mock_all:
mock_all.return_value = []
with self.assertRaises(c_exc.NexusPortBindingNotFound):
nexus_db.get_nexusport_binding(port_id=10,
vlan_id=20,
switch_ip='10.0.0.1',
instance_id=1)
def test_get_nexusvlan_binding_no_result_found_handling(self):
with mock.patch('sqlalchemy.orm.Query.all') as mock_all:
mock_all.return_value = []
with self.assertRaises(c_exc.NexusPortBindingNotFound):
nexus_db.get_nexusvlan_binding(vlan_id=10,
switch_ip='10.0.0.1')
def test_update_nexusport_binding_no_result_found_handling(self):
with mock.patch('sqlalchemy.orm.Query.one') as mock_one:
mock_one.side_effect = sqlalchemy.orm.exc.NoResultFound
with self.assertRaises(c_exc.NexusPortBindingNotFound):
nexus_db.update_nexusport_binding(port_id=10,
vlan_id=20,
switch_ip='10.0.0.1',
instance_id=1)
def test_get_nexusvm_binding_no_result_found_handling(self):
with mock.patch('sqlalchemy.orm.Query.first') as mock_first:
mock_first.return_value = None
with self.assertRaises(c_exc.NexusPortBindingNotFound):
nexus_db.get_nexusvm_binding(port_id=10,
vlan_id=20,
switch_ip='10.0.0.1')
def test_nexusport_binding_not_found_exception_message_formatting(self):
try:
raise c_exc.NexusPortBindingNotFound(a=1, b='test')
except c_exc.NexusPortBindingNotFound as e:
self.assertIn('(a=1,b=test)', str(e))
def tearDown_nexusportbinding(self):
"""Tear down nexus port binding table."""
LOG.debug("Tearing Down Nexus port Bindings")
binds = self.dbtest.get_all_nexusportbindings()
for bind in binds:
vlan_id = bind["vlan-id"]
self.dbtest.delete_nexusportbinding(vlan_id)
class L2networkDBTest(base.BaseTestCase):
"""Class conisting of L2network DB unit tests."""
def setUp(self):
"""Setup for tests."""
super(L2networkDBTest, self).setUp()
l2network_db.initialize()
self.dbtest = L2networkDB()
self.quantum = QuantumDB()
self.addCleanup(db.clear_db)
LOG.debug("Setup")
def testa_create_vlanbinding(self):
"""Test add vlan binding."""
net1 = self.quantum.create_network("t1", "netid1")
vlan1 = self.dbtest.create_vlan_binding(10, "vlan1", net1["net-id"])
self.assertTrue(vlan1["vlan-id"] == "10")
self.teardown_vlanbinding()
self.teardown_network()
def testb_getall_vlanbindings(self):
"""Test get all vlan bindings."""
net1 = self.quantum.create_network("t1", "netid1")
net2 = self.quantum.create_network("t1", "netid2")
vlan1 = self.dbtest.create_vlan_binding(10, "vlan1", net1["net-id"])
self.assertTrue(vlan1["vlan-id"] == "10")
vlan2 = self.dbtest.create_vlan_binding(20, "vlan2", net2["net-id"])
self.assertTrue(vlan2["vlan-id"] == "20")
vlans = self.dbtest.get_all_vlan_bindings()
count = 0
for vlan in vlans:
if "vlan" in vlan["vlan-name"]:
count += 1
self.assertTrue(count == 2)
self.teardown_vlanbinding()
self.teardown_network()
def testc_delete_vlanbinding(self):
"""Test delete vlan binding."""
net1 = self.quantum.create_network("t1", "netid1")
vlan1 = self.dbtest.create_vlan_binding(10, "vlan1", net1["net-id"])
self.assertTrue(vlan1["vlan-id"] == "10")
self.dbtest.delete_vlan_binding(net1["net-id"])
vlans = self.dbtest.get_all_vlan_bindings()
count = 0
for vlan in vlans:
if "vlan " in vlan["vlan-name"]:
count += 1
self.assertTrue(count == 0)
self.teardown_vlanbinding()
self.teardown_network()
def testd_update_vlanbinding(self):
"""Test update vlan binding."""
net1 = self.quantum.create_network("t1", "netid1")
vlan1 = self.dbtest.create_vlan_binding(10, "vlan1", net1["net-id"])
self.assertTrue(vlan1["vlan-id"] == "10")
vlan1 = self.dbtest.update_vlan_binding(net1["net-id"], 11, "newvlan1")
vlans = self.dbtest.get_all_vlan_bindings()
count = 0
for vlan in vlans:
if "new" in vlan["vlan-name"]:
count += 1
self.assertTrue(count == 1)
self.teardown_vlanbinding()
self.teardown_network()
def testm_test_vlanids(self):
"""Test vlanid methods."""
l2network_db.create_vlanids()
vlanids = l2network_db.get_all_vlanids()
self.assertTrue(len(vlanids) > 0)
vlanid = l2network_db.reserve_vlanid()
used = l2network_db.is_vlanid_used(vlanid)
self.assertTrue(used)
used = l2network_db.release_vlanid(vlanid)
self.assertFalse(used)
#counting on default teardown here to clear db
def teardown_network(self):
"""tearDown Network table."""
LOG.debug("Tearing Down Network")
nets = self.quantum.get_all_networks("t1")
for net in nets:
netid = net["net-id"]
self.quantum.delete_network(netid)
def teardown_port(self):
"""tearDown Port table."""
LOG.debug("Tearing Down Port")
nets = self.quantum.get_all_networks("t1")
for net in nets:
netid = net["net-id"]
ports = self.quantum.get_all_ports(netid)
for port in ports:
portid = port["port-id"]
self.quantum.delete_port(netid, portid)
def teardown_vlanbinding(self):
"""tearDown VlanBinding table."""
LOG.debug("Tearing Down Vlan Binding")
vlans = self.dbtest.get_all_vlan_bindings()
for vlan in vlans:
netid = vlan["net-id"]
self.dbtest.delete_vlan_binding(netid)
class QuantumDBTest(base.BaseTestCase):
"""Class conisting of Quantum DB unit tests."""
def setUp(self):
"""Setup for tests."""
super(QuantumDBTest, self).setUp()
l2network_db.initialize()
self.addCleanup(db.clear_db)
self.dbtest = QuantumDB()
self.tenant_id = "t1"
LOG.debug("Setup")
def testa_create_network(self):
"""Test to create network."""
net1 = self.dbtest.create_network(self.tenant_id, "plugin_test1")
self.assertTrue(net1["net-name"] == "plugin_test1")
self.teardown_network_port()
def testb_get_networks(self):
"""Test to get all networks."""
net1 = self.dbtest.create_network(self.tenant_id, "plugin_test1")
self.assertTrue(net1["net-name"] == "plugin_test1")
net2 = self.dbtest.create_network(self.tenant_id, "plugin_test2")
self.assertTrue(net2["net-name"] == "plugin_test2")
nets = self.dbtest