Use testtools instead of unittest or unittest2.

As part of the move towards testr and parallel test running, we
start to use testtools and fixtures to make the test suite
resilient and more pedantic.

Part of blueprint grizzly-testtools

Change-Id: I90250de9fe21237db34f6a50b89b15863e270aa5
This commit is contained in:
Monty Taylor 2013-02-06 17:01:30 +11:00 committed by Akihiro MOTOKI
parent b93d9853d2
commit c90977b4f6
89 changed files with 535 additions and 698 deletions

View File

@ -66,9 +66,9 @@ Example::
import random
import StringIO
import time
import unittest
import eventlet
import testtools
import webob.exc
import quantum.api.networks
@ -199,6 +199,12 @@ bug that had no unit test, a new passing unit test should be added. If a
submitted bug fix does have a unit test, be sure to add a new one that fails
without the patch and passes with the patch.
All unittest classes must ultimately inherit from testtools.TestCase.
All setUp and tearDown methods must upcall using the super() method.
tearDown methods should be avoided and addCleanup calls should be preferred.
Never manually create tempfiles. Always use the tempfile fixtures from
the fixture library to ensure that they are cleaned up.
openstack-common
----------------

View File

@ -19,17 +19,4 @@
# The code below enables nosetests to work with i18n _() blocks
import __builtin__
import unittest
setattr(__builtin__, '_', lambda x: x)
class BaseTest(unittest.TestCase):
def setUp(self):
pass
def setUp():
pass

View File

