Browse Source

Merge "Add more unit test case more than 80%"

changes/21/669521/1
Zuul 2 years ago
committed by Gerrit Code Review
parent
commit
a8410092f5
13 changed files with 2175 additions and 111 deletions
  1. +1
    -1
      tox.ini
  2. +1
    -32
      tricircle/network/central_plugin.py
  3. +15
    -3
      tricircle/network/central_trunk_driver.py
  4. +0
    -32
      tricircle/network/helper.py
  5. +63
    -0
      tricircle/tests/unit/cmd/test_cmd.py
  6. +83
    -0
      tricircle/tests/unit/common/test_httpclient.py
  7. +69
    -0
      tricircle/tests/unit/common/test_utils.py
  8. +199
    -0
      tricircle/tests/unit/network/test_central_plugin.py
  9. +684
    -0
      tricircle/tests/unit/network/test_central_trunk_driver.py
  10. +383
    -3
      tricircle/tests/unit/network/test_helper.py
  11. +396
    -8
      tricircle/tests/unit/network/test_local_plugin.py
  12. +190
    -32
      tricircle/tests/unit/xjob/test_xmanager.py
  13. +91
    -0
      tricircle/tests/unit/xjob/test_xservice.py

+ 1
- 1
tox.ini View File

@ -36,7 +36,7 @@ commands = {posargs}
basepython = python3
commands =
python setup.py testr --coverage --testr-args='{posargs}'
coverage report --fail-under=40 --skip-covered
coverage report --fail-under=80 --skip-covered
[testenv:genconfig]
basepython = python3


+ 1
- 32
tricircle/network/central_plugin.py View File

