rename neutron/tests to tacker/tests

Change-Id: I8a06ad8acde84b6635b87a9679924443550feaa8
This commit is contained in:
Isaku Yamahata 2014-06-26 19:18:51 +09:00
parent 9b8bc393e9
commit 6125f5fcab
49 changed files with 240 additions and 407 deletions

View File

@ -1,8 +0,0 @@
[pipeline:extensions_app_with_filter]
pipeline = extensions extensions_test_app
[filter:extensions]
paste.filter_factory = neutron.common.extensions:plugin_aware_extension_middleware_factory
[app:extensions_test_app]
paste.app_factory = neutron.tests.unit.test_extensions:app_factory

View File

@ -32,16 +32,15 @@ from oslo.config import cfg
from oslo.messaging import conffixture as messaging_conffixture
import testtools
from neutron.common import config
from neutron.common import rpc as n_rpc
from neutron.db import agentschedulers_db
from neutron import manager
from neutron.tests import fake_notifier
from neutron.tests import post_mortem_debug
from tacker.common import config
from tacker.common import rpc as n_rpc
from tacker import manager
from tacker.tests import fake_notifier
from tacker.tests import post_mortem_debug
CONF = cfg.CONF
CONF.import_opt('state_path', 'neutron.common.config')
CONF.import_opt('state_path', 'tacker.common.config')
TRUE_STRING = ['True', '1']
LOG_FORMAT = "%(asctime)s %(levelname)8s [%(name)s] %(message)s"
@ -65,13 +64,10 @@ class BaseTestCase(testtools.TestCase):
def cleanup_core_plugin(self):
"""Ensure that the core plugin is deallocated."""
nm = manager.NeutronManager
nm = manager.TackerManager
if not nm.has_instance():
return
#TODO(marun) Fix plugins that do not properly initialize notifiers
agentschedulers_db.AgentSchedulerDbMixin.agent_notifiers = {}
# Perform a check for deallocation only if explicitly
# configured to do so since calling gc.collect() after every
# test increases test suite execution time by ~50%.
@ -102,9 +98,9 @@ class BaseTestCase(testtools.TestCase):
@staticmethod
def config_parse(conf=None, args=None):
"""Create the default configurations."""
# neutron.conf.test includes rpc_backend which needs to be cleaned up
# tacker.conf.test includes rpc_backend which needs to be cleaned up
if args is None:
args = ['--config-file', etcdir('neutron.conf.test')]
args = ['--config-file', etcdir('tacker.conf.test')]
if conf is None:
config.init(args=args)
else:
@ -138,7 +134,7 @@ class BaseTestCase(testtools.TestCase):
# suppress all but errors here
self.useFixture(
fixtures.FakeLogger(
name='neutron.api.extensions',
name='tacker.api.extensions',
format=LOG_FORMAT,
level=logging.ERROR,
nuke_handlers=capture_logs,
@ -167,12 +163,12 @@ class BaseTestCase(testtools.TestCase):
stderr = self.useFixture(fixtures.StringStream('stderr')).stream
self.useFixture(fixtures.MonkeyPatch('sys.stderr', stderr))
self.useFixture(fixtures.MonkeyPatch(
'neutron.common.exceptions.NeutronException.use_fatal_exceptions',
'tacker.common.exceptions.TackerException.use_fatal_exceptions',
fake_use_fatal_exceptions))
# don't actually start RPC listeners when testing
self.useFixture(fixtures.MonkeyPatch(
'neutron.common.rpc_compat.Connection.consume_in_threads',
'tacker.common.rpc_compat.Connection.consume_in_threads',
fake_consume_in_threads))
self.useFixture(fixtures.MonkeyPatch(
@ -184,7 +180,7 @@ class BaseTestCase(testtools.TestCase):
self.useFixture(self.messaging_conf)
self.addCleanup(n_rpc.clear_extra_exmods)
n_rpc.add_extra_exmods('neutron.test')
n_rpc.add_extra_exmods('tacker.test')
self.addCleanup(n_rpc.cleanup)
n_rpc.init(CONF)

View File

@ -0,0 +1,8 @@
[pipeline:extensions_app_with_filter]
pipeline = extensions extensions_test_app
[filter:extensions]
paste.filter_factory = tacker.common.extensions:plugin_aware_extension_middleware_factory
[app:extensions_test_app]
paste.app_factory = tacker.tests.unit.test_extensions:app_factory

View File

@ -1,6 +1,6 @@
# neutron-rootwrap command filters for the unit test
# tacker-rootwrap command filters for the unit test
# this file goes with neutron/tests/unit/_test_rootwrap_exec.py.
# this file goes with tacker/tests/unit/_test_rootwrap_exec.py.
# See the comments there about how to run that unit tests
# format seems to be

View File

@ -9,7 +9,7 @@ debug = False
bind_host = 0.0.0.0
# Port the bind the API server to
bind_port = 9696
bind_port = 8888
# Path to the extensions
api_extensions_path = unit/extensions
@ -18,7 +18,7 @@ api_extensions_path = unit/extensions
api_paste_config = api-paste.ini.test
# The messaging module to use, defaults to kombu.
rpc_backend = neutron.openstack.common.rpc.impl_fake
rpc_backend = tacker.openstack.common.rpc.impl_fake
lock_path = $state_path/lock

View File

@ -19,9 +19,9 @@ import os
import fixtures
from neutron.agent.linux import utils
from neutron.openstack.common import log as logging
from neutron.tests import base
from tacker.agent.linux import utils
from tacker.openstack.common import log as logging
from tacker.tests import base
LOG = logging.getLogger(__name__)
@ -63,10 +63,10 @@ to the aid of their party.\"\n")
"generated by test_rootwrap.py\n")
f.write("[DEFAULT]\n")
f.write("filters_path=" + self.cwd +
"/neutron/tests/etc/rootwrap.d/")
"/tacker/tests/etc/rootwrap.d/")
# now set the root helper to sudo our rootwrap script,
# with the new conf
self.root_helper = "sudo " + self.cwd + "/bin/neutron-rootwrap "
self.root_helper = "sudo " + self.cwd + "/bin/tacker-rootwrap "
self.root_helper += self.conf_file
def runTest(self):

View File

@ -20,9 +20,9 @@ import eventlet.timeout
import mock
import testtools
from neutron.agent.linux import async_process
from neutron.agent.linux import utils
from neutron.tests import base
from tacker.agent.linux import async_process
from tacker.agent.linux import utils
from tacker.tests import base
_marker = ()

View File

@ -19,15 +19,15 @@
"""stubs.py provides interface methods for the database test cases"""
from neutron.db import api as db
from neutron.openstack.common import log as logging
from tacker.db import api as db
from tacker.openstack.common import log as logging
LOG = logging.getLogger(__name__)
class NeutronDB(object):
"""Class conisting of methods to call Neutron db methods."""
class TackerDB(object):
"""Class conisting of methods to call Tacker db methods."""
def get_all_networks(self, tenant_id):
"""Get all networks."""
nets = []

View File

@ -16,8 +16,8 @@
import abc
from neutron.api import extensions
from neutron import wsgi
from tacker.api import extensions
from tacker import wsgi
class StubExtension(object):

View File

@ -16,7 +16,7 @@
#
# @author: Kaiwei Fan, VMware, Inc
from neutron.api import extensions
from tacker.api import extensions
EXTENDED_ATTRIBUTE = 'extended_attribute'
EXTENDED_ATTRIBUTES_2_0 = {

View File

@ -20,10 +20,9 @@
import abc
from neutron.api import extensions
from neutron.api.v2 import base
from neutron import manager
from neutron import quota
from tacker.api import extensions
from tacker.api.v1 import base
from tacker import manager
# Attribute Map
@ -73,13 +72,11 @@ class Extensionattribute(extensions.ExtensionDescriptor):
def get_resources(cls):
"""Returns Ext Resources."""
exts = []
plugin = manager.NeutronManager.get_plugin()
plugin = manager.TackerManager.get_plugin()
resource_name = 'ext_test_resource'
collection_name = resource_name + "s"
params = RESOURCE_ATTRIBUTE_MAP.get(collection_name, dict())
quota.QUOTAS.register_resource_by_name(resource_name)
controller = base.create_resource(collection_name,
resource_name,
plugin, params,

View File

@ -17,9 +17,9 @@
import abc
from neutron.api import extensions
from neutron.openstack.common import jsonutils
from neutron import wsgi
from tacker.api import extensions
from tacker.openstack.common import jsonutils
from tacker import wsgi
class FoxInSocksController(wsgi.Controller):

View File

@ -17,9 +17,9 @@
import mock
from neutron.agent import rpc
from neutron.openstack.common import context
from neutron.tests import base
from tacker.agent import rpc
from tacker.openstack.common import context
from tacker.tests import base
class AgentRPCPluginApi(base.BaseTestCase):
@ -27,7 +27,7 @@ class AgentRPCPluginApi(base.BaseTestCase):
agent = rpc.PluginApi('fake_topic')
ctxt = context.RequestContext('fake_user', 'fake_project')
expect_val = 'foo'
with mock.patch('neutron.common.rpc_compat.RpcProxy.call') as rpc_call:
with mock.patch('tacker.common.rpc_compat.RpcProxy.call') as rpc_call:
rpc_call.return_value = expect_val
func_obj = getattr(agent, method)
if method == 'tunnel_sync':
@ -91,7 +91,7 @@ class AgentRPCMethods(base.BaseTestCase):
mock.call().consume_in_threads()
]
call_to_patch = 'neutron.common.rpc_compat.create_connection'
call_to_patch = 'tacker.common.rpc_compat.create_connection'
with mock.patch(call_to_patch) as create_connection:
rpc.create_consumers(endpoints, 'foo', [('topic', 'op')])
create_connection.assert_has_calls(expected)
@ -107,7 +107,7 @@ class AgentRPCMethods(base.BaseTestCase):
mock.call().consume_in_threads()
]
call_to_patch = 'neutron.common.rpc_compat.create_connection'
call_to_patch = 'tacker.common.rpc_compat.create_connection'
with mock.patch(call_to_patch) as create_connection:
rpc.create_consumers(endpoints, 'foo', [('topic', 'op', 'node1')])
create_connection.assert_has_calls(expected)

View File

@ -21,11 +21,11 @@
from testtools import matchers
from webob import exc
from neutron.api import api_common as common
from neutron.tests import base
from tacker.api import api_common as common
from tacker.tests import base
class FakeController(common.NeutronController):
class FakeController(common.TackerController):
_resource_name = 'fake'

View File

@ -24,22 +24,20 @@ import webob
from webob import exc
import webtest
from neutron.api import api_common
from neutron.api import extensions
from neutron.api.rpc.agentnotifiers import dhcp_rpc_agent_api
from neutron.api.v2 import attributes
from neutron.api.v2 import base as v2_base
from neutron.api.v2 import router
from neutron.common import exceptions as n_exc
from neutron import context
from neutron import manager
from neutron.openstack.common import policy as common_policy
from neutron.openstack.common import uuidutils
from neutron import policy
from neutron import quota
from neutron.tests import base
from neutron.tests import fake_notifier
from neutron.tests.unit import testlib_api
from tacker.api import api_common
from tacker.api import extensions
from tacker.api.v1 import attributes
from tacker.api.v1 import base as v2_base
from tacker.api.v1 import router
from tacker.common import exceptions as n_exc
from tacker import context
from tacker import manager
from tacker.openstack.common import policy as common_policy
from tacker.openstack.common import uuidutils
from tacker import policy
from tacker.tests import base
from tacker.tests import fake_notifier
from tacker.tests.unit import testlib_api
ROOTDIR = os.path.dirname(os.path.dirname(__file__))
@ -92,7 +90,7 @@ class APIv2TestBase(base.BaseTestCase):
def setUp(self):
super(APIv2TestBase, self).setUp()
plugin = 'neutron.neutron_plugin_base_v2.NeutronPluginBaseV2'
plugin = 'tacker.tacker_plugin_base_v2.TackerPluginBaseV2'
# Ensure existing ExtensionManager is not used
extensions.PluginAwareExtensionManager._instance = None
# Create the default configurations
@ -104,16 +102,12 @@ class APIv2TestBase(base.BaseTestCase):
self._plugin_patcher = mock.patch(plugin, autospec=True)
self.plugin = self._plugin_patcher.start()
instance = self.plugin.return_value
instance._NeutronPluginBaseV2__native_pagination_support = True
instance._NeutronPluginBaseV2__native_sorting_support = True
instance._TackerPluginBaseV2__native_pagination_support = True
instance._TackerPluginBaseV2__native_sorting_support = True
api = router.APIRouter()
self.api = webtest.TestApp(api)
quota.QUOTAS._driver = None
cfg.CONF.set_override('quota_driver', 'neutron.quota.ConfDriver',
group='QUOTAS')
class _ArgMatcher(object):
"""An adapter to assist mock assertions, used to custom compare."""
@ -451,8 +445,8 @@ class APIv2TestCase(APIv2TestBase):
def test_emulated_sort(self):
instance = self.plugin.return_value
instance._NeutronPluginBaseV2__native_pagination_support = False
instance._NeutronPluginBaseV2__native_sorting_support = False
instance._TackerPluginBaseV2__native_pagination_support = False
instance._TackerPluginBaseV2__native_sorting_support = False
instance.get_networks.return_value = []
api = webtest.TestApp(router.APIRouter())
api.get(_get_path('networks'), {'sort_key': ['name', 'status'],
@ -463,8 +457,8 @@ class APIv2TestCase(APIv2TestBase):
def test_emulated_sort_without_sort_field(self):
instance = self.plugin.return_value
instance._NeutronPluginBaseV2__native_pagination_support = False
instance._NeutronPluginBaseV2__native_sorting_support = False
instance._TackerPluginBaseV2__native_pagination_support = False
instance._TackerPluginBaseV2__native_sorting_support = False
instance.get_networks.return_value = []
api = webtest.TestApp(router.APIRouter())
api.get(_get_path('networks'), {'sort_key': ['name', 'status'],
@ -482,7 +476,7 @@ class APIv2TestCase(APIv2TestBase):
def test_emulated_pagination(self):
instance = self.plugin.return_value
instance._NeutronPluginBaseV2__native_pagination_support = False
instance._TackerPluginBaseV2__native_pagination_support = False
instance.get_networks.return_value = []
api = webtest.TestApp(router.APIRouter())
api.get(_get_path('networks'), {'limit': 10,
@ -495,7 +489,7 @@ class APIv2TestCase(APIv2TestBase):
def test_native_pagination_without_native_sorting(self):
instance = self.plugin.return_value
instance._NeutronPluginBaseV2__native_sorting_support = False
instance._TackerPluginBaseV2__native_sorting_support = False
self.assertRaises(n_exc.Invalid, router.APIRouter)
def test_native_pagination_without_allow_sorting(self):
@ -521,7 +515,7 @@ class JSONV2TestCase(APIv2TestBase, testlib_api.WebTestCase):
def _test_list(self, req_tenant_id, real_tenant_id):
env = {}
if req_tenant_id:
env = {'neutron.context': context.Context('', req_tenant_id)}
env = {'tacker.context': context.Context('', req_tenant_id)}
input_dict = {'id': uuidutils.generate_uuid(),
'name': 'net1',
'admin_state_up': True,
@ -798,7 +792,7 @@ class JSONV2TestCase(APIv2TestBase, testlib_api.WebTestCase):
def test_create_with_keystone_env(self):
tenant_id = _uuid()
net_id = _uuid()
env = {'neutron.context': context.Context('', tenant_id)}
env = {'tacker.context': context.Context('', tenant_id)}
# tenant_id should be fetched from env
initial_input = {'network': {'name': 'net1'}}
full_input = {'network': {'admin_state_up': True,
@ -824,7 +818,7 @@ class JSONV2TestCase(APIv2TestBase, testlib_api.WebTestCase):
def test_create_bad_keystone_tenant(self):
tenant_id = _uuid()
data = {'network': {'name': 'net1', 'tenant_id': tenant_id}}
env = {'neutron.context': context.Context('', tenant_id + "bad")}
env = {'tacker.context': context.Context('', tenant_id + "bad")}
res = self.api.post(_get_path('networks', fmt=self.fmt),
self.serialize(data),
content_type='application/' + self.fmt,
@ -985,7 +979,7 @@ class JSONV2TestCase(APIv2TestBase, testlib_api.WebTestCase):
expect_errors=False):
env = {}
if req_tenant_id:
env = {'neutron.context': context.Context('', req_tenant_id)}
env = {'tacker.context': context.Context('', req_tenant_id)}
instance = self.plugin.return_value
instance.get_network.return_value = {'tenant_id': real_tenant_id,
'shared': False}
@ -1015,10 +1009,10 @@ class JSONV2TestCase(APIv2TestBase, testlib_api.WebTestCase):
env = {}
shared = False
if req_tenant_id:
env = {'neutron.context': context.Context('', req_tenant_id)}
env = {'tacker.context': context.Context('', req_tenant_id)}
if req_tenant_id.endswith('another'):
shared = True
env['neutron.context'].roles = ['tenant_admin']
env['tacker.context'].roles = ['tenant_admin']
data = {'tenant_id': real_tenant_id, 'shared': shared}
instance = self.plugin.return_value
@ -1065,7 +1059,7 @@ class JSONV2TestCase(APIv2TestBase, testlib_api.WebTestCase):
expect_errors=False):
env = {}
if req_tenant_id:
env = {'neutron.context': context.Context('', req_tenant_id)}
env = {'tacker.context': context.Context('', req_tenant_id)}
# leave out 'name' field intentionally
data = {'network': {'admin_state_up': True}}
return_value = {'subnets': []}
@ -1121,7 +1115,7 @@ class SubresourceTest(base.BaseTestCase):
def setUp(self):
super(SubresourceTest, self).setUp()
plugin = 'neutron.tests.unit.test_api_v2.TestSubresourcePlugin'
plugin = 'tacker.tests.unit.test_api_v2.TestSubresourcePlugin'
extensions.PluginAwareExtensionManager._instance = None
# Save the global RESOURCE_ATTRIBUTE_MAP
@ -1290,104 +1284,10 @@ class NotificationTest(APIv2TestBase):
self._resource_op_notifier('update', 'network')
class DHCPNotificationTest(APIv2TestBase):
def _test_dhcp_notifier(self, opname, resource, initial_input=None):
instance = self.plugin.return_value
instance.get_networks.return_value = initial_input
instance.get_networks_count.return_value = 0
expected_code = exc.HTTPCreated.code
with mock.patch.object(dhcp_rpc_agent_api.DhcpAgentNotifyAPI,
'notify') as dhcp_notifier:
if opname == 'create':
res = self.api.post_json(
_get_path('networks'),
initial_input)
if opname == 'update':
res = self.api.put_json(
_get_path('networks', id=_uuid()),
initial_input)
expected_code = exc.HTTPOk.code
if opname == 'delete':
res = self.api.delete(_get_path('networks', id=_uuid()))
expected_code = exc.HTTPNoContent.code
expected_item = mock.call(mock.ANY, mock.ANY,
resource + "." + opname + ".end")
if initial_input and resource not in initial_input:
resource += 's'
num = len(initial_input[resource]) if initial_input and isinstance(
initial_input[resource], list) else 1
expected = [expected_item for x in xrange(num)]
self.assertEqual(expected, dhcp_notifier.call_args_list)
self.assertEqual(num, dhcp_notifier.call_count)
self.assertEqual(expected_code, res.status_int)
def test_network_create_dhcp_notifer(self):
input = {'network': {'name': 'net',
'tenant_id': _uuid()}}
self._test_dhcp_notifier('create', 'network', input)
def test_network_delete_dhcp_notifer(self):
self._test_dhcp_notifier('delete', 'network')
def test_network_update_dhcp_notifer(self):
input = {'network': {'name': 'net'}}
self._test_dhcp_notifier('update', 'network', input)
def test_networks_create_bulk_dhcp_notifer(self):
input = {'networks': [{'name': 'net1',
'tenant_id': _uuid()},
{'name': 'net2',
'tenant_id': _uuid()}]}
self._test_dhcp_notifier('create', 'network', input)
class QuotaTest(APIv2TestBase):
def test_create_network_quota(self):
cfg.CONF.set_override('quota_network', 1, group='QUOTAS')
initial_input = {'network': {'name': 'net1', 'tenant_id': _uuid()}}
full_input = {'network': {'admin_state_up': True, 'subnets': []}}
full_input['network'].update(initial_input['network'])
instance = self.plugin.return_value
instance.get_networks_count.return_value = 1
res = self.api.post_json(
_get_path('networks'), initial_input, expect_errors=True)
instance.get_networks_count.assert_called_with(mock.ANY,
filters=mock.ANY)
self.assertIn("Quota exceeded for resources",
res.json['NeutronError']['message'])
def test_create_network_quota_no_counts(self):
cfg.CONF.set_override('quota_network', 1, group='QUOTAS')
initial_input = {'network': {'name': 'net1', 'tenant_id': _uuid()}}
full_input = {'network': {'admin_state_up': True, 'subnets': []}}
full_input['network'].update(initial_input['network'])
instance = self.plugin.return_value
instance.get_networks_count.side_effect = (
NotImplementedError())
instance.get_networks.return_value = ["foo"]
res = self.api.post_json(
_get_path('networks'), initial_input, expect_errors=True)
instance.get_networks_count.assert_called_with(mock.ANY,
filters=mock.ANY)
self.assertIn("Quota exceeded for resources",
res.json['NeutronError']['message'])
def test_create_network_quota_without_limit(self):
cfg.CONF.set_override('quota_network', -1, group='QUOTAS')
initial_input = {'network': {'name': 'net1', 'tenant_id': _uuid()}}
instance = self.plugin.return_value
instance.get_networks_count.return_value = 3
res = self.api.post_json(
_get_path('networks'), initial_input)
self.assertEqual(res.status_int, exc.HTTPCreated.code)
class ExtensionTestCase(base.BaseTestCase):
def setUp(self):
super(ExtensionTestCase, self).setUp()
plugin = 'neutron.neutron_plugin_base_v2.NeutronPluginBaseV2'
plugin = 'tacker.tacker_plugin_base_v2.TackerPluginBaseV2'
# Ensure existing ExtensionManager is not used
extensions.PluginAwareExtensionManager._instance = None
@ -1408,16 +1308,12 @@ class ExtensionTestCase(base.BaseTestCase):
self.plugin = self._plugin_patcher.start()
# Instantiate mock plugin and enable the V2attributes extension
manager.NeutronManager.get_plugin().supported_extension_aliases = (
manager.TackerManager.get_plugin().supported_extension_aliases = (
["v2attrs"])
api = router.APIRouter()
self.api = webtest.TestApp(api)
quota.QUOTAS._driver = None
cfg.CONF.set_override('quota_driver', 'neutron.quota.ConfDriver',
group='QUOTAS')
def tearDown(self):
super(ExtensionTestCase, self).tearDown()
self.api = None

View File

@ -25,12 +25,11 @@ from oslo.config import cfg
from webob import exc
import webtest
from neutron.api import extensions
from neutron.api.v2 import attributes
from neutron import quota
from neutron.tests.unit import test_api_v2
from neutron.tests.unit import test_extensions
from neutron.tests.unit import testlib_api
from tacker.api import extensions
from tacker.api.v1 import attributes
from tacker.tests.unit import test_api_v2
from tacker.tests.unit import test_extensions
from tacker.tests.unit import testlib_api
class ExtensionTestCase(testlib_api.WebTestCase):
@ -43,7 +42,6 @@ class ExtensionTestCase(testlib_api.WebTestCase):
translate_resource_name=False,
allow_pagination=False, allow_sorting=False,
supported_extension_aliases=None,
use_quota=False,
):
self._resource_prefix = resource_prefix
@ -86,10 +84,6 @@ class ExtensionTestCase(testlib_api.WebTestCase):
native_sorting_attr_name = ("_%s__native_sorting_support"
% instance.__class__.__name__)
setattr(instance, native_sorting_attr_name, True)
if use_quota:
quota.QUOTAS._driver = None
cfg.CONF.set_override('quota_driver', 'neutron.quota.ConfDriver',
group='QUOTAS')
class ExtensionTestExtensionManager(object):
def get_resources(self):

View File

@ -22,12 +22,12 @@ import mock
from webob import exc
import webtest
from neutron.api.v2 import resource as wsgi_resource
from neutron.common import exceptions as n_exc
from neutron import context
from neutron.openstack.common import gettextutils
from neutron.tests import base
from neutron import wsgi
from tacker.api.v1 import resource as wsgi_resource
from tacker.common import exceptions as n_exc
from tacker import context
from tacker.openstack.common import gettextutils
from tacker.tests import base
from tacker import wsgi
class RequestTestCase(base.BaseTestCase):
@ -91,12 +91,12 @@ class RequestTestCase(base.BaseTestCase):
result = request.best_match_content_type()
self.assertEqual(result, "application/json")
def test_context_with_neutron_context(self):
def test_context_with_tacker_context(self):
ctxt = context.Context('fake_user', 'fake_tenant')
self.req.environ['neutron.context'] = ctxt
self.req.environ['tacker.context'] = ctxt
self.assertEqual(self.req.context, ctxt)
def test_context_without_neutron_context(self):
def test_context_without_tacker_context(self):
self.assertTrue(self.req.context.is_admin)
def test_best_match_language(self):
@ -124,13 +124,13 @@ class RequestTestCase(base.BaseTestCase):
class ResourceTestCase(base.BaseTestCase):
def test_unmapped_neutron_error_with_json(self):
def test_unmapped_tacker_error_with_json(self):
msg = u'\u7f51\u7edc'
class TestException(n_exc.NeutronException):
class TestException(n_exc.TackerException):
message = msg
expected_res = {'body': {
'NeutronError': {
'TackerError': {
'type': 'TestException',
'message': msg,
'detail': ''}}}
@ -146,13 +146,13 @@ class ResourceTestCase(base.BaseTestCase):
self.assertEqual(wsgi.JSONDeserializer().deserialize(res.body),
expected_res)
def test_unmapped_neutron_error_with_xml(self):
def test_unmapped_tacker_error_with_xml(self):
msg = u'\u7f51\u7edc'
class TestException(n_exc.NeutronException):
class TestException(n_exc.TackerException):
message = msg
expected_res = {'body': {
'NeutronError': {
'TackerError': {
'type': 'TestException',
'message': msg,
'detail': ''}}}
@ -168,14 +168,14 @@ class ResourceTestCase(base.BaseTestCase):
self.assertEqual(wsgi.XMLDeserializer().deserialize(res.body),
expected_res)
@mock.patch('neutron.openstack.common.gettextutils.translate')
def test_unmapped_neutron_error_localized(self, mock_translation):
@mock.patch('tacker.openstack.common.gettextutils.translate')
def test_unmapped_tacker_error_localized(self, mock_translation):
gettextutils.install('blaa', lazy=True)
msg_translation = 'Translated error'
mock_translation.return_value = msg_translation
msg = _('Unmapped error')
class TestException(n_exc.NeutronException):
class TestException(n_exc.TackerException):
message = msg
controller = mock.MagicMock()
@ -190,13 +190,13 @@ class ResourceTestCase(base.BaseTestCase):
self.assertIn(msg_translation,
str(wsgi.JSONDeserializer().deserialize(res.body)))
def test_mapped_neutron_error_with_json(self):
def test_mapped_tacker_error_with_json(self):
msg = u'\u7f51\u7edc'
class TestException(n_exc.NeutronException):
class TestException(n_exc.TackerException):
message = msg
expected_res = {'body': {
'NeutronError': {
'TackerError': {
'type': 'TestException',
'message': msg,
'detail': ''}}}
@ -214,13 +214,13 @@ class ResourceTestCase(base.BaseTestCase):
self.assertEqual(wsgi.JSONDeserializer().deserialize(res.body),
expected_res)
def test_mapped_neutron_error_with_xml(self):
def test_mapped_tacker_error_with_xml(self):
msg = u'\u7f51\u7edc'
class TestException(n_exc.NeutronException):
class TestException(n_exc.TackerException):
message = msg
expected_res = {'body': {
'NeutronError': {
'TackerError': {
'type': 'TestException',
'message': msg,
'detail': ''}}}
@ -238,14 +238,14 @@ class ResourceTestCase(base.BaseTestCase):
self.assertEqual(wsgi.XMLDeserializer().deserialize(res.body),
expected_res)
@mock.patch('neutron.openstack.common.gettextutils.translate')
def test_mapped_neutron_error_localized(self, mock_translation):
@mock.patch('tacker.openstack.common.gettextutils.translate')
def test_mapped_tacker_error_localized(self, mock_translation):
gettextutils.install('blaa', lazy=True)
msg_translation = 'Translated error'
mock_translation.return_value = msg_translation
msg = _('Unmapped error')
class TestException(n_exc.NeutronException):
class TestException(n_exc.TackerException):
message = msg
controller = mock.MagicMock()
@ -273,7 +273,7 @@ class ResourceTestCase(base.BaseTestCase):
self.assertEqual(res.status_int, exc.HTTPGatewayTimeout.code)
def test_unhandled_error_with_json(self):
expected_res = {'body': {'NeutronError':
expected_res = {'body': {'TackerError':
_('Request Failed: internal server error '
'while processing your request.')}}
controller = mock.MagicMock()
@ -289,7 +289,7 @@ class ResourceTestCase(base.BaseTestCase):
expected_res)
def test_unhandled_error_with_xml(self):
expected_res = {'body': {'NeutronError':
expected_res = {'body': {'TackerError':
_('Request Failed: internal server error '
'while processing your request.')}}
controller = mock.MagicMock()
@ -326,7 +326,7 @@ class ResourceTestCase(base.BaseTestCase):
def _test_error_log_level(self, map_webob_exc, expect_log_info=False,
use_fault_map=True):
class TestException(n_exc.NeutronException):
class TestException(n_exc.TackerException):
message = 'Test Exception'
controller = mock.MagicMock()

View File

@ -17,9 +17,9 @@
import testtools
from neutron.api.v2 import attributes
from neutron.common import exceptions as n_exc
from neutron.tests import base
from tacker.api.v1 import attributes
from tacker.common import exceptions as n_exc
from tacker.tests import base
class TestAttributes(base.BaseTestCase):

View File

@ -17,22 +17,22 @@
import webob
from neutron import auth
from neutron.openstack.common.middleware import request_id
from neutron.tests import base
from tacker import auth
from tacker.openstack.common.middleware import request_id
from tacker.tests import base
class NeutronKeystoneContextTestCase(base.BaseTestCase):
class TackerKeystoneContextTestCase(base.BaseTestCase):
def setUp(self):
super(NeutronKeystoneContextTestCase, self).setUp()
super(TackerKeystoneContextTestCase, self).setUp()
@webob.dec.wsgify
def fake_app(req):
self.context = req.environ['neutron.context']
self.context = req.environ['tacker.context']
return webob.Response()
self.context = None
self.middleware = auth.NeutronKeystoneContext(fake_app)
self.middleware = auth.TackerKeystoneContext(fake_app)
self.request = webob.Request.blank('/')
self.request.headers['X_AUTH_TOKEN'] = 'testauthtoken'

View File

@ -14,11 +14,11 @@
import mock
from neutron.common import log as call_log
from neutron.tests import base
from tacker.common import log as call_log
from tacker.tests import base
MODULE_NAME = 'neutron.tests.unit.test_common_log'
MODULE_NAME = 'tacker.tests.unit.test_common_log'
class TargetKlass(object):

View File

@ -15,10 +15,10 @@
import mock
import testtools
from neutron.common import exceptions as n_exc
from neutron.common import utils
from neutron.plugins.common import utils as plugin_utils
from neutron.tests import base
from tacker.common import exceptions as n_exc
from tacker.common import utils
from tacker.plugins.common import utils as plugin_utils
from tacker.tests import base
class TestParseMappings(base.BaseTestCase):

View File

@ -18,15 +18,15 @@ import os
import mock
from oslo.config import cfg
from neutron.common import config # noqa
from neutron.tests import base
from tacker.common import config # noqa
from tacker.tests import base
class ConfigurationTest(base.BaseTestCase):
def test_defaults(self):
self.assertEqual('0.0.0.0', cfg.CONF.bind_host)
self.assertEqual(9696, cfg.CONF.bind_port)
self.assertEqual(8888, cfg.CONF.bind_port)
self.assertEqual('api-paste.ini', cfg.CONF.api_paste_config)
self.assertEqual('', cfg.CONF.api_extensions_path)
self.assertEqual('policy.json', cfg.CONF.policy_file)
@ -44,7 +44,7 @@ class ConfigurationTest(base.BaseTestCase):
self.assertEqual(absolute_dir, cfg.CONF.state_path)
self.assertEqual(86400, cfg.CONF.dhcp_lease_duration)
self.assertFalse(cfg.CONF.allow_overlapping_ips)
self.assertEqual('neutron', cfg.CONF.control_exchange)
self.assertEqual('tacker', cfg.CONF.control_exchange)
def test_load_paste_app_not_found(self):
self.config(api_paste_config='no_such_file.conf')

View File

@ -21,9 +21,9 @@ import sys
import mock
from neutron.db import migration
from neutron.db.migration import cli
from neutron.tests import base
from tacker.db import migration
from tacker.db.migration import cli
from tacker.tests import base
class TestDbMigration(base.BaseTestCase):

View File

@ -19,74 +19,40 @@
Unit tests for extension extended attribute
"""
from oslo.config import cfg
import webob.exc as webexc
import neutron
from neutron.api import extensions
from neutron.api.v2 import attributes
from neutron.common import config
from neutron import manager
from neutron.plugins.common import constants
from neutron.plugins.openvswitch import ovs_neutron_plugin
from neutron import quota
from neutron.tests import base
from neutron.tests.unit.extensions import extendedattribute as extattr
from neutron.tests.unit import test_api_v2
from neutron.tests.unit import testlib_api
from neutron import wsgi
import tacker
from tacker.api import extensions
from tacker.api.v1 import attributes
from tacker.common import config
from tacker import manager
from tacker.tests import base
from tacker.tests.unit.extensions import extendedattribute as extattr
from tacker.tests.unit import test_api_v2
from tacker.tests.unit import testlib_api
from tacker import wsgi
_uuid = test_api_v2._uuid
_get_path = test_api_v2._get_path
extensions_path = ':'.join(neutron.tests.unit.extensions.__path__)
class ExtensionExtendedAttributeTestPlugin(
ovs_neutron_plugin.OVSNeutronPluginV2):
supported_extension_aliases = [
'ext-obj-test', "extended-ext-attr"
]
def __init__(self, configfile=None):
super(ExtensionExtendedAttributeTestPlugin, self)
self.objs = []
self.objh = {}
def create_ext_test_resource(self, context, ext_test_resource):
obj = ext_test_resource['ext_test_resource']
id = _uuid()
obj['id'] = id
self.objs.append(obj)
self.objh.update({id: obj})
return obj
def get_ext_test_resources(self, context, filters=None, fields=None):
return self.objs
def get_ext_test_resource(self, context, id, fields=None):
return self.objh[id]
extensions_path = ':'.join(tacker.tests.unit.extensions.__path__)
class ExtensionExtendedAttributeTestCase(base.BaseTestCase):
def setUp(self):
super(ExtensionExtendedAttributeTestCase, self).setUp()
plugin = (
"neutron.tests.unit.test_extension_extended_attribute."
"tacker.tests.unit.test_extension_extended_attribute."
"ExtensionExtendedAttributeTestPlugin"
)
# point config file to: neutron/tests/etc/neutron.conf.test
# point config file to: tacker/tests/etc/tacker.conf.test
self.config_parse()
self.setup_coreplugin(plugin)
ext_mgr = extensions.PluginAwareExtensionManager(
extensions_path,
{constants.CORE: ExtensionExtendedAttributeTestPlugin}
)
ext_mgr.extend_resources("2.0", {})
extensions.PluginAwareExtensionManager._instance = ext_mgr
ext_mgr = extensions.ExtensionManager(extensions_path)
ext_mgr.extend_resources("1.0", {})
extensions.ExtensionManager._instance = ext_mgr
app = config.load_paste_app('extensions_test_app')
self._api = extensions.ExtensionMiddleware(app, ext_mgr=ext_mgr)
@ -102,13 +68,9 @@ class ExtensionExtendedAttributeTestCase(base.BaseTestCase):
# the global attribute map
attributes.RESOURCE_ATTRIBUTE_MAP.update(
extattr.EXTENDED_ATTRIBUTES_2_0)
self.agentscheduler_dbMinxin = manager.NeutronManager.get_plugin()
self.agentscheduler_dbMinxin = manager.TackerManager.get_plugin()
self.addCleanup(self.restore_attribute_map)
quota.QUOTAS._driver = None
cfg.CONF.set_override('quota_driver', 'neutron.quota.ConfDriver',
group='QUOTAS')
def restore_attribute_map(self):
# Restore the original RESOURCE_ATTRIBUTE_MAP
attributes.RESOURCE_ATTRIBUTE_MAP = self.saved_attr_map

View File

@ -22,22 +22,21 @@ import routes
import webob
import webtest
from neutron.api import extensions
from neutron.common import config
from neutron.common import exceptions
from neutron.db import db_base_plugin_v2
from neutron.openstack.common import jsonutils
from neutron.openstack.common import log as logging
from neutron.plugins.common import constants
from neutron.tests import base
from neutron.tests.unit import extension_stubs as ext_stubs
import neutron.tests.unit.extensions
from neutron.tests.unit import testlib_api
from neutron import wsgi
from tacker.api import extensions
from tacker.common import config
from tacker.common import exceptions
from tacker.openstack.common import jsonutils
from tacker.openstack.common import log as logging
from tacker.plugins.common import constants
from tacker.tests import base
from tacker.tests.unit import extension_stubs as ext_stubs
import tacker.tests.unit.extensions
from tacker.tests.unit import testlib_api
from tacker import wsgi
LOG = logging.getLogger(__name__)
extensions_path = ':'.join(neutron.tests.unit.extensions.__path__)
extensions_path = ':'.join(tacker.tests.unit.extensions.__path__)
class ExtensionsTestApp(wsgi.Router):
@ -50,15 +49,6 @@ class ExtensionsTestApp(wsgi.Router):
super(ExtensionsTestApp, self).__init__(mapper)
class FakePluginWithExtension(db_base_plugin_v2.NeutronDbPluginV2):
"""A fake plugin used only for extension testing in this file."""
supported_extension_aliases = ["FOXNSOX"]
def method_to_support_foxnsox_extension(self, context):
self._log("method_to_support_foxnsox_extension", context)
class PluginInterfaceTest(base.BaseTestCase):
def test_issubclass_hook(self):
class A(object):
@ -482,7 +472,7 @@ class PluginAwareExtensionManagerTest(base.BaseTestCase):
def test_unsupported_extensions_are_not_loaded(self):
stub_plugin = ext_stubs.StubPlugin(supported_extensions=["e1", "e3"])
plugin_info = {constants.CORE: stub_plugin}
with mock.patch("neutron.api.extensions.PluginAwareExtensionManager."
with mock.patch("tacker.api.extensions.PluginAwareExtensionManager."
"check_if_plugin_extensions_loaded"):
ext_mgr = extensions.PluginAwareExtensionManager('', plugin_info)
@ -515,7 +505,7 @@ class PluginAwareExtensionManagerTest(base.BaseTestCase):
supported_extension_aliases = ["supported_extension"]
plugin_info = {constants.CORE: PluginWithoutExpectedIface()}
with mock.patch("neutron.api.extensions.PluginAwareExtensionManager."
with mock.patch("tacker.api.extensions.PluginAwareExtensionManager."
"check_if_plugin_extensions_loaded"):
ext_mgr = extensions.PluginAwareExtensionManager('', plugin_info)
ext_mgr.add_extension(ext_stubs.ExtensionExpectingPluginInterface(
@ -533,7 +523,7 @@ class PluginAwareExtensionManagerTest(base.BaseTestCase):
pass
plugin_info = {constants.CORE: PluginWithExpectedInterface()}
with mock.patch("neutron.api.extensions.PluginAwareExtensionManager."
with mock.patch("tacker.api.extensions.PluginAwareExtensionManager."
"check_if_plugin_extensions_loaded"):
ext_mgr = extensions.PluginAwareExtensionManager('', plugin_info)
ext_mgr.add_extension(ext_stubs.ExtensionExpectingPluginInterface(
@ -541,17 +531,17 @@ class PluginAwareExtensionManagerTest(base.BaseTestCase):
self.assertIn("supported_extension", ext_mgr.extensions)
def test_extensions_expecting_neutron_plugin_interface_are_loaded(self):
def test_extensions_expecting_tacker_plugin_interface_are_loaded(self):
class ExtensionForQuamtumPluginInterface(ext_stubs.StubExtension):
"""This Extension does not implement get_plugin_interface method.
This will work with any plugin implementing NeutronPluginBase
This will work with any plugin implementing TackerPluginBase
"""
pass
stub_plugin = ext_stubs.StubPlugin(supported_extensions=["e1"])
plugin_info = {constants.CORE: stub_plugin}
with mock.patch("neutron.api.extensions.PluginAwareExtensionManager."
with mock.patch("tacker.api.extensions.PluginAwareExtensionManager."
"check_if_plugin_extensions_loaded"):
ext_mgr = extensions.PluginAwareExtensionManager('', plugin_info)
ext_mgr.add_extension(ExtensionForQuamtumPluginInterface("e1"))
@ -562,14 +552,14 @@ class PluginAwareExtensionManagerTest(base.BaseTestCase):
class ExtensionWithNoNeedForPluginInterface(ext_stubs.StubExtension):
"""This Extension does not need any plugin interface.
This will work with any plugin implementing NeutronPluginBase
This will work with any plugin implementing TackerPluginBase
"""
def get_plugin_interface(self):
return None
stub_plugin = ext_stubs.StubPlugin(supported_extensions=["e1"])
plugin_info = {constants.CORE: stub_plugin}
with mock.patch("neutron.api.extensions.PluginAwareExtensionManager."
with mock.patch("tacker.api.extensions.PluginAwareExtensionManager."
"check_if_plugin_extensions_loaded"):
ext_mgr = extensions.PluginAwareExtensionManager('', plugin_info)
ext_mgr.add_extension(ExtensionWithNoNeedForPluginInterface("e1"))
@ -583,7 +573,7 @@ class PluginAwareExtensionManagerTest(base.BaseTestCase):
stub_plugin = ext_stubs.StubPlugin(supported_extensions=["e1"])
plugin_info = {constants.DUMMY: stub_plugin}
with mock.patch("neutron.api.extensions.PluginAwareExtensionManager."
with mock.patch("tacker.api.extensions.PluginAwareExtensionManager."
"check_if_plugin_extensions_loaded"):
ext_mgr = extensions.PluginAwareExtensionManager('', plugin_info)
ext_mgr.add_extension(NonCorePluginExtenstion("e1"))
@ -647,9 +637,7 @@ def setup_base_app(test):
def setup_extensions_middleware(extension_manager=None):
extension_manager = (extension_manager or
extensions.PluginAwareExtensionManager(
extensions_path,
{constants.CORE: FakePluginWithExtension()}))
extensions.ExtensionManager(extensions_path))
base.BaseTestCase.config_parse()
app = config.load_paste_app('extensions_test_app')
return extensions.ExtensionMiddleware(app, ext_mgr=extension_manager)

View File

@ -10,8 +10,8 @@
# License for the specific language governing permissions and limitations
# under the License.
from neutron.hacking import checks
from neutron.tests import base
from tacker.hacking import checks
from tacker.tests import base
class HackingTestCase(base.BaseTestCase):

View File

@ -16,20 +16,20 @@
import mock
from testtools import matchers
from neutron import context
from neutron.openstack.common import local
from neutron.tests import base
from tacker import context
from tacker.openstack.common import local
from tacker.tests import base
class TestNeutronContext(base.BaseTestCase):
class TestTackerContext(base.BaseTestCase):
def setUp(self):
super(TestNeutronContext, self).setUp()
db_api = 'neutron.db.api.get_session'
super(TestTackerContext, self).setUp()
db_api = 'tacker.db.api.get_session'
self._db_api_session_patcher = mock.patch(db_api)
self.db_api_session = self._db_api_session_patcher.start()
def test_neutron_context_create(self):
def test_tacker_context_create(self):
ctx = context.Context('user_id', 'tenant_id')
self.assertEqual('user_id', ctx.user_id)
self.assertEqual('tenant_id', ctx.project_id)
@ -40,12 +40,12 @@ class TestNeutronContext(base.BaseTestCase):
self.assertIsNone(ctx.user_name)
self.assertIsNone(ctx.tenant_name)
def test_neutron_context_create_logs_unknown_kwarg(self):
def test_tacker_context_create_logs_unknown_kwarg(self):
with mock.patch.object(context.LOG, 'debug') as mock_log:
context.Context('user_id', 'tenant_id', foo=None)
self.assertEqual(mock_log.call_count, 1)
def test_neutron_context_create_with_name(self):
def test_tacker_context_create_with_name(self):
ctx = context.Context('user_id', 'tenant_id',
tenant_name='tenant_name', user_name='user_name')
# Check name is set
@ -55,11 +55,11 @@ class TestNeutronContext(base.BaseTestCase):
self.assertEqual('user_id', ctx.user)
self.assertEqual('tenant_id', ctx.tenant)
def test_neutron_context_create_with_request_id(self):
def test_tacker_context_create_with_request_id(self):
ctx = context.Context('user_id', 'tenant_id', request_id='req_id_xxx')
self.assertEqual('req_id_xxx', ctx.request_id)
def test_neutron_context_to_dict(self):
def test_tacker_context_to_dict(self):
ctx = context.Context('user_id', 'tenant_id')
ctx_dict = ctx.to_dict()
self.assertEqual('user_id', ctx_dict['user_id'])
@ -71,7 +71,7 @@ class TestNeutronContext(base.BaseTestCase):
self.assertIsNone(ctx_dict['tenant_name'])
self.assertIsNone(ctx_dict['project_name'])
def test_neutron_context_to_dict_with_name(self):
def test_tacker_context_to_dict_with_name(self):
ctx = context.Context('user_id', 'tenant_id',
tenant_name='tenant_name', user_name='user_name')
ctx_dict = ctx.to_dict()
@ -79,7 +79,7 @@ class TestNeutronContext(base.BaseTestCase):
self.assertEqual('tenant_name', ctx_dict['tenant_name'])
self.assertEqual('tenant_name', ctx_dict['project_name'])
def test_neutron_context_admin_to_dict(self):
def test_tacker_context_admin_to_dict(self):
self.db_api_session.return_value = 'fakesession'
ctx = context.get_admin_context()
ctx_dict = ctx.to_dict()
@ -88,22 +88,22 @@ class TestNeutronContext(base.BaseTestCase):
self.assertIsNotNone(ctx.session)
self.assertNotIn('session', ctx_dict)
def test_neutron_context_admin_without_session_to_dict(self):
def test_tacker_context_admin_without_session_to_dict(self):
ctx = context.get_admin_context_without_session()
ctx_dict = ctx.to_dict()
self.assertIsNone(ctx_dict['user_id'])
self.assertIsNone(ctx_dict['tenant_id'])
self.assertFalse(hasattr(ctx, 'session'))
def test_neutron_context_with_load_roles_true(self):
def test_tacker_context_with_load_roles_true(self):
ctx = context.get_admin_context()
self.assertIn('admin', ctx.roles)
def test_neutron_context_with_load_roles_false(self):
def test_tacker_context_with_load_roles_false(self):
ctx = context.get_admin_context(load_admin_roles=False)
self.assertFalse(ctx.roles)
def test_neutron_context_elevated_retains_request_id(self):
def test_tacker_context_elevated_retains_request_id(self):
ctx = context.Context('user_id', 'tenant_id')
self.assertFalse(ctx.is_admin)
req_id_before = ctx.request_id
@ -112,7 +112,7 @@ class TestNeutronContext(base.BaseTestCase):
self.assertTrue(elevated_ctx.is_admin)
self.assertEqual(req_id_before, elevated_ctx.request_id)
def test_neutron_context_overwrite(self):
def test_tacker_context_overwrite(self):
ctx1 = context.Context('user_id', 'tenant_id')
self.assertEqual(ctx1.request_id, local.store.context.request_id)
@ -126,7 +126,7 @@ class TestNeutronContext(base.BaseTestCase):
self.assertNotEqual(ctx3.request_id, ctx2.request_id)
self.assertEqual(ctx2.request_id, local.store.context.request_id)
def test_neutron_context_get_admin_context_not_update_local_store(self):
def test_tacker_context_get_admin_context_not_update_local_store(self):
ctx = context.Context('user_id', 'tenant_id')
req_id_before = local.store.context.request_id
self.assertEqual(ctx.request_id, req_id_before)

View File

@ -13,7 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
"""Test of Policy Engine For Neutron"""
"""Test of Policy Engine For Tacker"""
import urllib2
@ -21,16 +21,16 @@ import fixtures
import mock
import six
import neutron
from neutron.api.v2 import attributes
from neutron.common import exceptions
from neutron import context
from neutron import manager
from neutron.openstack.common import importutils
from neutron.openstack.common import jsonutils as json
from neutron.openstack.common import policy as common_policy
from neutron import policy
from neutron.tests import base
import tacker
from tacker.api.v1 import attributes
from tacker.common import exceptions
from tacker import context
from tacker import manager
from tacker.openstack.common import importutils
from tacker.openstack.common import jsonutils as json
from tacker.openstack.common import policy as common_policy
from tacker import policy
from tacker.tests import base
class PolicyFileTestCase(base.BaseTestCase):
@ -46,7 +46,7 @@ class PolicyFileTestCase(base.BaseTestCase):
def fake_find_config_file(_1, _2):
return self.tempdir.join('policy')
with mock.patch.object(neutron.common.utils,
with mock.patch.object(tacker.common.utils,
'find_config_file',
new=fake_find_config_file):
tmpfilename = fake_find_config_file(None, None)
@ -219,10 +219,10 @@ FAKE_RESOURCE = {"%ss" % FAKE_RESOURCE_NAME:
}}}
class NeutronPolicyTestCase(base.BaseTestCase):
class TackerPolicyTestCase(base.BaseTestCase):
def setUp(self):
super(NeutronPolicyTestCase, self).setUp()
super(TackerPolicyTestCase, self).setUp()
policy.reset()
policy.init()
self.addCleanup(policy.reset)
@ -268,15 +268,15 @@ class NeutronPolicyTestCase(base.BaseTestCase):
def remove_fake_resource():
del attributes.RESOURCE_ATTRIBUTE_MAP["%ss" % FAKE_RESOURCE_NAME]
self.patcher = mock.patch.object(neutron.policy,
self.patcher = mock.patch.object(tacker.policy,
'init',
new=fakepolicyinit)
self.patcher.start()
self.addCleanup(remove_fake_resource)
self.context = context.Context('fake', 'fake', roles=['user'])
plugin_klass = importutils.import_class(
"neutron.db.db_base_plugin_v2.NeutronDbPluginV2")
self.manager_patcher = mock.patch('neutron.manager.NeutronManager')
"tacker.db.db_base_plugin_v2.TackerDbPluginV2")
self.manager_patcher = mock.patch('tacker.manager.TackerManager')
fake_manager = self.manager_patcher.start()
fake_manager_instance = fake_manager.return_value
fake_manager_instance.plugin = plugin_klass()
@ -422,7 +422,7 @@ class NeutronPolicyTestCase(base.BaseTestCase):
return {'tenant_id': 'fake'}
action = "create_port:mac"
with mock.patch.object(manager.NeutronManager.get_instance().plugin,
with mock.patch.object(manager.TackerManager.get_instance().plugin,
'get_network', new=fakegetnetwork):
target = {'network_id': 'whatever'}
result = policy.enforce(self.context, action, target)
@ -437,7 +437,7 @@ class NeutronPolicyTestCase(base.BaseTestCase):
# so long that we verify that, if *f* blows up, the behavior of the
# policy engine to propagate the exception is preserved
action = "create_port:mac"
with mock.patch.object(manager.NeutronManager.get_instance().plugin,
with mock.patch.object(manager.TackerManager.get_instance().plugin,
'get_network', new=fakegetnetwork):
target = {'network_id': 'whatever'}
self.assertRaises(NotImplementedError,
@ -455,7 +455,7 @@ class NeutronPolicyTestCase(base.BaseTestCase):
self.rules['admin_or_network_owner'] = common_policy.parse_rule(
"role:admin or tenant_id:%(network_tenant_id)s")
action = "create_port:mac"
with mock.patch.object(manager.NeutronManager.get_instance().plugin,
with mock.patch.object(manager.TackerManager.get_instance().plugin,
'get_network', new=fakegetnetwork):
target = {'network_id': 'whatever'}
result = policy.enforce(self.context, action, target)

View File

@ -20,8 +20,8 @@ import sys
import mock
from six import moves
from neutron.tests import base
from neutron.tests import post_mortem_debug
from tacker.tests import base
from tacker.tests import post_mortem_debug
class TestTesttoolsExceptionHandler(base.BaseTestCase):

View File

@ -25,11 +25,11 @@ import testtools
import webob
import webob.exc
from neutron.api.v2 import attributes
from neutron.common import constants
from neutron.common import exceptions as exception
from neutron.tests import base
from neutron import wsgi
from tacker.api.v1 import attributes
from tacker.common import constants
from tacker.common import exceptions as exception
from tacker.tests import base
from tacker import wsgi
CONF = cfg.CONF
@ -47,7 +47,7 @@ class TestWSGIServer(base.BaseTestCase):
server.stop()
server.wait()
@mock.patch('neutron.openstack.common.service.ProcessLauncher')
@mock.patch('tacker.openstack.common.service.ProcessLauncher')
def test_start_multiple_workers(self, ProcessLauncher):
launcher = ProcessLauncher.return_value

View File

@ -15,9 +15,9 @@
import testtools
from neutron.api.v2 import attributes
from neutron.tests import base
from neutron import wsgi
from tacker.api.v1 import attributes
from tacker.tests import base
from tacker import wsgi
class ExpectedException(testtools.ExpectedException):
@ -45,7 +45,7 @@ def create_request(path, body, content_type, method='GET',
req.headers['Accept'] = content_type
req.body = body
if context:
req.environ['neutron.context'] = context
req.environ['tacker.context'] = context
return req