Add more unit test case more than 80%

Change-Id: I5a4557446264766c5ad974fe6797db6fb92ecff0
Signed-off-by: zhang xiaohan <zhangxiaohan@szzt.com.cn>
Co-Authored-By: tangzhuo <ztang@hnu.edu.cn>
This commit is contained in:
zhangxiaohan 2019-07-02 20:22:13 -07:00
parent 3ba476ec46
commit dd9b42bf6a
13 changed files with 2175 additions and 111 deletions

View File

@ -36,7 +36,7 @@ commands = {posargs}
basepython = python3
commands =
python setup.py testr --coverage --testr-args='{posargs}'
coverage report --fail-under=40 --skip-covered
coverage report --fail-under=80 --skip-covered
[testenv:genconfig]
basepython = python3

View File

@ -2021,37 +2021,6 @@ class TricirclePlugin(db_base_plugin_v2.NeutronDbPluginV2,
raise
return return_info
@staticmethod
def _safe_create_bottom_floatingip(t_ctx, pod, client, fip_net_id,
fip_address, port_id):
try:
client.create_floatingips(
t_ctx, {'floatingip': {'floating_network_id': fip_net_id,
'floating_ip_address': fip_address,
'port_id': port_id}})
except q_cli_exceptions.IpAddressInUseClient:
fips = client.list_floatingips(t_ctx,
[{'key': 'floating_ip_address',
'comparator': 'eq',
'value': fip_address}])
# NOTE(zhiyuan) if the internal port associated with the existing
# fip is what we expect, just ignore this exception; or if the
# existing fip is not associated with any internal port, update the
# fip to add association
if not fips:
# this is rare case that we got IpAddressInUseClient exception
# a second ago but now the floating ip is missing
raise t_network_exc.BottomPodOperationFailure(
resource='floating ip', region_name=pod['region_name'])
associated_port_id = fips[0].get('port_id')
if associated_port_id == port_id:
pass
elif not associated_port_id:
client.update_floatingips(t_ctx, fips[0]['id'],
{'floatingip': {'port_id': port_id}})
else:
raise
@staticmethod
def _rollback_floatingip_data(context, _id, org_data):
"""Rollback the data of floating ip object to the original one

View File

@ -14,18 +14,21 @@
# under the License.
import six
from oslo_log import log
from neutron.services.trunk import exceptions as trunk_exc
from neutron.services.trunk import plugin as trunk_plugin
from neutron_lib.db import utils as db_utils
from neutron_lib.plugins import directory
import six
import tricircle.common.client as t_client
import tricircle.common.constants as t_constants
import tricircle.common.context as t_context
from tricircle.common import xrpcapi
import tricircle.db.api as db_api
from tricircle.network import central_plugin
from tricircle.network import helper
LOG = log.getLogger(__name__)
@ -191,6 +194,12 @@ class TricircleTrunkDriver(trunk_plugin.TrunkPlugin):
filters, remainder, None)
ret.extend(next_ret)
return ret
else:
# get from top pod
top_ret = self._get_trunks_from_top_with_limit(
context, top_bottom_map, filters, remainder, None)
ret.extend(top_ret)
return ret
def _map_trunks_from_bottom_to_top(self, trunks, bottom_top_map):
trunk_list = []
@ -294,18 +303,21 @@ class TricircleTrunkDriver(trunk_plugin.TrunkPlugin):
except trunk_exc.TrunkNotFound:
return ret
ret = super(TricircleTrunkDriver, self).get_ports(context, filters)
core_plugin = directory.get_plugin()
ret = super(central_plugin.TricirclePlugin, core_plugin).get_ports(
context, filters)
return ret
def update_subports_device_id(self, context,
subports, device_id, device_owner):
if not subports['sub_ports']:
return
core_plugin = directory.get_plugin()
body = {'port': {
'device_id': device_id,
'device_owner': device_owner}}
for subport in subports['sub_ports']:
super(TricircleTrunkDriver, self).update_port(
super(central_plugin.TricirclePlugin, core_plugin).update_port(
context, subport['port_id'], body)
def add_subports(self, context, trunk_id, subports):

View File

@ -35,7 +35,6 @@ import tricircle.common.exceptions as t_exceptions
import tricircle.common.lock_handle as t_lock
from tricircle.common import utils
import tricircle.db.api as db_api
import tricircle.network.exceptions as t_network_exc
# manually define these constants to avoid depending on neutron repos
@ -694,37 +693,6 @@ class NetworkHelper(object):
self.prepare_bottom_element(ctx, project_id, b_pod, t_dhcp_port,
t_constants.RT_PORT, dhcp_port_body)
@staticmethod
def _safe_create_bottom_floatingip(t_ctx, pod, client, fip_net_id,
fip_address, port_id):
try:
client.create_floatingips(
t_ctx, {'floatingip': {'floating_network_id': fip_net_id,
'floating_ip_address': fip_address,
'port_id': port_id}})
except q_cli_exceptions.IpAddressInUseClient:
fips = client.list_floatingips(t_ctx,
[{'key': 'floating_ip_address',
'comparator': 'eq',
'value': fip_address}])
if not fips:
# this is rare case that we got IpAddressInUseClient exception
# a second ago but now the floating ip is missing
raise t_network_exc.BottomPodOperationFailure(
resource='floating ip', region_name=pod['region_name'])
associated_port_id = fips[0].get('port_id')
if associated_port_id == port_id:
# the internal port associated with the existing fip is what
# we expect, just ignore this exception
pass
elif not associated_port_id:
# the existing fip is not associated with any internal port,
# update the fip to add association
client.update_floatingips(t_ctx, fips[0]['id'],
{'floatingip': {'port_id': port_id}})
else:
raise
def _get_top_element(self, t_ctx, q_ctx, _type, _id):
if self.call_obj:
return getattr(self.call_obj, 'get_%s' % _type)(q_ctx, _id)

View File

@ -0,0 +1,63 @@
# Copyright (c) 2018 NEC, Corp.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from mock import patch
import sys
import unittest
from oslo_config import cfg
from oslo_service import service
from tricircle.api import app
from tricircle.cmd import api
from tricircle.cmd import xjob
from tricircle.xjob import xservice
def fake_wait(self):
return
class TestXjobCmd(unittest.TestCase):
def setUp(self):
super(TestXjobCmd, self).setUp()
sys.argv = ['tricircle-cmd']
cfg.CONF.reset()
cfg.CONF.unregister_opts(xservice.common_opts)
cfg.CONF.unregister_opts(app.common_opts)
@patch.object(service.ProcessLauncher, 'wait', new=fake_wait)
@mock.patch('tricircle.xjob.xservice.create_service')
@mock.patch('oslo_service.service.ProcessLauncher.launch_service')
def test_xjob_main(self, launch_service, create_service):
xjob.main()
launch_service.assert_called_once_with(
create_service.return_value, workers=1)
@patch.object(service.ProcessLauncher, 'wait', new=fake_wait)
@mock.patch('tricircle.api.app.setup_app')
@mock.patch('oslo_service.wsgi.Server')
@mock.patch('oslo_service.service.ProcessLauncher.launch_service')
def test_api_main(self, launch_service, wsgi_server, setup_app):
api.main()
wsgi_server.assert_called_once_with(mock.ANY, 'Tricircle Admin_API',
setup_app.return_value,
mock.ANY, mock.ANY)
launch_service.assert_called_once_with(
wsgi_server.return_value, workers=1)
def tearDown(self):
cfg.CONF.reset()
cfg.CONF.unregister_opts(xservice.common_opts)
cfg.CONF.unregister_opts(app.common_opts)

View File

@ -23,6 +23,7 @@ from tricircle.common import httpclient as hclient
from tricircle.db import api
from tricircle.db import core
from tricircle.db import models
def fake_get_pod_service_endpoint(ctx, region_name, st):
@ -87,6 +88,10 @@ class HttpClientTest(unittest.TestCase):
ver = hclient.get_version_from_url(url)
self.assertEqual(ver, '')
url = 'sss/networks'
ver = hclient.get_version_from_url(url)
self.assertEqual(ver, 'sss')
def test_get_bottom_url(self):
b_endpoint = 'http://127.0.0.1:9696/v2.0/networks'
t_url = 'http://127.0.0.1:9696/v2.0/networks'
@ -96,6 +101,28 @@ class HttpClientTest(unittest.TestCase):
self.assertEqual(t_ver, 'v2.0')
self.assertEqual(b_ver, 'v2.0')
t_url_1 = 'http://127.0.0.1:9696/sss/v2.0/networks'
b_url = hclient.get_bottom_url(t_ver, t_url_1, b_ver, b_endpoint)
self.assertEqual(b_url, '')
t_url_1 = 'v2.0/networks'
b_url = hclient.get_bottom_url(t_ver, t_url_1, b_ver, b_endpoint)
self.assertEqual(b_url, 'http://127.0.0.1:9696/v2.0/networks')
b_url = hclient.get_bottom_url(t_ver, t_url, '', b_endpoint)
self.assertEqual(b_url, 'http://127.0.0.1:9696/networks')
t_url_1 = 'http://127.0.0.1:9696/v2.0/networks?qqq=123&sss=456'
b_url = hclient.get_bottom_url(t_ver, t_url_1, b_ver, b_endpoint)
self.assertEqual(b_url,
'http://127.0.0.1:9696/v2.0/networks?qqq=123&sss=456')
t_url_1 = 'http://127.0.0.1:9696/v2.0/networks?' \
'qqq=123&availability_zone=456'
b_url = hclient.get_bottom_url(t_ver, t_url_1, b_ver, b_endpoint)
self.assertEqual(b_url,
'http://127.0.0.1:9696/v2.0/networks?qqq=123')
b_url = hclient.get_bottom_url(t_ver, t_url, b_ver, b_endpoint)
self.assertEqual(b_url,
'http://127.0.0.1:9696/v2.0/networks')
@ -211,5 +238,61 @@ class HttpClientTest(unittest.TestCase):
config_dict['service_type'])
self.assertEqual(endpoint, config_dict['service_url'])
endpoint = hclient.get_pod_service_endpoint(
self.context,
'x_region_name',
config_dict['service_type'])
self.assertEqual(endpoint, '')
def test_get_res_routing_ref(self):
t_url = 'http://127.0.0.1:9696/v2.0/networks'
self.assertIsNone(hclient.get_res_routing_ref(
self.context, 'fake_pod_id', t_url, s_type=cons.ST_NEUTRON))
pod_dict = {
'pod_id': 'fake_pod_id',
'region_name': 'fake_region_name',
'az_name': 'fake_az'
}
api.create_pod(self.context, pod_dict)
routes = [
{
'top_id': 'top_id',
'bottom_id': 'bottom_id',
'pod_id': 'fake_pod_id',
'project_id': 'test_project_id',
'resource_type': 'network'
},
]
with self.context.session.begin():
for route in routes:
core.create_resource(
self.context, models.ResourceRouting, route)
config_dict = {
'service_id': 'fake_service_id',
'pod_id': 'fake_pod_id',
'service_type': cons.ST_NEUTRON,
'service_url': 'http://127.0.0.1:9696/v2.0/networks'
}
api.create_cached_endpoints(self.context, config_dict)
s_ctx = {'t_ver': 'v2.0', 'b_ver': 'v2.0',
't_url': t_url, 'b_url': t_url}
self.assertEqual(s_ctx, hclient.get_res_routing_ref(
self.context, 'top_id', t_url, s_type=cons.ST_NEUTRON))
def test_convert_header(self):
header = {'header1': 'aaa', 'header2': 'bbb'}
self.assertEqual(header,
hclient.convert_header('v1.0', 'v1.0', header))
header = {'header1': 'aaa', 'header2': None}
except_header = {'header1': 'aaa'}
self.assertEqual(except_header,
hclient.convert_header('v1.0', 'v1.0', header))
def tearDown(self):
core.ModelBase.metadata.drop_all(core.get_engine())

View File

@ -15,13 +15,23 @@
# License for the specific language governing permissions and limitations
# under the License.
import mock
import pecan
import unittest
from oslo_config import cfg
from tricircle.common import constants as cons
from tricircle.common import exceptions
from tricircle.common import utils
class TricircleUtilsTestCase(unittest.TestCase):
def test_bool_from_string(self):
self.assertEqual(True, utils.bool_from_string('true'))
self.assertEqual(False, utils.bool_from_string('false'))
self.assertRaises(ValueError, utils.bool_from_string, 'a', strict=True)
self.assertEqual(True, utils.bool_from_string('a', default=True))
def test_check_string_length(self):
self.assertIsNone(utils.check_string_length(
'test', 'name', max_len=255))
@ -34,3 +44,62 @@ class TricircleUtilsTestCase(unittest.TestCase):
self.assertRaises(exceptions.InvalidInput,
utils.check_string_length,
'a' * 256, 'name', max_len=255)
def test_get_id_from_name(self):
output = utils.get_id_from_name(
cons.RT_NETWORK, 'name#77b0babc-f7e4-4c14-b250-1f18835a52c2')
self.assertEqual('77b0babc-f7e4-4c14-b250-1f18835a52c2', output)
output = utils.get_id_from_name(
cons.RT_NETWORK, '77b0babc-f7e4-4c14-b250-1f18835a52c2')
self.assertEqual('77b0babc-f7e4-4c14-b250-1f18835a52c2', output)
output = utils.get_id_from_name(
cons.RT_NETWORK, 'name@not_uuid')
self.assertIsNone(output)
output = utils.get_id_from_name(
cons.RT_PORT, '77b0babc-f7e4-4c14-b250-1f18835a52c2')
self.assertEqual('77b0babc-f7e4-4c14-b250-1f18835a52c2', output)
output = utils.get_id_from_name(
cons.RT_PORT, 'not_uuid')
self.assertIsNone(output)
@mock.patch.object(pecan, 'response')
def test_format_error(self, mock_response):
output = utils.format_error(401, 'this is error', 'MyError')
self.assertEqual({'MyError': {
'message': 'this is error', 'code': 401
}}, output)
output = utils.format_error(400, 'this is error')
self.assertEqual({'badRequest': {
'message': 'this is error', 'code': 400
}}, output)
output = utils.format_error(401, 'this is error')
self.assertEqual({'Error': {
'message': 'this is error', 'code': 401
}}, output)
@mock.patch('tricircle.common.utils.format_error')
def test_format_api_error(self, mock_format_error):
output = utils.format_api_error(400, 'this is error')
self.assertEqual(mock_format_error.return_value, output)
@mock.patch('tricircle.common.utils.format_error')
def test_format_nova_error(self, mock_format_error):
output = utils.format_nova_error(400, 'this is error')
self.assertEqual(mock_format_error.return_value, output)
@mock.patch('tricircle.common.utils.format_error')
def test_format_cinder_error(self, mock_format_error):
output = utils.format_cinder_error(400, 'this is error')
self.assertEqual(mock_format_error.return_value, output)
def test_get_pagination_limit(self):
setattr(cfg.CONF, 'pagination_max_limit', 1024)
self.assertEqual(512, utils.get_pagination_limit(512))
self.assertEqual(1024, utils.get_pagination_limit(2048))
self.assertEqual(1024, utils.get_pagination_limit(-1))

View File

@ -40,6 +40,7 @@ from neutron.db import models_v2
from neutron.db import rbac_db_models as rbac_db
import neutron.objects.base as base_object
from neutron.services.qos.drivers import manager as q_manager
from neutron.services.trunk import plugin as trunk_plugin
from neutron.plugins.ml2 import managers as n_managers
@ -67,6 +68,7 @@ import tricircle.network.central_plugin as plugin
from tricircle.network import central_qos_plugin
from tricircle.network import helper
from tricircle.network import qos_driver
from tricircle.tests.unit.network import test_central_trunk_plugin
from tricircle.tests.unit.network import test_qos
from tricircle.tests.unit.network import test_security_groups
import tricircle.tests.unit.utils as test_utils
@ -105,6 +107,12 @@ BOTTOM2_FIPS = _resource_store.BOTTOM2_FLOATINGIPS
BOTTOM2_ROUTERS = _resource_store.BOTTOM2_ROUTERS
BOTTOM2_POLICIES = _resource_store.BOTTOM2_QOS_POLICIES
BOTTOM2_POLICY_RULES = _resource_store.BOTTOM2_QOS_BANDWIDTH_LIMIT_RULES
TOP_TRUNKS = _resource_store.TOP_TRUNKS
TOP_SUBPORTS = _resource_store.TOP_SUBPORTS
BOTTOM1_TRUNKS = _resource_store.BOTTOM1_TRUNKS
BOTTOM2_TRUNKS = _resource_store.BOTTOM2_TRUNKS
BOTTOM1_SUBPORTS = _resource_store.BOTTOM1_SUBPORTS
BOTTOM2_SUBPORTS = _resource_store.BOTTOM2_SUBPORTS
TEST_TENANT_ID = test_utils.TEST_TENANT_ID
FakeNeutronContext = test_utils.FakeNeutronContext
@ -959,6 +967,8 @@ def fake_get_instance(cls, subnet_pool, context):
def fake_get_plugin(alias=plugin_constants.CORE):
if alias == 'trunk':
return test_central_trunk_plugin.FakePlugin()
return FakePlugin()
@ -1614,6 +1624,71 @@ class PluginTest(unittest.TestCase,
return t_port_id, b_port_id
@staticmethod
def _prepare_trunk_test(project_id, ctx, pod_name, index, t_net_id,
b_net_id, t_subnet_id, b_subnet_id):
t_trunk_id = uuidutils.generate_uuid()
b_trunk_id = uuidutils.generate_uuid()
t_parent_port_id = uuidutils.generate_uuid()
t_sub_port_id = PluginTest._prepare_port_test(
project_id, ctx, pod_name, index, t_net_id,
b_net_id, t_subnet_id, b_subnet_id)
t_subport = {
'segmentation_type': 'vlan',
'port_id': t_sub_port_id,
'segmentation_id': 164,
'trunk_id': t_trunk_id}
t_trunk = {
'id': t_trunk_id,
'name': 'top_trunk_%d' % index,
'status': 'DOWN',
'description': 'created',
'admin_state_up': True,
'port_id': t_parent_port_id,
'tenant_id': project_id,
'project_id': project_id,
'sub_ports': [t_subport]
}
TOP_TRUNKS.append(test_utils.DotDict(t_trunk))
TOP_SUBPORTS.append(test_utils.DotDict(t_subport))
b_subport = {
'segmentation_type': 'vlan',
'port_id': t_sub_port_id,
'segmentation_id': 164,
'trunk_id': b_trunk_id}
b_trunk = {
'id': b_trunk_id,
'name': 'top_trunk_%d' % index,
'status': 'UP',
'description': 'created',
'admin_state_up': True,
'port_id': t_parent_port_id,
'tenant_id': project_id,
'project_id': project_id,
'sub_ports': [b_subport]
}
if pod_name == 'pod_1':
BOTTOM1_SUBPORTS.append(test_utils.DotDict(t_subport))
BOTTOM1_TRUNKS.append(test_utils.DotDict(b_trunk))
else:
BOTTOM2_SUBPORTS.append(test_utils.DotDict(t_subport))
BOTTOM2_TRUNKS.append(test_utils.DotDict(b_trunk))
pod_id = 'pod_id_1' if pod_name == 'pod_1' else 'pod_id_2'
core.create_resource(ctx, models.ResourceRouting,
{'top_id': t_trunk_id,
'bottom_id': b_trunk_id,
'pod_id': pod_id,
'project_id': project_id,
'resource_type': constants.RT_TRUNK})
return t_trunk, b_trunk
@staticmethod
def _prepare_network_subnet(project_id, ctx, region_name, index,
enable_dhcp=True, az_hints=None,
@ -1968,6 +2043,48 @@ class PluginTest(unittest.TestCase,
self.assertEqual(bottom_subnet['enable_dhcp'],
body_copy['subnet']['enable_dhcp'])
@patch.object(ipam_pluggable_backend.IpamPluggableBackend,
'_allocate_ips_for_port', new=fake_allocate_ips_for_port)
@patch.object(directory, 'get_plugin', new=fake_get_plugin)
@patch.object(driver.Pool, 'get_instance', new=fake_get_instance)
@patch.object(db_utils, 'filter_non_model_columns',
new=fake_filter_non_model_columns)
@patch.object(context, 'get_context_from_neutron_context')
def test_create_port(self, mock_context):
project_id = TEST_TENANT_ID
self._basic_pod_route_setup()
(t_net_id, t_subnet_id,
b_net_id, b_subnet_id) = self._prepare_network_subnet(
project_id, self.context, 'pod_1', 1)
neutron_context = FakeNeutronContext()
fake_plugin = FakePlugin()
fake_client = FakeClient()
mock_context.return_value = self.context
t_pod = {'pod_id': 'pod_id_top', 'region_name': 'top-region',
'az_name': ''}
db_api.create_pod(self.context, t_pod)
body_port = {
'port': {
'name': 'interface_top-region_port-1',
'description': 'top_description',
'extra_dhcp_opts': [],
'security_groups': [],
'device_id': 'reserved_gateway_port',
'admin_state_up': True,
'network_id': t_net_id,
'tenant_id': project_id,
'project_id': project_id,
}
}
port = fake_plugin.create_port(neutron_context, body_port)
t_gw_ports = fake_client.list_resources(
'port', None, [{'key': 'name', 'comparator': 'eq',
'value': 'interface_top-region_port-1'}])
self.assertEqual(t_gw_ports[0]['id'], port['id'])
@patch.object(ipam_pluggable_backend.IpamPluggableBackend,
'_update_ips_for_port', new=fake_update_ips_for_port)
@patch.object(directory, 'get_plugin', new=fake_get_plugin)
@ -2056,6 +2173,88 @@ class PluginTest(unittest.TestCase,
self.assertEqual(bottom_port['allowed_address_pairs'][0],
body_copy['port']['allowed_address_pairs'][0])
@patch.object(directory, 'get_plugin', new=fake_get_plugin)
@patch.object(driver.Pool, 'get_instance', new=fake_get_instance)
@patch.object(db_utils, 'filter_non_model_columns',
new=fake_filter_non_model_columns)
@patch.object(trunk_plugin.TrunkPlugin, 'get_trunk')
@patch.object(context, 'get_context_from_neutron_context')
def test_update_port_tunck(self, mock_context, mock_get_trunk):
project_id = TEST_TENANT_ID
self._basic_pod_route_setup()
t_ctx = context.get_db_context()
(t_net_id, t_subnet_id,
b_net_id, b_subnet_id) = self._prepare_network_subnet(
project_id, t_ctx, 'pod_1', 1)
t_port_id, b_port_id = self._prepare_port_test(
project_id, t_ctx, 'pod_1', 1, t_net_id, b_net_id,
t_subnet_id, b_subnet_id)
t_trunk, b_trunk = self._prepare_trunk_test(
project_id, t_ctx, 'pod_1', 2, t_net_id,
b_net_id, t_subnet_id, b_subnet_id)
fake_plugin = FakePlugin()
mock_context.return_value = t_ctx
update_body = {
'port': {
'binding:profile': {
constants.PROFILE_REGION: 'pod_1',
constants.PROFILE_DEVICE: 'compute:new'
},
'trunk_details': {'trunk_id': t_trunk['id'],
'sub_ports': []}
}
}
body_copy = copy.deepcopy(update_body)
q_ctx = test_central_trunk_plugin.FakeNeutronContext()
mock_get_trunk.return_value = t_trunk
top_port = fake_plugin.update_port(q_ctx, t_port_id, update_body)
self.assertEqual(top_port['binding:profile'],
body_copy['port']['binding:profile'])
self.assertEqual(top_port['trunk_details'],
body_copy['port']['trunk_details'])
@patch.object(directory, 'get_plugin', new=fake_get_plugin)
@patch.object(driver.Pool, 'get_instance', new=fake_get_instance)
@patch.object(db_utils, 'filter_non_model_columns',
new=fake_filter_non_model_columns)
@patch.object(context, 'get_context_from_neutron_context')
def test_update_port_mapping(self, mock_context):
project_id = TEST_TENANT_ID
self._basic_pod_route_setup()
neutron_context = FakeNeutronContext()
t_ctx = context.get_db_context()
(t_net_id, t_subnet_id,
b_net_id, b_subnet_id) = self._prepare_network_subnet(
project_id, t_ctx, 'pod_1', 1)
t_port_id, b_port_id = self._prepare_port_test(
project_id, t_ctx, 'pod_1', 1, t_net_id, b_net_id,
t_subnet_id, b_subnet_id)
fake_plugin = FakePlugin()
mock_context.return_value = t_ctx
update_body = {
'port': {
'binding:profile': {
constants.PROFILE_REGION: 'pod_1',
constants.PROFILE_DEVICE: '',
constants.PROFILE_STATUS: 'DOWN'
}
}
}
b_update_body = {'port': {'device_id': None}}
fake_client = FakeClient('pod_1')
fake_client.update_ports(t_ctx, b_port_id, b_update_body)
fake_plugin.update_port(neutron_context, t_port_id, update_body)
routing_resources = core.query_resource(
t_ctx, models.ResourceRouting,
[{'key': 'bottom_id', 'comparator': 'eq', 'value': b_port_id}], [])
self.assertListEqual(routing_resources, [])
@patch.object(directory, 'get_plugin', new=fake_get_plugin)
@patch.object(driver.Pool, 'get_instance', new=fake_get_instance)
@patch.object(db_utils, 'filter_non_model_columns',

View File

@ -0,0 +1,684 @@
# Copyright 2015 Huawei Technologies Co., Ltd.
# All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from mock import patch
import six
import unittest
from six.moves import xrange
import neutron.conf.common as q_config
from neutron.db import db_base_plugin_v2
from neutron_lib.api.definitions import portbindings
from neutron_lib.plugins import directory
from neutron_lib.plugins import utils
from oslo_config import cfg
from oslo_utils import uuidutils
from tricircle.common import client
from tricircle.common import constants
from tricircle.common import context
import tricircle.db.api as db_api
from tricircle.db import core
from tricircle.db import models
from tricircle.network import central_plugin
import tricircle.network.central_trunk_driver as trunk_driver
from tricircle.network import helper
import tricircle.tests.unit.utils as test_utils
from tricircle.xjob import xmanager
_resource_store = test_utils.get_resource_store()
TOP_TRUNKS = _resource_store.TOP_TRUNKS
TOP_SUBPORTS = _resource_store.TOP_SUBPORTS
TOP_PORTS = _resource_store.TOP_PORTS
BOTTOM1_TRUNKS = _resource_store.BOTTOM1_TRUNKS
BOTTOM2_TRUNKS = _resource_store.BOTTOM2_TRUNKS
BOTTOM1_SUBPORTS = _resource_store.BOTTOM1_SUBPORTS
BOTTOM2_SUBPORTS = _resource_store.BOTTOM2_SUBPORTS
BOTTOM1_PORTS = _resource_store.BOTTOM1_PORTS
BOTTOM2_PORTS = _resource_store.BOTTOM2_PORTS
TEST_TENANT_ID = test_utils.TEST_TENANT_ID
class FakeBaseXManager(xmanager.XManager):
def __init__(self):
self.clients = {constants.TOP: client.Client()}
def _get_client(self, region_name=None):
return FakeClient(region_name)
class FakeXManager(FakeBaseXManager):
def __init__(self, fake_plugin):
super(FakeXManager, self).__init__()
self.xjob_handler = FakeBaseRPCAPI(fake_plugin)
self.helper = helper.NetworkHelper()
class FakeBaseRPCAPI(object):
def __init__(self, fake_plugin):
self.xmanager = FakeBaseXManager()
def sync_trunk(self, ctxt, project_id, trunk_id, pod_id):
combine_id = '%s#%s' % (pod_id, trunk_id)
self.xmanager.sync_trunk(
ctxt, payload={constants.JT_TRUNK_SYNC: combine_id})
def configure_security_group_rules(self, ctxt, project_id):
pass
class FakeRPCAPI(FakeBaseRPCAPI):
def __init__(self, fake_plugin):
self.xmanager = FakeXManager(fake_plugin)
class FakeNeutronClient(test_utils.FakeNeutronClient):
_resource = 'trunk'
trunks_path = ''
class FakeClient(test_utils.FakeClient):
def __init__(self, region_name=None):
super(FakeClient, self).__init__(region_name)
self.client = FakeNeutronClient(self.region_name)
def get_native_client(self, resource, ctx):
return self.client
def get_trunks(self, ctx, trunk_id):
return self.get_resource(constants.RT_TRUNK, ctx, trunk_id)
def update_trunks(self, context, trunk_id, trunk):
self.update_resources(constants.RT_TRUNK, context, trunk_id, trunk)
def delete_trunks(self, context, trunk_id):
self.delete_resources(constants.RT_TRUNK, context, trunk_id)
def action_trunks(self, ctx, action, resource_id, body):
if self.region_name == 'pod_1':
btm_trunks = BOTTOM1_TRUNKS
else:
btm_trunks = BOTTOM2_TRUNKS
for trunk in btm_trunks:
if trunk['id'] == resource_id:
subports = body['sub_ports']
if action == 'add_subports':
for subport in subports:
subport['trunk_id'] = resource_id
trunk['sub_ports'].extend(subports)
return
elif action == 'remove_subports':
for subport in subports:
for b_subport in trunk['sub_ports']:
if subport['port_id'] == b_subport['port_id']:
trunk['sub_ports'].remove(b_subport)
return
def list_trunks(self, ctx, filters=None):
filter_dict = {}
filters = filters or []
for query_filter in filters:
key = query_filter['key']
# when querying trunks, "fields" is passed in the query string to
# ask the server to only return necessary fields, which can reduce
# the data being transferred. In test, we just return all the
# fields since there's no need to optimize
if key != 'fields':
value = query_filter['value']
filter_dict[key] = value
return self.client.get('', filter_dict)['trunks']
def get_ports(self, ctx, port_id):
pass
def list_ports(self, ctx, filters=None):
fake_plugin = FakePlugin()
q_ctx = FakeNeutronContext()
_filters = {}
for f in filters:
_filters[f['key']] = [f['value']]
return fake_plugin.get_trunk_subports(q_ctx, _filters)
def create_ports(self, ctx, body):
if 'ports' in body:
ret = []
for port in body['ports']:
p = self.create_resources('port', ctx, {'port': port})
p['id'] = p['device_id']
ret.append(p)
return ret
return self.create_resources('port', ctx, body)
class FakeNeutronContext(test_utils.FakeNeutronContext):
def session_class(self):
return FakeSession
class FakeSession(test_utils.FakeSession):
def add_hook(self, model_obj, model_dict):
if model_obj.__tablename__ == 'subports':
for top_trunk in TOP_TRUNKS:
if top_trunk['id'] == model_dict['trunk_id']:
top_trunk['sub_ports'].append(model_dict)
def delete_top_subport(self, port_id):
for res_list in self.resource_store.store_map.values():
for res in res_list:
sub_ports = res.get('sub_ports')
if sub_ports:
for sub_port in sub_ports:
if sub_port['port_id'] == port_id:
sub_ports.remove(sub_port)
def delete_hook(self, model_obj):
if model_obj.get('segmentation_type'):
self.delete_top_subport(model_obj['port_id'])
return 'port_id'
class FakePlugin(trunk_driver.TricircleTrunkDriver):
def __init__(self):
self._segmentation_types = {'vlan': utils.is_valid_vlan_tag}
self.xjob_handler = FakeRPCAPI(self)
self.helper = helper.NetworkHelper(self)
def _get_client(self, region_name):
return FakeClient(region_name)
def fake_get_context_from_neutron_context(q_context):
ctx = context.get_db_context()
return ctx
def fake_get_min_search_step(self):
return 2
class FakeCorePlugin(central_plugin.TricirclePlugin):
def __init__(self):
self.type_manager = test_utils.FakeTypeManager()
def get_port(self, context, port_id):
return {portbindings.HOST_ID: None,
'device_id': None}
def get_ports(self, ctx, filters):
top_client = FakeClient()
_filters = []
for key, values in six.iteritems(filters):
for v in values:
_filters.append({'key': key, 'comparator': 'eq', 'value': v})
return top_client.list_resources('port', ctx, _filters)
def update_port(self, context, id, port):
port_body = port['port']
for _port in TOP_PORTS:
if _port['id'] == id:
for key, value in six.iteritems(port_body):
_port[key] = value
class PluginTest(unittest.TestCase):
def setUp(self):
core.initialize()
core.ModelBase.metadata.create_all(core.get_engine())
self.context = context.Context()
cfg.CONF.set_override('tenant_network_types', ['local', 'vlan'],
group='tricircle')
cfg.CONF.set_override('bridge_network_type', 'vlan',
group='tricircle')
setattr(cfg.CONF, 'setproctitle', 'central-trunk-driver')
xmanager.IN_TEST = True
def fake_get_plugin(alias='core'):
if alias == 'trunk':
return FakePlugin()
return FakeCorePlugin()
directory.get_plugin = fake_get_plugin
def _basic_pod_setup(self):
pod1 = {'pod_id': 'pod_id_1',
'region_name': 'pod_1',
'az_name': 'az_name_1'}
pod2 = {'pod_id': 'pod_id_2',
'region_name': 'pod_2',
'az_name': 'az_name_2'}
pod3 = {'pod_id': 'pod_id_0',
'region_name': 'top_pod',
'az_name': ''}
for pod in (pod1, pod2, pod3):
db_api.create_pod(self.context, pod)
def _prepare_port_test(self, tenant_id, ctx, pod_name, index,
device_onwer='compute:None', create_bottom=True):
t_port_id = uuidutils.generate_uuid()
t_subnet_id = uuidutils.generate_uuid()
t_net_id = uuidutils.generate_uuid()
t_port = {
'id': t_port_id,
'name': 'top_port_%d' % index,
'description': 'old_top_description',
'extra_dhcp_opts': [],
'device_owner': device_onwer,
'security_groups': [],
'device_id': '68f46ee4-d66a-4c39-bb34-ac2e5eb85470',
'admin_state_up': True,
'network_id': t_net_id,
'tenant_id': tenant_id,
'mac_address': 'fa:16:3e:cd:76:4%s' % index,
'project_id': 'tenant_id',
'binding:host_id': 'zhiyuan-5',
'status': 'ACTIVE',
'network_id': t_net_id,
'fixed_ips': [{'subnet_id': t_subnet_id}]
}
TOP_PORTS.append(test_utils.DotDict(t_port))
if create_bottom:
b_port = {
'id': t_port_id,
'name': t_port_id,
'description': 'old_bottom_description',
'extra_dhcp_opts': [],
'device_owner': device_onwer,
'security_groups': [],
'device_id': '68f46ee4-d66a-4c39-bb34-ac2e5eb85470',
'admin_state_up': True,
'network_id': t_net_id,
'tenant_id': tenant_id,
'device_owner': 'compute:None',
'extra_dhcp_opts': [],
'mac_address': 'fa:16:3e:cd:76:40',
'project_id': 'tenant_id',
'binding:host_id': 'zhiyuan-5',
'status': 'ACTIVE',
'network_id': t_net_id,
'fixed_ips': [{'subnet_id': t_subnet_id}]
}
if pod_name == 'pod_1':
BOTTOM1_PORTS.append(test_utils.DotDict(b_port))
else:
BOTTOM2_PORTS.append(test_utils.DotDict(b_port))
pod_id = 'pod_id_1' if pod_name == 'pod_1' else 'pod_id_2'
core.create_resource(ctx, models.ResourceRouting,
{'top_id': t_port_id,
'bottom_id': t_port_id,
'pod_id': pod_id,
'project_id': tenant_id,
'resource_type': constants.RT_PORT})
return t_port_id
def _prepare_trunk_test(self, project_id, ctx, pod_name, index,
is_create_bottom, t_uuid=None, b_uuid=None):
t_trunk_id = t_uuid or uuidutils.generate_uuid()
b_trunk_id = b_uuid or uuidutils.generate_uuid()
t_parent_port_id = uuidutils.generate_uuid()
t_sub_port_id = self._prepare_port_test(
project_id, ctx, pod_name, index, create_bottom=is_create_bottom)
t_subport = {
'segmentation_type': 'vlan',
'port_id': t_sub_port_id,
'segmentation_id': 164,
'trunk_id': t_trunk_id}
t_trunk = {
'id': t_trunk_id,
'name': 'top_trunk_%d' % index,
'status': 'DOWN',
'description': 'created',
'admin_state_up': True,
'port_id': t_parent_port_id,
'tenant_id': project_id,
'project_id': project_id,
'sub_ports': [t_subport]
}
TOP_TRUNKS.append(test_utils.DotDict(t_trunk))
TOP_SUBPORTS.append(test_utils.DotDict(t_subport))
b_trunk = None
if is_create_bottom:
b_subport = {
'segmentation_type': 'vlan',
'port_id': t_sub_port_id,
'segmentation_id': 164,
'trunk_id': b_trunk_id}
b_trunk = {
'id': b_trunk_id,
'name': 'top_trunk_%d' % index,
'status': 'UP',
'description': 'created',
'admin_state_up': True,
'port_id': t_parent_port_id,
'tenant_id': project_id,
'project_id': project_id,
'sub_ports': [b_subport]
}
if pod_name == 'pod_1':
BOTTOM1_SUBPORTS.append(test_utils.DotDict(t_subport))
BOTTOM1_TRUNKS.append(test_utils.DotDict(b_trunk))
else:
BOTTOM2_SUBPORTS.append(test_utils.DotDict(t_subport))
BOTTOM2_TRUNKS.append(test_utils.DotDict(b_trunk))
pod_id = 'pod_id_1' if pod_name == 'pod_1' else 'pod_id_2'
core.create_resource(ctx, models.ResourceRouting,
{'top_id': t_trunk_id,
'bottom_id': b_trunk_id,
'pod_id': pod_id,
'project_id': project_id,
'resource_type': constants.RT_TRUNK})
return t_trunk, b_trunk
@patch.object(context, 'get_context_from_neutron_context',
new=fake_get_context_from_neutron_context)
def test_get_trunk(self):
project_id = TEST_TENANT_ID
q_ctx = FakeNeutronContext()
t_ctx = context.get_db_context()
self._basic_pod_setup()
fake_plugin = FakePlugin()
t_trunk, b_trunk = self._prepare_trunk_test(project_id, t_ctx,
'pod_1', 1, True)
res = fake_plugin.get_trunk(q_ctx, t_trunk['id'])
t_trunk['status'] = b_trunk['status']
t_trunk['sub_ports'][0].pop('trunk_id')
six.assertCountEqual(self, t_trunk, res)
@patch.object(context, 'get_context_from_neutron_context',
new=fake_get_context_from_neutron_context)
def test_get_trunks(self):
project_id = TEST_TENANT_ID
q_ctx = FakeNeutronContext()
t_ctx = context.get_db_context()
self._basic_pod_setup()
fake_plugin = FakePlugin()
t_trunk1, _ = self._prepare_trunk_test(project_id, t_ctx,
'pod_1', 1, True)
t_trunk2, _ = self._prepare_trunk_test(project_id, t_ctx,
'pod_1', 2, True)
t_trunk3, _ = self._prepare_trunk_test(project_id, t_ctx,
'pod_2', 3, True)
t_trunk4, _ = self._prepare_trunk_test(project_id, t_ctx,
'pod_2', 4, True)
t_trunk5, _ = self._prepare_trunk_test(project_id, t_ctx,
'pod_1', 5, False)
t_trunk6, _ = self._prepare_trunk_test(project_id, t_ctx,
'pod_1', 6, False)
res = fake_plugin.get_trunks(q_ctx)
self.assertEqual(len(res), 6)
res = fake_plugin.get_trunks(
q_ctx, filters={'id': [t_trunk1['id']]}, limit=3)
t_trunk1['status'] = 'UP'
res[0]['sub_ports'][0]['trunk_id'] = t_trunk1['id']
six.assertCountEqual(self, [t_trunk1], res)
res = fake_plugin.get_trunks(q_ctx, filters={'id': [t_trunk5['id']]})
t_trunk5['sub_ports'][0].pop('trunk_id')
six.assertCountEqual(self, [t_trunk5], res)
trunks = fake_plugin.get_trunks(q_ctx,
filters={'status': ['UP'],
'description': ['created']})
self.assertEqual(len(trunks), 4)
@patch.object(context, 'get_context_from_neutron_context',
new=fake_get_context_from_neutron_context)
@patch.object(FakePlugin, '_get_min_search_step',
new=fake_get_min_search_step)
def test_get_trunks_pagination(self):
project_id = TEST_TENANT_ID
q_ctx = FakeNeutronContext()
t_ctx = context.get_db_context()
self._basic_pod_setup()
fake_plugin = FakePlugin()
t_trunk1, _ = self._prepare_trunk_test(
project_id, t_ctx, 'pod_1', 1, True,
'101779d0-e30e-495a-ba71-6265a1669701',
'1b1779d0-e30e-495a-ba71-6265a1669701')
t_trunk2, _ = self._prepare_trunk_test(
project_id, t_ctx, 'pod_1', 2, True,
'201779d0-e30e-495a-ba71-6265a1669701',
'2b1779d0-e30e-495a-ba71-6265a1669701')
t_trunk3, _ = self._prepare_trunk_test(
project_id, t_ctx, 'pod_2', 3, True,
'301779d0-e30e-495a-ba71-6265a1669701',
'3b1779d0-e30e-495a-ba71-6265a1669701')
t_trunk4, _ = self._prepare_trunk_test(
project_id, t_ctx, 'pod_2', 4, True,
'401779d0-e30e-495a-ba71-6265a1669701',
'4b1779d0-e30e-495a-ba71-6265a1669701')
t_trunk5, _ = self._prepare_trunk_test(
project_id, t_ctx, 'pod_2', 5, False,
'501779d0-e30e-495a-ba71-6265a1669701')
t_trunk6, _ = self._prepare_trunk_test(
project_id, t_ctx, 'pod_2', 6, False,
'601779d0-e30e-495a-ba71-6265a1669701')
t_trunk7, _ = self._prepare_trunk_test(
project_id, t_ctx, 'pod_2', 7, False,
'601779d0-e30e-495a-ba71-6265a1669701')
# limit no marker
res = fake_plugin.get_trunks(q_ctx, limit=3)
res_trunk_ids = [trunk['id'] for trunk in res]
except_trunk_ids = [t_trunk1['id'], t_trunk2['id'], t_trunk3['id']]
self.assertEqual(res_trunk_ids, except_trunk_ids)
# limit and top pod's marker
res = fake_plugin.get_trunks(q_ctx, limit=3, marker=t_trunk5['id'])
res_trunk_ids = [trunk['id'] for trunk in res]
except_trunk_ids = [t_trunk6['id'], t_trunk7['id']]
self.assertEqual(res_trunk_ids, except_trunk_ids)
# limit and bottom pod's marker
res = fake_plugin.get_trunks(q_ctx, limit=6, marker=t_trunk1['id'])
res_trunk_ids = [trunk['id'] for trunk in res]
except_trunk_ids = [t_trunk2['id'], t_trunk3['id'], t_trunk4['id'],
t_trunk5['id'], t_trunk6['id'], t_trunk7['id']]
self.assertEqual(res_trunk_ids, except_trunk_ids)
# limit and bottom pod's marker and filters
res = fake_plugin.get_trunks(q_ctx, limit=6, marker=t_trunk1['id'],
filters={'status': ['UP']})
res_trunk_ids = [trunk['id'] for trunk in res]
except_trunk_ids = [t_trunk2['id'], t_trunk3['id'], t_trunk4['id']]
self.assertEqual(res_trunk_ids, except_trunk_ids)
@patch.object(context, 'get_context_from_neutron_context',
new=fake_get_context_from_neutron_context)
def test_update_trunk(self):
project_id = TEST_TENANT_ID
q_ctx = FakeNeutronContext()
t_ctx = context.get_db_context()
self._basic_pod_setup()
fake_plugin = FakePlugin()
t_trunk, b_trunk = self._prepare_trunk_test(project_id, t_ctx,
'pod_1', 1, True)
update_body = {'trunk': {
'name': 'new_name',
'description': 'updated',
'admin_state_up': False}
}
updated_top_trunk = fake_plugin.update_trunk(q_ctx, t_trunk['id'],
update_body)
self.assertEqual(updated_top_trunk['name'], 'new_name')
self.assertEqual(updated_top_trunk['description'], 'updated')
self.assertFalse(updated_top_trunk['admin_state_up'])
updated_btm_trunk = fake_plugin.get_trunk(q_ctx, t_trunk['id'])
self.assertEqual(updated_btm_trunk['name'], 'new_name')
self.assertEqual(updated_btm_trunk['description'], 'updated')
self.assertFalse(updated_btm_trunk['admin_state_up'])
@patch.object(context, 'get_context_from_neutron_context',
new=fake_get_context_from_neutron_context)
def test_delete_trunk(self):
project_id = TEST_TENANT_ID
q_ctx = FakeNeutronContext()
t_ctx = context.get_db_context()
self._basic_pod_setup()
fake_plugin = FakePlugin()
t_trunk, b_trunk = self._prepare_trunk_test(project_id, t_ctx,
'pod_1', 1, True)
fake_plugin.delete_trunk(q_ctx, t_trunk['id'])
self.assertEqual(len(TOP_TRUNKS), 0)
self.assertEqual(len(BOTTOM1_TRUNKS), 0)
route_filters = [{'key': 'top_id',
'comparator': 'eq',
'value': t_trunk['id']}]
routes = core.query_resource(t_ctx, models.ResourceRouting,
route_filters, [])
self.assertEqual(len(routes), 0)
@patch.object(db_base_plugin_v2.NeutronDbPluginV2, 'get_ports',
new=FakeCorePlugin.get_ports)
@patch.object(db_base_plugin_v2.NeutronDbPluginV2, 'update_port',
new=FakeCorePlugin.update_port)
@patch.object(context, 'get_context_from_neutron_context',
new=fake_get_context_from_neutron_context)
def test_action_subports(self):
project_id = TEST_TENANT_ID
q_ctx = FakeNeutronContext()
t_ctx = context.get_db_context()
self._basic_pod_setup()
fake_plugin = FakePlugin()
t_trunk, b_trunk = self._prepare_trunk_test(project_id, t_ctx,
'pod_1', 1, True)
add_subport_id1 = self._prepare_port_test(project_id, t_ctx, 'pod_1',
1, create_bottom=False)
add_subport_id2 = self._prepare_port_test(project_id, t_ctx, 'pod_1',
2, create_bottom=False)
add_subport_id3 = self._prepare_port_test(project_id, t_ctx, 'pod_1',
3, create_bottom=False)
add_subport_id4 = self._prepare_port_test(project_id, t_ctx, 'pod_1',
4, create_bottom=False)
add_subport_id5 = self._prepare_port_test(project_id, t_ctx, 'pod_1',
5, create_bottom=False)
add_subport_id6 = self._prepare_port_test(project_id, t_ctx, 'pod_1',
6, create_bottom=True)
add_subport_id7 = self._prepare_port_test(project_id, t_ctx, 'pod_1',
7, create_bottom=True)
add_subport_id8 = self._prepare_port_test(project_id, t_ctx, 'pod_1',
8, create_bottom=False)
add_subport_id9 = self._prepare_port_test(project_id, t_ctx, 'pod_1',
9, create_bottom=False)
# Avoid warning: assigned to but never used
ids = [add_subport_id1, add_subport_id2, add_subport_id3,
add_subport_id4, add_subport_id5, add_subport_id6,
add_subport_id7, add_subport_id8, add_subport_id9]
ids.sort()
remove_subports = {'segmentation_type': 'vlan',
'port_id': uuidutils.generate_uuid(),
'segmentation_id': 165}
b_trunk['sub_ports'].append(remove_subports)
add_subports = []
for _id in xrange(1, 10):
port_id = eval("add_subport_id%d" % _id)
subport = {
'segmentation_type': 'vlan',
'port_id': port_id,
'segmentation_id': _id}
add_subports.append(subport)
fake_plugin.add_subports(q_ctx, t_trunk['id'],
{'sub_ports': add_subports})
top_subports = TOP_TRUNKS[0]['sub_ports']
btm_subports = BOTTOM1_TRUNKS[0]['sub_ports']
except_btm_subports = []
for subport in b_trunk['sub_ports']:
if subport['segmentation_id'] == 164:
except_btm_subports.extend([subport])
for subport in add_subports:
subport['trunk_id'] = b_trunk['id']
except_btm_subports.extend(add_subports)
six.assertCountEqual(self, btm_subports, except_btm_subports)
except_top_subports = []
for subport in t_trunk['sub_ports']:
if subport['segmentation_id'] == 164:
except_top_subports.extend([subport])
for subport in add_subports:
subport['trunk_id'] = t_trunk['id']
except_top_subports.extend(add_subports)
except_btm_subports.extend(add_subports)
six.assertCountEqual(self, top_subports, except_top_subports)
self.assertEqual(len(BOTTOM1_PORTS), 10)
map_filters = [{'key': 'resource_type',
'comparator': 'eq',
'value': constants.RT_PORT},
{'key': 'project_id',
'comparator': 'eq',
'value': project_id}]
port_mappings = db_api.list_resource_routings(t_ctx, map_filters)
self.assertEqual(len(port_mappings), 10)
@patch.object(db_base_plugin_v2.NeutronDbPluginV2, 'update_port',
new=FakeCorePlugin.update_port)
@patch.object(context, 'get_context_from_neutron_context',
new=fake_get_context_from_neutron_context)
def test_remove_subports(self):
project_id = TEST_TENANT_ID
q_ctx = FakeNeutronContext()
t_ctx = context.get_db_context()
self._basic_pod_setup()
fake_plugin = FakePlugin()
t_trunk, b_trunk = self._prepare_trunk_test(project_id, t_ctx,
'pod_1', 1, True)
subport_id = t_trunk['sub_ports'][0]['port_id']
remove_subport = {'sub_ports': [{'port_id': subport_id}]}
fake_plugin.remove_subports(q_ctx, t_trunk['id'], remove_subport)
top_subports = TOP_TRUNKS[0]['sub_ports']
btm_subports = BOTTOM1_TRUNKS[0]['sub_ports']
self.assertEqual(len(top_subports), 0)
self.assertEqual(len(btm_subports), 0)
def tearDown(self):
core.ModelBase.metadata.drop_all(core.get_engine())
test_utils.get_resource_store().clean()
cfg.CONF.unregister_opts(q_config.core_opts)
xmanager.IN_TEST = False

View File

@ -17,15 +17,62 @@ from mock import patch
import six
import unittest
import neutronclient.common.exceptions as q_cli_exceptions
from oslo_config import cfg
from oslo_utils import uuidutils
import neutron.conf.common as q_config
from neutron_lib.api.definitions import portbindings
import neutron_lib.constants as q_constants
import neutron_lib.exceptions as q_exceptions
import neutronclient.common.exceptions as q_cli_exceptions
from tricircle.common import context
from tricircle.common import exceptions as t_exceptions
import tricircle.db.api as db_api
from tricircle.db import core
from tricircle.network import helper
import tricircle.tests.unit.utils as test_utils
_resource_store = test_utils.get_resource_store()
TOP_NETS = _resource_store.TOP_NETWORKS
TOP_SUBNETS = _resource_store.TOP_SUBNETS
BOTTOM1_NETS = _resource_store.BOTTOM1_NETWORKS
BOTTOM1_PORTS = _resource_store.BOTTOM1_PORTS
BOTTOM1_ROUTERS = _resource_store.BOTTOM1_ROUTERS
class FakeClient(object):
def get_resource_list(_type, is_top):
pod = 'top' if is_top else 'pod_1'
return _resource_store.pod_store_map[pod][_type]
def get_resource(_type, is_top, resource_id):
for resource in get_resource_list(_type, is_top):
if resource['id'] == resource_id:
return resource
raise q_exceptions.NotFound()
def list_resource(_type, is_top, filters=None):
resource_list = get_resource_list(_type, is_top)
if not filters:
return [resource for resource in get_resource_list(
_type, is_top)]
ret = []
for resource in resource_list:
pick = True
for key, value in six.iteritems(filters):
if resource.get(key) not in value:
pick = False
break
if pick:
ret.append(resource)
return ret
class FakeClient(test_utils.FakeClient):
def __init__(self, region_name=None):
pass
super(FakeClient, self).__init__(region_name)
def create_ports(self, context, body):
for port in body['ports']:
@ -36,10 +83,192 @@ class FakeClient(object):
port['id'] = port['name'].split('_')[-1]
return body['ports']
def list_networks(self, ctx, filters=None):
networks = self.list_resources('network', ctx, filters)
return networks
def delete_networks(self, ctx, net_id):
self.delete_resources('network', ctx, net_id)
def list_subnets(self, ctx, filters=None):
return self.list_resources('subnet', ctx, filters)
def get_subnets(self, ctx, subnet_id):
return self.get_resource('subnet', ctx, subnet_id)
def delete_subnets(self, ctx, subnet_id):
self.delete_resources('subnet', ctx, subnet_id)
def list_routers(self, ctx, filters=None):
return self.list_resources('router', ctx, filters)
def delete_routers(self, ctx, router_id):
self.delete_resources('router', ctx, router_id)
def action_routers(self, ctx, action, *args, **kwargs):
router_id, body = args
if action == 'add_gateway':
port = {
'admin_state_up': True,
'id': uuidutils.generate_uuid(),
'name': '',
'network_id': body['network_id'],
'fixed_ips': '10.0.1.1',
'mac_address': '',
'device_id': router_id,
'device_owner': 'network:router_gateway',
'binding:vif_type': 'ovs',
'binding:host_id': 'host_1'
}
BOTTOM1_PORTS.append(test_utils.DotDict(port))
elif action == 'remove_gateway':
self.delete_routers(ctx, router_id)
class HelperTest(unittest.TestCase):
def setUp(self):
core.initialize()
core.ModelBase.metadata.create_all(core.get_engine())
cfg.CONF.register_opts(q_config.core_opts)
self.helper = helper.NetworkHelper()
self.context = context.Context()
def _prepare_pods(self):
pod1 = {'pod_id': 'pod_id_1',
'region_name': 'pod_1',
'az_name': 'az_name_1'}
pod2 = {'pod_id': 'pod_id_2',
'region_name': 'pod_2',
'az_name': 'az_name_2'}
pod3 = {'pod_id': 'pod_id_0',
'region_name': 'top_pod',
'az_name': ''}
for pod in (pod1, pod2, pod3):
db_api.create_pod(self.context, pod)
def _prepare_top_network(self, project_id,
network_type='vlan', az_hints=None):
t_net_id = uuidutils.generate_uuid()
t_subnet_id = uuidutils.generate_uuid()
t_net = {
'id': t_net_id,
'name': t_net_id,
'tenant_id': project_id,
'project_id': project_id,
'description': 'description',
'admin_state_up': False,
'shared': False,
'provider:network_type': network_type,
'availability_zone_hints': az_hints
}
t_subnet = {
'id': t_subnet_id,
'network_id': t_net_id,
'name': t_subnet_id,
'ip_version': 4,
'cidr': '10.0.1.0/24',
'allocation_pools': [],
'enable_dhcp': True,
'gateway_ip': '10.0.1.1',
'ipv6_address_mode': q_constants.IPV6_SLAAC,
'ipv6_ra_mode': q_constants.IPV6_SLAAC,
'tenant_id': project_id,
'project_id': project_id,
'description': 'description',
'host_routes': [],
'dns_nameservers': [],
'segment_id': 'b85fd910-e483-4ef1-bdf5-b0f747d0b0d5'
}
TOP_NETS.append(test_utils.DotDict(t_net))
TOP_SUBNETS.append(test_utils.DotDict(t_subnet))
return t_net, t_subnet
def _prepare_bottom_network(self, project_id, b_uuid=None,
network_type='vlan', az_hints=None):
b_net_id = b_uuid or uuidutils.generate_uuid()
b_net = {
'id': b_net_id,
'name': b_net_id,
'tenant_id': project_id,
'project_id': project_id,
'description': 'description',
'admin_state_up': False,
'shared': False,
'provider:network_type': network_type,
'availability_zone_hints': az_hints
}
BOTTOM1_NETS.append(test_utils.DotDict(b_net))
return b_net
def _prepare_router(self, project_id, router_az_hints=None):
b_router_id = uuidutils.generate_uuid()
b_router = {
'id': b_router_id,
'name': b_router_id,
'distributed': False,
'tenant_id': project_id,
'attached_ports': test_utils.DotList(),
'extra_attributes': {
'availability_zone_hints': router_az_hints
}
}
BOTTOM1_ROUTERS.append(test_utils.DotDict(b_router))
return b_router_id
def test_is_local_network(self):
net = {
'provider:network_type': 'vlan',
'availability_zone_hints': []
}
self.assertFalse(self.helper.is_local_network(self.context, net))
net = {
'provider:network_type': 'vlan',
'availability_zone_hints': ['pod_1', 'pod_1']
}
self.assertFalse(self.helper.is_local_network(self.context, net))
net = {
'provider:network_type': 'vlan',
'availability_zone_hints': ['pod_1']
}
self._prepare_pods()
self.assertTrue(self.helper.is_local_network(self.context, net))
def test_fill_binding_info(self):
port_body = {
portbindings.PROFILE: 'Open vSwitch agent'
}
self.helper.fill_binding_info(port_body)
self.assertEqual(port_body, {
portbindings.PROFILE: 'Open vSwitch agent',
portbindings.VIF_DETAILS: {'port_filter': True,
'ovs_hybrid_plug': True},
portbindings.VIF_TYPE: portbindings.VIF_TYPE_OVS,
portbindings.VNIC_TYPE: portbindings.VNIC_NORMAL
})
@patch.object(helper.NetworkHelper, '_get_client', new=FakeClient)
@patch.object(context, 'get_context_from_neutron_context')
def test_prepare_top_element(self, mock_context):
mock_context.return_value = self.context
self._prepare_pods()
t_port_id = uuidutils.generate_uuid()
port_body = {
'port': {
'id': t_port_id,
'name': t_port_id,
'fixed_ips': [{'ip_address': '10.0.1.1'}],
'mac_address': 'fa:16:3e:d4:01:01',
'device_id': None
}
}
self.helper.prepare_top_element(
self.context, None, test_utils.TEST_TENANT_ID,
{'pod_id': 'pod_id_0', 'region_name': 'top_pod'},
{'id': t_port_id}, 'port', port_body)
t_ports = list_resource('port', True)
self.assertEqual(t_ports[0]['id'], t_port_id)
def test_get_create_subnet_body(self):
t_net_id = uuidutils.generate_uuid()
@ -123,3 +352,154 @@ class HelperTest(unittest.TestCase):
port_bodys, agents, 5)
req_port_ids = [port['id'] for port in port_bodys]
six.assertCountEqual(self, ret_port_ids, req_port_ids)
@patch.object(helper.NetworkHelper, '_get_client', new=FakeClient)
def test_prepare_shadow_port(self):
self._prepare_pods()
port_body = {
'id': uuidutils.generate_uuid(),
'fixed_ips': [{'ip_address': '10.0.1.1'}],
'mac_address': 'fa:16:3e:d4:01:01',
'binding:host_id': 'host1',
'device_id': None
}
agent = {'type': 'Open vSwitch agent', 'tunnel_ip': '192.168.1.101'}
self.helper.prepare_shadow_port(
self.context, 'project_id',
{'pod_id': 'pod_id_1', 'region_name': 'pod_1'},
'net-id-1', port_body, agent)
sw_ports = list_resource('port', False)
self.assertEqual(len(sw_ports), 1)
@patch.object(helper.NetworkHelper, '_get_client', new=FakeClient)
@patch.object(context, 'get_context_from_neutron_context')
def test_prepare_bottom_router(self, mock_context):
self._prepare_pods()
mock_context.return_value = self.context
net = {
'availability_zone_hints': ['az_name_1'],
'tenant_id': test_utils.TEST_TENANT_ID
}
self.helper.prepare_bottom_router(self.context, net, 'fake_router_1')
b_routers = list_resource('router', False)
self.assertEqual(len(b_routers), 1)
@patch.object(helper.NetworkHelper, '_get_client', new=FakeClient)
@patch.object(context, 'get_context_from_neutron_context')
def test_remove_bottom_router_by_name(self, mock_context):
router_id = self._prepare_router(test_utils.TEST_TENANT_ID,
router_az_hints='az_name_1')
mock_context.return_value = self.context
b_router = get_resource('router', False, router_id)
self.assertIsNotNone(b_router['id'])
self.helper.remove_bottom_router_by_name(
self.context, 'pod_1', router_id)
self.assertRaises(q_exceptions.NotFound, get_resource,
'router', False, router_id)
@patch.object(helper.NetworkHelper, '_get_client', new=FakeClient)
@patch.object(context, 'get_context_from_neutron_context')
def test_prepare_bottom_router_gateway(self, mock_context):
self._prepare_pods()
mock_context.return_value = self.context
self.assertRaises(t_exceptions.NotFound,
self.helper.prepare_bottom_router_gateway,
self.context, 'pod_1', 'fake_router')
router_id = self._prepare_router(test_utils.TEST_TENANT_ID,
router_az_hints='az_name_1')
self.assertRaises(t_exceptions.NotFound,
self.helper.prepare_bottom_router_gateway,
self.context, 'pod_1', router_id)
b_net_id = uuidutils.generate_uuid()
b_net = {
'id': b_net_id,
'name': router_id,
'tenant_id': test_utils.TEST_TENANT_ID,
'project_id': test_utils.TEST_TENANT_ID,
'description': 'description',
'admin_state_up': False,
'shared': False,
'provider:network_type': 'vlan',
'availability_zone_hints': None
}
BOTTOM1_NETS.append(test_utils.DotDict(b_net))
self.helper.prepare_bottom_router_gateway(
self.context, 'pod_1', router_id)
b_gw_ports = list_resource('port', False)
self.assertEqual(b_gw_ports[0]['device_id'], router_id)
@patch.object(helper.NetworkHelper, '_get_client', new=FakeClient)
@patch.object(context, 'get_context_from_neutron_context')
def test_remove_bottom_router_gateway(self, mock_context):
self._prepare_pods()
mock_context.return_value = self.context
self.assertRaises(t_exceptions.NotFound,
self.helper.remove_bottom_router_gateway,
self.context, 'pod_1', 'fake_router')
router_id = self._prepare_router(test_utils.TEST_TENANT_ID,
router_az_hints='az_name_1')
b_routers = list_resource('router', False)
self.assertEqual(b_routers[0]['id'], router_id)
self.helper.remove_bottom_router_gateway(
self.context, 'pod_1', router_id)
self.assertRaises(q_exceptions.NotFound, get_resource,
'router', False, router_id)
@patch.object(helper.NetworkHelper, '_get_client', new=FakeClient)
@patch.object(context, 'get_context_from_neutron_context')
def test_remove_bottom_external_network_by_name(self, mock_context):
mock_context.return_value = self.context
b_net = self._prepare_bottom_network(test_utils.TEST_TENANT_ID,
az_hints='az_name_1')
b_net_req = get_resource('network', False, b_net['id'])
self.assertEqual(b_net_req['id'], b_net['id'])
self.helper.remove_bottom_external_network_by_name(
self.context, 'pod_1', b_net['id'])
self.assertRaises(q_exceptions.NotFound, get_resource,
'network', False, b_net['id'])
@patch.object(helper.NetworkHelper, '_get_client', new=FakeClient)
@patch.object(context, 'get_context_from_neutron_context')
def test_prepare_bottom_external_subnet_by_bottom_name(self, mock_context):
self._prepare_pods()
mock_context.return_value = self.context
t_net, t_subnet = self._prepare_top_network(test_utils.TEST_TENANT_ID)
self.assertRaises(
t_exceptions.InvalidInput,
self.helper.prepare_bottom_external_subnet_by_bottom_name,
self.context, t_subnet, 'pod_1',
'fake_bottom_network_name', t_subnet['id'])
b_net = self._prepare_bottom_network(
test_utils.TEST_TENANT_ID, b_uuid=t_net['id'])
self.helper.prepare_bottom_external_subnet_by_bottom_name(
self.context, t_subnet, 'pod_1',
b_net['name'], t_subnet['id'])
b_subnets = list_resource('subnet', False)
self.assertEqual(len(b_subnets), 1)
@patch.object(helper.NetworkHelper, '_get_client', new=FakeClient)
@patch.object(context, 'get_context_from_neutron_context')
def test_remove_bottom_external_subnet_by_name(self, mock_context):
self._prepare_pods()
mock_context.return_value = self.context
t_net, t_subnet = self._prepare_top_network(test_utils.TEST_TENANT_ID)
b_net = self._prepare_bottom_network(
test_utils.TEST_TENANT_ID, b_uuid=t_net['id'])
self.helper.prepare_bottom_external_subnet_by_bottom_name(
self.context, t_subnet, 'pod_1',
b_net['name'], t_subnet['id'])
b_subnets = list_resource('subnet', False)
self.helper.remove_bottom_external_subnet_by_name(
self.context, 'pod_1', b_subnets[0]['name'])
self.assertRaises(q_exceptions.NotFound, get_resource,
'subnet', False, t_subnet['id'])
def tearDown(self):
core.ModelBase.metadata.drop_all(core.get_engine())
test_utils.get_resource_store().clean()
cfg.CONF.unregister_opts(q_config.core_opts)

View File

@ -23,6 +23,7 @@ from oslo_config import cfg
from oslo_utils import uuidutils
import neutron.conf.common as q_config
import neutron.extensions.securitygroup as ext_sg
from neutron.services.trunk import exceptions as t_exc
from neutron_lib.api.definitions import portbindings
import neutron_lib.constants as q_constants
@ -91,6 +92,13 @@ def list_resource(_type, is_top, filters=None):
return ret
def delete_resource(_type, is_top, resource_id):
for resource in get_resource_list(_type, is_top):
if resource['id'] == resource_id:
return get_resource_list(_type, is_top).remove(resource)
raise q_exceptions.NotFound()
class FakeTypeManager(object):
def __init__(self):
@ -121,6 +129,9 @@ class FakeCorePlugin(object):
def update_subnet(self, context, _id, subnet):
return update_resource('subnet', False, _id, subnet['subnet'])
def delete_subnet(self, context, _id):
return delete_resource('subnet', False, _id)
def get_subnet(self, context, _id, fields=None):
return get_resource('subnet', False, _id)
@ -132,6 +143,13 @@ class FakeCorePlugin(object):
create_resource('port', False, port['port'])
return port['port']
def create_port_bulk(self, context, ports):
ret_ports = []
for port in ports['ports']:
create_resource('port', False, port['port'])
ret_ports.append(port['port'])
return ret_ports
def update_port(self, context, _id, port):
return update_resource('port', False, _id, port['port'])
@ -142,6 +160,9 @@ class FakeCorePlugin(object):
limit=None, marker=None, page_reverse=False):
return list_resource('port', False, filters)
def delete_port(self, context, _id, l3_port_check=False):
delete_resource('port', False, _id)
def create_security_group(self, context, security_group, default_sg=False):
create_resource('security_group', False,
security_group['security_group'])
@ -150,6 +171,11 @@ class FakeCorePlugin(object):
def get_security_group(self, context, _id, fields=None, tenant_id=None):
return get_resource('security_group', False, _id)
def get_security_groups(self, context, filters=None, fields=None,
sorts=None, limit=None, marker=None,
page_reverse=False, default_sg=False):
return list_resource('security_group', False, filters)
def get_agents(self, context, filters=None, fields=None):
return list_resource('agent', False, filters)
@ -254,6 +280,9 @@ class FakeNeutronHandle(object):
return [trunk]
return []
def handle_delete(self, context, _type, _id):
delete_resource(_type, True, _id)
class FakePlugin(plugin.TricirclePlugin):
def __init__(self):
@ -391,6 +420,61 @@ class PluginTest(unittest.TestCase):
TOP_PORTS.append(t_port)
return t_port
def test__in_subnet_delete(self):
self.context.request_id = None
self.assertEqual(False, self.plugin._in_subnet_delete(self.context))
def test__adapt_network_body(self):
network = {'provider:network_type': constants.NT_LOCAL}
self.plugin._adapt_network_body(network)
self.assertEqual({}, network)
def test__adapt_port_body_for_call(self):
port = {}
self.plugin._adapt_port_body_for_call(port)
self.assertIsNotNone(port['mac_address'])
self.assertIsNotNone(port['fixed_ips'])
def test__construct_params(self):
filters = {'filter': 'aaa'}
sorts = [['name', True]]
limit = 10
marker = 'bbb'
params = {'filter': 'aaa', 'sort_key': ['name'],
'limit': limit, 'marker': marker}
params.update({'sort_dir': ['desc']})
self.assertEqual(params,
self.plugin._construct_params(
filters, sorts, limit, marker, True))
params.update({'sort_dir': ['asc']})
self.assertEqual(params,
self.plugin._construct_params(
filters, sorts, limit, marker, False))
def test__get_neutron_region(self):
cfg.CONF.set_override('local_region_name', None, 'tricircle')
cfg.CONF.set_override('region_name', 'Pod1', 'nova')
self.assertEqual('Pod1', self.plugin._get_neutron_region())
@patch.object(t_context, 'get_context_from_neutron_context', new=mock.Mock)
def test__ensure_subnet(self):
t_net, t_subnet, t_port, _ = self._prepare_resource()
b_net = copy.deepcopy(t_net)
subnet_ids = self.plugin._ensure_subnet(
self.context, b_net, is_top=False)
self.assertEqual(t_net['subnets'], subnet_ids)
b_net['subnets'] = []
subnet_ids = self.plugin._ensure_subnet(
self.context, b_net, is_top=False)
self.assertEqual(t_net['subnets'], subnet_ids)
t_net['subnets'] = []
self.assertEqual([], self.plugin._ensure_subnet(self.context, t_net))
@patch.object(t_context, 'get_context_from_neutron_context', new=mock.Mock)
def test_get_subnet_no_bottom_network(self):
t_net, t_subnet, t_port, _ = self._prepare_resource()
@ -399,8 +483,9 @@ class PluginTest(unittest.TestCase):
t_subnet, t_port)
self._validate(b_net, b_subnet, b_port, t_net, t_subnet, t_port)
@patch.object(t_context, 'get_context_from_neutron_context', new=mock.Mock)
def test_get_subnet(self):
@patch.object(t_context, 'get_context_from_neutron_context')
def test_get_subnet(self, mock_context):
mock_context.return_value = self.context
t_net, t_subnet, t_port, _ = self._prepare_resource()
self.plugin.get_network(self.context, t_net['id'])
self.plugin.get_subnet(self.context, t_subnet['id'])
@ -408,14 +493,73 @@ class PluginTest(unittest.TestCase):
t_net, t_subnet, t_port)
self._validate(b_net, b_subnet, b_port, t_net, t_subnet, t_port)
def test_create_subnet(self):
_, t_subnet, _, _ = self._prepare_resource()
subnet = {'subnet': t_subnet}
self.plugin.create_subnet(self.context, subnet)
self.assertDictEqual(t_subnet,
get_resource('subnet', False, t_subnet['id']))
delete_resource('subnet', False, t_subnet['id'])
t_subnet['name'] = t_subnet['id']
self.plugin.create_subnet(self.context, subnet)
self.assertDictEqual(t_subnet,
get_resource('subnet', False, t_subnet['id']))
@patch.object(t_context, 'get_context_from_neutron_context', new=mock.Mock)
def test_get_network(self):
def test__create_bottom_network(self):
self.plugin.neutron_handle.handle_get = mock.Mock(return_value=None)
self.assertRaises(q_exceptions.NetworkNotFound,
self.plugin._create_bottom_network,
self.context, 'fake_net_id')
t_net, _, _, _ = self._prepare_resource()
self.plugin.neutron_handle.handle_get = mock.Mock(return_value=t_net)
_, b_net = self.plugin._create_bottom_network(
self.context, t_net['id'])
self.assertDictEqual(b_net,
get_resource('network', False, t_net['id']))
def test_create_network(self):
t_net, t_subnet, t_port, _ = self._prepare_resource()
network = {'network': t_net}
self.plugin.create_network(self.context, network)
b_net = get_resource('network', False, t_net['id'])
self.assertDictEqual(t_net, b_net)
t_net['id'] = uuidutils.generate_uuid()
t_net['name'] = None
self.plugin.create_network(self.context, network)
b_net = get_resource('network', False, t_net['id'])
self.assertDictEqual(t_net, b_net)
t_net['id'] = None
t_net['name'] = uuidutils.generate_uuid()
self.plugin.create_network(self.context, network)
b_net = get_resource('network', False, t_net['id'])
t_net['id'] = t_net['name']
self.assertDictEqual(t_net, b_net)
@patch.object(t_context, 'get_context_from_neutron_context')
def test_get_network(self, mock_context):
t_net, t_subnet, t_port, _ = self._prepare_resource()
self.plugin._start_subnet_delete(self.context)
self.assertRaises(q_exceptions.NotFound,
self.plugin.get_network, self.context, t_net['id'])
self.plugin._end_subnet_delete(self.context)
self.plugin.get_network(self.context, t_net['id'])
b_net, b_subnet, b_port = self._get_bottom_resources_with_net(
t_net, t_subnet, t_port)
self._validate(b_net, b_subnet, b_port, t_net, t_subnet, t_port)
mock_context.return_value = self.context
mock_context.return_value.auth_token = None
self.assertRaises(q_exceptions.NetworkNotFound,
self.plugin.get_network,
self.context, uuidutils.generate_uuid())
@patch.object(t_context, 'get_context_from_neutron_context', new=mock.Mock)
def test_get_network_no_gateway(self):
t_net, t_subnet, t_port, _ = self._prepare_resource()
@ -442,6 +586,21 @@ class PluginTest(unittest.TestCase):
self._validate(b_net1, b_subnet1, b_port1, t_net1, t_subnet1, t_port1)
self._validate(b_net2, b_subnet2, b_port2, t_net2, t_subnet2, t_port2)
except_networks = [{
'id': net['id'],
'name': net['name'],
'project_id': net['tenant_id'],
'provider:network_type': constants.NT_VLAN,
'subnets': net['subnets'],
'tenant_id': net['tenant_id']
} for net in [t_net1, t_net2]]
self.assertListEqual(
except_networks, self.plugin.get_networks(self.context))
self.assertListEqual(
except_networks, self.plugin.get_networks(self.context,
{'id': [t_net1['id'],
t_net2['id']]}))
@patch.object(t_context, 'get_context_from_neutron_context', new=mock.Mock)
@patch.object(client.Client, 'get_admin_token', new=mock.Mock)
def test_get_invaild_networks(self):
@ -454,10 +613,31 @@ class PluginTest(unittest.TestCase):
nets = self.plugin.get_networks(self.context, net_filter)
six.assertCountEqual(self, nets, [])
@patch.object(t_context, 'get_context_from_neutron_context', new=mock.Mock)
@patch.object(t_context, 'get_context_from_neutron_context')
@patch.object(FakeNeutronHandle, 'handle_get')
def test_get_subnet_notfound(self, mock_handle_get, mock_context):
t_net, t_subnet, t_port, _ = self._prepare_resource(
az_hints='fake_region')
self.assertRaises(q_exceptions.SubnetNotFound,
self.plugin.get_subnet,
self.context, t_port['id'])
mock_handle_get.return_value = None
self.assertRaises(q_exceptions.SubnetNotFound,
self.plugin.get_subnet,
self.context, uuidutils.generate_uuid())
mock_context.return_value = self.context
mock_context.return_value.auth_token = None
self.assertRaises(q_exceptions.SubnetNotFound,
self.plugin.get_subnet,
self.context, uuidutils.generate_uuid())
@patch.object(t_context, 'get_context_from_neutron_context')
@patch.object(client.Client, 'get_admin_token', new=mock.Mock)
def test_get_subnets(self):
def test_get_subnets(self, mock_context):
az_hints = ['Pod1', 'Pod2']
mock_context.return_value = self.context
t_net1, t_subnet1, t_port1, _ = self._prepare_resource()
t_net2, t_subnet2, t_port2, _ = self._prepare_resource(az_hints)
cfg.CONF.set_override('region_name', 'Pod1', 'nova')
@ -471,6 +651,20 @@ class PluginTest(unittest.TestCase):
self._validate(b_net1, b_subnet1, b_port1, t_net1, t_subnet1, t_port1)
self._validate(b_net2, b_subnet2, b_port2, t_net2, t_subnet2, t_port2)
delete_resource('subnet', False, t_subnet1['id'])
t_net1, t_subnet1, t_port1, _ = self._prepare_resource()
b_subnets = self.plugin.get_subnets(self.context)
self.assertEqual(len(b_subnets), 1)
b_subnets = self.plugin.get_subnets(self.context, {
'id': [t_subnet1['id'], t_subnet2['id']]})
self.assertEqual(len(b_subnets), 2)
mock_context.return_value.auth_token = None
b_subnets = self.plugin.get_subnets(self.context, {
'id': [t_subnet1['id'], t_subnet2['id'], 'fake_net_id']})
self.assertEqual(len(b_subnets), 2)
@patch.object(t_context, 'get_context_from_neutron_context', new=mock.Mock)
@patch.object(client.Client, 'get_admin_token', new=mock.Mock)
def test_get_invaild_subnets(self):
@ -495,6 +689,42 @@ class PluginTest(unittest.TestCase):
b_port = get_resource('port', False, t_port['id'])
self.assertDictEqual(t_port, b_port)
@patch.object(t_context, 'get_context_from_neutron_context', new=mock.Mock)
def test_create_port_route_snat(self):
t_net, t_subnet, t_port, _ = self._prepare_resource()
port = {'name': 'route_snat',
'fixed_ips': q_constants.ATTR_NOT_SPECIFIED,
'network_id': t_net['id'],
'device_owner': q_constants.DEVICE_OWNER_ROUTER_SNAT}
t_port = self.plugin.create_port(self.context, {'port': port})
b_port = get_resource('port', False, t_port['id'])
self.assertDictEqual(t_port, b_port)
port = {'id': uuidutils.generate_uuid(),
'name': 'route_snat',
'fixed_ips': [{'subnet_id': t_subnet['id'],
'ip_address': '10.0.1.3'}],
'network_id': t_net['id'],
'device_owner': q_constants.DEVICE_OWNER_ROUTER_SNAT}
t_snat_port = {'id': uuidutils.generate_uuid(),
'tenant_id': self.tenant_id,
'admin_state_up': True,
'name': constants.snat_port_name % t_subnet['id'],
'network_id': t_net['id'],
'mac_address': 'fa:16:3e:96:41:03',
'device_owner': q_constants.DEVICE_OWNER_ROUTER_SNAT,
'device_id': 'reserved_snat_port',
'fixed_ips': [{'subnet_id': t_subnet['id'],
'ip_address': '10.0.1.3'}],
'binding:profile': {}}
TOP_PORTS.append(t_snat_port)
t_port = self.plugin.create_port(self.context, {'port': port})
b_port = get_resource('port', False, t_port['id'])
self.assertDictEqual(t_port, b_port)
@patch.object(t_context, 'get_context_from_neutron_context', new=mock.Mock)
def test_create_port_lbaas(self):
t_net, t_subnet, t_port, _ = self._prepare_resource()
@ -582,8 +812,37 @@ class PluginTest(unittest.TestCase):
# "agent" extension and body contains tunnel ip
mock_agent.assert_has_calls([mock.call(self.context, agent_state)])
@patch.object(FakePlugin, '_ensure_trunk', new=mock.Mock)
@patch.object(t_context, 'get_context_from_neutron_context', new=mock.Mock)
def test_create_port_bulk(self):
t_net, t_subnet, t_port, t_sg = self._prepare_resource()
t_ports = []
for i in (1, 2):
t_vm_port = self._prepare_vm_port(t_net, t_subnet, i, [t_sg['id']])
t_ports.append(t_vm_port)
self.plugin.get_ports(self.context,
{'id': [t_ports[0]['id'], t_ports[1]['id'],
'fake_port_id']})
b_ports = []
b_port1 = get_resource('port', False, t_ports[0]['id'])
b_port1['device_owner'] = constants.DEVICE_OWNER_SHADOW
b_port1['name'] = 'shadow_' + b_port1['id']
b_ports.append({'port': b_port1})
b_port2 = get_resource('port', False, t_ports[1]['id'])
b_port2['device_owner'] = constants.DEVICE_OWNER_SUBPORT
b_port2['device_id'] = b_port2['id']
b_ports.append({'port': b_port2})
t_vm_port = self._prepare_vm_port(t_net, t_subnet, 3, [t_sg['id']])
t_vm_port['device_owner'] = None
b_ports.append({'port': t_vm_port})
ret_b_ports = self.plugin.create_port_bulk(
self.context, {'ports': b_ports})
self.assertEqual(len(ret_b_ports), 2)
self.assertListEqual(b_ports, [{'port': b_port2}, {'port': t_vm_port}])
@patch.object(t_context, 'get_context_from_neutron_context', new=mock.Mock)
@patch.object(FakePlugin, '_ensure_trunk', new=mock.Mock)
def test_get_port(self):
t_net, t_subnet, t_port, _ = self._prepare_resource()
@ -592,6 +851,21 @@ class PluginTest(unittest.TestCase):
b_port = get_resource('port', False, t_port['id'])
self.assertDictEqual(t_port, b_port)
@patch.object(FakePlugin, '_ensure_trunk', new=mock.Mock)
@patch.object(t_context, 'get_context_from_neutron_context')
@patch.object(FakeNeutronHandle, 'handle_get')
def test_get_port_notfound(self, mock_handle_get, mock_context):
mock_context.return_value = self.context
mock_context.return_value.auth_token = None
self.assertRaises(q_exceptions.PortNotFound,
self.plugin.get_port, self.context, 'fake_port_id')
mock_context.return_value.auth_token = 'fake_auth_token'
mock_handle_get.return_value = None
self.assertRaises(q_exceptions.PortNotFound,
self.plugin.get_port,
self.context, uuidutils.generate_uuid())
@patch.object(t_context, 'get_context_from_neutron_context', new=mock.Mock)
@patch.object(plugin.TricirclePlugin, '_handle_security_group',
new=mock.Mock)
@ -621,8 +895,10 @@ class PluginTest(unittest.TestCase):
mock_create_trunk.assert_called_once_with(self.context,
{'trunk': t_trunk})
@patch.object(t_context, 'get_context_from_neutron_context', new=mock.Mock)
def test_get_ports(self):
@patch.object(t_context, 'get_context_from_neutron_context')
@patch.object(FakeCorePlugin, 'get_ports')
def test_get_ports(self, mock_get_ports, mock_context):
mock_context.return_value = self.context
t_net, t_subnet, t_port, t_sg = self._prepare_resource()
t_ports = []
for i in (1, 2):
@ -636,6 +912,21 @@ class PluginTest(unittest.TestCase):
b_port.pop('project_id')
self.assertDictEqual(t_ports[i], b_port)
self.plugin.get_ports(self.context)
mock_get_ports.assert_called_with(self.context,
None, None, None, None, None, False)
mock_get_ports.return_value = t_ports
b_ports = self.plugin.get_ports(
self.context, {'id': [t_ports[0]['id'], t_ports[1]['id']]})
self.assertEqual(len(b_ports), 2)
mock_context.return_value.auth_token = None
b_ports = self.plugin.get_ports(
self.context, {'id': [t_ports[0]['id'], t_ports[1]['id'],
'fake_port_id']})
self.assertEqual(len(b_ports), 2)
@patch.object(t_context, 'get_context_from_neutron_context')
@patch.object(FakeNeutronHandle, 'handle_update')
def test_update_port(self, mock_update, mock_context):
@ -784,6 +1075,45 @@ class PluginTest(unittest.TestCase):
# port status is update to active
self.assertEqual(q_constants.PORT_STATUS_ACTIVE, b_port['status'])
@patch.object(t_context, 'get_context_from_neutron_context')
def test_delete_port(self, mock_context):
mock_context.return_value = self.context
t_net, _, t_port, _ = self._prepare_resource()
port = {
'port': {'network_id': t_net['id'],
'fixed_ips': q_constants.ATTR_NOT_SPECIFIED,
'device_owner': q_constants.DEVICE_OWNER_ROUTER_SNAT,
'name': 'test-port',
'security_groups': []}
}
b_port = self.plugin.create_port(self.context, port)
b_port_valid = get_resource('port', False, b_port['id'])
self.assertEqual(b_port_valid['id'], b_port['id'])
self.plugin.delete_port(self.context, b_port['id'])
self.assertRaises(q_exceptions.NotFound,
get_resource, 'port', False, b_port['id'])
port = {
'port': {'network_id': t_net['id'],
'fixed_ips': q_constants.ATTR_NOT_SPECIFIED,
'device_owner': q_constants.DEVICE_OWNER_COMPUTE_PREFIX,
'name': 'test-port',
'security_groups': []}
}
b_port = self.plugin.create_port(self.context, port)
b_port_valid = get_resource('port', False, b_port['id'])
self.assertEqual(b_port_valid['id'], b_port['id'])
t_port = get_resource('port', True, b_port['id'])
self.assertEqual(b_port['id'], t_port['id'])
self.plugin.delete_port(self.context, b_port['id'])
self.assertRaises(q_exceptions.NotFound,
get_resource, 'port', False, b_port['id'])
self.assertRaises(q_exceptions.NotFound,
get_resource, 'port', True, t_port['id'])
@patch.object(t_context, 'get_context_from_neutron_context')
def test_update_subnet(self, mock_context):
_, t_subnet, t_port, _ = self._prepare_resource(enable_dhcp=False)
@ -798,6 +1128,64 @@ class PluginTest(unittest.TestCase):
b_port = get_resource('port', False, port_id)
self.assertEqual(b_port['device_owner'], 'network:dhcp')
@patch.object(t_context, 'get_context_from_neutron_context', new=mock.Mock)
def test_delete_subnet(self):
t_net, t_subnet, t_port, _ = self._prepare_resource(enable_dhcp=False)
self.plugin.get_network(self.context, t_net['id'])
self.plugin.delete_subnet(self.context, t_subnet['id'])
self.assertRaises(q_exceptions.NotFound,
get_resource, 'subnet', False, t_subnet['id'])
@patch.object(t_context, 'get_context_from_neutron_context', new=mock.Mock)
def test__handle_security_group(self):
t_ctx = t_context.get_db_context()
port = {'security_groups': q_constants.ATTR_NOT_SPECIFIED}
self.plugin._handle_security_group(t_ctx, self.context, port)
b_sgs = list_resource('security_group', False)
self.assertListEqual(b_sgs, [])
port = {'security_groups': []}
self.plugin._handle_security_group(t_ctx, self.context, port)
b_sgs = list_resource('security_group', False)
self.assertEqual(b_sgs, [])
@patch.object(t_context, 'get_context_from_neutron_context', new=mock.Mock)
@patch.object(FakeNeutronHandle, 'handle_get')
def test_get_security_group(self, mock_handle_get):
sg_id = uuidutils.generate_uuid()
mock_handle_get.return_value = None
self.assertRaises(ext_sg.SecurityGroupNotFound,
self.plugin.get_security_group,
self.context, sg_id)
@patch.object(t_context, 'get_context_from_neutron_context', new=mock.Mock)
@patch.object(FakeCorePlugin, 'get_security_groups')
def test_get_security_groups_mock(self, mock_get_sgs):
_, _, _, t_sg1 = self._prepare_resource()
_, _, _, t_sg2 = self._prepare_resource()
self.plugin.get_security_groups(self.context)
mock_get_sgs.assert_called_with(self.context,
None, None, None, None, None,
False, False)
@patch.object(t_context, 'get_context_from_neutron_context', new=mock.Mock)
def test_get_security_groups(self):
_, _, _, t_sg1 = self._prepare_resource()
_, _, _, t_sg2 = self._prepare_resource()
self.plugin.get_security_groups(self.context,
{'id': [t_sg1['id'], t_sg2['id'],
'fake_sg_id']})
b_sg = get_resource('security_group', False, t_sg1['id'])
self.assertEqual(b_sg['id'], t_sg1['id'])
b_sg = get_resource('security_group', False, t_sg2['id'])
self.assertEqual(b_sg['id'], t_sg2['id'])
b_sgs = self.plugin.get_security_groups(self.context,
{'id': [t_sg1['id'],
t_sg2['id']]})
self.assertEqual(len(b_sgs), 2)
def tearDown(self):
cfg.CONF.unregister_opts(q_config.core_opts)
test_utils.get_resource_store().clean()

View File

@ -22,6 +22,7 @@ from six.moves import xrange
import unittest
import neutron_lib.constants as q_constants
import neutronclient.common.exceptions as q_cli_exceptions
from oslo_config import cfg
from oslo_utils import uuidutils
@ -30,6 +31,7 @@ from tricircle.common import context
import tricircle.db.api as db_api
from tricircle.db import core
from tricircle.db import models
import tricircle.network.exceptions as t_network_exc
from tricircle.network import helper
from tricircle.xjob import xmanager
from tricircle.xjob import xservice
@ -60,38 +62,72 @@ RES_MAP = {'top': {'network': TOP_NETWORK,
'subnet': TOP_SUBNET,
'router': TOP_ROUTER,
'security_group': TOP_SG,
'floatingips': TOP_FIP},
'floatingip': TOP_FIP},
'pod_1': {'network': BOTTOM1_NETWORK,
'subnet': BOTTOM1_SUBNET,
'port': BOTTOM1_PORT,
'router': BOTTOM1_ROUTER,
'security_group': BOTTOM1_SG,
'floatingips': BOTTOM1_FIP},
'floatingip': BOTTOM1_FIP},
'pod_2': {'network': BOTTOM2_NETWORK,
'subnet': BOTTOM2_SUBNET,
'port': BOTTOM2_PORT,
'router': BOTTOM2_ROUTER,
'security_group': BOTTOM2_SG,
'floatingips': BOTTOM2_FIP}}
'floatingip': BOTTOM2_FIP}}
def fake_get_client(self, region_name=None):
return FakeClient(region_name)
class FakeXManager(xmanager.XManager):
def fake_create_floatingips(self, ctx, filters=None):
raise q_cli_exceptions.IpAddressInUseClient(
message='fake_create_floatingips')
class FakeBaseXManager(xmanager.XManager):
def __init__(self):
self.clients = {'top': FakeClient(),
'pod_1': FakeClient('pod_1'),
'pod_2': FakeClient('pod_2')}
self.helper = helper.NetworkHelper()
self.job_handles = {
constants.JT_CONFIGURE_ROUTE: self.configure_route,
constants.JT_ROUTER_SETUP: self.setup_bottom_router,
constants.JT_SHADOW_PORT_SETUP: self.setup_shadow_ports,
constants.JT_PORT_DELETE: self.delete_server_port}
def _get_client(self, region_name=None):
return FakeClient(region_name)
def setup_bottom_router(self, ctx, payload):
super(FakeBaseXManager, self).setup_bottom_router(ctx, payload=payload)
class FakeXManager(FakeBaseXManager):
def __init__(self):
super(FakeXManager, self).__init__()
self.xjob_handler = FakeXJobAPI()
class FakeXJobAPI(object):
def setup_shadow_ports(self, ctx, pod_id, t_net_id):
def __init__(self):
self.xmanager = FakeBaseXManager()
def configure_route(self, ctxt, project_id, router_id):
pass
def setup_bottom_router(self, ctxt, project_id, net_id, router_id, pod_id):
combine_id = '%s#%s#%s' % (pod_id, router_id, net_id)
self.xmanager.setup_bottom_router(
ctxt, payload={constants.JT_ROUTER_SETUP: combine_id})
def setup_shadow_ports(self, ctxt, pod_id, net_id):
combine_id = '%s#%s' % (pod_id, net_id)
self.xmanager.setup_shadow_ports(
ctxt, payload={constants.JT_SHADOW_PORT_SETUP: combine_id})
class FakeClient(object):
def __init__(self, region_name=None):
@ -100,7 +136,7 @@ class FakeClient(object):
else:
self.region_name = 'top'
def list_resources(self, resource, cxt, filters=None):
def list_resources(self, resource, ctx, filters=None):
res_list = []
filters = filters or []
for res in RES_MAP[self.region_name][resource]:
@ -119,14 +155,14 @@ class FakeClient(object):
res_list.append(copy.copy(res))
return res_list
def create_resources(self, resource, cxt, body):
def create_resources(self, resource, ctx, body):
res = body[resource]
if 'id' not in res:
res['id'] = uuidutils.generate_uuid()
RES_MAP[self.region_name][resource].append(res)
return res
def update_resources(self, resource, cxt, _id, body):
def update_resources(self, resource, ctx, _id, body):
for res in RES_MAP[self.region_name][resource]:
if res['id'] == _id:
res.update(body[resource])
@ -139,57 +175,70 @@ class FakeClient(object):
return ret
return self.create_resources('port', ctx, body)
def list_ports(self, cxt, filters=None):
return self.list_resources('port', cxt, filters)
def list_ports(self, ctx, filters=None):
return self.list_resources('port', ctx, filters)
def get_ports(self, cxt, port_id):
def get_ports(self, ctx, port_id):
return self.list_resources(
'port', cxt,
'port', ctx,
[{'key': 'id', 'comparator': 'eq', 'value': port_id}])[0]
def update_ports(self, cxt, _id, body):
self.update_resources('port', cxt, _id, body)
def update_ports(self, ctx, _id, body):
self.update_resources('port', ctx, _id, body)
def list_subnets(self, cxt, filters=None):
return self.list_resources('subnet', cxt, filters)
def list_subnets(self, ctx, filters=None):
return self.list_resources('subnet', ctx, filters)
def get_subnets(self, cxt, subnet_id):
def get_subnets(self, ctx, subnet_id):
return self.list_resources(
'subnet', cxt,
'subnet', ctx,
[{'key': 'id', 'comparator': 'eq', 'value': subnet_id}])[0]
def update_subnets(self, cxt, subnet_id, body):
def update_subnets(self, ctx, subnet_id, body):
pass
def get_networks(self, cxt, net_id):
def list_networks(self, ctx, filters=None):
return self.list_resources('network', ctx, filters)
def get_networks(self, ctx, net_id):
return self.list_resources(
'network', cxt,
'network', ctx,
[{'key': 'id', 'comparator': 'eq', 'value': net_id}])[0]
def get_routers(self, cxt, router_id):
def get_routers(self, ctx, router_id):
return self.list_resources(
'router', cxt,
'router', ctx,
[{'key': 'id', 'comparator': 'eq', 'value': router_id}])[0]
def update_routers(self, cxt, *args, **kwargs):
def update_routers(self, ctx, *args, **kwargs):
pass
def list_security_groups(self, cxt, filters=None):
return self.list_resources('security_group', cxt, filters)
def list_security_groups(self, ctx, filters=None):
return self.list_resources('security_group', ctx, filters)
def get_security_groups(self, cxt, sg_id):
def get_security_groups(self, ctx, sg_id):
return self.list_resources(
'security_group', cxt,
'security_group', ctx,
[{'key': 'id', 'comparator': 'eq', 'value': sg_id}])[0]
def delete_security_group_rules(self, cxt, sg_id):
def delete_security_group_rules(self, ctx, sg_id):
pass
def create_security_group_rules(self, cxt, *args, **kwargs):
def create_security_group_rules(self, ctx, *args, **kwargs):
pass
def list_floatingips(self, cxt, filters=None):
return self.list_resources('floatingips', cxt, filters)
def list_floatingips(self, ctx, filters=None):
return self.list_resources('floatingip', ctx, filters)
def create_floatingips(self, ctx, body):
fip = self.create_resources('floatingip', ctx, body)
for key in ['fixed_port_id']:
if key not in fip:
fip[key] = None
return fip
def update_floatingips(self, ctx, _id, body):
self.update_resources('floatingip', ctx, _id, body)
class XManagerTest(unittest.TestCase):
@ -364,6 +413,115 @@ class XManagerTest(unittest.TestCase):
six.assertCountEqual(self, expect_routes,
routes_body['router']['routes'])
def _prepare_top_router(self, project_id):
for i in (0, 1, 2):
pod_dict = {'pod_id': 'pod_id_%d' % i,
'region_name': 'pod_%d' % i,
'az_name': 'az_name_%d' % i}
db_api.create_pod(self.context, pod_dict)
t_net = {'id': 'network_1_id'}
t_subnet = {'id': 'subnet_1_id',
'network_id': t_net['id'],
'cidr': '10.0.1.0/24',
'gateway_ip': '10.0.1.1'}
bridge_network = {'id': 'bridge_network_1_id',
'name': constants.bridge_net_name % project_id}
bridge_subnet = {
'id': 'bridge_subnet_1_id',
'name': constants.bridge_subnet_name % project_id,
'network_id': bridge_network['id'],
'cidr': '100.0.1.0/24',
'gateway_ip': '100.0.1.1',
}
RES_MAP['top']['network'].append(t_net)
RES_MAP['top']['subnet'].append(t_subnet)
RES_MAP['top']['network'].append(bridge_network)
RES_MAP['top']['subnet'].append(bridge_subnet)
top_router_id = 'router_id'
t_router = {'id': top_router_id, 'tenant_id': project_id,
'extra_attributes': {
'availability_zone_hints': ['pod_1']}}
TOP_ROUTER.append(t_router)
return t_net, t_subnet, t_router
@patch.object(helper.NetworkHelper, '_get_client', new=fake_get_client)
def test_redo_failed_or_new_job(self):
project_id = uuidutils.generate_uuid()
t_net, _, t_router = self._prepare_top_router(project_id)
resource_id = 'pod_id_1#%s#%s' % (t_router['id'], t_net['id'])
db_api.new_job(self.context, project_id, constants.JT_ROUTER_SETUP,
resource_id)
self.xmanager.redo_failed_or_new_job(self.context)
self.assertEqual(len(RES_MAP['pod_1']['router']), 1)
TOP_ROUTER.remove(t_router)
router = {'id': t_router['id'], 'tenant_id': project_id,
'extra_attributes': {'availability_zone_hints': ['pod_2']}}
TOP_ROUTER.append(router)
jobs = db_api.list_jobs(self.context)
for job in jobs:
db_api.delete_job(self.context, job['id'])
resource_id = 'pod_id_2#%s#%s' % (t_router['id'], t_net['id'])
with self.context.session.begin():
job_dict = {'id': uuidutils.generate_uuid(),
'type': constants.JT_ROUTER_SETUP,
'status': constants.JS_Fail,
'project_id': project_id,
'resource_id': resource_id,
'extra_id': uuidutils.generate_uuid()}
core.create_resource(self.context, models.AsyncJob, job_dict)
self.xmanager.redo_failed_or_new_job(self.context)
self.assertEqual(len(RES_MAP['pod_2']['router']), 1)
@patch.object(helper.NetworkHelper, '_get_client', new=fake_get_client)
def test_setup_bottom_router_not_special(self):
project_id = uuidutils.generate_uuid()
t_net, _, t_router = self._prepare_top_router(project_id)
resource_id = 'pod_id_1#%s#%s' % (t_router['id'], t_net['id'])
db_api.new_job(self.context, project_id, constants.JT_ROUTER_SETUP,
resource_id)
db_api.create_resource_mapping(
self.context, t_net['id'], t_net['id'],
'pod_id_1', project_id, constants.RT_NETWORK)
combine_id = '%s#%s#%s' % (constants.POD_NOT_SPECIFIED,
t_router['id'], t_net['id'])
db_api.new_job(self.context, project_id, constants.JT_ROUTER_SETUP,
combine_id)
self.xmanager.setup_bottom_router(self.context, payload={
constants.JT_ROUTER_SETUP: combine_id
})
self.assertEqual(len(RES_MAP['pod_1']['router']), 1)
@patch.object(helper.NetworkHelper, '_get_client', new=fake_get_client)
@patch.object(FakeClient, 'create_floatingips',
new=fake_create_floatingips)
def test__safe_create_bottom_floatingip(self):
client = FakeClient('pod_1')
pod = {'region_name': 'pod_1'}
self.assertRaises(t_network_exc.BottomPodOperationFailure,
self.xmanager._safe_create_bottom_floatingip,
self.context, pod, client, None, None, None)
fip_net_id = 'fip_net_id_1'
fip_address = '10.0.1.55'
port_id = 'port_id_1'
RES_MAP['pod_1']['floatingip'].append(
{'floating_network_id': fip_net_id,
'floating_ip_address': fip_address,
'port_id': port_id,
'id': uuidutils.generate_uuid()})
self.xmanager._safe_create_bottom_floatingip(
self.context, pod, client, fip_net_id, fip_address, port_id)
self.assertEqual(RES_MAP['pod_1']['floatingip'][0]['port_id'],
port_id)
RES_MAP['pod_1']['floatingip'][0]['port_id'] = None
self.xmanager._safe_create_bottom_floatingip(
self.context, pod, client, fip_net_id, fip_address, 'fake_port_id')
self.assertEqual(RES_MAP['pod_1']['floatingip'][0]['port_id'],
'fake_port_id')
@patch.object(FakeClient, 'update_routers')
def test_configure_extra_routes_with_floating_ips(self, mock_update):
top_router_id = 'router_id'

View File

@ -0,0 +1,91 @@
# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from mock import patch
from oslo_config import cfg
import oslo_messaging as messaging
import unittest
from tricircle.xjob import xmanager
from tricircle.xjob import xservice
CONF = cfg.CONF
def fake_rpc_start(self, override_pool_size=None):
return
class FakeXManager(xmanager.XManager):
"""Fake xmanager for tests."""
def __init__(self, host=None, service_name=None):
super(FakeXManager, self).__init__(host=host,
service_name=service_name)
def test_method(self):
return 'manager'
class ExtendedXService(xservice.XService):
def test_method(self):
return 'service'
class XServiceTest(unittest.TestCase):
"""Test cases for XServices."""
def setUp(self):
for opt in xservice.common_opts:
if opt.name == 'enable_api_gateway':
CONF.unregister_opt(opt)
CONF.register_opts(xservice.common_opts)
@patch.object(messaging.MessageHandlingServer, 'start',
new=fake_rpc_start)
def test_message_gets_to_manager(self):
t_manager = FakeXManager()
serv = xservice.XService('test', 'test', 'test', t_manager)
serv.start()
self.assertEqual('manager', serv.test_method())
@patch.object(messaging.MessageHandlingServer, 'start',
new=fake_rpc_start)
def test_override_manager_method(self):
t_manager = FakeXManager()
serv = ExtendedXService('test', 'test', 'test', t_manager)
serv.start()
self.assertEqual('service', serv.test_method())
@patch.object(messaging.MessageHandlingServer, 'start',
new=fake_rpc_start)
def test_service_create(self):
t_manager = FakeXManager()
CONF.set_override('host', 'tricircle-foo')
serv = xservice.XService.create(manager=t_manager)
serv.start()
self.assertEqual('manager', serv.test_method())
self.assertEqual('tricircle-foo', serv.host)
@patch.object(messaging.MessageHandlingServer, 'start',
new=fake_rpc_start)
def test_service_create_extend(self):
CONF.set_override('host', 'tricircle-bar')
serv = xservice.create_service()
self.assertEqual('tricircle-bar', serv.host)
def tearDown(self):
CONF.unregister_opts(xservice.common_opts)