@ -20,9 +20,9 @@
import logging
import os.path
import unittest
import routes
import testtools
import webob
from webtest import TestApp
@ -149,19 +149,14 @@ class ExtensionsTestApp(wsgi.Router):
self._delete_port(net_id, port_id)
self._delete_network(net_id)
def tearDown(self):
""" Tear down """
db.clear_db()
class QosExtensionTest(unittest.TestCase):
class QosExtensionTest(testtools.TestCase):
def setUp(self):
""" Set up function """
super(QosExtensionTest, self).setUp()
parent_resource = dict(member_name="tenant",
collection_name="extensions/csco/tenants")
controller = qos.QosController(QuantumManager.get_plugin())
@ -411,12 +406,13 @@ class QosExtensionTest(unittest.TestCase):
db.clear_db()
class CredentialExtensionTest(unittest.TestCase):
class CredentialExtensionTest(testtools.TestCase):
def setUp(self):
""" Set up function """
super(CredentialExtensionTest, self).setUp()
parent_resource = dict(member_name="tenant",
collection_name="extensions/csco/tenants")
controller = credential.CredentialController(QuantumManager.

View File

@ -21,7 +21,8 @@ that tests the database api method calls
"""
import logging as LOG
import unittest
import testtools
from quantum.openstack.common import log as logging
from quantum.plugins.cisco.common import cisco_constants as const
@ -346,18 +347,16 @@ class QuantumDB(object):
raise Exception("Failed to unplug interface: %s" % str(exc))
class NexusDBTest(unittest.TestCase):
class NexusDBTest(testtools.TestCase):
"""Class conisting of nexus DB unit tests"""
def setUp(self):
super(NexusDBTest, self).setUp()
"""Setup for nexus db tests"""
l2network_db.initialize()
self.addCleanup(db.clear_db)
self.dbtest = NexusDB()
LOG.debug("Setup")
def tearDown(self):
"""Tear Down"""
db.clear_db()
def testa_create_nexusportbinding(self):
"""create nexus port binding"""
binding1 = self.dbtest.create_nexusportbinding("port1", 10)
@ -410,19 +409,17 @@ class NexusDBTest(unittest.TestCase):
self.dbtest.delete_nexusportbinding(vlan_id)
class L2networkDBTest(unittest.TestCase):
class L2networkDBTest(testtools.TestCase):
"""Class conisting of L2network DB unit tests"""
def setUp(self):
"""Setup for tests"""
super(L2networkDBTest, self).setUp()
l2network_db.initialize()
self.dbtest = L2networkDB()
self.quantum = QuantumDB()
self.addCleanup(db.clear_db)
LOG.debug("Setup")
def tearDown(self):
"""Tear Down"""
db.clear_db()
def testa_create_vlanbinding(self):
"""test add vlan binding"""
net1 = self.quantum.create_network("t1", "netid1")
@ -518,19 +515,17 @@ class L2networkDBTest(unittest.TestCase):
self.dbtest.delete_vlan_binding(netid)
class QuantumDBTest(unittest.TestCase):
class QuantumDBTest(testtools.TestCase):
"""Class conisting of Quantum DB unit tests"""
def setUp(self):
"""Setup for tests"""
super(QuantumDBTest, self).setUp()
l2network_db.initialize()
self.addCleanup(db.clear_db)
self.dbtest = QuantumDB()
self.tenant_id = "t1"
LOG.debug("Setup")
def tearDown(self):
"""Tear Down"""
db.clear_db()
def testa_create_network(self):
"""test to create network"""
net1 = self.dbtest.create_network(self.tenant_id, "plugin_test1")

View File

@ -16,17 +16,4 @@
# under the License.
import __builtin__
import unittest
setattr(__builtin__, '_', lambda x: x)
class BaseTest(unittest.TestCase):
def setUp(self):
pass
def setUp():
pass

View File

@ -16,17 +16,4 @@
# under the License.
import __builtin__
import unittest
setattr(__builtin__, '_', lambda x: x)
class BaseTest(unittest.TestCase):
def setUp(self):
pass
def setUp():
pass

View File

@ -40,6 +40,7 @@ def curdir(*p):
class APIv2TestCase(test_api_v2.APIv2TestCase):
def setUp(self):
super(APIv2TestCase, self).setUp()
plugin = 'quantum.plugins.cisco.network_plugin.PluginV2'
# Ensure 'stale' patched copies of the plugin are never returned
QuantumManager._instance = None

View File

@ -20,7 +20,8 @@
import __builtin__
import os
import unittest
import testtools
setattr(__builtin__, '_', lambda x: x)
@ -33,13 +34,3 @@ cfg.CONF.state_path = absdir
# An empty lock path forces lockutils.synchronized to use a temporary
# location for lock files that will be cleaned up automatically.
cfg.CONF.lock_path = ''
class BaseTest(unittest.TestCase):
def setUp(self):
pass
def setUp():
pass

View File

@ -16,7 +16,8 @@
# under the License.
import os
import unittest
import testtools
from quantum.agent.linux import utils
from quantum.openstack.common import log as logging
@ -25,7 +26,7 @@ from quantum.openstack.common import log as logging
LOG = logging.getLogger(__name__)
class RootwrapTestExec(unittest.TestCase):
class RootwrapTestExec(testtools.TestCase):
"""Simple unit test to test the basic rootwrap mechanism
Essentially hello-world. Just run a command as root and check that
@ -39,6 +40,7 @@ class RootwrapTestExec(unittest.TestCase):
"""
def setUp(self):
super(RootwrapTestExec, self).setUp()
self.cwd = os.getcwd() + "/../../.."
# stuff a stupid bash script into /tmp, so that the next
# method can execute it.
@ -77,3 +79,4 @@ come to the aid of their party.")
def tearDown(self):
os.remove(self.test_file)
os.remove(self.conf_file)
super(RootwrapTestExec, self).tearDown()

View File

@ -67,14 +67,11 @@ class BigSwitchProxyPluginV2TestCase(test_plugin.QuantumDbPluginV2TestCase):
self.httpPatch = patch('httplib.HTTPConnection', create=True,
new=HTTPConnectionMock)
self.addCleanup(self.httpPatch.stop)
MockHTTPConnection = self.httpPatch.start()
super(BigSwitchProxyPluginV2TestCase,
self).setUp(self._plugin_name)
def tearDown(self):
super(BigSwitchProxyPluginV2TestCase, self).tearDown()
self.httpPatch.stop()
class TestBigSwitchProxyBasicGet(test_plugin.TestBasicGet,
BigSwitchProxyPluginV2TestCase):

View File

@ -19,18 +19,20 @@
"""
Test vlans alloc/dealloc.
"""
import unittest2 as unittest
import testtools
from quantum.db import api as db
from quantum.openstack.common import context
from quantum.plugins.brocade import vlanbm as vlan_bitmap
class TestVlanBitmap(unittest.TestCase):
class TestVlanBitmap(testtools.TestCase):
"""exercise Vlan bitmap ."""
def setUp(self):
super(TestVlanBitmap, self).setUp()
db.configure_db()
self.addCleanup(db.clear_db)
self.context = context.get_admin_context()
self.context.session = db.get_session()
@ -69,6 +71,3 @@ class TestVlanBitmap(unittest.TestCase):
self.vbm_.release_vlan(4)
vlan_id = self.vbm_.get_next_vlan(None)
self.assertEqual(vlan_id, 4)
def tearDown(self):
db.clear_db()

View File

@ -14,7 +14,7 @@
# limitations under the License.
import mock
import unittest
import testtools
from quantum.db import api as db
from quantum.openstack.common import importutils
@ -34,12 +34,13 @@ NEXUS_DRIVER = ('quantum.plugins.cisco.tests.unit.v2.nexus.'
'fake_nexus_driver.CiscoNEXUSFakeDriver')
class TestCiscoNexusPlugin(unittest.TestCase):
class TestCiscoNexusPlugin(testtools.TestCase):
def setUp(self):
"""
Set up function
"""
super(TestCiscoNexusPlugin, self).setUp()
self.tenant_id = "test_tenant_cisco1"
self.net_name = "test_network_cisco1"
self.net_id = 000007
@ -133,9 +134,3 @@ class TestCiscoNexusPlugin(unittest.TestCase):
)
self.assertEqual(expected_instance_id, INSTANCE)
def tearDown(self):
"""Clear the test environment"""
pass
# Remove database contents
#db.clear_db(network_models_v2.model_base.BASEV2)

View File

@ -24,14 +24,15 @@ import sys
import mock
from oslo.config import cfg
import unittest2 as unittest
import testtools
from quantum.plugins.hyperv.agent import hyperv_quantum_agent
class TestHyperVQuantumAgent(unittest.TestCase):
class TestHyperVQuantumAgent(testtools.TestCase):
def setUp(self):
super(TestHyperVQuantumAgent, self).setUp()
self.addCleanup(cfg.CONF.reset)
# Avoid rpc initialization for unit tests
cfg.CONF.set_override('rpc_backend',
@ -42,9 +43,6 @@ class TestHyperVQuantumAgent(unittest.TestCase):
self.agent.agent_id = mock.Mock()
self.agent._utils = mock.Mock()
def tearDown(self):
cfg.CONF.reset()
def test_port_bound(self):
port = mock.Mock()
net_uuid = 'my-net-uuid'

View File

@ -21,7 +21,7 @@ Unit Tests for hyperv quantum rpc
"""
import mock
import unittest2
import testtools
from quantum.agent import rpc as agent_rpc
from quantum.common import topics
@ -31,7 +31,7 @@ from quantum.plugins.hyperv import agent_notifier_api as ana
from quantum.plugins.hyperv.common import constants
class rpcHyperVApiTestCase(unittest2.TestCase):
class rpcHyperVApiTestCase(testtools.TestCase):
def _test_hyperv_quantum_api(
self, rpcapi, topic, method, rpc_method, **kwargs):

View File

@ -14,13 +14,13 @@
# limitations under the License.
from oslo.config import cfg
import unittest2 as unittest
import testtools
#NOTE this import loads tests required options
from quantum.plugins.linuxbridge.common import config
class ConfigurationTest(unittest.TestCase):
class ConfigurationTest(testtools.TestCase):
def test_defaults(self):
self.assertEqual(-1,

View File

@ -13,7 +13,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import unittest2
import testtools
from testtools import matchers
from quantum.common import exceptions as q_exc
from quantum.db import api as db
@ -29,14 +30,13 @@ UPDATED_VLAN_RANGES = {PHYS_NET: [(VLAN_MIN + 5, VLAN_MAX + 5)],
PHYS_NET_2: [(VLAN_MIN + 20, VLAN_MAX + 20)]}
class NetworkStatesTest(unittest2.TestCase):
class NetworkStatesTest(testtools.TestCase):
def setUp(self):
super(NetworkStatesTest, self).setUp()
lb_db.initialize()
lb_db.sync_network_states(VLAN_RANGES)
self.session = db.get_session()
def tearDown(self):
db.clear_db()
self.addCleanup(db.clear_db)
def test_sync_network_states(self):
self.assertIsNone(lb_db.get_network_state(PHYS_NET,
@ -105,11 +105,11 @@ class NetworkStatesTest(unittest2.TestCase):
for x in xrange(VLAN_MIN, VLAN_MAX + 1):
physical_network, vlan_id = lb_db.reserve_network(self.session)
self.assertEqual(physical_network, PHYS_NET)
self.assertGreaterEqual(vlan_id, VLAN_MIN)
self.assertLessEqual(vlan_id, VLAN_MAX)
self.assertThat(vlan_id, matchers.GreaterThan(VLAN_MIN - 1))
self.assertThat(vlan_id, matchers.LessThan(VLAN_MAX + 1))
vlan_ids.add(vlan_id)
with self.assertRaises(q_exc.NoNetworkAvailable):
with testtools.ExpectedException(q_exc.NoNetworkAvailable):
physical_network, vlan_id = lb_db.reserve_network(self.session)
for vlan_id in vlan_ids:
@ -123,7 +123,7 @@ class NetworkStatesTest(unittest2.TestCase):
self.assertTrue(lb_db.get_network_state(PHYS_NET,
vlan_id).allocated)
with self.assertRaises(q_exc.VlanIdInUse):
with testtools.ExpectedException(q_exc.VlanIdInUse):
lb_db.reserve_specific_network(self.session, PHYS_NET, vlan_id)
lb_db.release_network(self.session, PHYS_NET, vlan_id, VLAN_RANGES)
@ -137,7 +137,7 @@ class NetworkStatesTest(unittest2.TestCase):
self.assertTrue(lb_db.get_network_state(PHYS_NET,
vlan_id).allocated)
with self.assertRaises(q_exc.VlanIdInUse):
with testtools.ExpectedException(q_exc.VlanIdInUse):
lb_db.reserve_specific_network(self.session, PHYS_NET, vlan_id)
lb_db.release_network(self.session, PHYS_NET, vlan_id, VLAN_RANGES)

View File

@ -16,15 +16,16 @@
import mock
from oslo.config import cfg
import unittest2 as unittest
import testtools
from quantum.plugins.linuxbridge.agent import linuxbridge_quantum_agent
from quantum.plugins.linuxbridge.common import constants as lconst
class TestLinuxBridge(unittest.TestCase):
class TestLinuxBridge(testtools.TestCase):
def setUp(self):
super(TestLinuxBridge, self).setUp()
self.addCleanup(cfg.CONF.reset)
interface_mappings = {'physnet1': 'eth1'}
root_helper = cfg.CONF.AGENT.root_helper

View File

@ -37,7 +37,6 @@ class LinuxBridgeSecurityGroupsTestCase(test_sg.SecurityGroupDBTestCase):
_plugin_name = PLUGIN_NAME
def setUp(self, plugin=None):
self.addCleanup(mock.patch.stopall)
notifier_p = mock.patch(NOTIFIER)
notifier_cls = notifier_p.start()
self.notifier = mock.Mock()
@ -48,10 +47,11 @@ class LinuxBridgeSecurityGroupsTestCase(test_sg.SecurityGroupDBTestCase):
RESOURCE_ATTRIBUTE_MAP[item].
copy())
super(LinuxBridgeSecurityGroupsTestCase, self).setUp(PLUGIN_NAME)
self.addCleanup(mock.patch.stopall)
def tearDown(self):
super(LinuxBridgeSecurityGroupsTestCase, self).tearDown()
attributes.RESOURCE_ATTRIBUTE_MAP = self._attribute_map_bk_
super(LinuxBridgeSecurityGroupsTestCase, self).tearDown()
class TestLinuxBridgeSecurityGroups(LinuxBridgeSecurityGroupsTestCase,

View File

@ -19,7 +19,7 @@ Unit Tests for linuxbridge rpc
"""
import stubout
import unittest2
import testtools
from quantum.agent import rpc as agent_rpc
from quantum.common import topics
@ -28,7 +28,7 @@ from quantum.openstack.common import rpc
from quantum.plugins.linuxbridge import lb_quantum_plugin as plb
class rpcApiTestCase(unittest2.TestCase):
class rpcApiTestCase(testtools.TestCase):
def _test_lb_api(self, rpcapi, topic, method, rpc_method, **kwargs):
ctxt = context.RequestContext('fake_user', 'fake_project')

View File

@ -21,7 +21,7 @@ import mock
import mox
from oslo.config import cfg
import stubout
import unittest2 as unittest
import testtools
from quantum import context
from quantum.db import api as db
@ -65,7 +65,7 @@ def setup_metaplugin_conf():
cfg.CONF.set_override('max_dns_nameservers', 10)
class MetaQuantumPluginV2Test(unittest.TestCase):
class MetaQuantumPluginV2Test(testtools.TestCase):
"""Class conisting of MetaQuantumPluginV2 unit tests"""
def setUp(self):
@ -287,7 +287,7 @@ class MetaQuantumPluginV2Test(unittest.TestCase):
self.plugin.delete_router(self.context, router_ret1['id'])
self.plugin.delete_router(self.context, router_ret2['id'])
with self.assertRaises(FlavorNotFound):
with testtools.ExpectedException(FlavorNotFound):
self.plugin.get_router(self.context, router_ret1['id'])
def test_extension_method(self):
@ -310,3 +310,4 @@ class MetaQuantumPluginV2Test(unittest.TestCase):
self.stubs.SmartUnsetAll()
self.mox.VerifyAll()
db.clear_db()
super(MetaQuantumPluginV2Test, self).tearDown()

View File

@ -19,7 +19,7 @@
# @author: Ryu Ishimoto, Midokura Japan KK
# @author: Tomoe Sugihara, Midokura Japan KK
import unittest2 as unittest
import testtools
import uuid
import mock
@ -27,14 +27,12 @@ import mock
from quantum.plugins.midonet import midonet_lib
class MidonetLibTestCase(unittest.TestCase):
class MidonetLibTestCase(testtools.TestCase):
def setUp(self):
super(MidonetLibTestCase, self).setUp()
self.mock_api = mock.Mock()
def tearDown(self):
self.mock_api = None
def _create_mock_chains(self, sg_id, sg_name):
mock_in_chain = mock.Mock()
mock_in_chain.get_name.return_value = "OS_SG_%s_%s_IN" % (sg_id,
@ -69,10 +67,6 @@ class MidonetChainManagerTestCase(MidonetLibTestCase):
super(MidonetChainManagerTestCase, self).setUp()
self.mgr = midonet_lib.ChainManager(self.mock_api)
def tearDown(self):
self.mgr = None
super(MidonetChainManagerTestCase, self).tearDown()
def test_create_for_sg(self):
tenant_id = 'test_tenant'
sg_id = str(uuid.uuid4())
@ -147,10 +141,6 @@ class MidonetPortGroupManagerTestCase(MidonetLibTestCase):
super(MidonetPortGroupManagerTestCase, self).setUp()
self.mgr = midonet_lib.PortGroupManager(self.mock_api)
def tearDown(self):
self.mgr = None
super(MidonetPortGroupManagerTestCase, self).tearDown()
def test_create(self):
tenant_id = 'test_tenant'
sg_id = str(uuid.uuid4())
@ -197,10 +187,6 @@ class MidonetRuleManagerTestCase(MidonetLibTestCase):
self.mgr.chain_manager = mock.Mock()
self.mgr.pg_manager = mock.Mock()
def tearDown(self):
self.mgr = None
super(MidonetRuleManagerTestCase, self).tearDown()
def _create_test_rule(self, tenant_id, sg_id, rule_id, direction="egress",
protocol="tcp", port_min=1, port_max=65535,
src_ip='192.168.1.0/24', src_group_id=None,

View File

@ -15,12 +15,12 @@
# under the License.
# @author: Ryota MIBU
import unittest
import testtools
from quantum.plugins.nec.common import config
class ConfigurationTest(unittest.TestCase):
class ConfigurationTest(testtools.TestCase):
def test_defaults(self):
self.assertEqual(-1, config.CONF.DATABASE.sql_max_retries)

View File

@ -16,7 +16,7 @@
# @author: Ryota MIBU
import random
import unittest
import testtools
from quantum.db import api as db_api
from quantum.openstack.common import uuidutils
@ -25,17 +25,15 @@ from quantum.plugins.nec.db import api as ndb
from quantum.plugins.nec.db import models as nmodels
class NECPluginV2DBTestBase(object):
class NECPluginV2DBTestBase(testtools.TestCase):
"""Class conisting of NECPluginV2 DB unit tests"""
def setUp(self):
"""Setup for tests"""
super(NECPluginV2DBTestBase, self).setUp()
ndb.initialize()
self.session = db_api.get_session()
def tearDown(self):
"""Tear Down"""
ndb.clear_db()
self.addCleanup(ndb.clear_db)
def get_ofc_item_random_params(self):
"""create random parameters for ofc_item test"""
@ -55,8 +53,7 @@ class NECPluginV2DBTestBase(object):
return port_id, datapath_id, port_no, vlan_id, mac, none
class NECPluginV2DBTest(NECPluginV2DBTestBase,
unittest.TestCase):
class NECPluginV2DBTest(NECPluginV2DBTestBase):
def testa_add_ofc_item(self):
"""test add OFC item"""
@ -166,8 +163,7 @@ class NECPluginV2DBTest(NECPluginV2DBTestBase,
self.assertEqual(None, portinfo_none)
class NECPluginV2DBOldMappingTest(NECPluginV2DBTestBase,
unittest.TestCase):
class NECPluginV2DBOldMappingTest(NECPluginV2DBTestBase):
"""Test related to old ID mapping"""
# Mapping Table mode

View File

@ -15,7 +15,7 @@
# under the License.
# @author: Ryota MIBU
import unittest
import testtools
from quantum import context
from quantum.openstack.common import uuidutils
@ -25,19 +25,18 @@ from quantum.plugins.nec.db import models as nmodels
from quantum.plugins.nec import ofc_manager
class OFCManagerTestBase(object):
class OFCManagerTestBase(testtools.TestCase):
"""Class conisting of OFCManager unit tests"""
def setUp(self):
super(OFCManagerTestBase, self).setUp()
driver = "quantum.tests.unit.nec.stub_ofc_driver.StubOFCDriver"
config.CONF.set_override('driver', driver, 'OFC')
ndb.initialize()
self.addCleanup(ndb.clear_db)
self.ofc = ofc_manager.OFCManager()
self.ctx = context.get_admin_context()
def tearDown(self):
ndb.clear_db()
def get_random_params(self):
"""create random parameters for portinfo test"""
tenant = uuidutils.generate_uuid()
@ -48,7 +47,7 @@ class OFCManagerTestBase(object):
return tenant, network, port, _filter, none
class OFCManagerTest(OFCManagerTestBase, unittest.TestCase):
class OFCManagerTest(OFCManagerTestBase):
def testa_create_ofc_tenant(self):
"""test create ofc_tenant"""
t, n, p, f, none = self.get_random_params()
@ -177,7 +176,7 @@ class OFCManagerTest(OFCManagerTestBase, unittest.TestCase):
'ofc_packet_filter', f))
class OFCManagerTestWithOldMapping(OFCManagerTestBase, unittest.TestCase):
class OFCManagerTestWithOldMapping(OFCManagerTestBase):
def test_exists_ofc_tenant(self):
t, n, p, f, none = self.get_random_params()

View File

@ -19,7 +19,7 @@ import random
import string
import mox
import unittest
import testtools
from quantum import context
from quantum.openstack.common import uuidutils
@ -43,17 +43,16 @@ def _ofc(id):
return "ofc-%s" % id
class PFCDriverTestBase():
class PFCDriverTestBase(testtools.TestCase):
driver = 'quantum.plugins.nec.drivers.pfc.PFCDriverBase'
def setUp(self):
super(PFCDriverTestBase, self).setUp()
self.mox = mox.Mox()
self.driver = drivers.get_driver(self.driver)(TestConfig)
self.mox.StubOutWithMock(ofc.OFCClient, 'do_request')
def tearDown(self):
self.mox.UnsetStubs()
self.addCleanup(self.mox.UnsetStubs)
def get_ofc_item_random_params(self):
"""create random parameters for ofc_item test"""
@ -168,11 +167,11 @@ class PFCDriverTestBase():
self.mox.VerifyAll()
class PFCDriverBaseTest(PFCDriverTestBase, unittest.TestCase):
class PFCDriverBaseTest(PFCDriverTestBase):
pass
class PFCV3DriverTest(PFCDriverTestBase, unittest.TestCase):
class PFCV3DriverTest(PFCDriverTestBase):
driver = 'pfc_v3'
def testa_create_tenant(self):
@ -189,20 +188,18 @@ class PFCV3DriverTest(PFCDriverTestBase, unittest.TestCase):
pass
class PFCV4DriverTest(PFCDriverTestBase, unittest.TestCase):
class PFCV4DriverTest(PFCDriverTestBase):
driver = 'pfc_v4'
class PFCDriverStringTest(unittest.TestCase):
class PFCDriverStringTest(testtools.TestCase):
driver = 'quantum.plugins.nec.drivers.pfc.PFCDriverBase'
def setUp(self):
super(PFCDriverStringTest, self).setUp()
self.driver = drivers.get_driver(self.driver)(TestConfig)
def tearDown(self):
pass
def test_generate_pfc_id_uuid(self):
id_str = uuidutils.generate_uuid()
exp_str = (id_str[:14] + id_str[15:]).replace('-', '')[:31]
@ -238,18 +235,17 @@ class PFCDriverStringTest(unittest.TestCase):
self.assertEqual(exp_str, ret_str)
class PFCIdConvertTest(unittest.TestCase):
class PFCIdConvertTest(testtools.TestCase):
driver = 'quantum.plugins.nec.drivers.pfc.PFCDriverBase'
def setUp(self):
super(PFCIdConvertTest, self).setUp()
self.mox = mox.Mox()
self.driver = drivers.get_driver(self.driver)(TestConfig)
self.ctx = self.mox.CreateMock(context.Context)
self.ctx.session = "session"
self.mox.StubOutWithMock(ndb, 'get_ofc_id_lookup_both')
def tearDown(self):
self.mox.UnsetStubs()
self.addCleanup(self.mox.UnsetStubs)
def generate_random_ids(self, count=1):
if count == 1:

View File

@ -16,7 +16,7 @@
# @author: Ryota MIBU
import mox
import unittest
import testtools
from quantum import context
from quantum.openstack.common import uuidutils
@ -32,17 +32,16 @@ class TestConfig(object):
port = 8888
class TremaDriverTestBase():
class TremaDriverTestBase(testtools.TestCase):
driver_name = "trema"
def setUp(self):
super(TremaDriverTestBase, self).setUp()
self.mox = mox.Mox()
self.driver = drivers.get_driver(self.driver_name)(TestConfig)
self.mox.StubOutWithMock(ofc_client.OFCClient, 'do_request')
def tearDown(self):
self.mox.UnsetStubs()
self.addCleanup(self.mox.UnsetStubs)
def get_ofc_item_random_params(self):
"""create random parameters for ofc_item test"""
@ -80,7 +79,7 @@ class TremaDriverNetworkTestBase(TremaDriverTestBase):
self.mox.VerifyAll()
class TremaPortBaseDriverTest(TremaDriverNetworkTestBase, unittest.TestCase):
class TremaPortBaseDriverTest(TremaDriverNetworkTestBase):
driver_name = "trema_port"
@ -111,8 +110,7 @@ class TremaPortBaseDriverTest(TremaDriverNetworkTestBase, unittest.TestCase):
self.mox.VerifyAll()
class TremaPortMACBaseDriverTest(TremaDriverNetworkTestBase,
unittest.TestCase):
class TremaPortMACBaseDriverTest(TremaDriverNetworkTestBase):
driver_name = "trema_portmac"
@ -152,7 +150,7 @@ class TremaPortMACBaseDriverTest(TremaDriverNetworkTestBase,
self.mox.VerifyAll()
class TremaMACBaseDriverTest(TremaDriverNetworkTestBase, unittest.TestCase):
class TremaMACBaseDriverTest(TremaDriverNetworkTestBase):
driver_name = "trema_mac"
@ -180,7 +178,7 @@ class TremaMACBaseDriverTest(TremaDriverNetworkTestBase, unittest.TestCase):
self.mox.VerifyAll()
class TremaFilterDriverTest(TremaDriverTestBase, unittest.TestCase):
class TremaFilterDriverTest(TremaDriverTestBase):
def get_ofc_item_random_params(self):
"""create random parameters for ofc_item test"""
@ -247,16 +245,15 @@ def generate_random_ids(count=1):
return [uuidutils.generate_uuid() for i in xrange(count)]
class TremaIdConvertTest(unittest.TestCase):
class TremaIdConvertTest(testtools.TestCase):
driver_name = 'trema'
def setUp(self):
super(TremaIdConvertTest, self).setUp()
self.driver = drivers.get_driver(self.driver_name)(TestConfig)
self.mox = mox.Mox()
self.ctx = self.mox.CreateMock(context.Context)
def tearDown(self):
self.mox.UnsetStubs()
self.addCleanup(self.mox.UnsetStubs)
def test_convert_tenant_id(self):
ofc_t_id = generate_random_ids(1)
@ -291,16 +288,15 @@ class TremaIdConvertTest(unittest.TestCase):
self.assertEqual(ret, ofc_f_id)
class TremaIdConvertTestBase(object):
class TremaIdConvertTestBase(testtools.TestCase):
def setUp(self):
super(TremaIdConvertTestBase, self).setUp()
self.mox = mox.Mox()
self.driver = drivers.get_driver(self.driver_name)(TestConfig)
self.ctx = self.mox.CreateMock(context.Context)
self.ctx.session = "session"
self.mox.StubOutWithMock(ndb, 'get_ofc_id_lookup_both')
def tearDown(self):
self.mox.UnsetStubs()
self.addCleanup(self.mox.UnsetStubs)
def _test_convert_port_id(self, port_path_template):
t_id, n_id = generate_random_ids(2)
@ -339,7 +335,7 @@ class TremaIdConvertTestBase(object):
self.assertEqual(ret, ofc_p_id)
class TremaIdConvertPortBaseTest(TremaIdConvertTestBase, unittest.TestCase):
class TremaIdConvertPortBaseTest(TremaIdConvertTestBase):
driver_name = "trema_port"
def test_convert_port_id(self):
@ -354,7 +350,7 @@ class TremaIdConvertPortBaseTest(TremaIdConvertTestBase, unittest.TestCase):
'/networs/%(network)s/ports/%(port)s')
class TremaIdConvertPortMACBaseTest(TremaIdConvertTestBase, unittest.TestCase):
class TremaIdConvertPortMACBaseTest(TremaIdConvertTestBase):
driver_name = "trema_portmac"
def test_convert_port_id(self):
@ -370,7 +366,7 @@ class TremaIdConvertPortMACBaseTest(TremaIdConvertTestBase, unittest.TestCase):
'/networs/%(network)s/ports/dummy-%(port)s/attachments/%(port)s')
class TremaIdConvertMACBaseTest(TremaIdConvertTestBase, unittest.TestCase):
class TremaIdConvertMACBaseTest(TremaIdConvertTestBase):
driver_name = "trema_mac"
def test_convert_port_id(self):

View File

@ -13,14 +13,14 @@
# under the License.
#
import unittest2 as unittest
import testtools
from oslo.config import cfg
from quantum.plugins.nicira.nicira_nvp_plugin.common import config
class ConfigurationTest(unittest.TestCase):
class ConfigurationTest(testtools.TestCase):
def test_defaults(self):
self.assertEqual(-1, cfg.CONF.DATABASE.sql_max_retries)

View File

@ -16,7 +16,7 @@
import contextlib
import mock
import unittest2 as unittest
import testtools
from webob import exc
import webtest
@ -53,9 +53,10 @@ class TestExtensionManager(object):
return []
class NetworkGatewayExtensionTestCase(unittest.TestCase):
class NetworkGatewayExtensionTestCase(testtools.TestCase):
def setUp(self):
super(NetworkGatewayExtensionTestCase, self).setUp()
plugin = '%s.%s' % (networkgw.__name__,
networkgw.NetworkGatewayPluginBase.__name__)
self._resource = networkgw.RESOURCE_NAME.replace('-', '_')
@ -71,9 +72,11 @@ class NetworkGatewayExtensionTestCase(unittest.TestCase):
# Update the plugin and extensions path
cfg.CONF.set_override('core_plugin', plugin)
self.addCleanup(cfg.CONF.reset)
self._plugin_patcher = mock.patch(plugin, autospec=True)
self.plugin = self._plugin_patcher.start()
_plugin_patcher = mock.patch(plugin, autospec=True)
self.plugin = _plugin_patcher.start()
self.addCleanup(_plugin_patcher.stop)
# Instantiate mock plugin and enable extensions
manager.QuantumManager.get_plugin().supported_extension_aliases = (
@ -83,12 +86,6 @@ class NetworkGatewayExtensionTestCase(unittest.TestCase):
self.ext_mdw = test_extensions.setup_extensions_middleware(ext_mgr)
self.api = webtest.TestApp(self.ext_mdw)
def tearDown(self):
self._plugin_patcher.stop()
self.api = None
self.plugin = None
cfg.CONF.reset()
def test_network_gateway_create(self):
nw_gw_id = _uuid()
data = {self._resource: {'name': 'nw-gw',

View File

@ -20,6 +20,7 @@ import os
import mock
import netaddr
from oslo.config import cfg
import testtools
import webob.exc
from quantum.common import constants
@ -88,11 +89,8 @@ class NiciraPluginV2TestCase(test_plugin.QuantumDbPluginV2TestCase):
instance.return_value.request.side_effect = _fake_request
super(NiciraPluginV2TestCase, self).setUp(self._plugin_name)
cfg.CONF.set_override('enable_metadata_access_network', False, 'NVP')
def tearDown(self):
self.fc.reset_all()
super(NiciraPluginV2TestCase, self).tearDown()
self.mock_nvpapi.stop()
self.addCleanup(self.fc.reset_all)
self.addCleanup(self.mock_nvpapi.stop)
class TestNiciraBasicGet(test_plugin.TestBasicGet, NiciraPluginV2TestCase):
@ -183,9 +181,10 @@ class TestNiciraNetworksV2(test_plugin.TestNetworksV2,
self._test_create_bridge_network(vlan_id=123)
def test_create_bridge_vlan_network_outofrange_returns_400(self):
with self.assertRaises(webob.exc.HTTPClientError) as ctx_manager:
with testtools.ExpectedException(
webob.exc.HTTPClientError) as ctx_manager:
self._test_create_bridge_network(vlan_id=5000)
self.assertEqual(ctx_manager.exception.code, 400)
self.assertEqual(ctx_manager.exception.code, 400)
def test_list_networks_filter_by_id(self):
# We add this unit test to cover some logic specific to the
@ -220,10 +219,7 @@ class NiciraPortSecurityTestCase(psec.PortSecurityDBTestCase):
instance.return_value.request.side_effect = _fake_request
super(NiciraPortSecurityTestCase, self).setUp(self._plugin_name)
def tearDown(self):
super(NiciraPortSecurityTestCase, self).tearDown()
self.mock_nvpapi.stop()
self.addCleanup(self.mock_nvpapi.stop)
class TestNiciraPortSecurity(psec.TestPortSecurity,

View File

@ -7,20 +7,15 @@
# System
import httplib
import unittest
# Third party
import testtools
# Local
import quantum.plugins.nicira.nicira_nvp_plugin.api_client.common as naco
class NvpApiCommonTest(unittest.TestCase):
def setUp(self):
pass
def tearDown(self):
pass
class NvpApiCommonTest(testtools.TestCase):
def test_conn_str(self):
conn = httplib.HTTPSConnection('localhost', 4242, timeout=0)

View File

@ -8,9 +8,10 @@
import eventlet
eventlet.monkey_patch()
import logging
import unittest
import urllib2
import testtools
logging.basicConfig(level=logging.DEBUG)
lg = logging.getLogger("test_nvp_api_request")
@ -21,10 +22,6 @@ def fetch(url):
return urllib2.urlopen(url).read()
class NvpApiRequestTest(unittest.TestCase):
class NvpApiRequestTest(testtools.TestCase):
def setUp(self):
pass
def tearDown(self):
pass
pass

View File

@ -16,12 +16,12 @@ import httplib
import logging
import new
import random
import unittest
import eventlet
from eventlet.green import urllib2
from mock import Mock
from mock import patch
import testtools
from quantum.plugins.nicira.nicira_nvp_plugin.api_client import (
client_eventlet as nace,
@ -40,10 +40,11 @@ def fetch(url):
return urllib2.urlopen(url).read()
class NvpApiRequestEventletTest(unittest.TestCase):
class NvpApiRequestEventletTest(testtools.TestCase):
def setUp(self):
super(NvpApiRequestEventletTest, self).setUp()
self.client = nace.NvpApiClientEventlet(
[("127.0.0.1", 4401, True)], "admin", "admin")
self.url = "/ws.v1/_debug"
@ -52,6 +53,7 @@ class NvpApiRequestEventletTest(unittest.TestCase):
def tearDown(self):
self.client = None
self.req = None
super(NvpApiRequestEventletTest, self).tearDown()
def test_construct_eventlet_api_request(self):
e = nare.NvpApiRequestEventlet(self.client, self.url)

View File

@ -15,10 +15,10 @@
#
# @author: Salvatore Orlando, VMware
import mock
import os
import mock
import unittest2 as unittest
import testtools
from quantum.openstack.common import jsonutils as json
import quantum.plugins.nicira.nicira_nvp_plugin as nvp_plugin
@ -32,7 +32,7 @@ NICIRA_PKG_PATH = nvp_plugin.__name__
_uuid = test_api_v2._uuid
class NvplibTestCase(unittest.TestCase):
class NvplibTestCase(testtools.TestCase):
def setUp(self):
# mock nvp api client
@ -57,10 +57,8 @@ class NvplibTestCase(unittest.TestCase):
self.fake_cluster.retries, self.fake_cluster.redirects)
super(NvplibTestCase, self).setUp()
def tearDown(self):
self.fc.reset_all()
self.mock_nvpapi.stop()
self.addCleanup(self.fc.reset_all)
self.addCleanup(self.mock_nvpapi.stop)
class TestNvplibNatRules(NvplibTestCase):
@ -143,7 +141,7 @@ class NvplibL2GatewayTestCase(NvplibTestCase):
gw_ids.append(self._create_gw_service(_uuid(), name)['uuid'])
results = nvplib.get_l2_gw_services(self.fake_cluster)
self.assertEqual(len(results), 2)
self.assertItemsEqual(gw_ids, [r['uuid'] for r in results])
self.assertEqual(sorted(gw_ids), sorted([r['uuid'] for r in results]))
def test_delete_l2_gw_service(self):
display_name = 'fake-gateway'

View File

@ -13,7 +13,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import unittest2
import testtools
from testtools import matchers
from quantum.common import exceptions as q_exc
from quantum.db import api as db
@ -33,14 +34,13 @@ TUNNEL_RANGES = [(TUN_MIN, TUN_MAX)]
UPDATED_TUNNEL_RANGES = [(TUN_MIN + 5, TUN_MAX + 5)]
class VlanAllocationsTest(unittest2.TestCase):
class VlanAllocationsTest(testtools.TestCase):
def setUp(self):
super(VlanAllocationsTest, self).setUp()
ovs_db_v2.initialize()
ovs_db_v2.sync_vlan_allocations(VLAN_RANGES)
self.session = db.get_session()
def tearDown(self):
db.clear_db()
self.addCleanup(db.clear_db)
def test_sync_vlan_allocations(self):
self.assertIsNone(ovs_db_v2.get_vlan_allocation(PHYS_NET,
@ -117,19 +117,19 @@ class VlanAllocationsTest(unittest2.TestCase):
for x in xrange(VLAN_MIN, VLAN_MAX + 1):
physical_network, vlan_id = ovs_db_v2.reserve_vlan(self.session)
self.assertEqual(physical_network, PHYS_NET)
self.assertGreaterEqual(vlan_id, VLAN_MIN)
self.assertLessEqual(vlan_id, VLAN_MAX)
self.assertThat(vlan_id, matchers.GreaterThan(VLAN_MIN - 1))
self.assertThat(vlan_id, matchers.LessThan(VLAN_MAX + 1))
vlan_ids.add(vlan_id)
with self.assertRaises(q_exc.NoNetworkAvailable):
with testtools.ExpectedException(q_exc.NoNetworkAvailable):
physical_network, vlan_id = ovs_db_v2.reserve_vlan(self.session)
ovs_db_v2.release_vlan(self.session, PHYS_NET, vlan_ids.pop(),
VLAN_RANGES)
physical_network, vlan_id = ovs_db_v2.reserve_vlan(self.session)
self.assertEqual(physical_network, PHYS_NET)
self.assertGreaterEqual(vlan_id, VLAN_MIN)
self.assertLessEqual(vlan_id, VLAN_MAX)
self.assertThat(vlan_id, matchers.GreaterThan(VLAN_MIN - 1))
self.assertThat(vlan_id, matchers.LessThan(VLAN_MAX + 1))
vlan_ids.add(vlan_id)
for vlan_id in vlan_ids:
@ -144,7 +144,7 @@ class VlanAllocationsTest(unittest2.TestCase):
self.assertTrue(ovs_db_v2.get_vlan_allocation(PHYS_NET,
vlan_id).allocated)
with self.assertRaises(q_exc.VlanIdInUse):
with testtools.ExpectedException(q_exc.VlanIdInUse):
ovs_db_v2.reserve_specific_vlan(self.session, PHYS_NET, vlan_id)
ovs_db_v2.release_vlan(self.session, PHYS_NET, vlan_id, VLAN_RANGES)
@ -158,7 +158,7 @@ class VlanAllocationsTest(unittest2.TestCase):
self.assertTrue(ovs_db_v2.get_vlan_allocation(PHYS_NET,
vlan_id).allocated)
with self.assertRaises(q_exc.VlanIdInUse):
with testtools.ExpectedException(q_exc.VlanIdInUse):
ovs_db_v2.reserve_specific_vlan(self.session, PHYS_NET, vlan_id)
ovs_db_v2.release_vlan(self.session, PHYS_NET, vlan_id, VLAN_RANGES)
@ -169,8 +169,8 @@ class VlanAllocationsTest(unittest2.TestCase):
for x in xrange(VLAN_MIN, VLAN_MAX + 1):
physical_network, vlan_id = ovs_db_v2.reserve_vlan(self.session)
self.assertEqual(physical_network, PHYS_NET)
self.assertGreaterEqual(vlan_id, VLAN_MIN)
self.assertLessEqual(vlan_id, VLAN_MAX)
self.assertThat(vlan_id, matchers.GreaterThan(VLAN_MIN - 1))
self.assertThat(vlan_id, matchers.LessThan(VLAN_MAX + 1))
vlan_ids.add(vlan_id)
ovs_db_v2.release_vlan(self.session, PHYS_NET, vlan_ids.pop(),
@ -178,14 +178,13 @@ class VlanAllocationsTest(unittest2.TestCase):
ovs_db_v2.sync_vlan_allocations({})
class TunnelAllocationsTest(unittest2.TestCase):
class TunnelAllocationsTest(testtools.TestCase):
def setUp(self):
super(TunnelAllocationsTest, self).setUp()
ovs_db_v2.initialize()
ovs_db_v2.sync_tunnel_allocations(TUNNEL_RANGES)
self.session = db.get_session()
def tearDown(self):
db.clear_db()
self.addCleanup(db.clear_db)
def test_sync_tunnel_allocations(self):
self.assertIsNone(ovs_db_v2.get_tunnel_allocation(TUN_MIN - 1))
@ -214,17 +213,17 @@ class TunnelAllocationsTest(unittest2.TestCase):
tunnel_ids = set()
for x in xrange(TUN_MIN, TUN_MAX + 1):
tunnel_id = ovs_db_v2.reserve_tunnel(self.session)
self.assertGreaterEqual(tunnel_id, TUN_MIN)
self.assertLessEqual(tunnel_id, TUN_MAX)
self.assertThat(tunnel_id, matchers.GreaterThan(TUN_MIN - 1))
self.assertThat(tunnel_id, matchers.LessThan(TUN_MAX + 1))
tunnel_ids.add(tunnel_id)
with self.assertRaises(q_exc.NoNetworkAvailable):
with testtools.ExpectedException(q_exc.NoNetworkAvailable):
tunnel_id = ovs_db_v2.reserve_tunnel(self.session)
ovs_db_v2.release_tunnel(self.session, tunnel_ids.pop(), TUNNEL_RANGES)
tunnel_id = ovs_db_v2.reserve_tunnel(self.session)
self.assertGreaterEqual(tunnel_id, TUN_MIN)
self.assertLessEqual(tunnel_id, TUN_MAX)
self.assertThat(tunnel_id, matchers.GreaterThan(TUN_MIN - 1))
self.assertThat(tunnel_id, matchers.LessThan(TUN_MAX + 1))
tunnel_ids.add(tunnel_id)
for tunnel_id in tunnel_ids:
@ -236,7 +235,7 @@ class TunnelAllocationsTest(unittest2.TestCase):
ovs_db_v2.reserve_specific_tunnel(self.session, tunnel_id)
self.assertTrue(ovs_db_v2.get_tunnel_allocation(tunnel_id).allocated)
with self.assertRaises(q_exc.TunnelIdInUse):
with testtools.ExpectedException(q_exc.TunnelIdInUse):
ovs_db_v2.reserve_specific_tunnel(self.session, tunnel_id)
ovs_db_v2.release_tunnel(self.session, tunnel_id, TUNNEL_RANGES)
@ -248,7 +247,7 @@ class TunnelAllocationsTest(unittest2.TestCase):
ovs_db_v2.reserve_specific_tunnel(self.session, tunnel_id)
self.assertTrue(ovs_db_v2.get_tunnel_allocation(tunnel_id).allocated)
with self.assertRaises(q_exc.TunnelIdInUse):
with testtools.ExpectedException(q_exc.TunnelIdInUse):
ovs_db_v2.reserve_specific_tunnel(self.session, tunnel_id)
ovs_db_v2.release_tunnel(self.session, tunnel_id, TUNNEL_RANGES)

View File

@ -13,7 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import unittest
import testtools
from oslo.config import cfg
@ -21,7 +21,7 @@ from oslo.config import cfg
from quantum.plugins.openvswitch.common import config
class ConfigurationTest(unittest.TestCase):
class ConfigurationTest(testtools.TestCase):
def test_defaults(self):
self.assertEqual('br-int', cfg.CONF.OVS.integration_bridge)

View File

@ -16,13 +16,13 @@
# @author: Dan Wendlandt, Nicira, Inc.
import mox
import unittest2 as unittest
import testtools
from quantum.agent.linux import ovs_lib, utils
from quantum.openstack.common import uuidutils
class OVS_Lib_Test(unittest.TestCase):
class OVS_Lib_Test(testtools.TestCase):
"""
A test suite to excercise the OVS libraries shared by Quantum agents.
Note: these tests do not actually execute ovs-* utilities, and thus
@ -30,6 +30,7 @@ class OVS_Lib_Test(unittest.TestCase):
"""
def setUp(self):
super(OVS_Lib_Test, self).setUp()
self.BR_NAME = "br-int"
self.TO = "--timeout=2"
@ -37,9 +38,7 @@ class OVS_Lib_Test(unittest.TestCase):
self.root_helper = 'sudo'
self.br = ovs_lib.OVSBridge(self.BR_NAME, self.root_helper)
self.mox.StubOutWithMock(utils, "execute")
def tearDown(self):
self.mox.UnsetStubs()
self.addCleanup(self.mox.UnsetStubs)
def test_vifport(self):
"""create and stringify vif port, confirm no exceptions"""

View File

@ -16,7 +16,7 @@
import mock
from oslo.config import cfg
import unittest2 as unittest
import testtools
from quantum.plugins.openvswitch.agent import ovs_quantum_agent
@ -25,7 +25,7 @@ NOTIFIER = ('quantum.plugins.openvswitch.'
'ovs_quantum_plugin.AgentNotifierApi')
class CreateAgentConfigMap(unittest.TestCase):
class CreateAgentConfigMap(testtools.TestCase):
def test_create_agent_config_map_succeeds(self):
self.assertTrue(ovs_quantum_agent.create_agent_config_map(cfg.CONF))
@ -34,13 +34,14 @@ class CreateAgentConfigMap(unittest.TestCase):
self.addCleanup(cfg.CONF.reset)
# An ip address is required for tunneling but there is no default
cfg.CONF.set_override('enable_tunneling', True, group='OVS')
with self.assertRaises(ValueError):
with testtools.ExpectedException(ValueError):
ovs_quantum_agent.create_agent_config_map(cfg.CONF)
class TestOvsQuantumAgent(unittest.TestCase):
class TestOvsQuantumAgent(testtools.TestCase):
def setUp(self):
super(TestOvsQuantumAgent, self).setUp()
self.addCleanup(cfg.CONF.reset)
self.addCleanup(mock.patch.stopall)
notifier_p = mock.patch(NOTIFIER)

View File

@ -19,7 +19,7 @@ Unit Tests for openvswitch rpc
"""
import stubout
import unittest2
import testtools
from quantum.agent import rpc as agent_rpc
from quantum.common import topics
@ -29,7 +29,7 @@ from quantum.plugins.openvswitch.common import constants
from quantum.plugins.openvswitch import ovs_quantum_plugin as povs
class rpcApiTestCase(unittest2.TestCase):
class rpcApiTestCase(testtools.TestCase):
def _test_ovs_api(self, rpcapi, topic, method, rpc_method, **kwargs):
ctxt = context.RequestContext('fake_user', 'fake_project')

View File

@ -16,10 +16,9 @@
#
# @author: Dave Lapsley, Nicira Networks, Inc.
import unittest
import mox
from oslo.config import cfg
import testtools
from quantum.agent.linux import ip_lib
from quantum.agent.linux import ovs_lib
@ -60,12 +59,16 @@ class DummyVlanBinding:
self.vlan_id = vlan_id
class TunnelTest(unittest.TestCase):
class TunnelTest(testtools.TestCase):
def setUp(self):
super(TunnelTest, self).setUp()
cfg.CONF.set_override('rpc_backend',
'quantum.openstack.common.rpc.impl_fake')
cfg.CONF.set_override('report_interval', 0, 'AGENT')
self.mox = mox.Mox()
self.addCleanup(self.mox.UnsetStubs)
self.INT_BRIDGE = 'integration_bridge'
self.TUN_BRIDGE = 'tunnel_bridge'
self.MAP_TUN_BRIDGE = 'tunnel_bridge_mapping'
@ -124,9 +127,6 @@ class TunnelTest(unittest.TestCase):
self.mox.StubOutWithMock(utils, 'get_interface_mac')
utils.get_interface_mac(self.INT_BRIDGE).AndReturn('000000000001')
def tearDown(self):
self.mox.UnsetStubs()
def testConstruct(self):
self.mox.ReplayAll()
b = ovs_quantum_agent.OVSQuantumAgent(self.INT_BRIDGE,

View File

@ -16,13 +16,13 @@
# under the License.
from oslo.config import cfg
import unittest2
import testtools
#NOTE this import loads tests required options
from quantum.plugins.ryu.common import config
class ConfigurationTest(unittest2.TestCase):
class ConfigurationTest(testtools.TestCase):
"""Configuration file Tests"""
def test_defaults(self):
self.assertEqual('br-int', cfg.CONF.OVS.integration_bridge)

View File

@ -17,17 +17,18 @@ from contextlib import nested
import httplib
import mock
import unittest2 as unittest
import testtools
from quantum.openstack.common import importutils
from quantum.tests.unit.ryu import fake_ryu
class RyuAgentTestCase(unittest.TestCase):
class RyuAgentTestCase(testtools.TestCase):
_AGENT_NAME = 'quantum.plugins.ryu.agent.ryu_quantum_agent'
def setUp(self):
super(RyuAgentTestCase, self).setUp()
self.addCleanup(mock.patch.stopall)
self.fake_ryu = fake_ryu.patch_fake_ryu_client().start()
self.mod_agent = importutils.import_module(self._AGENT_NAME)

View File

@ -27,6 +27,7 @@ class RyuPluginV2TestCase(test_plugin.QuantumDbPluginV2TestCase):
self.ryu_patcher = fake_ryu.patch_fake_ryu_client()
self.ryu_patcher.start()
super(RyuPluginV2TestCase, self).setUp(self._plugin_name)
self.addCleanup(self.ryu_patcher.stop)
class TestRyuBasicGet(test_plugin.TestBasicGet, RyuPluginV2TestCase):

View File

@ -15,7 +15,7 @@
# License for the specific language governing permissions and limitations
# under the License.
import unittest2
import testtools
from quantum.agent.common import config
@ -25,7 +25,7 @@ def test_setup_conf():
assert conf.state_path.endswith('/var/lib/quantum')
class TestRootHelper(unittest2.TestCase):
class TestRootHelper(testtools.TestCase):
def test_agent_root_helper(self):
conf = config.setup_conf()

View File

@ -15,15 +15,15 @@
# under the License.
# @author: Dan Wendlandt, Nicira, Inc.
import unittest
import mock
import testtools
from quantum.agent.linux import utils
class AgentUtilsExecuteTest(unittest.TestCase):
class AgentUtilsExecuteTest(testtools.TestCase):
def setUp(self):
super(AgentUtilsExecuteTest, self).setUp()
self.root_helper = "echo"
self.test_file = "/tmp/test_execute.tmp"
open(self.test_file, 'w').close()
@ -61,7 +61,7 @@ class AgentUtilsExecuteTest(unittest.TestCase):
self.assertEqual(result, "%s\n" % self.test_file)
class AgentUtilsGetInterfaceMAC(unittest.TestCase):
class AgentUtilsGetInterfaceMAC(testtools.TestCase):
def test_get_interface_mac(self):
expect_val = '01:02:03:04:05:06'
with mock.patch('fcntl.ioctl') as ioctl:

View File

@ -17,20 +17,21 @@
import mock
from oslo.config import cfg
import unittest2 as unittest
import testtools
from quantum.agent import netns_cleanup_util as util
class TestNullDelegate(unittest.TestCase):
class TestNullDelegate(testtools.TestCase):
def test_getattribute(self):
null_delegate = util.NullDelegate()
self.assertIsNone(null_delegate.test())
class TestNetnsCleanup(unittest.TestCase):
def tearDown(self):
cfg.CONF.reset()
class TestNetnsCleanup(testtools.TestCase):
def setUp(self):
super(TestNetnsCleanup, self).setUp()
self.addCleanup(cfg.CONF.reset)
def test_kill_dhcp(self, dhcp_active=True):
conf = mock.Mock()

View File

@ -19,7 +19,7 @@ import contextlib
import itertools
import mock
from oslo.config import cfg
import unittest2 as unittest
import testtools
from quantum.agent.linux import ip_lib
from quantum.agent.linux import ovs_lib
@ -27,9 +27,10 @@ from quantum.agent import ovs_cleanup_util as util
from quantum.openstack.common import uuidutils
class TestOVSCleanup(unittest.TestCase):
def tearDown(self):
cfg.CONF.reset()
class TestOVSCleanup(testtools.TestCase):
def setUp(self):
super(TestOVSCleanup, self).setUp()
self.addCleanup(cfg.CONF.reset)
def test_setup_conf(self):
conf = util.setup_conf()

View File

@ -15,16 +15,15 @@
# License for the specific language governing permissions and limitations
# under the License.
import unittest
import mock
from oslo.config import cfg
import testtools
from quantum.agent import rpc
from quantum.openstack.common import context
class AgentRPCPluginApi(unittest.TestCase):
class AgentRPCPluginApi(testtools.TestCase):
def _test_rpc_call(self, method):
agent = rpc.PluginApi('fake_topic')
ctxt = context.RequestContext('fake_user', 'fake_project')
@ -48,7 +47,7 @@ class AgentRPCPluginApi(unittest.TestCase):
self._test_rpc_call('tunnel_sync')
class AgentRPCMethods(unittest.TestCase):
class AgentRPCMethods(testtools.TestCase):
def test_create_consumers(self):
dispatcher = mock.Mock()
expected = [

View File

@ -18,7 +18,8 @@
# @author: Zhongyue Luo, Intel Corporation.
#
import unittest2
import testtools
from testtools import matchers
from webob import exc
from quantum.api import api_common as common
@ -28,8 +29,9 @@ class FakeController(common.QuantumController):
_resource_name = 'fake'
class APICommonTestCase(unittest2.TestCase):
class APICommonTestCase(testtools.TestCase):
def setUp(self):
super(APICommonTestCase, self).setUp()
self.controller = FakeController(None)
def test_prepare_request_body(self):
@ -56,7 +58,7 @@ class APICommonTestCase(unittest2.TestCase):
}
}
actual = self.controller._prepare_request_body(body, params)
self.assertDictEqual(expect, actual)
self.assertThat(expect, matchers.Equals(actual))
def test_prepare_request_body_none(self):
body = None
@ -71,7 +73,7 @@ class APICommonTestCase(unittest2.TestCase):
}
}
actual = self.controller._prepare_request_body(body, params)
self.assertDictEqual(expect, actual)
self.assertThat(expect, matchers.Equals(actual))
def test_prepare_request_body_keyerror(self):
body = {'t2': {}}

View File

@ -20,7 +20,8 @@ import urlparse
import mock
from oslo.config import cfg
import unittest2 as unittest
import testtools
from testtools import matchers
import webob
from webob import exc
import webtest
@ -67,7 +68,7 @@ def _get_path(resource, id=None, action=None, fmt=None):
return path
class ResourceIndexTestCase(unittest.TestCase):
class ResourceIndexTestCase(testtools.TestCase):
def test_index_json(self):
index = webtest.TestApp(router.Index({'foo': 'bar'}))
res = index.get('')
@ -92,8 +93,10 @@ class ResourceIndexTestCase(unittest.TestCase):
self.assertTrue(link['rel'] == 'self')
class APIv2TestBase(unittest.TestCase):
class APIv2TestBase(testtools.TestCase):
def setUp(self):
super(APIv2TestBase, self).setUp()
plugin = 'quantum.quantum_plugin_base_v2.QuantumPluginBaseV2'
# Ensure 'stale' patched copies of the plugin are never returned
QuantumManager._instance = None
@ -111,15 +114,11 @@ class APIv2TestBase(unittest.TestCase):
instance = self.plugin.return_value
instance._QuantumPluginBaseV2__native_pagination_support = True
instance._QuantumPluginBaseV2__native_sorting_support = True
self.addCleanup(self._plugin_patcher.stop)
self.addCleanup(cfg.CONF.reset)
api = router.APIRouter()
self.api = webtest.TestApp(api)
super(APIv2TestBase, self).setUp()
def tearDown(self):
self._plugin_patcher.stop()
self.api = None
self.plugin = None
cfg.CONF.reset()
class _ArgMatcher(object):
@ -138,10 +137,6 @@ def _list_cmp(l1, l2):
class APIv2TestCase(APIv2TestBase):
# NOTE(jkoelker) This potentially leaks the mock object if the setUp
# raises without being caught. Using unittest2
# or dropping 2.6 support so we can use addCleanup
# will get around this.
def _do_field_list(self, resource, base_fields):
attr_info = attributes.RESOURCE_ATTRIBUTE_MAP[resource]
policy_attrs = [name for (name, info) in attr_info.items()
@ -599,9 +594,9 @@ class JSONV2TestCase(APIv2TestBase, testlib_api.WebTestCase):
params=params).json
self.assertEqual(len(res['networks']), 2)
self.assertItemsEqual([id1, id2],
[res['networks'][0]['id'],
res['networks'][1]['id']])
self.assertEqual(sorted([id1, id2]),
sorted([res['networks'][0]['id'],
res['networks'][1]['id']]))
self.assertIn('networks_links', res)
next_links = []
@ -1110,8 +1105,10 @@ class JSONV2TestCase(APIv2TestBase, testlib_api.WebTestCase):
self.assertEqual(res.status_int, 400)
class SubresourceTest(unittest.TestCase):
class SubresourceTest(testtools.TestCase):
def setUp(self):
super(SubresourceTest, self).setUp()
plugin = 'quantum.tests.unit.test_api_v2.TestSubresourcePlugin'
QuantumManager._instance = None
PluginAwareExtensionManager._instance = None
@ -1127,6 +1124,8 @@ class SubresourceTest(unittest.TestCase):
self._plugin_patcher = mock.patch(plugin, autospec=True)
self.plugin = self._plugin_patcher.start()
self.addCleanup(self._plugin_patcher.stop)
self.addCleanup(cfg.CONF.reset)
router.SUB_RESOURCES['dummy'] = {
'collection_name': 'dummies',
@ -1146,13 +1145,10 @@ class SubresourceTest(unittest.TestCase):
self.api = webtest.TestApp(api)
def tearDown(self):
self._plugin_patcher.stop()
self.api = None
self.plugin = None
router.SUB_RESOURCES = {}
cfg.CONF.reset()
# Restore the global RESOURCE_ATTRIBUTE_MAP
attributes.RESOURCE_ATTRIBUTE_MAP = self.saved_attr_map
super(SubresourceTest, self).tearDown()
def test_index_sub_resource(self):
instance = self.plugin.return_value
@ -1210,7 +1206,7 @@ class XMLV2TestCase(JSONV2TestCase):
fmt = 'xml'
class V2Views(unittest.TestCase):
class V2Views(testtools.TestCase):
def _view(self, keys, collection, resource):
data = dict((key, 'value') for key in keys)
data['fake'] = 'value'
@ -1332,12 +1328,9 @@ class QuotaTest(APIv2TestBase):
self.assertEqual(res.status_int, exc.HTTPCreated.code)
class ExtensionTestCase(unittest.TestCase):
# NOTE(jkoelker) This potentially leaks the mock object if the setUp
# raises without being caught. Using unittest2
# or dropping 2.6 support so we can use addCleanup
# will get around this.
class ExtensionTestCase(testtools.TestCase):
def setUp(self):
super(ExtensionTestCase, self).setUp()
plugin = 'quantum.quantum_plugin_base_v2.QuantumPluginBaseV2'
# Ensure 'stale' patched copies of the plugin are never returned
@ -1369,6 +1362,7 @@ class ExtensionTestCase(unittest.TestCase):
self.api = webtest.TestApp(api)
def tearDown(self):
super(ExtensionTestCase, self).tearDown()
self._plugin_patcher.stop()
self.api = None
self.plugin = None
@ -1424,13 +1418,13 @@ class TestSubresourcePlugin():
return
class ListArgsTestCase(unittest.TestCase):
class ListArgsTestCase(testtools.TestCase):
def test_list_args(self):
path = '/?fields=4&foo=3&fields=2&bar=1'
request = webob.Request.blank(path)
expect_val = ['2', '4']
actual_val = api_common.list_args(request, 'fields')
self.assertItemsEqual(actual_val, expect_val)
self.assertEqual(sorted(actual_val), expect_val)
def test_list_args_with_empty(self):
path = '/?foo=4&bar=3&baz=2&qux=1'
@ -1438,7 +1432,7 @@ class ListArgsTestCase(unittest.TestCase):
self.assertEqual([], api_common.list_args(request, 'fields'))
class FiltersTestCase(unittest.TestCase):
class FiltersTestCase(testtools.TestCase):
def test_all_skip_args(self):
path = '/?fields=4&fields=3&fields=2&fields=1'
request = webob.Request.blank(path)
@ -1486,7 +1480,7 @@ class FiltersTestCase(unittest.TestCase):
self.assertEqual(actual_val, expect_val)
class CreateResourceTestCase(unittest.TestCase):
class CreateResourceTestCase(testtools.TestCase):
def test_resource_creation(self):
resource = base.create_resource('fakes', 'fake', None, {})
self.assertIsInstance(resource, webob.dec.wsgify)

View File

@ -18,9 +18,8 @@
# @author: Zhongyue Luo, Intel Corporation.
#
import unittest2 as unittest
import mock
import testtools
from webob import exc
import webtest
@ -30,8 +29,9 @@ from quantum import context
from quantum import wsgi
class RequestTestCase(unittest.TestCase):
class RequestTestCase(testtools.TestCase):
def setUp(self):
super(RequestTestCase, self).setUp()
self.req = wsgi_resource.Request({'foo': 'bar'})
def test_content_type_missing(self):
@ -99,7 +99,7 @@ class RequestTestCase(unittest.TestCase):
self.assertTrue(self.req.context.is_admin)
class ResourceTestCase(unittest.TestCase):
class ResourceTestCase(testtools.TestCase):
def test_unmapped_quantum_error(self):
controller = mock.MagicMock()
controller.test.side_effect = q_exc.QuantumException()

View File

@ -15,13 +15,13 @@
# License for the specific language governing permissions and limitations
# under the License.
import unittest2
import testtools
from quantum.api.v2 import attributes
from quantum.common import exceptions as q_exc
class TestAttributes(unittest2.TestCase):
class TestAttributes(testtools.TestCase):
def _construct_dict_and_constraints(self):
""" Constructs a test dictionary and a definition of constraints.
@ -487,7 +487,7 @@ class TestAttributes(unittest2.TestCase):
del dictionary['key1']
msg = attributes._validate_dict(dictionary, constraints)
self.assertIn('Expected keys:', msg, 'The error was not detected.')
self.assertIn('Expected keys:', msg)
def test_validate_dict_wrong_values(self):
dictionary, constraints = self._construct_dict_and_constraints()
@ -502,7 +502,7 @@ class TestAttributes(unittest2.TestCase):
del dictionary['key3']['k4']
dictionary['key3']['k5'] = 'a string value'
msg = attributes._validate_dict(dictionary, constraints)
self.assertIn('Expected keys:', msg, 'The error was not detected.')
self.assertIn('Expected keys:', msg)
def test_validate_dict_or_none(self):
dictionary, constraints = self._construct_dict_and_constraints()
@ -537,7 +537,7 @@ class TestAttributes(unittest2.TestCase):
self.assertIsNone(msg)
class TestConvertToBoolean(unittest2.TestCase):
class TestConvertToBoolean(testtools.TestCase):
def test_convert_to_boolean_bool(self):
self.assertIs(attributes.convert_to_boolean(True), True)
@ -562,7 +562,7 @@ class TestConvertToBoolean(unittest2.TestCase):
'7')
class TestConvertToInt(unittest2.TestCase):
class TestConvertToInt(testtools.TestCase):
def test_convert_to_int_int(self):
self.assertEqual(attributes.convert_to_int(-1), -1)
@ -596,7 +596,7 @@ class TestConvertToInt(unittest2.TestCase):
value, attributes.convert_none_to_empty_list(value))
class TestConvertKvp(unittest2.TestCase):
class TestConvertKvp(testtools.TestCase):
def test_convert_kvp_list_to_dict_succeeds_for_missing_values(self):
result = attributes.convert_kvp_list_to_dict(['True'])
@ -612,11 +612,11 @@ class TestConvertKvp(unittest2.TestCase):
self.assertEqual({'a': ['b'], 'c': ['d']}, result)
def test_convert_kvp_str_to_list_fails_for_missing_key(self):
with self.assertRaises(q_exc.InvalidInput):
with testtools.ExpectedException(q_exc.InvalidInput):
attributes.convert_kvp_str_to_list('=a')
def test_convert_kvp_str_to_list_fails_for_missing_equals(self):
with self.assertRaises(q_exc.InvalidInput):
with testtools.ExpectedException(q_exc.InvalidInput):
attributes.convert_kvp_str_to_list('a')
def test_convert_kvp_str_to_list_succeeds_for_one_equals(self):
@ -628,7 +628,7 @@ class TestConvertKvp(unittest2.TestCase):
self.assertEqual(['a', 'a=a'], result)
class TestConvertToList(unittest2.TestCase):
class TestConvertToList(testtools.TestCase):
def test_convert_to_empty_list(self):
for item in (None, [], (), {}):

View File

@ -1,11 +1,10 @@
import unittest
import testtools
import webob
from quantum import auth
class QuantumKeystoneContextTestCase(unittest.TestCase):
class QuantumKeystoneContextTestCase(testtools.TestCase):
def setUp(self):
super(QuantumKeystoneContextTestCase, self).setUp()

View File

@ -12,37 +12,37 @@
# License for the specific language governing permissions and limitations
# under the License.
import unittest2 as unittest
import testtools
from quantum.common import utils
class TestParseMappings(unittest.TestCase):
class TestParseMappings(testtools.TestCase):
def parse(self, mapping_list, unique_values=True):
return utils.parse_mappings(mapping_list, unique_values)
def test_parse_mappings_fails_for_missing_separator(self):
with self.assertRaises(ValueError):
with testtools.ExpectedException(ValueError):
self.parse(['key'])
def test_parse_mappings_fails_for_missing_key(self):
with self.assertRaises(ValueError):
with testtools.ExpectedException(ValueError):
self.parse([':val'])
def test_parse_mappings_fails_for_missing_value(self):
with self.assertRaises(ValueError):
with testtools.ExpectedException(ValueError):
self.parse(['key:'])
def test_parse_mappings_fails_for_extra_separator(self):
with self.assertRaises(ValueError):
with testtools.ExpectedException(ValueError):
self.parse(['key:val:junk'])
def test_parse_mappings_fails_for_duplicate_key(self):
with self.assertRaises(ValueError):
with testtools.ExpectedException(ValueError):
self.parse(['key:val1', 'key:val2'])
def test_parse_mappings_fails_for_duplicate_value(self):
with self.assertRaises(ValueError):
with testtools.ExpectedException(ValueError):
self.parse(['key1:val', 'key2:val'])
def test_parse_mappings_succeeds_for_one_mapping(self):

View File

@ -14,14 +14,14 @@
# limitations under the License.
import os
import unittest
import testtools
from oslo.config import cfg
from quantum.common import config
class ConfigurationTest(unittest.TestCase):
class ConfigurationTest(testtools.TestCase):
def test_defaults(self):
self.assertEqual('0.0.0.0', cfg.CONF.bind_host)

View File

@ -15,21 +15,21 @@
"""Test of DB API"""
import fixtures
import mock
from oslo.config import cfg
import unittest2 as unittest
import testtools
import quantum.db.api as db
class DBTestCase(unittest.TestCase):
class DBTestCase(testtools.TestCase):
def setUp(self):
super(DBTestCase, self).setUp()
cfg.CONF.set_override('sql_max_retries', 1, 'DATABASE')
cfg.CONF.set_override('reconnect_interval', 0, 'DATABASE')
def tearDown(self):
db._ENGINE = None
cfg.CONF.reset()
self.addCleanup(cfg.CONF.reset)
self.useFixture(fixtures.MonkeyPatch('quantum.db.api._ENGINE', None))
def test_db_reconnect(self):
with mock.patch.object(db, 'register_models') as mock_register:

View File

@ -20,13 +20,13 @@
import sys
import mock
import unittest2 as unittest
import testtools
from quantum.db import migration
from quantum.db.migration import cli
class TestDbMigration(unittest.TestCase):
class TestDbMigration(testtools.TestCase):
def test_should_run_plugin_in_list(self):
self.assertTrue(migration.should_run('foo', ['foo', 'bar']))
self.assertFalse(migration.should_run('foo', ['bar']))
@ -35,14 +35,13 @@ class TestDbMigration(unittest.TestCase):
self.assertTrue(migration.should_run('foo', ['*']))
class TestCli(unittest.TestCase):
class TestCli(testtools.TestCase):
def setUp(self):
super(TestCli, self).setUp()
self.do_alembic_cmd_p = mock.patch.object(cli, 'do_alembic_command')
self.do_alembic_cmd = self.do_alembic_cmd_p.start()
def tearDown(self):
self.do_alembic_cmd_p.stop()
cli.CONF.reset()
self.addCleanup(self.do_alembic_cmd_p.stop)
self.addCleanup(cli.CONF.reset)
def _main_test_helper(self, argv, func_name, exp_args=(), exp_kwargs={}):
with mock.patch.object(sys, 'argv', argv):

View File

@ -24,7 +24,8 @@ import random
import mock
from oslo.config import cfg
import sqlalchemy as sa
import unittest2
import testtools
from testtools import matchers
import webob.exc
import quantum
@ -158,7 +159,6 @@ class QuantumDbPluginV2TestCase(testlib_api.WebTestCase):
self.ext_api = test_extensions.setup_extensions_middleware(ext_mgr)
def tearDown(self):
super(QuantumDbPluginV2TestCase, self).tearDown()
self.api = None
self._deserializers = None
self._skip_native_bulk = None
@ -173,6 +173,7 @@ class QuantumDbPluginV2TestCase(testlib_api.WebTestCase):
cfg.CONF.reset()
# Restore the original attribute map
attributes.RESOURCE_ATTRIBUTE_MAP = self._attribute_map_bk
super(QuantumDbPluginV2TestCase, self).tearDown()
def _req(self, method, resource, data=None, fmt=None, id=None, params=None,
action=None, subresource=None, sub_id=None):
@ -514,8 +515,8 @@ class QuantumDbPluginV2TestCase(testlib_api.WebTestCase):
quantum_context=quantum_context,
query_params=query_params)
resource = resource.replace('-', '_')
self.assertItemsEqual([i['id'] for i in res['%ss' % resource]],
[i[resource]['id'] for i in items])
self.assertEqual(sorted([i['id'] for i in res['%ss' % resource]]),
sorted([i[resource]['id'] for i in items]))
@contextlib.contextmanager
def network(self, name='net1',
@ -587,8 +588,8 @@ class QuantumDbPluginV2TestCase(testlib_api.WebTestCase):
res = self.deserialize(self.fmt, req.get_response(api))
collection = collection.replace('-', '_')
expected_res = [item[collection]['id'] for item in items]
self.assertListEqual([n['id'] for n in res["%ss" % collection]],
expected_res)
self.assertEqual(sorted([n['id'] for n in res["%ss" % collection]]),
sorted(expected_res))
def _test_list_with_pagination(self, collection, items, sort,
limit, expected_page_num, query_params='',
@ -604,7 +605,8 @@ class QuantumDbPluginV2TestCase(testlib_api.WebTestCase):
while req:
page_num = page_num + 1
res = self.deserialize(self.fmt, req.get_response(api))
self.assertLessEqual(len(res["%ss" % collection]), limit)
self.assertThat(len(res["%ss" % collection]),
matchers.LessThan(limit + 1))
items_res = items_res + res["%ss" % collection]
req = None
if '%ss_links' % collection in res:
@ -616,8 +618,9 @@ class QuantumDbPluginV2TestCase(testlib_api.WebTestCase):
self.assertEqual(len(res["%ss" % collection]),
limit)
self.assertEqual(page_num, expected_page_num)
self.assertListEqual([n[verify_key] for n in items_res],
[item[collection][verify_key] for item in items])
self.assertEqual(sorted([n[verify_key] for n in items_res]),
sorted([item[collection][verify_key]
for item in items]))
def _test_list_with_pagination_reverse(self, collection, items, sort,
limit, expected_page_num,
@ -637,7 +640,8 @@ class QuantumDbPluginV2TestCase(testlib_api.WebTestCase):
while req:
page_num = page_num + 1
res = self.deserialize(self.fmt, req.get_response(api))
self.assertLessEqual(len(res["%ss" % collection]), limit)
self.assertThat(len(res["%ss" % collection]),
matchers.LessThan(limit + 1))
res["%ss" % collection].reverse()
item_res = item_res + res["%ss" % collection]
req = None
@ -652,8 +656,8 @@ class QuantumDbPluginV2TestCase(testlib_api.WebTestCase):
self.assertEqual(page_num, expected_page_num)
expected_res = [item[collection]['id'] for item in items]
expected_res.reverse()
self.assertListEqual([n['id'] for n in item_res],
expected_res)
self.assertEqual(sorted([n['id'] for n in item_res]),
sorted(expected_res))
class TestBasicGet(QuantumDbPluginV2TestCase):
@ -1644,9 +1648,9 @@ fixed_ips=ip_address%%3D%s&fixed_ips=ip_address%%3D%s&fixed_ips=subnet_id%%3D%s
ip_allocation = q.one()
self.assertGreater(
self.assertThat(
ip_allocation.expiration - timeutils.utcnow(),
datetime.timedelta(seconds=10))
matchers.GreaterThan(datetime.timedelta(seconds=10)))
def test_port_delete_holds_ip(self):
plugin = QuantumManager.get_plugin()
@ -1765,13 +1769,14 @@ class TestNetworksV2(QuantumDbPluginV2TestCase):
name = 'public_net'
keys = [('subnets', []), ('name', name), ('admin_state_up', True),
('status', 'ACTIVE'), ('shared', True)]
with self.assertRaises(webob.exc.HTTPClientError) as ctx_manager:
with testtools.ExpectedException(
webob.exc.HTTPClientError) as ctx_manager:
with self.network(name=name,
shared=True,
tenant_id="another_tenant",
set_context=True):
pass
self.assertEqual(ctx_manager.exception.code, 403)
self.assertEqual(ctx_manager.exception.code, 403)
def test_update_network(self):
with self.network() as network:
@ -2288,13 +2293,13 @@ class TestSubnetsV2(QuantumDbPluginV2TestCase):
with self.subnet(network=network,
gateway_ip=gateway_ip_1,
cidr=cidr_1):
with self.assertRaises(
with testtools.ExpectedException(
webob.exc.HTTPClientError) as ctx_manager:
with self.subnet(network=network,
gateway_ip=gateway_ip_2,
cidr=cidr_2):
pass
self.assertEqual(ctx_manager.exception.code, 400)
self.assertEqual(ctx_manager.exception.code, 400)
def test_create_subnet_bad_V4_cidr(self):
with self.network() as network:
@ -2331,8 +2336,8 @@ class TestSubnetsV2(QuantumDbPluginV2TestCase):
cidr_1 = '10.0.0.0/23'
cidr_2 = '10.0.0.0/24'
cfg.CONF.set_override('allow_overlapping_ips', False)
with self.assertRaises(
webob.exc.HTTPClientError) as ctx_manager:
with testtools.ExpectedException(
webob.exc.HTTPClientError) as ctx_manager:
with contextlib.nested(self.subnet(cidr=cidr_1),
self.subnet(cidr=cidr_2)):
pass
@ -2744,21 +2749,23 @@ class TestSubnetsV2(QuantumDbPluginV2TestCase):
cidr = '10.0.0.0/24'
allocation_pools = [{'start': '10.0.0.1',
'end': '10.0.0.5'}]
with self.assertRaises(webob.exc.HTTPClientError) as ctx_manager:
with testtools.ExpectedException(
webob.exc.HTTPClientError) as ctx_manager:
self._test_create_subnet(cidr=cidr,
allocation_pools=allocation_pools)
self.assertEqual(ctx_manager.exception.code, 409)
self.assertEqual(ctx_manager.exception.code, 409)
def test_create_subnet_gateway_in_allocation_pool_returns_409(self):
gateway_ip = '10.0.0.50'
cidr = '10.0.0.0/24'
allocation_pools = [{'start': '10.0.0.1',
'end': '10.0.0.100'}]
with self.assertRaises(webob.exc.HTTPClientError) as ctx_manager:
with testtools.ExpectedException(
webob.exc.HTTPClientError) as ctx_manager:
self._test_create_subnet(gateway_ip=gateway_ip,
cidr=cidr,
allocation_pools=allocation_pools)
self.assertEqual(ctx_manager.exception.code, 409)
self.assertEqual(ctx_manager.exception.code, 409)
def test_create_subnet_overlapping_allocation_pools_returns_409(self):
gateway_ip = '10.0.0.1'
@ -2767,40 +2774,44 @@ class TestSubnetsV2(QuantumDbPluginV2TestCase):
'end': '10.0.0.150'},
{'start': '10.0.0.140',
'end': '10.0.0.180'}]
with self.assertRaises(webob.exc.HTTPClientError) as ctx_manager:
with testtools.ExpectedException(
webob.exc.HTTPClientError) as ctx_manager:
self._test_create_subnet(gateway_ip=gateway_ip,
cidr=cidr,
allocation_pools=allocation_pools)
self.assertEqual(ctx_manager.exception.code, 409)
self.assertEqual(ctx_manager.exception.code, 409)
def test_create_subnet_invalid_allocation_pool_returns_400(self):
gateway_ip = '10.0.0.1'
cidr = '10.0.0.0/24'
allocation_pools = [{'start': '10.0.0.2',
'end': '10.0.0.256'}]
with self.assertRaises(webob.exc.HTTPClientError) as ctx_manager:
with testtools.ExpectedException(
webob.exc.HTTPClientError) as ctx_manager:
self._test_create_subnet(gateway_ip=gateway_ip,
cidr=cidr,
allocation_pools=allocation_pools)
self.assertEqual(ctx_manager.exception.code, 400)
self.assertEqual(ctx_manager.exception.code, 400)
def test_create_subnet_out_of_range_allocation_pool_returns_400(self):
gateway_ip = '10.0.0.1'
cidr = '10.0.0.0/24'
allocation_pools = [{'start': '10.0.0.2',
'end': '10.0.1.6'}]
with self.assertRaises(webob.exc.HTTPClientError) as ctx_manager:
with testtools.ExpectedException(
webob.exc.HTTPClientError) as ctx_manager:
self._test_create_subnet(gateway_ip=gateway_ip,
cidr=cidr,
allocation_pools=allocation_pools)
self.assertEqual(ctx_manager.exception.code, 400)
self.assertEqual(ctx_manager.exception.code, 400)
def test_create_subnet_shared_returns_400(self):
cidr = '10.0.0.0/24'
with self.assertRaises(webob.exc.HTTPClientError) as ctx_manager:
with testtools.ExpectedException(
webob.exc.HTTPClientError) as ctx_manager:
self._test_create_subnet(cidr=cidr,
shared=True)
self.assertEqual(ctx_manager.exception.code, 400)
self.assertEqual(ctx_manager.exception.code, 400)
def test_create_subnet_inconsistent_ipv6_cidrv4(self):
with self.network() as network:
@ -3367,7 +3378,7 @@ class TestSubnetsV2(QuantumDbPluginV2TestCase):
self.assertEqual(res.status_int, 204)
class DbModelTestCase(unittest2.TestCase):
class DbModelTestCase(testtools.TestCase):
""" DB model tests """
def test_repr(self):
""" testing the string representation of 'model' classes """

View File

@ -13,16 +13,16 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import unittest
import mock
import testtools
from quantum.db import dhcp_rpc_base
class TestDhcpRpcCallackMixin(unittest.TestCase):
class TestDhcpRpcCallackMixin(testtools.TestCase):
def setUp(self):
super(TestDhcpRpcCallackMixin, self).setUp()
self.plugin_p = mock.patch('quantum.manager.QuantumManager.get_plugin')
get_plugin = self.plugin_p.start()
self.plugin = mock.Mock()
@ -34,6 +34,7 @@ class TestDhcpRpcCallackMixin(unittest.TestCase):
def tearDown(self):
self.log_p.stop()
self.plugin_p.stop()
super(TestDhcpRpcCallackMixin, self).tearDown()
def test_get_active_networks(self):
plugin_retval = [dict(id='a'), dict(id='b')]

View File

@ -19,7 +19,7 @@ import socket
import mock
from oslo.config import cfg
import unittest2 as unittest
import testtools
from quantum.agent.common import config
from quantum.agent.linux import interface
@ -32,8 +32,9 @@ class MyApp(object):
self.stdout = _stdout
class TestDebugCommands(unittest.TestCase):
class TestDebugCommands(testtools.TestCase):
def setUp(self):
super(TestDebugCommands, self).setUp()
cfg.CONF.register_opts(interface.OPTS)
cfg.CONF.register_opts(QuantumDebugAgent.OPTS)
cfg.CONF(args=[], project='quantum')

View File

@ -22,7 +22,7 @@ import uuid
import eventlet
import mock
from oslo.config import cfg
import unittest2 as unittest
import testtools
from quantum.agent.common import config
from quantum.agent import dhcp_agent
@ -108,8 +108,9 @@ fake_down_network = FakeModel('12345678-dddd-dddd-1234567890ab',
ports=[])
class TestDhcpAgent(unittest.TestCase):
class TestDhcpAgent(testtools.TestCase):
def setUp(self):
super(TestDhcpAgent, self).setUp()
cfg.CONF.register_opts(dhcp_agent.DeviceManager.OPTS)
cfg.CONF.register_opts(dhcp_agent.DhcpAgent.OPTS)
cfg.CONF.register_opts(dhcp_agent.DhcpLeaseRelay.OPTS)
@ -124,6 +125,7 @@ class TestDhcpAgent(unittest.TestCase):
def tearDown(self):
self.driver_cls_p.stop()
cfg.CONF.reset()
super(TestDhcpAgent, self).tearDown()
def test_dhcp_agent_manager(self):
state_rpc_str = 'quantum.agent.rpc.PluginReportStateAPI'
@ -314,15 +316,16 @@ class TestDhcpAgent(unittest.TestCase):
dhcp.needs_resync = True
with mock.patch.object(dhcp, 'sync_state') as sync_state:
sync_state.side_effect = RuntimeError
with self.assertRaises(RuntimeError):
with testtools.ExpectedException(RuntimeError):
dhcp._periodic_resync_helper()
sync_state.assert_called_once_with()
sleep.assert_called_once_with(dhcp.conf.resync_interval)
self.assertFalse(dhcp.needs_resync)
class TestDhcpAgentEventHandler(unittest.TestCase):
class TestDhcpAgentEventHandler(testtools.TestCase):
def setUp(self):
super(TestDhcpAgentEventHandler, self).setUp()
cfg.CONF.register_opts(dhcp_agent.DeviceManager.OPTS)
cfg.CONF.register_opts(dhcp_agent.DhcpLeaseRelay.OPTS)
cfg.CONF.set_override('interface_driver',
@ -354,6 +357,7 @@ class TestDhcpAgentEventHandler(unittest.TestCase):
self.call_driver_p.stop()
self.cache_p.stop()
self.plugin_p.stop()
super(TestDhcpAgentEventHandler, self).tearDown()
def test_enable_dhcp_helper(self):
self.plugin.get_network_info.return_value = fake_network
@ -634,8 +638,9 @@ class TestDhcpAgentEventHandler(unittest.TestCase):
self.assertEqual(self.call_driver.call_count, 0)
class TestDhcpPluginApiProxy(unittest.TestCase):
class TestDhcpPluginApiProxy(testtools.TestCase):
def setUp(self):
super(TestDhcpPluginApiProxy, self).setUp()
self.proxy = dhcp_agent.DhcpPluginApi('foo', {})
self.proxy.host = 'foo'
@ -647,6 +652,7 @@ class TestDhcpPluginApiProxy(unittest.TestCase):
def tearDown(self):
self.make_msg_p.stop()
self.call_p.stop()
super(TestDhcpPluginApiProxy, self).tearDown()
def test_get_active_networks(self):
self.proxy.get_active_networks()
@ -701,7 +707,7 @@ class TestDhcpPluginApiProxy(unittest.TestCase):
host='foo')
class TestNetworkCache(unittest.TestCase):
class TestNetworkCache(testtools.TestCase):
def test_put_network(self):
nc = dhcp_agent.NetworkCache()
nc.put(fake_network)
@ -809,8 +815,9 @@ class TestNetworkCache(unittest.TestCase):
self.assertEqual(nc.get_port_by_id(fake_port1.id), fake_port1)
class TestDeviceManager(unittest.TestCase):
class TestDeviceManager(testtools.TestCase):
def setUp(self):
super(TestDeviceManager, self).setUp()
cfg.CONF.register_opts(dhcp_agent.DeviceManager.OPTS)
cfg.CONF.register_opts(dhcp_agent.DhcpAgent.OPTS)
cfg.CONF.set_override('interface_driver',
@ -840,6 +847,7 @@ class TestDeviceManager(unittest.TestCase):
self.device_exists_p.stop()
self.iproute_cls_p.stop()
cfg.CONF.reset()
super(TestDeviceManager, self).tearDown()
def _test_setup_helper(self, device_exists, reuse_existing=False,
metadata_access_network=False,
@ -886,7 +894,7 @@ class TestDeviceManager(unittest.TestCase):
self._test_setup_helper(False)
def test_setup_device_exists(self):
with self.assertRaises(exceptions.PreexistingDeviceFailure):
with testtools.ExpectedException(exceptions.PreexistingDeviceFailure):
self._test_setup_helper(True)
def test_setup_device_exists_reuse(self):
@ -1002,14 +1010,16 @@ class TestDeviceManager(unittest.TestCase):
self.assertEqual(dh.get_device_id(fake_network), expected)
class TestDhcpLeaseRelay(unittest.TestCase):
class TestDhcpLeaseRelay(testtools.TestCase):
def setUp(self):
super(TestDhcpLeaseRelay, self).setUp()
cfg.CONF.register_opts(dhcp_agent.DhcpLeaseRelay.OPTS)
self.unlink_p = mock.patch('os.unlink')
self.unlink = self.unlink_p.start()
def tearDown(self):
self.unlink_p.stop()
super(TestDhcpLeaseRelay, self).tearDown()
def test_init_relay_socket_path_no_prev_socket(self):
with mock.patch('os.path.exists') as exists:
@ -1036,7 +1046,7 @@ class TestDhcpLeaseRelay(unittest.TestCase):
self.unlink.side_effect = OSError
with mock.patch('os.path.exists') as exists:
exists.return_value = True
with self.assertRaises(OSError):
with testtools.ExpectedException(OSError):
relay = dhcp_agent.DhcpLeaseRelay(None)
self.unlink.assert_called_once_with(
@ -1125,7 +1135,7 @@ class TestDhcpLeaseRelay(unittest.TestCase):
relay._handler)])
class TestDictModel(unittest.TestCase):
class TestDictModel(testtools.TestCase):
def test_basic_dict(self):
d = dict(a=1, b=2)

View File

@ -20,7 +20,7 @@ Unit tests for extension extended attribute
"""
from oslo.config import cfg
import unittest2 as unittest
import testtools
import webob.exc as webexc
import quantum
@ -66,8 +66,9 @@ class ExtensionExtendedAttributeTestPlugin(
return self.objh[id]
class ExtensionExtendedAttributeTestCase(unittest.TestCase):
class ExtensionExtendedAttributeTestCase(testtools.TestCase):
def setUp(self):
super(ExtensionExtendedAttributeTestCase, self).setUp()
plugin = (
"quantum.tests.unit.test_extension_extended_attribute."
"ExtensionExtendedAttributeTestPlugin"
@ -92,11 +93,7 @@ class ExtensionExtendedAttributeTestCase(unittest.TestCase):
self._api = extensions.ExtensionMiddleware(app, ext_mgr=ext_mgr)
self._tenant_id = "8c70909f-b081-452d-872b-df48e6c355d1"
def tearDown(self):
# restore original resource attribute map before
self._api = None
cfg.CONF.reset()
self.addCleanup(cfg.CONF.reset)
def _do_request(self, method, path, data=None, params=None, action=None):
content_type = 'application/json'

View File

@ -157,8 +157,8 @@ class ExtraRouteDBTestCase(test_l3.L3NatDBTestCase):
{'router': {'routes': routes}})
body = self._show('routers', r['router']['id'])
self.assertItemsEqual(body['router']['routes'],
routes)
self.assertEqual(sorted(body['router']['routes']),
sorted(routes))
# clean-up
self._update('routers', r['router']['id'],
@ -189,8 +189,8 @@ class ExtraRouteDBTestCase(test_l3.L3NatDBTestCase):
routes_orig}})
body = self._show('routers', r['router']['id'])
self.assertItemsEqual(body['router']['routes'],
routes_orig)
self.assertEqual(sorted(body['router']['routes']),
sorted(routes_orig))
routes_left = [{'destination': '135.207.0.0/16',
'nexthop': '10.0.1.3'},
@ -202,8 +202,8 @@ class ExtraRouteDBTestCase(test_l3.L3NatDBTestCase):
routes_left}})
body = self._show('routers', r['router']['id'])
self.assertItemsEqual(body['router']['routes'],
routes_left)
self.assertEqual(sorted(body['router']['routes']),
sorted(routes_left))
body = self._update('routers', r['router']['id'],
{'router': {'routes': []}})