@ -940,7 +940,7 @@ class TricirclePlugin(db_base_plugin_v2.NeutronDbPluginV2,
request_body_policy_id = \
request_body.get('qos_policy_id', None)
if request_body_policy_id:
request_body.pop('qos_policy_id')
request_body.pop('qos_policy_id')
if request_body:
try:
@ -2021,37 +2021,6 @@ class TricirclePlugin(db_base_plugin_v2.NeutronDbPluginV2,
raise
return return_info
@staticmethod
def _safe_create_bottom_floatingip(t_ctx, pod, client, fip_net_id,
fip_address, port_id):
try:
client.create_floatingips(
t_ctx, {'floatingip': {'floating_network_id': fip_net_id,
'floating_ip_address': fip_address,
'port_id': port_id}})
except q_cli_exceptions.IpAddressInUseClient:
fips = client.list_floatingips(t_ctx,
[{'key': 'floating_ip_address',
'comparator': 'eq',
'value': fip_address}])
# NOTE(zhiyuan) if the internal port associated with the existing
# fip is what we expect, just ignore this exception; or if the
# existing fip is not associated with any internal port, update the
# fip to add association
if not fips:
# this is rare case that we got IpAddressInUseClient exception
# a second ago but now the floating ip is missing
raise t_network_exc.BottomPodOperationFailure(
resource='floating ip', region_name=pod['region_name'])
associated_port_id = fips[0].get('port_id')
if associated_port_id == port_id:
pass
elif not associated_port_id:
client.update_floatingips(t_ctx, fips[0]['id'],
{'floatingip': {'port_id': port_id}})
else:
raise
@staticmethod
def _rollback_floatingip_data(context, _id, org_data):
"""Rollback the data of floating ip object to the original one


+ 15
- 3
tricircle/network/central_trunk_driver.py View File

@ -14,18 +14,21 @@
# under the License.
import six
from oslo_log import log
from neutron.services.trunk import exceptions as trunk_exc
from neutron.services.trunk import plugin as trunk_plugin
from neutron_lib.db import utils as db_utils
from neutron_lib.plugins import directory
import six
import tricircle.common.client as t_client
import tricircle.common.constants as t_constants
import tricircle.common.context as t_context
from tricircle.common import xrpcapi
import tricircle.db.api as db_api
from tricircle.network import central_plugin
from tricircle.network import helper
LOG = log.getLogger(__name__)
@ -191,6 +194,12 @@ class TricircleTrunkDriver(trunk_plugin.TrunkPlugin):
filters, remainder, None)
ret.extend(next_ret)
return ret
else:
# get from top pod
top_ret = self._get_trunks_from_top_with_limit(
context, top_bottom_map, filters, remainder, None)
ret.extend(top_ret)
return ret
def _map_trunks_from_bottom_to_top(self, trunks, bottom_top_map):
trunk_list = []
@ -294,18 +303,21 @@ class TricircleTrunkDriver(trunk_plugin.TrunkPlugin):
except trunk_exc.TrunkNotFound:
return ret
ret = super(TricircleTrunkDriver, self).get_ports(context, filters)
core_plugin = directory.get_plugin()
ret = super(central_plugin.TricirclePlugin, core_plugin).get_ports(
context, filters)
return ret
def update_subports_device_id(self, context,
subports, device_id, device_owner):
if not subports['sub_ports']:
return
core_plugin = directory.get_plugin()
body = {'port': {
'device_id': device_id,
'device_owner': device_owner}}
for subport in subports['sub_ports']:
super(TricircleTrunkDriver, self).update_port(
super(central_plugin.TricirclePlugin, core_plugin).update_port(
context, subport['port_id'], body)
def add_subports(self, context, trunk_id, subports):


+ 0
- 32
tricircle/network/helper.py View File

@ -35,7 +35,6 @@ import tricircle.common.exceptions as t_exceptions
import tricircle.common.lock_handle as t_lock
from tricircle.common import utils
import tricircle.db.api as db_api
import tricircle.network.exceptions as t_network_exc
# manually define these constants to avoid depending on neutron repos
@ -694,37 +693,6 @@ class NetworkHelper(object):
self.prepare_bottom_element(ctx, project_id, b_pod, t_dhcp_port,
t_constants.RT_PORT, dhcp_port_body)
@staticmethod
def _safe_create_bottom_floatingip(t_ctx, pod, client, fip_net_id,
fip_address, port_id):
try:
client.create_floatingips(
t_ctx, {'floatingip': {'floating_network_id': fip_net_id,
'floating_ip_address': fip_address,
'port_id': port_id}})
except q_cli_exceptions.IpAddressInUseClient:
fips = client.list_floatingips(t_ctx,
[{'key': 'floating_ip_address',
'comparator': 'eq',
'value': fip_address}])
if not fips:
# this is rare case that we got IpAddressInUseClient exception
# a second ago but now the floating ip is missing
raise t_network_exc.BottomPodOperationFailure(
resource='floating ip', region_name=pod['region_name'])
associated_port_id = fips[0].get('port_id')
if associated_port_id == port_id:
# the internal port associated with the existing fip is what
# we expect, just ignore this exception
pass
elif not associated_port_id:
# the existing fip is not associated with any internal port,
# update the fip to add association
client.update_floatingips(t_ctx, fips[0]['id'],
{'floatingip': {'port_id': port_id}})
else:
raise
def _get_top_element(self, t_ctx, q_ctx, _type, _id):
if self.call_obj:
return getattr(self.call_obj, 'get_%s' % _type)(q_ctx, _id)


+ 63
- 0
tricircle/tests/unit/cmd/test_cmd.py View File

@ -0,0 +1,63 @@
# Copyright (c) 2018 NEC, Corp.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from mock import patch
import sys
import unittest
from oslo_config import cfg
from oslo_service import service
from tricircle.api import app
from tricircle.cmd import api
from tricircle.cmd import xjob
from tricircle.xjob import xservice
def fake_wait(self):
return
class TestXjobCmd(unittest.TestCase):
def setUp(self):
super(TestXjobCmd, self).setUp()
sys.argv = ['tricircle-cmd']
cfg.CONF.reset()
cfg.CONF.unregister_opts(xservice.common_opts)
cfg.CONF.unregister_opts(app.common_opts)
@patch.object(service.ProcessLauncher, 'wait', new=fake_wait)
@mock.patch('tricircle.xjob.xservice.create_service')
@mock.patch('oslo_service.service.ProcessLauncher.launch_service')
def test_xjob_main(self, launch_service, create_service):
xjob.main()
launch_service.assert_called_once_with(
create_service.return_value, workers=1)
@patch.object(service.ProcessLauncher, 'wait', new=fake_wait)
@mock.patch('tricircle.api.app.setup_app')
@mock.patch('oslo_service.wsgi.Server')
@mock.patch('oslo_service.service.ProcessLauncher.launch_service')
def test_api_main(self, launch_service, wsgi_server, setup_app):
api.main()
wsgi_server.assert_called_once_with(mock.ANY, 'Tricircle Admin_API',
setup_app.return_value,
mock.ANY, mock.ANY)
launch_service.assert_called_once_with(
wsgi_server.return_value, workers=1)
def tearDown(self):
cfg.CONF.reset()
cfg.CONF.unregister_opts(xservice.common_opts)
cfg.CONF.unregister_opts(app.common_opts)

+ 83
- 0
tricircle/tests/unit/common/test_httpclient.py View File

@ -23,6 +23,7 @@ from tricircle.common import httpclient as hclient
from tricircle.db import api
from tricircle.db import core
from tricircle.db import models
def fake_get_pod_service_endpoint(ctx, region_name, st):
@ -87,6 +88,10 @@ class HttpClientTest(unittest.TestCase):
ver = hclient.get_version_from_url(url)
self.assertEqual(ver, '')
url = 'sss/networks'
ver = hclient.get_version_from_url(url)
self.assertEqual(ver, 'sss')
def test_get_bottom_url(self):
b_endpoint = 'http://127.0.0.1:9696/v2.0/networks'
t_url = 'http://127.0.0.1:9696/v2.0/networks'
@ -96,6 +101,28 @@ class HttpClientTest(unittest.TestCase):
self.assertEqual(t_ver, 'v2.0')
self.assertEqual(b_ver, 'v2.0')
t_url_1 = 'http://127.0.0.1:9696/sss/v2.0/networks'
b_url = hclient.get_bottom_url(t_ver, t_url_1, b_ver, b_endpoint)
self.assertEqual(b_url, '')
t_url_1 = 'v2.0/networks'
b_url = hclient.get_bottom_url(t_ver, t_url_1, b_ver, b_endpoint)
self.assertEqual(b_url, 'http://127.0.0.1:9696/v2.0/networks')
b_url = hclient.get_bottom_url(t_ver, t_url, '', b_endpoint)
self.assertEqual(b_url, 'http://127.0.0.1:9696/networks')
t_url_1 = 'http://127.0.0.1:9696/v2.0/networks?qqq=123&sss=456'
b_url = hclient.get_bottom_url(t_ver, t_url_1, b_ver, b_endpoint)
self.assertEqual(b_url,
'http://127.0.0.1:9696/v2.0/networks?qqq=123&sss=456')
t_url_1 = 'http://127.0.0.1:9696/v2.0/networks?' \
'qqq=123&availability_zone=456'
b_url = hclient.get_bottom_url(t_ver, t_url_1, b_ver, b_endpoint)
self.assertEqual(b_url,
'http://127.0.0.1:9696/v2.0/networks?qqq=123')
b_url = hclient.get_bottom_url(t_ver, t_url, b_ver, b_endpoint)
self.assertEqual(b_url,
'http://127.0.0.1:9696/v2.0/networks')
@ -211,5 +238,61 @@ class HttpClientTest(unittest.TestCase):
config_dict['service_type'])
self.assertEqual(endpoint, config_dict['service_url'])
endpoint = hclient.get_pod_service_endpoint(
self.context,
'x_region_name',
config_dict['service_type'])
self.assertEqual(endpoint, '')
def test_get_res_routing_ref(self):
t_url = 'http://127.0.0.1:9696/v2.0/networks'
self.assertIsNone(hclient.get_res_routing_ref(
self.context, 'fake_pod_id', t_url, s_type=cons.ST_NEUTRON))
pod_dict = {
'pod_id': 'fake_pod_id',
'region_name': 'fake_region_name',
'az_name': 'fake_az'
}
api.create_pod(self.context, pod_dict)
routes = [
{
'top_id': 'top_id',
'bottom_id': 'bottom_id',
'pod_id': 'fake_pod_id',
'project_id': 'test_project_id',
'resource_type': 'network'
},
]
with self.context.session.begin():
for route in routes:
core.create_resource(
self.context, models.ResourceRouting, route)
config_dict = {
'service_id': 'fake_service_id',
'pod_id': 'fake_pod_id',
'service_type': cons.ST_NEUTRON,
'service_url': 'http://127.0.0.1:9696/v2.0/networks'
}
api.create_cached_endpoints(self.context, config_dict)
s_ctx = {'t_ver': 'v2.0', 'b_ver': 'v2.0',
't_url': t_url, 'b_url': t_url}
self.assertEqual(s_ctx, hclient.get_res_routing_ref(
self.context, 'top_id', t_url, s_type=cons.ST_NEUTRON))
def test_convert_header(self):
header = {'header1': 'aaa', 'header2': 'bbb'}
self.assertEqual(header,
hclient.convert_header('v1.0', 'v1.0', header))
header = {'header1': 'aaa', 'header2': None}
except_header = {'header1': 'aaa'}
self.assertEqual(except_header,
hclient.convert_header('v1.0', 'v1.0', header))
def tearDown(self):
core.ModelBase.metadata.drop_all(core.get_engine())

+ 69
- 0
tricircle/tests/unit/common/test_utils.py View File

@ -15,13 +15,23 @@
# License for the specific language governing permissions and limitations
# under the License.
import mock
import pecan
import unittest
from oslo_config import cfg
from tricircle.common import constants as cons
from tricircle.common import exceptions
from tricircle.common import utils
class TricircleUtilsTestCase(unittest.TestCase):
def test_bool_from_string(self):
self.assertEqual(True, utils.bool_from_string('true'))
self.assertEqual(False, utils.bool_from_string('false'))
self.assertRaises(ValueError, utils.bool_from_string, 'a', strict=True)
self.assertEqual(True, utils.bool_from_string('a', default=True))
def test_check_string_length(self):
self.assertIsNone(utils.check_string_length(
'test', 'name', max_len=255))
@ -34,3 +44,62 @@ class TricircleUtilsTestCase(unittest.TestCase):
self.assertRaises(exceptions.InvalidInput,
utils.check_string_length,
'a' * 256, 'name', max_len=255)
def test_get_id_from_name(self):
output = utils.get_id_from_name(
cons.RT_NETWORK, 'name#77b0babc-f7e4-4c14-b250-1f18835a52c2')
self.assertEqual('77b0babc-f7e4-4c14-b250-1f18835a52c2', output)
output = utils.get_id_from_name(
cons.RT_NETWORK, '77b0babc-f7e4-4c14-b250-1f18835a52c2')
self.assertEqual('77b0babc-f7e4-4c14-b250-1f18835a52c2', output)
output = utils.get_id_from_name(
cons.RT_NETWORK, 'name@not_uuid')
self.assertIsNone(output)
output = utils.get_id_from_name(
cons.RT_PORT, '77b0babc-f7e4-4c14-b250-1f18835a52c2')
self.assertEqual('77b0babc-f7e4-4c14-b250-1f18835a52c2', output)
output = utils.get_id_from_name(
cons.RT_PORT, 'not_uuid')
self.assertIsNone(output)
@mock.patch.object(pecan, 'response')
def test_format_error(self, mock_response):
output = utils.format_error(401, 'this is error', 'MyError')
self.assertEqual({'MyError': {
'message': 'this is error', 'code': 401
}}, output)
output = utils.format_error(400, 'this is error')
self.assertEqual({'badRequest': {
'message': 'this is error', 'code': 400
}}, output)
output = utils.format_error(401, 'this is error')
self.assertEqual({'Error': {
'message': 'this is error', 'code': 401
}}, output)
@mock.patch('tricircle.common.utils.format_error')
def test_format_api_error(self, mock_format_error):
output = utils.format_api_error(400, 'this is error')
self.assertEqual(mock_format_error.return_value, output)
@mock.patch('tricircle.common.utils.format_error')
def test_format_nova_error(self, mock_format_error):
output = utils.format_nova_error(400, 'this is error')
self.assertEqual(mock_format_error.return_value, output)
@mock.patch('tricircle.common.utils.format_error')
def test_format_cinder_error(self, mock_format_error):
output = utils.format_cinder_error(400, 'this is error')
self.assertEqual(mock_format_error.return_value, output)
def test_get_pagination_limit(self):
setattr(cfg.CONF, 'pagination_max_limit', 1024)
self.assertEqual(512, utils.get_pagination_limit(512))
self.assertEqual(1024, utils.get_pagination_limit(2048))
self.assertEqual(1024, utils.get_pagination_limit(-1))

+ 199
- 0
tricircle/tests/unit/network/test_central_plugin.py View File

@ -40,6 +40,7 @@ from neutron.db import models_v2
from neutron.db import rbac_db_models as rbac_db
import neutron.objects.base as base_object
from neutron.services.qos.drivers import manager as q_manager
from neutron.services.trunk import plugin as trunk_plugin
from neutron.plugins.ml2 import managers as n_managers
@ -67,6 +68,7 @@ import tricircle.network.central_plugin as plugin
from tricircle.network import central_qos_plugin
from tricircle.network import helper
from tricircle.network import qos_driver
from tricircle.tests.unit.network import test_central_trunk_plugin
from tricircle.tests.unit.network import test_qos
from tricircle.tests.unit.network import test_security_groups
import tricircle.tests.unit.utils as test_utils
@ -105,6 +107,12 @@ BOTTOM2_FIPS = _resource_store.BOTTOM2_FLOATINGIPS
BOTTOM2_ROUTERS = _resource_store.BOTTOM2_ROUTERS
BOTTOM2_POLICIES = _resource_store.BOTTOM2_QOS_POLICIES
BOTTOM2_POLICY_RULES = _resource_store.BOTTOM2_QOS_BANDWIDTH_LIMIT_RULES
TOP_TRUNKS = _resource_store.TOP_TRUNKS
TOP_SUBPORTS = _resource_store.TOP_SUBPORTS
BOTTOM1_TRUNKS = _resource_store.BOTTOM1_TRUNKS
BOTTOM2_TRUNKS = _resource_store.BOTTOM2_TRUNKS
BOTTOM1_SUBPORTS = _resource_store.BOTTOM1_SUBPORTS
BOTTOM2_SUBPORTS = _resource_store.BOTTOM2_SUBPORTS
TEST_TENANT_ID = test_utils.TEST_TENANT_ID
FakeNeutronContext = test_utils.FakeNeutronContext
@ -959,6 +967,8 @@ def fake_get_instance(cls, subnet_pool, context):
def fake_get_plugin(alias=plugin_constants.CORE):
if alias == 'trunk':
return test_central_trunk_plugin.FakePlugin()
return FakePlugin()
@ -1614,6 +1624,71 @@ class PluginTest(unittest.TestCase,
return t_port_id, b_port_id
@staticmethod
def _prepare_trunk_test(project_id, ctx, pod_name, index, t_net_id,
b_net_id, t_subnet_id, b_subnet_id):
t_trunk_id = uuidutils.generate_uuid()
b_trunk_id = uuidutils.generate_uuid()
t_parent_port_id = uuidutils.generate_uuid()
t_sub_port_id = PluginTest._prepare_port_test(
project_id, ctx, pod_name, index, t_net_id,
b_net_id, t_subnet_id, b_subnet_id)
t_subport = {
'segmentation_type': 'vlan',
'port_id': t_sub_port_id,
'segmentation_id': 164,
'trunk_id': t_trunk_id}
t_trunk = {
'id': t_trunk_id,
'name': 'top_trunk_%d' % index,
'status': 'DOWN',
'description': 'created',
'admin_state_up': True,
'port_id': t_parent_port_id,
'tenant_id': project_id,
'project_id': project_id,
'sub_ports': [t_subport]
}
TOP_TRUNKS.append(test_utils.DotDict(t_trunk))
TOP_SUBPORTS.append(test_utils.DotDict(t_subport))
b_subport = {
'segmentation_type': 'vlan',
'port_id': t_sub_port_id,
'segmentation_id': 164,
'trunk_id': b_trunk_id}
b_trunk = {
'id': b_trunk_id,
'name': 'top_trunk_%d' % index,
'status': 'UP',
'description': 'created',
'admin_state_up': True,
'port_id': t_parent_port_id,
'tenant_id': project_id,
'project_id': project_id,
'sub_ports': [b_subport]
}
if pod_name == 'pod_1':
BOTTOM1_SUBPORTS.append(test_utils.DotDict(t_subport))
BOTTOM1_TRUNKS.append(test_utils.DotDict(b_trunk))
else:
BOTTOM2_SUBPORTS.append(test_utils.DotDict(t_subport))
BOTTOM2_TRUNKS.append(test_utils.DotDict(b_trunk))
pod_id = 'pod_id_1' if pod_name == 'pod_1' else 'pod_id_2'
core.create_resource(ctx, models.ResourceRouting,
{'top_id': t_trunk_id,
'bottom_id': b_trunk_id,
'pod_id': pod_id,
'project_id': project_id,
'resource_type': constants.RT_TRUNK})
return t_trunk, b_trunk
@staticmethod
def _prepare_network_subnet(project_id, ctx, region_name, index,
enable_dhcp=True, az_hints=None,
@ -1968,6 +2043,48 @@ class PluginTest(unittest.TestCase,
self.assertEqual(bottom_subnet['enable_dhcp'],
body_copy['subnet']['enable_dhcp'])
@patch.object(ipam_pluggable_backend.IpamPluggableBackend,
'_allocate_ips_for_port', new=fake_allocate_ips_for_port)
@patch.object(directory, 'get_plugin', new=fake_get_plugin)
@patch.object(driver.Pool, 'get_instance', new=fake_get_instance)
@patch.object(db_utils, 'filter_non_model_columns',
new=fake_filter_non_model_columns)
@patch.object(context, 'get_context_from_neutron_context')
def test_create_port(self, mock_context):
project_id = TEST_TENANT_ID
self._basic_pod_route_setup()
(t_net_id, t_subnet_id,
b_net_id, b_subnet_id) = self._prepare_network_subnet(
project_id, self.context, 'pod_1', 1)
neutron_context = FakeNeutronContext()
fake_plugin = FakePlugin()
fake_client = FakeClient()
mock_context.return_value = self.context
t_pod = {'pod_id': 'pod_id_top', 'region_name': 'top-region',
'az_name': ''}
db_api.create_pod(self.context, t_pod)
body_port = {
'port': {
'name': 'interface_top-region_port-1',
'description': 'top_description',
'extra_dhcp_opts': [],
'security_groups': [],
'device_id': 'reserved_gateway_port',
'admin_state_up': True,
'network_id': t_net_id,
'tenant_id': project_id,
'project_id': project_id,
}
}
port = fake_plugin.create_port(neutron_context, body_port)
t_gw_ports = fake_client.list_resources(
'port', None, [{'key': 'name', 'comparator': 'eq',
'value': 'interface_top-region_port-1'}])
self.assertEqual(t_gw_ports[0]['id'], port['id'])
@patch.object(ipam_pluggable_backend.IpamPluggableBackend,
'_update_ips_for_port', new=fake_update_ips_for_port)
@patch.object(directory, 'get_plugin', new=fake_get_plugin)
@ -2056,6 +2173,88 @@ class PluginTest(unittest.TestCase,
self.assertEqual(bottom_port['allowed_address_pairs'][0],
body_copy['port']['allowed_address_pairs'][0])
@patch.object(directory, 'get_plugin', new=fake_get_plugin)
@patch.object(driver.Pool, 'get_instance', new=fake_get_instance)
@patch.object(db_utils, 'filter_non_model_columns',
new=fake_filter_non_model_columns)
@patch.object(trunk_plugin.TrunkPlugin, 'get_trunk')
@patch.object(context, 'get_context_from_neutron_context')
def test_update_port_tunck(self, mock_context, mock_get_trunk):
project_id = TEST_TENANT_ID
self._basic_pod_route_setup()
t_ctx = context.get_db_context()
(t_net_id, t_subnet_id,
b_net_id, b_subnet_id) = self._prepare_network_subnet(
project_id, t_ctx, 'pod_1', 1)
t_port_id, b_port_id = self._prepare_port_test(
project_id, t_ctx, 'pod_1', 1, t_net_id, b_net_id,
t_subnet_id, b_subnet_id)
t_trunk, b_trunk = self._prepare_trunk_test(
project_id, t_ctx, 'pod_1', 2, t_net_id,
b_net_id, t_subnet_id, b_subnet_id)
fake_plugin = FakePlugin()
mock_context.return_value = t_ctx
update_body = {
'port': {
'binding:profile': {
constants.PROFILE_REGION: 'pod_1',
constants.PROFILE_DEVICE: 'compute:new'
},
'trunk_details': {'trunk_id': t_trunk['id'],
'sub_ports': []}
}
}
body_copy = copy.deepcopy(update_body)
q_ctx = test_central_trunk_plugin.FakeNeutronContext()
mock_get_trunk.return_value = t_trunk
top_port = fake_plugin.update_port(q_ctx, t_port_id, update_body)
self.assertEqual(top_port['binding:profile'],
body_copy['port']['binding:profile'])
self.assertEqual(top_port['trunk_details'],
body_copy['port']['trunk_details'])
@patch.object(directory, 'get_plugin', new=fake_get_plugin)
@patch.object(driver.Pool, 'get_instance', new=fake_get_instance)
@patch.object(db_utils, 'filter_non_model_columns',
new=fake_filter_non_model_columns)
@patch.object(context, 'get_context_from_neutron_context')
def test_update_port_mapping(self, mock_context):
project_id = TEST_TENANT_ID
self._basic_pod_route_setup()
neutron_context = FakeNeutronContext()
t_ctx = context.get_db_context()
(t_net_id, t_subnet_id,
b_net_id, b_subnet_id) = self._prepare_network_subnet(
project_id, t_ctx, 'pod_1', 1)
t_port_id, b_port_id = self._prepare_port_test(
project_id, t_ctx, 'pod_1', 1, t_net_id, b_net_id,
t_subnet_id, b_subnet_id)
fake_plugin = FakePlugin()
mock_context.return_value = t_ctx
update_body = {
'port': {
'binding:profile': {
constants.PROFILE_REGION: 'pod_1',
constants.PROFILE_DEVICE: '',
constants.PROFILE_STATUS: 'DOWN'
}
}
}
b_update_body = {'port': {'device_id': None}}
fake_client = FakeClient('pod_1')
fake_client.update_ports(t_ctx, b_port_id, b_update_body)
fake_plugin.update_port(neutron_context, t_port_id, update_body)
routing_resources = core.query_resource(
t_ctx, models.ResourceRouting,
[{'key': 'bottom_id', 'comparator': 'eq', 'value': b_port_id}], [])
self.assertListEqual(routing_resources, [])
@patch.object(directory, 'get_plugin', new=fake_get_plugin)
@patch.object(driver.Pool, 'get_instance', new=fake_get_instance)
@patch.object(db_utils, 'filter_non_model_columns',


+ 684
- 0
tricircle/tests/unit/network/test_central_trunk_driver.py View File

@ -0,0 +1,684 @@
# Copyright 2015 Huawei Technologies Co., Ltd.
# All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from mock import patch
import six
import unittest
from six.moves import xrange
import neutron.conf.common as q_config
from neutron.db import db_base_plugin_v2
from neutron_lib.api.definitions import portbindings
from neutron_lib.plugins import directory
from neutron_lib.plugins import utils
from oslo_config import cfg
from oslo_utils import uuidutils
from tricircle.common import client
from tricircle.common import constants
from tricircle.common import context
import tricircle.db.api as db_api
from tricircle.db import core
from tricircle.db import models
from tricircle.network import central_plugin
import tricircle.network.central_trunk_driver as trunk_driver
from tricircle.network import helper
import tricircle.tests.unit.utils as test_utils
from tricircle.xjob import xmanager
_resource_store = test_utils.get_resource_store()
TOP_TRUNKS = _resource_store.TOP_TRUNKS
TOP_SUBPORTS = _resource_store.TOP_SUBPORTS
TOP_PORTS = _resource_store.TOP_PORTS
BOTTOM1_TRUNKS = _resource_store.BOTTOM1_TRUNKS
BOTTOM2_TRUNKS = _resource_store.BOTTOM2_TRUNKS
BOTTOM1_SUBPORTS = _resource_store.BOTTOM1_SUBPORTS
BOTTOM2_SUBPORTS = _resource_store.BOTTOM2_SUBPORTS
BOTTOM1_PORTS = _resource_store.BOTTOM1_PORTS
BOTTOM2_PORTS = _resource_store.BOTTOM2_PORTS
TEST_TENANT_ID = test_utils.TEST_TENANT_ID
class FakeBaseXManager(xmanager.XManager):
def __init__(self):
self.clients = {constants.TOP: client.Client()}
def _get_client(self, region_name=None):
return FakeClient(region_name)
class FakeXManager(FakeBaseXManager):
def __init__(self, fake_plugin):
super(FakeXManager, self).__init__()
self.xjob_handler = FakeBaseRPCAPI(fake_plugin)
self.helper = helper.NetworkHelper()
class FakeBaseRPCAPI(object):
def __init__(self, fake_plugin):
self.xmanager = FakeBaseXManager()
def sync_trunk(self, ctxt, project_id, trunk_id, pod_id):
combine_id = '%s#%s' % (pod_id, trunk_id)
self.xmanager.sync_trunk(
ctxt, payload={constants.JT_TRUNK_SYNC: combine_id})
def configure_security_group_rules(self, ctxt, project_id):
pass
class FakeRPCAPI(FakeBaseRPCAPI):
def __init__(self, fake_plugin):
self.xmanager = FakeXManager(fake_plugin)
class FakeNeutronClient(test_utils.FakeNeutronClient):
_resource = 'trunk'
trunks_path = ''
class FakeClient(test_utils.FakeClient):
def __init__(self, region_name=None):
super(FakeClient, self).__init__(region_name)
self.client = FakeNeutronClient(self.region_name)
def get_native_client(self, resource, ctx):
return self.client
def get_trunks(self, ctx, trunk_id):
return self.get_resource(constants.RT_TRUNK, ctx, trunk_id)
def update_trunks(self, context, trunk_id, trunk):
self.update_resources(constants.RT_TRUNK, context, trunk_id, trunk)
def delete_trunks(self, context, trunk_id):
self.delete_resources(constants.RT_TRUNK, context, trunk_id)
def action_trunks(self, ctx, action, resource_id, body):
if self.region_name == 'pod_1':
btm_trunks = BOTTOM1_TRUNKS
else:
btm_trunks = BOTTOM2_TRUNKS
for trunk in btm_trunks:
if trunk['id'] == resource_id:
subports = body['sub_ports']
if action == 'add_subports':
for subport in subports:
subport['trunk_id'] = resource_id
trunk['sub_ports'].extend(subports)
return
elif action == 'remove_subports':
for subport in subports:
for b_subport in trunk['sub_ports']:
if subport['port_id'] == b_subport['port_id']:
trunk['sub_ports'].remove(b_subport)
return
def list_trunks(self, ctx, filters=None):
filter_dict = {}
filters = filters or []
for query_filter in filters:
key = query_filter['key']
# when querying trunks, "fields" is passed in the query string to
# ask the server to only return necessary fields, which can reduce
# the data being transferred. In test, we just return all the
# fields since there's no need to optimize
if key != 'fields':
value = query_filter['value']
filter_dict[key] = value
return self.client.get('', filter_dict)['trunks']
def get_ports(self, ctx, port_id):
pass
def list_ports(self, ctx, filters=None):
fake_plugin = FakePlugin()
q_ctx = FakeNeutronContext()
_filters = {}
for f in filters:
_filters[f['key']] = [f['value']]
return fake_plugin.get_trunk_subports(q_ctx, _filters)
def create_ports(self, ctx, body):
if 'ports' in body:
ret = []
for port in body['ports']:
p = self.create_resources('port', ctx, {'port': port})
p['id'] = p['device_id']
ret.append(p)
return ret
return self.create_resources('port', ctx, body)
class FakeNeutronContext(test_utils.FakeNeutronContext):
def session_class(self):
return FakeSession
class FakeSession(test_utils.FakeSession):
def add_hook(self, model_obj, model_dict):
if model_obj.__tablename__ == 'subports':
for top_trunk in TOP_TRUNKS:
if top_trunk['id'] == model_dict['trunk_id']:
top_trunk['sub_ports'].append(model_dict)
def delete_top_subport(self, port_id):
for res_list in self.resource_store.store_map.values():
for res in res_list:
sub_ports = res.get('sub_ports')
if sub_ports:
for sub_port in sub_ports:
if sub_port['port_id'] == port_id:
sub_ports.remove(sub_port)
def delete_hook(self, model_obj):
if model_obj.get('segmentation_type'):
self.delete_top_subport(model_obj['port_id'])
return 'port_id'
class FakePlugin(trunk_driver.TricircleTrunkDriver):
def __init__(self):
self._segmentation_types = {'vlan': utils.is_valid_vlan_tag}
self.xjob_handler = FakeRPCAPI(self)
self.helper = helper.NetworkHelper(self)
def _get_client(self, region_name):
return FakeClient(region_name)
def fake_get_context_from_neutron_context(q_context):
ctx = context.get_db_context()
return ctx
def fake_get_min_search_step(self):
return 2
class FakeCorePlugin(central_plugin.TricirclePlugin):
def __init__(self):
self.type_manager = test_utils.FakeTypeManager()
def get_port(self, context, port_id):
return {portbindings.HOST_ID: None,
'device_id': None}
def get_ports(self, ctx, filters):
top_client = FakeClient()
_filters = []
for key, values in six.iteritems(filters):
for v in values:
_filters.append({'key': key, 'comparator': 'eq', 'value': v})
return top_client.list_resources('port', ctx, _filters)
def update_port(self, context, id, port):
port_body = port['port']
for _port in TOP_PORTS:
if _port['id'] == id:
for key, value in six.iteritems(port_body):
_port[key] = value
class PluginTest(unittest.TestCase):
def setUp(self):
core.initialize()
core.ModelBase.metadata.create_all(core.get_engine())
self.context = context.Context()
cfg.CONF.set_override('tenant_network_types', ['local', 'vlan'],
group='tricircle')
cfg.CONF.set_override('bridge_network_type', 'vlan',
group='tricircle')
setattr(cfg.CONF, 'setproctitle', 'central-trunk-driver')
xmanager.IN_TEST = True
def fake_get_plugin(alias='core'):
if alias == 'trunk':
return FakePlugin()
return FakeCorePlugin()
directory.get_plugin = fake_get_plugin
def _basic_pod_setup(self):
pod1 = {'pod_id': 'pod_id_1',
'region_name': 'pod_1',
'az_name': 'az_name_1'}
pod2 = {'pod_id': 'pod_id_2',
'region_name': 'pod_2',
'az_name': 'az_name_2'}
pod3 = {'pod_id': 'pod_id_0',
'region_name': 'top_pod',
'az_name': ''}
for pod in (pod1, pod2, pod3):
db_api.create_pod(self.context, pod)
def _prepare_port_test(self, tenant_id, ctx, pod_name, index,
device_onwer='compute:None', create_bottom=True):
t_port_id = uuidutils.generate_uuid()
t_subnet_id = uuidutils.generate_uuid()
t_net_id = uuidutils.generate_uuid()
t_port = {
'id': t_port_id,
'name': 'top_port_%d' % index,
'description': 'old_top_description',
'extra_dhcp_opts': [],
'device_owner': device_onwer,
'security_groups': [],
'device_id': '68f46ee4-d66a-4c39-bb34-ac2e5eb85470',
'admin_state_up': True,
'network_id': t_net_id,
'tenant_id': tenant_id,
'mac_address': 'fa:16:3e:cd:76:4%s' % index,
'project_id': 'tenant_id',
'binding:host_id': 'zhiyuan-5',
'status': 'ACTIVE',
'network_id': t_net_id,
'fixed_ips': [{'subnet_id': t_subnet_id}]
}
TOP_PORTS.append(test_utils.DotDict(t_port))
if create_bottom:
b_port = {
'id': t_port_id,
'name': t_port_id,
'description': 'old_bottom_description',
'extra_dhcp_opts': [],
'device_owner': device_onwer,
'security_groups': [],
'device_id': '68f46ee4-d66a-4c39-bb34-ac2e5eb85470',
'admin_state_up': True,
'network_id': t_net_id,
'tenant_id': tenant_id,
'device_owner': 'compute:None',
'extra_dhcp_opts': [],
'mac_address': 'fa:16:3e:cd:76:40',
'project_id': 'tenant_id',
'binding:host_id': 'zhiyuan-5',
'status': 'ACTIVE',
'network_id': t_net_id,
'fixed_ips': [{'subnet_id': t_subnet_id}]
}
if pod_name == 'pod_1':
BOTTOM1_PORTS.append(test_utils.DotDict(b_port))
else:
BOTTOM2_PORTS.append(test_utils.DotDict(b_port))
pod_id = 'pod_id_1' if pod_name == 'pod_1' else 'pod_id_2'
core.create_resource(ctx, models.ResourceRouting,
{'top_id': t_port_id,
'bottom_id': t_port_id,
'pod_id': pod_id,
'project_id': tenant_id,
'resource_type': constants.RT_PORT})
return t_port_id
def _prepare_trunk_test(self, project_id, ctx, pod_name, index,
is_create_bottom, t_uuid=None, b_uuid=None):
t_trunk_id = t_uuid or uuidutils.generate_uuid()
b_trunk_id = b_uuid or uuidutils.generate_uuid()
t_parent_port_id = uuidutils.generate_uuid()
t_sub_port_id = self._prepare_port_test(
project_id, ctx, pod_name, index, create_bottom=is_create_bottom)
t_subport = {
'segmentation_type': 'vlan',
'port_id': t_sub_port_id,
'segmentation_id': 164,
'trunk_id': t_trunk_id}
t_trunk = {
'id': t_trunk_id,
'name': 'top_trunk_%d' % index,
'status': 'DOWN',
'description': 'created',
'admin_state_up': True,
'port_id': t_parent_port_id,
'tenant_id': project_id,
'project_id': project_id,
'sub_ports': [t_subport]
}
TOP_TRUNKS.append(test_utils.DotDict(t_trunk))
TOP_SUBPORTS.append(test_utils.DotDict(t_subport))
b_trunk = None
if is_create_bottom:
b_subport = {
'segmentation_type': 'vlan',
'port_id': t_sub_port_id,
'segmentation_id': 164,
'trunk_id': b_trunk_id}
b_trunk = {
'id': b_trunk_id,
'name': 'top_trunk_%d' % index,
'status': 'UP',
'description': 'created',
'admin_state_up': True,
'port_id': t_parent_port_id,
'tenant_id': project_id,
'project_id': project_id,
'sub_ports': [b_subport]
}
if pod_name == 'pod_1':
BOTTOM1_SUBPORTS.append(test_utils.DotDict(t_subport))
BOTTOM1_TRUNKS.append(test_utils.DotDict(b_trunk))
else:
BOTTOM2_SUBPORTS.append(test_utils.DotDict(t_subport))
BOTTOM2_TRUNKS.append(test_utils.DotDict(b_trunk))
pod_id = 'pod_id_1' if pod_name == 'pod_1' else 'pod_id_2'
core.create_resource(ctx, models.ResourceRouting,
{'top_id': t_trunk_id,
'bottom_id': b_trunk_id,
'pod_id': pod_id,
'project_id': project_id,
'resource_type': constants.RT_TRUNK})
return t_trunk, b_trunk
@patch.object(context, 'get_context_from_neutron_context',
new=fake_get_context_from_neutron_context)
def test_get_trunk(self):
project_id = TEST_TENANT_ID
q_ctx = FakeNeutronContext()
t_ctx = context.get_db_context()
self._basic_pod_setup()
fake_plugin = FakePlugin()
t_trunk, b_trunk = self._prepare_trunk_test(project_id, t_ctx,
'pod_1', 1, True)
res = fake_plugin.get_trunk(q_ctx, t_trunk['id'])
t_trunk['status'] = b_trunk['status']
t_trunk['sub_ports'][0].pop('trunk_id')
six.assertCountEqual(self, t_trunk, res)
@patch.object(context, 'get_context_from_neutron_context',
new=fake_get_context_from_neutron_context)
def test_get_trunks(self):
project_id = TEST_TENANT_ID
q_ctx = FakeNeutronContext()
t_ctx = context.get_db_context()
self._basic_pod_setup()
fake_plugin = FakePlugin()
t_trunk1, _ = self._prepare_trunk_test(project_id, t_ctx,
'pod_1', 1, True)
t_trunk2, _ = self._prepare_trunk_test(project_id, t_ctx,
'pod_1', 2, True)
t_trunk3, _ = self._prepare_trunk_test(project_id, t_ctx,
'pod_2', 3, True)
t_trunk4, _ = self._prepare_trunk_test(project_id, t_ctx,
'pod_2', 4, True)
t_trunk5, _ = self._prepare_trunk_test(project_id, t_ctx,
'pod_1', 5, False)
t_trunk6, _ = self._prepare_trunk_test(project_id, t_ctx,
'pod_1', 6, False)
res = fake_plugin.get_trunks(q_ctx)
self.assertEqual(len(res), 6)
res = fake_plugin.get_trunks(
q_ctx, filters={'id': [t_trunk1['id']]}, limit=3)
t_trunk1['status'] = 'UP'
res[0]['sub_ports'][0]['trunk_id'] = t_trunk1['id']
six.assertCountEqual(self, [t_trunk1], res)
res = fake_plugin.get_trunks(q_ctx, filters={'id': [t_trunk5['id']]})
t_trunk5['sub_ports'][0].pop('trunk_id')
six.assertCountEqual(self, [t_trunk5], res)
trunks = fake_plugin.get_trunks(q_ctx,
filters={'status': ['UP'],
'description': ['created']})
self.assertEqual(len(trunks), 4)
@patch.object(context, 'get_context_from_neutron_context',
new=fake_get_context_from_neutron_context)
@patch.object(FakePlugin, '_get_min_search_step',
new=fake_get_min_search_step)
def test_get_trunks_pagination(self):
project_id = TEST_TENANT_ID
q_ctx = FakeNeutronContext()
t_ctx = context.get_db_context()
self._basic_pod_setup()
fake_plugin = FakePlugin()
t_trunk1, _ = self._prepare_trunk_test(
project_id, t_ctx, 'pod_1', 1, True,
'101779d0-e30e-495a-ba71-6265a1669701',
'1b1779d0-e30e-495a-ba71-6265a1669701')
t_trunk2, _ = self._prepare_trunk_test(
project_id, t_ctx, 'pod_1', 2, True,
'201779d0-e30e-495a-ba71-6265a1669701',
'2b1779d0-e30e-495a-ba71-6265a1669701')
t_trunk3, _ = self._prepare_trunk_test(
project_id, t_ctx, 'pod_2', 3, True,
'301779d0-e30e-495a-ba71-6265a1669701',
'3b1779d0-e30e-495a-ba71-6265a1669701')
t_trunk4, _ = self._prepare_trunk_test(
project_id, t_ctx, 'pod_2', 4, True,
'401779d0-e30e-495a-ba71-6265a1669701',
'4b1779d0-e30e-495a-ba71-6265a1669701')
t_trunk5, _ = self._prepare_trunk_test(
project_id, t_ctx, 'pod_2', 5, False,
'501779d0-e30e-495a-ba71-6265a1669701')
t_trunk6, _ = self._prepare_trunk_test(
project_id, t_ctx, 'pod_2', 6, False,
'601779d0-e30e-495a-ba71-6265a1669701')
t_trunk7, _ = self._prepare_trunk_test(
project_id, t_ctx, 'pod_2', 7, False,
'601779d0-e30e-495a-ba71-6265a1669701')
# limit no marker
res = fake_plugin.get_trunks(q_ctx, limit=3)
res_trunk_ids = [trunk['id'] for trunk in res]
except_trunk_ids = [t_trunk1['id'], t_trunk2['id'], t_trunk3['id']]
self.assertEqual(res_trunk_ids, except_trunk_ids)
# limit and top pod's marker
res = fake_plugin.get_trunks(q_ctx, limit=3, marker=t_trunk5['id'])
res_trunk_ids = [trunk['id'] for trunk in res]
except_trunk_ids = [t_trunk6['id'], t_trunk7['id']]
self.assertEqual(res_trunk_ids, except_trunk_ids)
# limit and bottom pod's marker
res = fake_plugin.get_trunks(q_ctx, limit=6, marker=t_trunk1['id'])
res_trunk_ids = [trunk['id'] for trunk in res]
except_trunk_ids = [t_trunk2['id'], t_trunk3['id'], t_trunk4['id'],
t_trunk5['id'], t_trunk6['id'], t_trunk7['id']]
self.assertEqual(res_trunk_ids, except_trunk_ids)
# limit and bottom pod's marker and filters
res = fake_plugin.get_trunks(q_ctx, limit=6, marker=t_trunk1['id'],
filters={'status': ['UP']})
res_trunk_ids = [trunk['id'] for trunk in res]
except_trunk_ids = [t_trunk2['id'], t_trunk3['id'], t_trunk4['id']]
self.assertEqual(res_trunk_ids, except_trunk_ids)
@patch.object(context, 'get_context_from_neutron_context',
new=fake_get_context_from_neutron_context)
def test_update_trunk(self):
project_id = TEST_TENANT_ID
q_ctx = FakeNeutronContext()
t_ctx = context.get_db_context()
self._basic_pod_setup()
fake_plugin = FakePlugin()
t_trunk, b_trunk = self._prepare_trunk_test(project_id, t_ctx,
'pod_1', 1, True)
update_body = {'trunk': {
'name': 'new_name',
'description': 'updated',
'admin_state_up': False}
}
updated_top_trunk = fake_plugin.update_trunk(q_ctx, t_trunk['id'],
update_body)
self.assertEqual(updated_top_trunk['name'], 'new_name')
self.assertEqual(updated_top_trunk['description'], 'updated')
self.assertFalse(updated_top_trunk['admin_state_up'])
updated_btm_trunk = fake_plugin.get_trunk(q_ctx, t_trunk['id'])
self.assertEqual(updated_btm_trunk['name'], 'new_name')
self.assertEqual(updated_btm_trunk['description'], 'updated')
self.assertFalse(updated_btm_trunk['admin_state_up'])
@patch.object(context, 'get_context_from_neutron_context',
new=fake_get_context_from_neutron_context)
def test_delete_trunk(self):
project_id = TEST_TENANT_ID
q_ctx = FakeNeutronContext()
t_ctx = context.get_db_context()
self._basic_pod_setup()
fake_plugin = FakePlugin()
t_trunk, b_trunk = self._prepare_trunk_test(project_id, t_ctx,
'pod_1', 1, True)
fake_plugin.delete_trunk(q_ctx, t_trunk['id'])
self.assertEqual(len(TOP_TRUNKS), 0)
self.assertEqual(len(BOTTOM1_TRUNKS), 0)
route_filters = [{'key': 'top_id',
'comparator': 'eq',
'value': t_trunk['id']}]
routes = core.query_resource(t_ctx, models.ResourceRouting,
route_filters, [])
self.assertEqual(len(routes), 0)
@patch.object(db_base_plugin_v2.NeutronDbPluginV2, 'get_ports',
new=FakeCorePlugin.get_ports)
@patch.object(db_base_plugin_v2.NeutronDbPluginV2, 'update_port',
new=FakeCorePlugin.update_port)
@patch.object(context, 'get_context_from_neutron_context',
new=fake_get_context_from_neutron_context)
def test_action_subports(self):
project_id = TEST_TENANT_ID
q_ctx = FakeNeutronContext()
t_ctx = context.get_db_context()
self._basic_pod_setup()
fake_plugin = FakePlugin()
t_trunk, b_trunk = self._prepare_trunk_test(project_id, t_ctx,
'pod_1', 1, True)
add_subport_id1 = self._prepare_port_test(project_id, t_ctx, 'pod_1',
1, create_bottom=False)
add_subport_id2 = self._prepare_port_test(project_id, t_ctx, 'pod_1',
2, create_bottom=False)
add_subport_id3 = self._prepare_port_test(project_id, t_ctx, 'pod_1',
3, create_bottom=False)
add_subport_id4 = self._prepare_port_test(project_id, t_ctx, 'pod_1',
4, create_bottom=False)
add_subport_id5 = self._prepare_port_test(project_id, t_ctx, 'pod_1',
5, create_bottom=False)
add_subport_id6 = self._prepare_port_test(project_id, t_ctx, 'pod_1',
6, create_bottom=True)
add_subport_id7 = self._prepare_port_test(project_id, t_ctx, 'pod_1',
7, create_bottom=True)
add_subport_id8 = self._prepare_port_test(project_id, t_ctx, 'pod_1',
8, create_bottom=False)
add_subport_id9 = self._prepare_port_test(project_id, t_ctx, 'pod_1',
9, create_bottom=False)
# Avoid warning: assigned to but never used
ids = [add_subport_id1, add_subport_id2, add_subport_id3,
add_subport_id4, add_subport_id5, add_subport_id6,
add_subport_id7, add_subport_id8, add_subport_id9]
ids.sort()
remove_subports = {'segmentation_type': 'vlan',
'port_id': uuidutils.generate_uuid(),
'segmentation_id': 165}
b_trunk['sub_ports'].append(remove_subports)
add_subports = []
for _id in xrange(1, 10):
port_id = eval("add_subport_id%d" % _id)
subport = {
'segmentation_type': 'vlan',
'port_id': port_id,
'segmentation_id': _id}
add_subports.append(subport)
fake_plugin.add_subports(q_ctx, t_trunk['id'],
{'sub_ports': add_subports})
top_subports = TOP_TRUNKS[0]['sub_ports']
btm_subports = BOTTOM1_TRUNKS[0]['sub_ports']
except_btm_subports = []
for subport in b_trunk['sub_ports']:
if subport['segmentation_id'] == 164:
except_btm_subports.extend([subport])
for subport in add_subports:
subport['trunk_id'] = b_trunk['id']
except_btm_subports.extend(add_subports)
six.assertCountEqual(self, btm_subports, except_btm_subports)
except_top_subports = []
for subport in t_trunk['sub_ports']:
if subport['segmentation_id'] == 164:
except_top_subports.extend([subport])
for subport in add_subports:
subport['trunk_id'] = t_trunk['id']
except_top_subports.extend(add_subports)
except_btm_subports.extend(add_subports)
six.assertCountEqual(self, top_subports, except_top_subports)
self.assertEqual(len(BOTTOM1_PORTS), 10)
map_filters = [{'key': 'resource_type',
'comparator': 'eq',
'value': constants.RT_PORT},
{'key': 'project_id',
'comparator': 'eq',
'value': project_id}]
port_mappings = db_api.list_resource_routings(t_ctx, map_filters)
self.assertEqual(len(port_mappings), 10)
@patch.object(db_base_plugin_v2.NeutronDbPluginV2, 'update_port',
new=FakeCorePlugin.update_port)
@patch.object(context, 'get_context_from_neutron_context',
new=fake_get_context_from_neutron_context)
def test_remove_subports(self):
project_id = TEST_TENANT_ID
q_ctx = FakeNeutronContext()
t_ctx = context.get_db_context()
self._basic_pod_setup()
fake_plugin = FakePlugin()
t_trunk, b_trunk = self._prepare_trunk_test(project_id, t_ctx,
'pod_1', 1, True)
subport_id = t_trunk['sub_ports'][0]['port_id']
remove_subport = {'sub_ports': [{'port_id': subport_id}]}
fake_plugin.remove_subports(q_ctx, t_trunk['id'], remove_subport)
top_subports = TOP_TRUNKS[0]['sub_ports']
btm_subports = BOTTOM1_TRUNKS[0]['sub_ports']
self.assertEqual(len(top_subports), 0)
self.assertEqual(len(btm_subports), 0)
def tearDown(self):
core.ModelBase.metadata.drop_all(core.get_engine())
test_utils.get_resource_store().clean()
cfg.CONF.unregister_opts(q_config.core_opts)
xmanager.IN_TEST = False

+ 383
- 3
tricircle/tests/unit/network/test_helper.py View File

@ -17,15 +17,62 @@ from mock import patch
import six
import unittest
import neutronclient.common.exceptions as q_cli_exceptions
from oslo_config import cfg
from oslo_utils import uuidutils
import neutron.conf.common as q_config
from neutron_lib.api.definitions import portbindings
import neutron_lib.constants as q_constants
import neutron_lib.exceptions as q_exceptions
import neutronclient.common.exceptions as q_cli_exceptions
from tricircle.common import context
from tricircle.common import exceptions as t_exceptions
import tricircle.db.api as db_api
from tricircle.db import core
from tricircle.network import helper
import tricircle.tests.unit.utils as test_utils
_resource_store = test_utils.get_resource_store()
TOP_NETS = _resource_store.TOP_NETWORKS
TOP_SUBNETS = _resource_store.TOP_SUBNETS
BOTTOM1_NETS = _resource_store.BOTTOM1_NETWORKS
BOTTOM1_PORTS = _resource_store.BOTTOM1_PORTS
BOTTOM1_ROUTERS = _resource_store.BOTTOM1_ROUTERS
def get_resource_list(_type, is_top):
pod = 'top' if is_top else 'pod_1'
return _resource_store.pod_store_map[pod][_type]
def get_resource(_type, is_top, resource_id):
for resource in get_resource_list(_type, is_top):
if resource['id'] == resource_id:
return resource
raise q_exceptions.NotFound()
def list_resource(_type, is_top, filters=None):
resource_list = get_resource_list(_type, is_top)
if not filters:
return [resource for resource in get_resource_list(
_type, is_top)]
ret = []
for resource in resource_list:
pick = True
for key, value in six.iteritems(filters):
if resource.get(key) not in value:
pick = False
break
if pick:
ret.append(resource)
return ret
class FakeClient(object):
class FakeClient(test_utils.FakeClient):
def __init__(self, region_name=None):
pass
super(FakeClient, self).__init__(region_name)
def create_ports(self, context, body):
for port in body['ports']:
@ -36,10 +83,192 @@ class FakeClient(object):
port['id'] = port['name'].split('_')[-1]
return body['ports']
def list_networks(self, ctx, filters=None):
networks = self.list_resources('network', ctx, filters)
return networks
def delete_networks(self, ctx, net_id):
self.delete_resources('network', ctx, net_id)
def list_subnets(self, ctx, filters=None):
return self.list_resources('subnet', ctx, filters)
def get_subnets(self, ctx, subnet_id):
return self.get_resource('subnet', ctx, subnet_id)
def delete_subnets(self, ctx, subnet_id):
self.delete_resources('subnet', ctx, subnet_id)
def list_routers(self, ctx, filters=None):
return self.list_resources('router', ctx, filters)
def delete_routers(self, ctx, router_id):
self.delete_resources('router', ctx, router_id)
def action_routers(self, ctx, action, *args, **kwargs):
router_id, body = args
if action == 'add_gateway':
port = {
'admin_state_up': True,
'id': uuidutils.generate_uuid(),
'name': '',
'network_id': body['network_id'],
'fixed_ips': '10.0.1.1',
'mac_address': '',
'device_id': router_id,
'device_owner': 'network:router_gateway',
'binding:vif_type': 'ovs',
'binding:host_id': 'host_1'
}
BOTTOM1_PORTS.append(test_utils.DotDict(port))
elif action == 'remove_gateway':
self.delete_routers(ctx, router_id)
class HelperTest(unittest.TestCase):
def setUp(self):
core.initialize()