View File

@ -16,9 +16,9 @@
# under the License.
import os
import unittest
import routes
import testtools
import webob
import webtest
@ -65,7 +65,7 @@ class FakePluginWithExtension(db_base_plugin_v2.QuantumDbPluginV2):
self._log("method_to_support_foxnsox_extension", context)
class ResourceExtensionTest(unittest.TestCase):
class ResourceExtensionTest(testtools.TestCase):
class ResourceExtensionController(wsgi.Controller):
@ -308,7 +308,7 @@ class ResourceExtensionTest(unittest.TestCase):
self.assertEqual(404, response.status_int)
class ActionExtensionTest(unittest.TestCase):
class ActionExtensionTest(testtools.TestCase):
def setUp(self):
super(ActionExtensionTest, self).setUp()
@ -355,7 +355,7 @@ class ActionExtensionTest(unittest.TestCase):
self.assertEqual(404, response.status_int)
class RequestExtensionTest(unittest.TestCase):
class RequestExtensionTest(testtools.TestCase):
def test_headers_can_be_extended(self):
def extend_headers(req, res):
@ -422,7 +422,7 @@ class RequestExtensionTest(unittest.TestCase):
return _setup_extensions_test_app(manager)
class ExtensionManagerTest(unittest.TestCase):
class ExtensionManagerTest(testtools.TestCase):
def test_invalid_extensions_are_not_registered(self):
@ -442,7 +442,7 @@ class ExtensionManagerTest(unittest.TestCase):
self.assertFalse('invalid_extension' in ext_mgr.extensions)
class PluginAwareExtensionManagerTest(unittest.TestCase):
class PluginAwareExtensionManagerTest(testtools.TestCase):
def test_unsupported_extensions_are_not_loaded(self):
stub_plugin = ext_stubs.StubPlugin(supported_extensions=["e1", "e3"])

View File

@ -17,7 +17,7 @@
import mock
from mock import call
import unittest2 as unittest
import testtools
from quantum.agent.linux.iptables_firewall import IptablesFirewallDriver
from quantum.tests.unit import test_api_v2
@ -29,14 +29,17 @@ FAKE_IP = {'IPv4': '10.0.0.1',
'IPv6': 'fe80::1'}
class IptablesFirewallTestCase(unittest.TestCase):
class IptablesFirewallTestCase(testtools.TestCase):
def setUp(self):
super(IptablesFirewallTestCase, self).setUp()
self.utils_exec_p = mock.patch(
'quantum.agent.linux.utils.execute')
self.utils_exec = self.utils_exec_p.start()
self.addCleanup(self.utils_exec_p.stop)
self.iptables_cls_p = mock.patch(
'quantum.agent.linux.iptables_manager.IptablesManager')
iptables_cls = self.iptables_cls_p.start()
self.addCleanup(self.iptables_cls_p.stop)
self.iptables_inst = mock.Mock()
self.v4filter_inst = mock.Mock()
self.v6filter_inst = mock.Mock()
@ -47,10 +50,6 @@ class IptablesFirewallTestCase(unittest.TestCase):
self.firewall = IptablesFirewallDriver()
self.firewall.iptables = self.iptables_inst
def tearDown(self):
self.iptables_cls_p.stop()
self.utils_exec_p.stop()
def _fake_port(self):
return {'device': 'tapfake_dev',
'mac_address': 'ff:ff:ff:ff',

View File

@ -19,24 +19,23 @@
import inspect
import os
import unittest
import mox
import testtools
from quantum.agent.linux import iptables_manager
class IptablesManagerStateFulTestCase(unittest.TestCase):
class IptablesManagerStateFulTestCase(testtools.TestCase):
def setUp(self):
super(IptablesManagerStateFulTestCase, self).setUp()
self.mox = mox.Mox()
self.root_helper = 'sudo'
self.iptables = (iptables_manager.
IptablesManager(root_helper=self.root_helper))
self.mox.StubOutWithMock(self.iptables, "execute")
def tearDown(self):
self.mox.UnsetStubs()
self.addCleanup(self.mox.UnsetStubs)
def test_binary_name(self):
self.assertEqual(iptables_manager.binary_name,
@ -285,9 +284,10 @@ class IptablesManagerStateFulTestCase(unittest.TestCase):
self.mox.VerifyAll()
class IptablesManagerStateLessTestCase(unittest.TestCase):
class IptablesManagerStateLessTestCase(testtools.TestCase):
def setUp(self):
super(IptablesManagerStateLessTestCase, self).setUp()
self.iptables = (iptables_manager.IptablesManager(state_less=True))
def test_nat_not_found(self):

View File

@ -16,10 +16,10 @@
# under the License.
import copy
import unittest2
import mock
from oslo.config import cfg
import testtools
from quantum.agent.common import config as agent_config
from quantum.agent import l3_agent
@ -33,9 +33,10 @@ _uuid = uuidutils.generate_uuid
HOSTNAME = 'myhost'
class TestBasicRouterOperations(unittest2.TestCase):
class TestBasicRouterOperations(testtools.TestCase):
def setUp(self):
super(TestBasicRouterOperations, self).setUp()
self.conf = cfg.ConfigOpts()
self.conf.register_opts(base_config.core_opts)
self.conf.register_opts(l3_agent.L3NATAgent.OPTS)
@ -82,6 +83,7 @@ class TestBasicRouterOperations(unittest2.TestCase):
self.dvr_cls_p.stop()
self.utils_exec_p.stop()
self.external_process_p.stop()
super(TestBasicRouterOperations, self).tearDown()
def testRouterInfoCreate(self):
id = _uuid()

View File

@ -24,6 +24,7 @@ import itertools
import mock
from oslo.config import cfg
import testtools
from webob import exc
import webtest
@ -72,6 +73,7 @@ class L3NatExtensionTestCase(testlib_api.WebTestCase):
fmt = 'json'
def setUp(self):
super(L3NatExtensionTestCase, self).setUp()
plugin = 'quantum.extensions.l3.RouterPluginBase'
# Ensure 'stale' patched copies of the plugin are never returned
@ -115,6 +117,7 @@ class L3NatExtensionTestCase(testlib_api.WebTestCase):
# Restore the global RESOURCE_ATTRIBUTE_MAP
attributes.RESOURCE_ATTRIBUTE_MAP = self.saved_attr_map
super(L3NatExtensionTestCase, self).tearDown()
def test_router_create(self):
router_id = _uuid()
@ -1427,7 +1430,8 @@ class L3NatDBTestCase(L3NatTestCaseBase):
def test_create_port_external_network_non_admin_fails(self):
with self.network(router__external=True) as ext_net:
with self.subnet(network=ext_net) as ext_subnet:
with self.assertRaises(exc.HTTPClientError) as ctx_manager:
with testtools.ExpectedException(
exc.HTTPClientError) as ctx_manager:
with self.port(subnet=ext_subnet,
set_context='True',
tenant_id='noadmin'):
@ -1442,7 +1446,7 @@ class L3NatDBTestCase(L3NatTestCaseBase):
ext_net['network']['id'])
def test_create_external_network_non_admin_fails(self):
with self.assertRaises(exc.HTTPClientError) as ctx_manager:
with testtools.ExpectedException(exc.HTTPClientError) as ctx_manager:
with self.network(router__external=True,
set_context='True',
tenant_id='noadmin'):

View File

@ -19,27 +19,26 @@
import os
import mock
import unittest2 as unittest
import testtools
from quantum.agent.linux import daemon
FAKE_FD = 8
class TestPidfile(unittest.TestCase):
class TestPidfile(testtools.TestCase):
def setUp(self):
super(TestPidfile, self).setUp()
self.os_p = mock.patch.object(daemon, 'os')
self.os = self.os_p.start()
self.addCleanup(self.os_p.stop)
self.os.open.return_value = FAKE_FD
self.fcntl_p = mock.patch.object(daemon, 'fcntl')
self.fcntl = self.fcntl_p.start()
self.addCleanup(self.fcntl_p.stop)
self.fcntl.flock.return_value = 0
def tearDown(self):
self.fcntl_p.stop()
self.os_p.stop()
def test_init(self):
self.os.O_CREAT = os.O_CREAT
self.os.O_RDWR = os.O_RDWR
@ -52,7 +51,7 @@ class TestPidfile(unittest.TestCase):
self.os.open.side_effect = IOError
with mock.patch.object(daemon.sys, 'stderr') as stderr:
with self.assertRaises(SystemExit):
with testtools.ExpectedException(SystemExit):
p = daemon.Pidfile('thefile', 'python')
sys.assert_has_calls([
mock.call.stderr.write(mock.ANY),
@ -95,8 +94,9 @@ class TestPidfile(unittest.TestCase):
['cat', '/proc/34/cmdline'], 'sudo')
class TestDaemon(unittest.TestCase):
class TestDaemon(testtools.TestCase):
def setUp(self):
super(TestDaemon, self).setUp()
self.os_p = mock.patch.object(daemon, 'os')
self.os = self.os_p.start()
@ -106,6 +106,7 @@ class TestDaemon(unittest.TestCase):
def tearDown(self):
self.pidfile_p.stop()
self.os_p.stop()
super(TestDaemon, self).tearDown()
def test_init(self):
d = daemon.Daemon('pidfile')
@ -113,7 +114,7 @@ class TestDaemon(unittest.TestCase):
def test_fork_parent(self):
self.os.fork.return_value = 1
with self.assertRaises(SystemExit):
with testtools.ExpectedException(SystemExit):
d = daemon.Daemon('pidfile')
d._fork()
@ -125,7 +126,7 @@ class TestDaemon(unittest.TestCase):
def test_fork_error(self):
self.os.fork.side_effect = lambda: OSError(1)
with mock.patch.object(daemon.sys, 'stderr') as stderr:
with self.assertRaises(SystemExit):
with testtools.ExpectedException(SystemExit):
d = daemon.Daemon('pidfile', 'stdin')
d._fork()
@ -174,6 +175,6 @@ class TestDaemon(unittest.TestCase):
with mock.patch.object(daemon.sys, 'stderr') as stderr:
with mock.patch.object(d, 'daemonize') as daemonize:
with self.assertRaises(SystemExit):
with testtools.ExpectedException(SystemExit):
d.start()
self.assertFalse(daemonize.called)

View File

@ -17,10 +17,10 @@
import os
import socket
import unittest2 as unittest
import mock
from oslo.config import cfg
import testtools
from quantum.agent.common import config
from quantum.agent.linux import dhcp
@ -134,7 +134,7 @@ class FakeV4NoGatewayNetwork:
ports = [FakePort1()]
class TestDhcpBase(unittest.TestCase):
class TestDhcpBase(testtools.TestCase):
def test_base_abc_error(self):
self.assertRaises(TypeError, dhcp.DhcpBase, None)
@ -195,8 +195,9 @@ class LocalChild(dhcp.DhcpLocalProcess):
self.called.append('spawn')
class TestBase(unittest.TestCase):
class TestBase(testtools.TestCase):
def setUp(self):
super(TestBase, self).setUp()
root = os.path.dirname(os.path.dirname(__file__))
args = ['--config-file',
os.path.join(root, 'etc', 'quantum.conf.test')]
@ -213,13 +214,11 @@ class TestBase(unittest.TestCase):
self.replace_p = mock.patch('quantum.agent.linux.dhcp.replace_file')
self.execute_p = mock.patch('quantum.agent.linux.utils.execute')
self.addCleanup(self.execute_p.stop)
self.safe = self.replace_p.start()
self.addCleanup(self.replace_p.stop)
self.execute = self.execute_p.start()
def tearDown(self):
self.execute_p.stop()
self.replace_p.stop()
class TestDhcpLocalProcess(TestBase):
def test_active(self):

View File

@ -17,21 +17,20 @@
# @author: Mark McClain, DreamHost
import mock
import unittest2 as unittest
import testtools
from quantum.agent.linux import external_process as ep
class TestProcessManager(unittest.TestCase):
class TestProcessManager(testtools.TestCase):
def setUp(self):
super(TestProcessManager, self).setUp()
self.execute_p = mock.patch('quantum.agent.linux.utils.execute')
self.execute = self.execute_p.start()
self.addCleanup(self.execute_p.stop)
self.conf = mock.Mock()
self.conf.external_pids = '/var/path'
def tearDown(self):
self.execute_p.stop()
def test_enable_no_namespace(self):
callback = mock.Mock()
callback.return_value = ['the', 'cmd']

View File

@ -15,10 +15,9 @@
# License for the specific language governing permissions and limitations
# under the License.
import unittest
import mock
from oslo.config import cfg
import testtools
from quantum.agent.common import config
from quantum.agent.dhcp_agent import DeviceManager
@ -58,8 +57,9 @@ class FakePort:
network_id = network.id
class TestBase(unittest.TestCase):
class TestBase(testtools.TestCase):
def setUp(self):
super(TestBase, self).setUp()
root_helper_opt = [
cfg.StrOpt('root_helper', default='sudo'),
]
@ -68,19 +68,13 @@ class TestBase(unittest.TestCase):
config.register_root_helper(self.conf)
self.ip_dev_p = mock.patch.object(ip_lib, 'IPDevice')
self.ip_dev = self.ip_dev_p.start()
self.addCleanup(self.ip_dev_p.stop)
self.ip_p = mock.patch.object(ip_lib, 'IPWrapper')
self.ip = self.ip_p.start()
self.addCleanup(self.ip_p.stop)
self.device_exists_p = mock.patch.object(ip_lib, 'device_exists')
self.device_exists = self.device_exists_p.start()
def tearDown(self):
# sometimes a test may turn this off
try:
self.device_exists_p.stop()
except RuntimeError, e:
pass
self.ip_dev_p.stop()
self.ip_p.stop()
self.addCleanup(self.device_exists_p.stop)
class TestABCDriver(TestBase):
@ -357,6 +351,7 @@ class TestMetaInterfaceDriver(TestBase):
self.conf.register_opts(DeviceManager.OPTS)
self.client_cls_p = mock.patch('quantumclient.v2_0.client.Client')
client_cls = self.client_cls_p.start()
self.addCleanup(self.client_cls_p.stop)
self.client_inst = mock.Mock()
client_cls.return_value = self.client_inst
@ -378,10 +373,6 @@ class TestMetaInterfaceDriver(TestBase):
'fake1:quantum.agent.linux.interface.OVSInterfaceDriver,'
'fake2:quantum.agent.linux.interface.BridgeInterfaceDriver')
def tearDown(self):
self.client_cls_p.stop()
super(TestMetaInterfaceDriver, self).tearDown()
def test_get_driver_by_network_id(self):
meta_interface = interface.MetaInterfaceDriver(self.conf)
driver = meta_interface._get_driver_by_network_id('test')

View File

@ -16,7 +16,7 @@
# under the License.
import mock
import unittest2 as unittest
import testtools
from quantum.agent.linux import ip_lib
from quantum.common import exceptions
@ -97,13 +97,12 @@ SUBNET_SAMPLE2 = ("10.0.0.0/24 dev tap1d7888a7-10 scope link src 10.0.0.2\n"
"10.0.0.0/24 dev qr-23380d11-d2 scope link src 10.0.0.1")
class TestSubProcessBase(unittest.TestCase):
class TestSubProcessBase(testtools.TestCase):
def setUp(self):
super(TestSubProcessBase, self).setUp()
self.execute_p = mock.patch('quantum.agent.linux.utils.execute')
self.execute = self.execute_p.start()
def tearDown(self):
self.execute_p.stop()
self.addCleanup(self.execute_p.stop)
def test_execute_wrapper(self):
ip_lib.SubProcessBase._execute('o', 'link', ('list',), 'sudo')
@ -150,13 +149,12 @@ class TestSubProcessBase(unittest.TestCase):
[], 'link', ('list',))
class TestIpWrapper(unittest.TestCase):
class TestIpWrapper(testtools.TestCase):
def setUp(self):
super(TestIpWrapper, self).setUp()
self.execute_p = mock.patch.object(ip_lib.IPWrapper, '_execute')
self.execute = self.execute_p.start()
def tearDown(self):
self.execute_p.stop()
self.addCleanup(self.execute_p.stop)
def test_get_devices(self):
self.execute.return_value = '\n'.join(LINK_SAMPLE)
@ -307,7 +305,7 @@ class TestIpWrapper(unittest.TestCase):
self.assertEqual(dev.mock_calls, [])
class TestIPDevice(unittest.TestCase):
class TestIPDevice(testtools.TestCase):
def test_eq_same_name(self):
dev1 = ip_lib.IPDevice('tap0')
dev2 = ip_lib.IPDevice('tap0')
@ -336,8 +334,9 @@ class TestIPDevice(unittest.TestCase):
self.assertEqual(str(ip_lib.IPDevice('tap0')), 'tap0')
class TestIPCommandBase(unittest.TestCase):
class TestIPCommandBase(testtools.TestCase):
def setUp(self):
super(TestIPCommandBase, self).setUp()
self.ip = mock.Mock()
self.ip.root_helper = 'sudo'
self.ip.namespace = 'namespace'
@ -363,8 +362,9 @@ class TestIPCommandBase(unittest.TestCase):
[mock.call._as_root('o', 'foo', ('link', ), False)])
class TestIPDeviceCommandBase(unittest.TestCase):
class TestIPDeviceCommandBase(testtools.TestCase):
def setUp(self):
super(TestIPDeviceCommandBase, self).setUp()
self.ip_dev = mock.Mock()
self.ip_dev.name = 'eth0'
self.ip_dev.root_helper = 'sudo'
@ -376,8 +376,9 @@ class TestIPDeviceCommandBase(unittest.TestCase):
self.assertEqual(self.ip_cmd.name, 'eth0')
class TestIPCmdBase(unittest.TestCase):
class TestIPCmdBase(testtools.TestCase):
def setUp(self):
super(TestIPCmdBase, self).setUp()
self.parent = mock.Mock()
self.parent.name = 'eth0'
self.parent.root_helper = 'sudo'
@ -653,7 +654,7 @@ class TestIpNetnsCommand(TestIPCmdBase):
root_helper='sudo', check_exit_code=True)
class TestDeviceExists(unittest.TestCase):
class TestDeviceExists(testtools.TestCase):
def test_device_exists(self):
with mock.patch.object(ip_lib.IPDevice, '_execute') as _execute:
_execute.return_value = LINK_SAMPLE[1]

View File

@ -18,7 +18,7 @@ import copy
import mock
from oslo.config import cfg
import unittest2
import testtools
from webob import exc
import webtest
@ -54,7 +54,7 @@ class LoadBalancerExtensionTestCase(testlib_api.WebTestCase):
fmt = 'json'
def setUp(self):
super(LoadBalancerExtensionTestCase, self).setUp()
plugin = 'quantum.extensions.loadbalancer.LoadBalancerPluginBase'
# Ensure 'stale' patched copies of the plugin are never returned
manager.QuantumManager._instance = None
@ -85,6 +85,7 @@ class LoadBalancerExtensionTestCase(testlib_api.WebTestCase):
self.api = None
self.plugin = None
cfg.CONF.reset()
super(LoadBalancerExtensionTestCase, self).tearDown()
def test_vip_create(self):
vip_id = _uuid()

View File

@ -19,7 +19,7 @@
import socket
import mock
import unittest2 as unittest
import testtools
import webob
from quantum.agent.metadata import agent
@ -37,20 +37,19 @@ class FakeConf(object):
metadata_proxy_shared_secret = 'secret'
class TestMetadataProxyHandler(unittest.TestCase):
class TestMetadataProxyHandler(testtools.TestCase):
def setUp(self):
super(TestMetadataProxyHandler, self).setUp()
self.qclient_p = mock.patch('quantumclient.v2_0.client.Client')
self.qclient = self.qclient_p.start()
self.addCleanup(self.qclient_p.stop)
self.log_p = mock.patch.object(agent, 'LOG')
self.log = self.log_p.start()
self.addCleanup(self.log_p.stop)
self.handler = agent.MetadataProxyHandler(FakeConf)
def tearDown(self):
self.log_p.stop()
self.qclient_p.stop()
def test_call(self):
req = mock.Mock()
with mock.patch.object(self.handler, '_get_instance_id') as get_id:
@ -216,7 +215,7 @@ class TestMetadataProxyHandler(unittest.TestCase):
webob.exc.HTTPInternalServerError)
def test_proxy_request_other_code(self):
with self.assertRaises(Exception) as e:
with testtools.ExpectedException(Exception) as e:
self._proxy_request_test_helper(302)
def test_sign_instance_id(self):
@ -226,7 +225,7 @@ class TestMetadataProxyHandler(unittest.TestCase):
)
class TestUnixDomainHttpProtocol(unittest.TestCase):
class TestUnixDomainHttpProtocol(testtools.TestCase):
def test_init_empty_client(self):
u = agent.UnixDomainHttpProtocol(mock.Mock(), '', mock.Mock())
self.assertEqual(u.client_address, ('<local>', 0))
@ -236,15 +235,14 @@ class TestUnixDomainHttpProtocol(unittest.TestCase):
self.assertEqual(u.client_address, 'foo')
class TestUnixDomainWSGIServer(unittest.TestCase):
class TestUnixDomainWSGIServer(testtools.TestCase):
def setUp(self):
super(TestUnixDomainWSGIServer, self).setUp()
self.eventlet_p = mock.patch.object(agent, 'eventlet')
self.eventlet = self.eventlet_p.start()
self.addCleanup(self.eventlet_p.stop)
self.server = agent.UnixDomainWSGIServer('test')
def tearDown(self):
self.eventlet_p.stop()
def test_start(self):
mock_app = mock.Mock()
with mock.patch.object(self.server, 'pool') as pool:
@ -276,15 +274,14 @@ class TestUnixDomainWSGIServer(unittest.TestCase):
self.assertTrue(len(logging.mock_calls))
class TestUnixDomainMetadataProxy(unittest.TestCase):
class TestUnixDomainMetadataProxy(testtools.TestCase):
def setUp(self):
super(TestUnixDomainMetadataProxy, self).setUp()
self.cfg_p = mock.patch.object(agent, 'cfg')
self.cfg = self.cfg_p.start()
self.addCleanup(self.cfg_p.stop)
self.cfg.CONF.metadata_proxy_socket = '/the/path'
def tearDown(self):
self.cfg_p.stop()
def test_init_doesnot_exists(self):
with mock.patch('os.path.isdir') as isdir:
with mock.patch('os.makedirs') as makedirs:
@ -325,7 +322,7 @@ class TestUnixDomainMetadataProxy(unittest.TestCase):
exists.return_value = True
unlink.side_effect = OSError
with self.assertRaises(OSError):
with testtools.ExpectedException(OSError):
p = agent.UnixDomainMetadataProxy(mock.Mock())
isdir.assert_called_once_with('/the')

View File

@ -19,7 +19,7 @@
import socket
import mock
import unittest2 as unittest
import testtools
import webob
from quantum.agent.metadata import namespace_proxy as ns_proxy
@ -37,7 +37,7 @@ class FakeConf(object):
metadata_proxy_shared_secret = 'secret'
class TestUnixDomainHttpConnection(unittest.TestCase):
class TestUnixDomainHttpConnection(testtools.TestCase):
def test_connect(self):
with mock.patch.object(ns_proxy, 'cfg') as cfg:
cfg.CONF.metadata_proxy_socket = '/the/path'
@ -55,16 +55,15 @@ class TestUnixDomainHttpConnection(unittest.TestCase):
self.assertEqual(conn.timeout, 3)
class TestNetworkMetadataProxyHandler(unittest.TestCase):
class TestNetworkMetadataProxyHandler(testtools.TestCase):
def setUp(self):
super(TestNetworkMetadataProxyHandler, self).setUp()
self.log_p = mock.patch.object(ns_proxy, 'LOG')
self.log = self.log_p.start()
self.addCleanup(self.log_p.stop)
self.handler = ns_proxy.NetworkMetadataProxyHandler('router_id')
def tearDown(self):
self.log_p.stop()
def test_call(self):
req = mock.Mock(headers={})
with mock.patch.object(self.handler, '_proxy_request') as proxy_req:
@ -77,7 +76,7 @@ class TestNetworkMetadataProxyHandler(unittest.TestCase):
req.query_string)
def test_no_argument_passed_to_init(self):
with self.assertRaises(ValueError):
with testtools.ExpectedException(ValueError):
ns_proxy.NetworkMetadataProxyHandler()
def test_call_internal_server_error(self):
@ -192,7 +191,7 @@ class TestNetworkMetadataProxyHandler(unittest.TestCase):
with mock.patch('httplib2.Http') as mock_http:
mock_http.return_value.request.return_value = (resp, '')
with self.assertRaises(Exception):
with testtools.ExpectedException(Exception):
self.handler._proxy_request('192.168.1.1',
'/latest/meta-data',
'')
@ -215,7 +214,7 @@ class TestNetworkMetadataProxyHandler(unittest.TestCase):
with mock.patch('httplib2.Http') as mock_http:
mock_http.return_value.request.side_effect = Exception
with self.assertRaises(Exception):
with testtools.ExpectedException(Exception):
self.handler._proxy_request('192.168.1.1',
'/latest/meta-data',
'')
@ -232,7 +231,7 @@ class TestNetworkMetadataProxyHandler(unittest.TestCase):
)
class TestProxyDaemon(unittest.TestCase):
class TestProxyDaemon(testtools.TestCase):
def test_init(self):
with mock.patch('quantum.agent.linux.daemon.Pidfile') as pf:
pd = ns_proxy.ProxyDaemon('pidfile', 9697, 'net_id', 'router_id')

View File

@ -20,10 +20,10 @@ import os.path
import shutil
import StringIO
import tempfile
import unittest2 as unittest
import urllib2
import mock
import testtools
import quantum
from quantum.common import exceptions
@ -33,17 +33,14 @@ from quantum.openstack.common import policy as common_policy
from quantum import policy
class PolicyFileTestCase(unittest.TestCase):
class PolicyFileTestCase(testtools.TestCase):
def setUp(self):
super(PolicyFileTestCase, self).setUp()
policy.reset()
self.addCleanup(policy.reset)
self.context = context.Context('fake', 'fake')
self.target = {}
def tearDown(self):
super(PolicyFileTestCase, self).tearDown()
policy.reset()
@contextlib.contextmanager
def _tempdir(self, **kwargs):
tmpdir = tempfile.mkdtemp(**kwargs)
@ -81,10 +78,11 @@ class PolicyFileTestCase(unittest.TestCase):
self.target)
class PolicyTestCase(unittest.TestCase):
class PolicyTestCase(testtools.TestCase):
def setUp(self):
super(PolicyTestCase, self).setUp()
policy.reset()
self.addCleanup(policy.reset)
# NOTE(vish): preload rules to circumvent reloading from file
policy.init()
rules = {
@ -105,10 +103,6 @@ class PolicyTestCase(unittest.TestCase):
self.context = context.Context('fake', 'fake', roles=['member'])
self.target = {}
def tearDown(self):
policy.reset()
super(PolicyTestCase, self).tearDown()
def test_enforce_nonexistent_action_throws(self):
action = "example:noexist"
self.assertRaises(exceptions.PolicyNotAuthorized, policy.enforce,
@ -178,12 +172,13 @@ class PolicyTestCase(unittest.TestCase):
policy.enforce(admin_context, uppercase_action, self.target)
class DefaultPolicyTestCase(unittest.TestCase):
class DefaultPolicyTestCase(testtools.TestCase):
def setUp(self):
super(DefaultPolicyTestCase, self).setUp()
policy.reset()
policy.init()
self.addCleanup(policy.reset)
self.rules = {
"default": '',
@ -200,10 +195,6 @@ class DefaultPolicyTestCase(unittest.TestCase):
for k, v in self.rules.items()), default_rule)
common_policy.set_rules(rules)
def tearDown(self):
super(DefaultPolicyTestCase, self).tearDown()
policy.reset()
def test_policy_called(self):
self.assertRaises(exceptions.PolicyNotAuthorized, policy.enforce,
self.context, "example:exist", {})
@ -217,12 +208,13 @@ class DefaultPolicyTestCase(unittest.TestCase):
self.context, "example:noexist", {})
class QuantumPolicyTestCase(unittest.TestCase):
class QuantumPolicyTestCase(testtools.TestCase):
def setUp(self):
super(QuantumPolicyTestCase, self).setUp()
policy.reset()
policy.init()
self.addCleanup(policy.reset)
self.rules = dict((k, common_policy.parse_rule(v)) for k, v in {
"admin_or_network_owner": "role:admin or "
"tenant_id:%(network_tenant_id)s",
@ -251,15 +243,12 @@ class QuantumPolicyTestCase(unittest.TestCase):
'init',
new=fakepolicyinit)
self.patcher.start()
self.addCleanup(self.patcher.stop)
self.context = context.Context('fake', 'fake', roles=['user'])
plugin_klass = importutils.import_class(
"quantum.db.db_base_plugin_v2.QuantumDbPluginV2")
self.plugin = plugin_klass()
def tearDown(self):
self.patcher.stop()
policy.reset()
def _test_action_on_attr(self, context, action, attr, value,
exception=None):
action = "%s_network" % action

View File

@ -15,22 +15,20 @@
# License for the specific language governing permissions and limitations
# under the License.
import unittest2 as unittest
import mock
import testtools
from quantum import context
class TestQuantumContext(unittest.TestCase):
class TestQuantumContext(testtools.TestCase):
def setUp(self):
super(TestQuantumContext, self).setUp()
db_api = 'quantum.db.api.get_session'
self._db_api_session_patcher = mock.patch(db_api)
self.db_api_session = self._db_api_session_patcher.start()
def tearDown(self):
self._db_api_session_patcher.stop()
self.addCleanup(self._db_api_session_patcher.stop)
def testQuantumContextCreate(self):
cxt = context.Context('user_id', 'tenant_id')

View File

@ -17,7 +17,9 @@
import os
import types
import unittest2
import fixtures
import testtools
from oslo.config import cfg
@ -39,18 +41,16 @@ def etcdir(*p):
return os.path.join(ETCDIR, *p)
class QuantumManagerTestCase(unittest2.TestCase):
class QuantumManagerTestCase(testtools.TestCase):
def setUp(self):
super(QuantumManagerTestCase, self).setUp()
args = ['--config-file', etcdir('quantum.conf.test')]
# If test_config specifies some config-file, use it, as well
config.parse(args=args)
def tearDown(self):
unittest2.TestCase.tearDown(self)
cfg.CONF.reset()
QuantumManager._instance = None
self.addCleanup(cfg.CONF.reset)
self.useFixture(
fixtures.MonkeyPatch('quantum.manager.QuantumManager._instance'))
def test_service_plugin_is_loaded(self):
cfg.CONF.set_override("core_plugin",
@ -59,7 +59,6 @@ class QuantumManagerTestCase(unittest2.TestCase):
cfg.CONF.set_override("service_plugins",
["quantum.tests.unit.dummy_plugin."
"DummyServicePlugin"])
QuantumManager._instance = None
mgr = QuantumManager.get_instance()
plugin = mgr.get_service_plugins()[constants.DUMMY]
@ -74,7 +73,6 @@ class QuantumManagerTestCase(unittest2.TestCase):
"QuantumDummyPlugin",
"quantum.tests.unit.dummy_plugin."
"QuantumDummyPlugin"])
QuantumManager._instance = None
try:
QuantumManager.get_instance().get_service_plugins()

View File

@ -1,8 +1,7 @@
import unittest2 as unittest
import webtest
import mock
from oslo.config import cfg
import testtools
import webtest
from quantum.api import extensions
from quantum.api.v2 import attributes
@ -27,6 +26,7 @@ class QuotaExtensionTestCase(testlib_api.WebTestCase):
fmt = 'json'
def setUp(self):
super(QuotaExtensionTestCase, self).setUp()
db._ENGINE = None
db._MAKER = None
# Ensure 'stale' patched copies of the plugin are never returned
@ -79,6 +79,7 @@ class QuotaExtensionTestCase(testlib_api.WebTestCase):
# Restore the global RESOURCE_ATTRIBUTE_MAP
attributes.RESOURCE_ATTRIBUTE_MAP = self.saved_attr_map
super(QuotaExtensionTestCase, self).tearDown()
def test_quotas_loaded_right(self):
res = self.api.get(_get_path('quotas', fmt=self.fmt))
@ -182,14 +183,14 @@ class QuotaExtensionTestCase(testlib_api.WebTestCase):
fmt=self.fmt),
self.serialize(quotas), extra_environ=env)
self.assertEqual(200, res.status_int)
with self.assertRaises(exceptions.OverQuota):
with testtools.ExpectedException(exceptions.OverQuota):
quota.QUOTAS.limit_check(context.Context('', tenant_id),
tenant_id,
network=6)
def test_quotas_limit_check_with_invalid_quota_value(self):
tenant_id = 'tenant_id1'
with self.assertRaises(exceptions.InvalidQuotaValue):
with testtools.ExpectedException(exceptions.InvalidQuotaValue):
quota.QUOTAS.limit_check(context.Context('', tenant_id),
tenant_id,
network=-1)

View File

@ -17,14 +17,14 @@
import os
import mock
import unittest2 as unittest
import testtools
from quantum.common import utils
from quantum.rootwrap import filters
from quantum.rootwrap import wrapper
class RootwrapTestCase(unittest.TestCase):
class RootwrapTestCase(testtools.TestCase):
def setUp(self):
super(RootwrapTestCase, self).setUp()

View File

@ -14,7 +14,7 @@
# License for the specific language governing permissions and limitations
# under the License.
import unittest2 as unittest
import testtools
import webob.exc as webexc
import quantum
@ -151,8 +151,9 @@ class RouterServiceInsertionTestPlugin(
pass
class RouterServiceInsertionTestCase(unittest.TestCase):
class RouterServiceInsertionTestCase(testtools.TestCase):
def setUp(self):
super(RouterServiceInsertionTestCase, self).setUp()
plugin = (
"quantum.tests.unit.test_routerserviceinsertion."
"RouterServiceInsertionTestPlugin"
@ -166,6 +167,7 @@ class RouterServiceInsertionTestCase(unittest.TestCase):
cfg.CONF.set_override('core_plugin', plugin)
cfg.CONF.set_override('service_plugins', [plugin])
cfg.CONF.set_override('quota_router', -1, group='QUOTAS')
self.addCleanup(cfg.CONF.reset)
# Ensure 'stale' patched copies of the plugin are never returned
quantum.manager.QuantumManager._instance = None
@ -186,10 +188,6 @@ class RouterServiceInsertionTestCase(unittest.TestCase):
res = self._do_request('GET', _get_path('service-types'))
self._service_type_id = res['service_types'][0]['id']
def tearDown(self):
self._api = None
cfg.CONF.reset()
def _do_request(self, method, path, data=None, params=None, action=None):
content_type = 'application/json'
body = None
@ -245,10 +243,11 @@ class RouterServiceInsertionTestCase(unittest.TestCase):
}
if update_service_type_id:
data["router"]["service_type_id"] = _uuid()
with self.assertRaises(webexc.HTTPClientError) as ctx_manager:
with testtools.ExpectedException(
webexc.HTTPClientError) as ctx_manager:
res = self._do_request(
'PUT', _get_path('routers/{0}'.format(router_id)), data)
self.assertEqual(ctx_manager.exception.code, 400)
self.assertEqual(ctx_manager.exception.code, 400)
else:
res = self._do_request(
'PUT', _get_path('routers/{0}'.format(router_id)), data)
@ -375,11 +374,12 @@ class RouterServiceInsertionTestCase(unittest.TestCase):
data = {res: uattrs}
if update_router_id:
uattrs['router_id'] = self._router_id
with self.assertRaises(webexc.HTTPClientError) as ctx_manager:
with testtools.ExpectedException(
webexc.HTTPClientError) as ctx_manager:
newobj = self._do_request(
'PUT',
_get_path('lb/{0}s/{1}'.format(res, obj['id'])), data)
self.assertEqual(ctx_manager.exception.code, 400)
self.assertEqual(ctx_manager.exception.code, 400)
else:
newobj = self._do_request(
'PUT',

View File

@ -16,12 +16,12 @@
# under the License.
from contextlib import nested
import mock
from mock import call
import unittest2 as unittest
import mox
from oslo.config import cfg
import testtools
from quantum.agent import firewall as firewall_base
from quantum.agent.linux import iptables_manager
@ -370,8 +370,9 @@ class SGServerRpcCallBackMixinTestCaseXML(SGServerRpcCallBackMixinTestCase):
fmt = 'xml'
class SGAgentRpcCallBackMixinTestCase(unittest.TestCase):
class SGAgentRpcCallBackMixinTestCase(testtools.TestCase):
def setUp(self):
super(SGAgentRpcCallBackMixinTestCase, self).setUp()
self.rpc = sg_rpc.SecurityGroupAgentRpcCallbackMixin()
self.rpc.sg_agent = mock.Mock()
@ -393,8 +394,9 @@ class SGAgentRpcCallBackMixinTestCase(unittest.TestCase):
[call.security_groups_provider_updated()])
class SecurityGroupAgentRpcTestCase(unittest.TestCase):
class SecurityGroupAgentRpcTestCase(testtools.TestCase):
def setUp(self):
super(SecurityGroupAgentRpcTestCase, self).setUp()
self.agent = sg_rpc.SecurityGroupAgentRpcMixin()
self.agent.context = None
self.addCleanup(mock.patch.stopall)
@ -477,8 +479,9 @@ class FakeSGRpcApi(agent_rpc.PluginApi,
pass
class SecurityGroupServerRpcApiTestCase(unittest.TestCase):
class SecurityGroupServerRpcApiTestCase(testtools.TestCase):
def setUp(self):
super(SecurityGroupServerRpcApiTestCase, self).setUp()
self.rpc = FakeSGRpcApi('fake_topic')
self.rpc.call = mock.Mock()
@ -499,8 +502,9 @@ class FakeSGNotifierAPI(proxy.RpcProxy,
pass
class SecurityGroupAgentRpcApiTestCase(unittest.TestCase):
class SecurityGroupAgentRpcApiTestCase(testtools.TestCase):
def setUp(self):
super(SecurityGroupAgentRpcApiTestCase, self).setUp()
self.notifier = FakeSGNotifierAPI(topic='fake',
default_version='1.0')
self.notifier.fanout_cast = mock.Mock()
@ -941,12 +945,13 @@ IPTABLES_FILTER_V6_EMPTY = """:%(bn)s-(%(chains)s) - [0:0]
FIREWALL_BASE_PACKAGE = 'quantum.agent.linux.iptables_firewall.'
class TestSecurityGroupAgentWithIptables(unittest.TestCase):
class TestSecurityGroupAgentWithIptables(testtools.TestCase):
FIREWALL_DRIVER = FIREWALL_BASE_PACKAGE + 'IptablesFirewallDriver'
PHYSDEV_INGRESS = 'physdev-out'
PHYSDEV_EGRESS = 'physdev-in'
def setUp(self):
super(TestSecurityGroupAgentWithIptables, self).setUp()
self.mox = mox.Mox()
agent_opts = [
cfg.StrOpt('root_helper', default='sudo'),

View File

@ -19,10 +19,10 @@
import contextlib
import logging
import unittest2 as unittest
import mock
from oslo.config import cfg
import testtools
import webob.exc as webexc
import webtest
@ -73,6 +73,7 @@ class ServiceTypeTestCaseBase(testlib_api.WebTestCase):
cfg.CONF.set_override('service_plugins',
["%s.%s" % (dp.__name__,
dp.DummyServicePlugin.__name__)])
self.addCleanup(cfg.CONF.reset)
# Make sure at each test a new instance of the plugin is returned
manager.QuantumManager._instance = None
# Ensure existing ExtensionManager is not used
@ -83,10 +84,6 @@ class ServiceTypeTestCaseBase(testlib_api.WebTestCase):
self.resource_name = servicetype.RESOURCE_NAME.replace('-', '_')
super(ServiceTypeTestCaseBase, self).setUp()
def tearDown(self):
self.api = None
cfg.CONF.reset()
class ServiceTypeExtensionTestCase(ServiceTypeTestCaseBase):
@ -95,14 +92,11 @@ class ServiceTypeExtensionTestCase(ServiceTypeTestCaseBase):
"%s.%s" % (servicetype_db.__name__,
servicetype_db.ServiceTypeManager.__name__),
autospec=True)
self.addCleanup(self._patcher.stop)
self.mock_mgr = self._patcher.start()
self.mock_mgr.get_instance.return_value = self.mock_mgr.return_value
super(ServiceTypeExtensionTestCase, self).setUp()
def tearDown(self):
self._patcher.stop()
super(ServiceTypeExtensionTestCase, self).tearDown()
def _test_service_type_create(self, env=None,
expected_status=webexc.HTTPCreated.code):
tenant_id = 'fake'
@ -256,12 +250,9 @@ class ServiceTypeManagerTestCase(ServiceTypeTestCaseBase):
plugin_name = "%s.%s" % (dp.__name__, dp.DummyServicePlugin.__name__)
cfg.CONF.set_override('service_definition', ['dummy:%s' % plugin_name],
group='DEFAULT_SERVICETYPE')
self.addCleanup(db_api.clear_db)
super(ServiceTypeManagerTestCase, self).setUp()
def tearDown(self):
super(ServiceTypeManagerTestCase, self).tearDown()
db_api.clear_db()
@contextlib.contextmanager
def service_type(self, name='svc_type',
default=True,

View File

@ -1,57 +0,0 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 OpenStack LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import os
from tempfile import mkstemp
import unittest
from quantum.openstack.common.setup import canonicalize_emails
from quantum.openstack.common.setup import parse_mailmap
class SetupTest(unittest.TestCase):
def setUp(self):
(fd, self.mailmap) = mkstemp(prefix='openstack', suffix='.mailmap')
def test_str_dict_replace(self):
string = 'Johnnie T. Hozer'
mapping = {'T.': 'The'}
self.assertEqual('Johnnie The Hozer',
canonicalize_emails(string, mapping))
def test_mailmap_with_fullname(self):
with open(self.mailmap, 'w') as mm_fh:
mm_fh.write("Foo Bar <email@foo.com> Foo Bar <email@bar.com>\n")
self.assertEqual({'<email@bar.com>': '<email@foo.com>'},
parse_mailmap(self.mailmap))
def test_mailmap_with_firstname(self):
with open(self.mailmap, 'w') as mm_fh:
mm_fh.write("Foo <email@foo.com> Foo <email@bar.com>\n")
self.assertEqual({'<email@bar.com>': '<email@foo.com>'},
parse_mailmap(self.mailmap))
def test_mailmap_with_noname(self):
with open(self.mailmap, 'w') as mm_fh:
mm_fh.write("<email@foo.com> <email@bar.com>\n")
self.assertEqual({'<email@bar.com>': '<email@foo.com>'},
parse_mailmap(self.mailmap))
def tearDown(self):
if os.path.exists(self.mailmap):
os.remove(self.mailmap)

View File

@ -18,7 +18,7 @@
import socket
import mock
import unittest2 as unittest
import testtools
from quantum.api.v2 import attributes
from quantum.common import constants
@ -26,7 +26,7 @@ from quantum.common import exceptions as exception
from quantum import wsgi
class TestWSGIServer(unittest.TestCase):
class TestWSGIServer(testtools.TestCase):
"""WSGI server tests."""
def test_start_random_port(self):
@ -82,7 +82,7 @@ class TestWSGIServer(unittest.TestCase):
])
class SerializerTest(unittest.TestCase):
class SerializerTest(testtools.TestCase):
def test_serialize_unknown_content_type(self):
"""
Test serialize verifies that exception InvalidContentType is raised
@ -108,7 +108,7 @@ class SerializerTest(unittest.TestCase):
serializer.get_deserialize_handler, content_type)
class RequestDeserializerTest(unittest.TestCase):
class RequestDeserializerTest(testtools.TestCase):
def test_get_body_deserializer_unknown_content_type(self):
"""
Test get body deserializer verifies
@ -121,8 +121,10 @@ class RequestDeserializerTest(unittest.TestCase):
deserializer.get_body_deserializer, content_type)
class ResponseSerializerTest(unittest.TestCase):
class ResponseSerializerTest(testtools.TestCase):
def setUp(self):
super(ResponseSerializerTest, self).setUp()
class JSONSerializer(object):
def serialize(self, data, action='default'):
return 'pew_json'
@ -162,7 +164,7 @@ class ResponseSerializerTest(unittest.TestCase):
self.serializer.get_body_serializer, 'application/unknown')
class XMLDeserializerTest(unittest.TestCase):
class XMLDeserializerTest(testtools.TestCase):
def test_default_raise_Maiformed_Exception(self):
"""
Test verifies that exception MalformedRequestBody is raised
@ -174,7 +176,7 @@ class XMLDeserializerTest(unittest.TestCase):
exception.MalformedRequestBody, deserializer.default, data_string)
class JSONDeserializerTest(unittest.TestCase):
class JSONDeserializerTest(testtools.TestCase):
def test_default_raise_Maiformed_Exception(self):
"""
Test verifies JsonDeserializer.default
@ -187,7 +189,7 @@ class JSONDeserializerTest(unittest.TestCase):
exception.MalformedRequestBody, deserializer.default, data_string)
class ResourceTest(unittest.TestCase):
class ResourceTest(testtools.TestCase):
def test_dispatch_unknown_controller_action(self):
class Controller(object):
def index(self, request, pants=None):
@ -260,7 +262,7 @@ class ResourceTest(unittest.TestCase):
self.assertEqual(400, result.status_int)
class XMLDictSerializerTest(unittest.TestCase):
class XMLDictSerializerTest(testtools.TestCase):
def test_xml(self):
NETWORK = {'network': {'test': None,
'tenant_id': 'test-tenant',

View File

@ -13,7 +13,7 @@
# License for the specific language governing permissions and limitations
# under the License.
import unittest2 as unittest
import testtools
from quantum.api.v2 import attributes
from quantum import wsgi
@ -35,10 +35,11 @@ def create_request(path, body, content_type, method='GET',
return req
class WebTestCase(unittest.TestCase):
class WebTestCase(testtools.TestCase):
fmt = 'json'
def setUp(self):
super(WebTestCase, self).setUp()
json_deserializer = wsgi.JSONDeserializer()
xml_deserializer = wsgi.XMLDeserializer(
attributes.get_attr_metadata())
@ -46,7 +47,6 @@ class WebTestCase(unittest.TestCase):
'application/json': json_deserializer,
'application/xml': xml_deserializer,
}
super(WebTestCase, self).setUp()
def deserialize(self, response):
ctype = 'application/%s' % self.fmt

View File

@ -43,7 +43,6 @@ To run a single functional test module::
import gettext
import os
import unittest
import sys
from quantum.common.test_lib import run_tests

View File

@ -9,7 +9,6 @@ source-dir = doc/source
# tissue http://pypi.python.org/pypi/tissue (pep8 checker)
# openstack-nose https://github.com/jkoelker/openstack-nose
verbosity=2
detailed-errors=1
cover-package = quantum
cover-html = true
cover-erase = true

View File

@ -2,6 +2,7 @@ Babel>=0.9.6
cliff
coverage
distribute>=0.6.24
fixtures>=0.3.12
mock>=1.0b1
mox==0.5.3
nose
@ -10,7 +11,7 @@ nosexcover
openstack.nose_plugin
pep8
sphinx>=1.1.2
unittest2
testtools>=0.9.27
webtest==1.3.3
# Packages for the Cisco Plugin
###############################