Enhanced unittests for admin utils
Run the admin utilities tests after creating ome basic neutron objects, and additng parameters to the calls. Change-Id: Id98c5baaacfa4cde2badeaf9ce7d40b0eb474655
This commit is contained in:
parent
ad03066423
commit
d289824e2f
|
@ -351,7 +351,11 @@ def change_edge_appliance_reservations(properties):
|
||||||
LOG.error(_LE("Please configure resource"))
|
LOG.error(_LE("Please configure resource"))
|
||||||
return
|
return
|
||||||
edge_id = properties.get('edge-id')
|
edge_id = properties.get('edge-id')
|
||||||
|
try:
|
||||||
h, edge = nsxv.get_edge(edge_id)
|
h, edge = nsxv.get_edge(edge_id)
|
||||||
|
except exceptions.NeutronException as e:
|
||||||
|
LOG.error(_LE("%s"), str(e))
|
||||||
|
return
|
||||||
appliances = edge['appliances']['appliances']
|
appliances = edge['appliances']['appliances']
|
||||||
for appliance in appliances:
|
for appliance in appliances:
|
||||||
appliance.update(reservations)
|
appliance.update(reservations)
|
||||||
|
@ -432,7 +436,7 @@ def nsx_update_edge(resource, event, trigger, **kwargs):
|
||||||
"(optional) --property shares=<shares> and/or "
|
"(optional) --property shares=<shares> and/or "
|
||||||
"(optional) --property reservation=<reservation> "
|
"(optional) --property reservation=<reservation> "
|
||||||
"\nFor hostgroup updates, add "
|
"\nFor hostgroup updates, add "
|
||||||
"--property hostgroup=True|False")
|
"--property hostgroup=update/all/clean")
|
||||||
if not kwargs.get('property'):
|
if not kwargs.get('property'):
|
||||||
LOG.error(usage_msg)
|
LOG.error(usage_msg)
|
||||||
return
|
return
|
||||||
|
|
|
@ -272,7 +272,9 @@ class FakeVcns(object):
|
||||||
"featureType": "dhcp_4.0",
|
"featureType": "dhcp_4.0",
|
||||||
"version": 14,
|
"version": 14,
|
||||||
"enabled": True,
|
"enabled": True,
|
||||||
"staticBindings": {"staticBindings": [{}]},
|
"staticBindings": {"staticBindings": [{
|
||||||
|
"macAddress": "fa:16:3e:e6:ad:ce",
|
||||||
|
"bindingId": "binding-1"}]},
|
||||||
"ipPools": {"ipPools": []}
|
"ipPools": {"ipPools": []}
|
||||||
}
|
}
|
||||||
return (header, response)
|
return (header, response)
|
||||||
|
@ -396,13 +398,15 @@ class FakeVcns(object):
|
||||||
|
|
||||||
def get_edge(self, edge_id):
|
def get_edge(self, edge_id):
|
||||||
if edge_id not in self._edges:
|
if edge_id not in self._edges:
|
||||||
raise Exception(_("Edge %s does not exist!") % edge_id)
|
raise exceptions.VcnsGeneralException(
|
||||||
|
_("Edge %s does not exist!") % edge_id)
|
||||||
header = {
|
header = {
|
||||||
'status': 200
|
'status': 200
|
||||||
}
|
}
|
||||||
response = {
|
response = {
|
||||||
'name': 'fake-edge',
|
'name': 'fake-edge',
|
||||||
'id': edge_id
|
'id': edge_id,
|
||||||
|
'appliances': {'appliances': []}
|
||||||
}
|
}
|
||||||
return (header, response)
|
return (header, response)
|
||||||
|
|
||||||
|
@ -424,6 +428,24 @@ class FakeVcns(object):
|
||||||
}
|
}
|
||||||
return (header, response)
|
return (header, response)
|
||||||
|
|
||||||
|
def get_vdn_switch(self, dvs_id):
|
||||||
|
header = {
|
||||||
|
'status': 200
|
||||||
|
}
|
||||||
|
response = {
|
||||||
|
'name': 'fake-switch',
|
||||||
|
'id': dvs_id,
|
||||||
|
'teamingPolicy': 'ETHER_CHANNEL'
|
||||||
|
}
|
||||||
|
return (header, response)
|
||||||
|
|
||||||
|
def update_vdn_switch(self, switch):
|
||||||
|
header = {
|
||||||
|
'status': 200
|
||||||
|
}
|
||||||
|
response = ''
|
||||||
|
return (header, response)
|
||||||
|
|
||||||
def update_routes(self, edge_id, routes):
|
def update_routes(self, edge_id, routes):
|
||||||
header = {
|
header = {
|
||||||
'status': 200
|
'status': 200
|
||||||
|
@ -909,7 +931,7 @@ class FakeVcns(object):
|
||||||
def get_security_group(self, sg_id):
|
def get_security_group(self, sg_id):
|
||||||
sg = self._securitygroups.get(sg_id)
|
sg = self._securitygroups.get(sg_id)
|
||||||
if sg:
|
if sg:
|
||||||
return ('<securitygroup><objectId>"%s"</objectId><name>"%s"'
|
return ('<securitygroup><objectId>%s</objectId><name>"%s"'
|
||||||
'</name></securitygroup>'
|
'</name></securitygroup>'
|
||||||
% (sg_id, sg.get("name")))
|
% (sg_id, sg.get("name")))
|
||||||
|
|
||||||
|
@ -919,6 +941,7 @@ class FakeVcns(object):
|
||||||
for k in self._securitygroups.keys():
|
for k in self._securitygroups.keys():
|
||||||
if k not in ('ids', 'names'):
|
if k not in ('ids', 'names'):
|
||||||
response += self.get_security_group(k)
|
response += self.get_security_group(k)
|
||||||
|
response = "<securitygroups>%s</securitygroups>" % response
|
||||||
return header, response
|
return header, response
|
||||||
|
|
||||||
def create_redirect_section(self, request):
|
def create_redirect_section(self, request):
|
||||||
|
@ -1029,9 +1052,11 @@ class FakeVcns(object):
|
||||||
|
|
||||||
def get_dfw_config(self):
|
def get_dfw_config(self):
|
||||||
response = ""
|
response = ""
|
||||||
for sec_id in range(0, self._sections['section_ids']):
|
for sec_id in self._sections.keys():
|
||||||
|
if sec_id.isdigit():
|
||||||
h, r = self._get_section(str(sec_id))
|
h, r = self._get_section(str(sec_id))
|
||||||
response += r
|
response += r
|
||||||
|
response = "<sections>%s</sections>" % response
|
||||||
headers = {'status': 200}
|
headers = {'status': 200}
|
||||||
return (headers, response)
|
return (headers, response)
|
||||||
|
|
||||||
|
@ -1096,7 +1121,11 @@ class FakeVcns(object):
|
||||||
self._spoofguard_policies[int(policy_id)] = {}
|
self._spoofguard_policies[int(policy_id)] = {}
|
||||||
|
|
||||||
def get_spoofguard_policy(self, policy_id):
|
def get_spoofguard_policy(self, policy_id):
|
||||||
|
try:
|
||||||
return None, self._spoofguard_policies[int(policy_id)]
|
return None, self._spoofguard_policies[int(policy_id)]
|
||||||
|
except IndexError:
|
||||||
|
raise exceptions.VcnsGeneralException(
|
||||||
|
_("Spoofguard policy not found"))
|
||||||
|
|
||||||
def get_spoofguard_policies(self):
|
def get_spoofguard_policies(self):
|
||||||
return None, {'policies': self._spoofguard_policies}
|
return None, {'policies': self._spoofguard_policies}
|
||||||
|
@ -1204,6 +1233,9 @@ class FakeVcns(object):
|
||||||
return (header, response)
|
return (header, response)
|
||||||
|
|
||||||
def update_edge_syslog(self, edge_id, config):
|
def update_edge_syslog(self, edge_id, config):
|
||||||
|
if edge_id not in self._edges:
|
||||||
|
raise exceptions.VcnsGeneralException(
|
||||||
|
_("edge not found"))
|
||||||
self._edges[edge_id]['syslog'] = config
|
self._edges[edge_id]['syslog'] = config
|
||||||
header = {
|
header = {
|
||||||
'status': 204
|
'status': 204
|
||||||
|
|
|
@ -27,13 +27,16 @@ from neutron.common import config as neutron_config
|
||||||
from neutron.db import servicetype_db # noqa
|
from neutron.db import servicetype_db # noqa
|
||||||
from neutron.quota import resource_registry
|
from neutron.quota import resource_registry
|
||||||
from neutron.tests import base
|
from neutron.tests import base
|
||||||
from neutron.tests.unit.db import test_db_base_plugin_v2 as test_n_plugin
|
from neutron.tests.unit.api import test_extensions
|
||||||
|
|
||||||
from vmware_nsx._i18n import _
|
from vmware_nsx._i18n import _
|
||||||
from vmware_nsx.common import config # noqa
|
from vmware_nsx.common import config # noqa
|
||||||
|
from vmware_nsx.db import nsxv_db
|
||||||
|
from vmware_nsx.dvs import dvs_utils
|
||||||
|
import vmware_nsx.shell.admin.plugins.nsxv.resources.utils as utils
|
||||||
from vmware_nsx.shell import resources
|
from vmware_nsx.shell import resources
|
||||||
from vmware_nsx.tests import unit as vmware
|
from vmware_nsx.tests import unit as vmware
|
||||||
from vmware_nsx.tests.unit.nsx_v.vshield import fake_vcns
|
from vmware_nsx.tests.unit.nsx_v import test_plugin as test_v_plugin
|
||||||
from vmware_nsx.tests.unit.nsx_v3 import test_plugin as test_v3_plugin
|
from vmware_nsx.tests.unit.nsx_v3 import test_plugin as test_v3_plugin
|
||||||
from vmware_nsxlib.v3 import resources as nsx_v3_resources
|
from vmware_nsxlib.v3 import resources as nsx_v3_resources
|
||||||
|
|
||||||
|
@ -96,29 +99,91 @@ class AbstractTestAdminUtils(base.BaseTestCase):
|
||||||
for op in res_dict[res].supported_ops:
|
for op in res_dict[res].supported_ops:
|
||||||
self._test_resource(res_name, op)
|
self._test_resource(res_name, op)
|
||||||
|
|
||||||
|
def _test_resources_with_args(self, res_dict, func_args):
|
||||||
|
for res in res_dict.keys():
|
||||||
|
res_name = res_dict[res].name
|
||||||
|
for op in res_dict[res].supported_ops:
|
||||||
|
args = {'property': func_args}
|
||||||
|
self._test_resource(res_name, op, **args)
|
||||||
|
|
||||||
|
|
||||||
class TestNsxvAdminUtils(AbstractTestAdminUtils,
|
class TestNsxvAdminUtils(AbstractTestAdminUtils,
|
||||||
test_n_plugin.NeutronDbPluginV2TestCase):
|
test_v_plugin.NsxVPluginV2TestCase):
|
||||||
|
|
||||||
def _init_mock_plugin(self):
|
|
||||||
mock_vcns = mock.patch(vmware.VCNS_NAME, autospec=True)
|
|
||||||
mock_vcns_instance = mock_vcns.start()
|
|
||||||
self.fc = fake_vcns.FakeVcns()
|
|
||||||
mock_vcns_instance.return_value = self.fc
|
|
||||||
|
|
||||||
self.addCleanup(self.fc.reset_all)
|
|
||||||
super(TestNsxvAdminUtils, self)._init_mock_plugin()
|
|
||||||
|
|
||||||
def _get_plugin_name(self):
|
def _get_plugin_name(self):
|
||||||
return 'nsxv'
|
return 'nsxv'
|
||||||
|
|
||||||
|
def _init_mock_plugin(self, *mocks):
|
||||||
|
super(TestNsxvAdminUtils, self)._init_mock_plugin()
|
||||||
|
|
||||||
|
# support the dvs manager:
|
||||||
|
mock.patch.object(dvs_utils, 'dvs_create_session').start()
|
||||||
|
# override metadata get-object
|
||||||
|
dummy_lb = {
|
||||||
|
'enabled': True,
|
||||||
|
'enableServiceInsertion': True,
|
||||||
|
'accelerationEnabled': True,
|
||||||
|
'virtualServer': [],
|
||||||
|
'applicationProfile': [],
|
||||||
|
'pool': [],
|
||||||
|
'applicationRule': []
|
||||||
|
}
|
||||||
|
mock.patch('vmware_nsx.plugins.nsx_v.vshield.nsxv_edge_cfg_obj.'
|
||||||
|
'NsxvEdgeCfgObj.get_object',
|
||||||
|
return_value=dummy_lb).start()
|
||||||
|
|
||||||
|
# Create a router to make sure we have deployed an edge
|
||||||
|
self.create_router()
|
||||||
|
self.edge_id = self.get_edge_id()
|
||||||
|
|
||||||
def test_nsxv_resources(self):
|
def test_nsxv_resources(self):
|
||||||
self._test_resources(resources.nsxv_resources)
|
self._test_resources(resources.nsxv_resources)
|
||||||
|
|
||||||
# This is an example how to test a specific utility with arguments
|
def _test_edge_nsx_update(self, edge_id, params):
|
||||||
def test_with_args(self):
|
args = {'property': ["edge-id=%s" % edge_id]}
|
||||||
args = {'property': ["xxx=yyy"]}
|
args['property'].extend(params)
|
||||||
self._test_resource('security-groups', 'fix-mismatch', **args)
|
self._test_resource('edges', 'nsx-update', **args)
|
||||||
|
|
||||||
|
def create_router(self):
|
||||||
|
# Global configuration to support router creation without messing up
|
||||||
|
# the plugin wrapper
|
||||||
|
cfg.CONF.set_override('track_quota_usage', False,
|
||||||
|
group='QUOTAS')
|
||||||
|
ext_mgr = test_v_plugin.TestL3ExtensionManager()
|
||||||
|
ext_api = test_extensions.setup_extensions_middleware(ext_mgr)
|
||||||
|
|
||||||
|
# Create an exclusive router (with an edge)
|
||||||
|
tenant_id = uuidutils.generate_uuid()
|
||||||
|
data = {'router': {'tenant_id': tenant_id}}
|
||||||
|
data['router']['name'] = 'dummy'
|
||||||
|
data['router']['admin_state_up'] = True
|
||||||
|
data['router']['router_type'] = 'exclusive'
|
||||||
|
router_req = self.new_create_request('routers', data, self.fmt)
|
||||||
|
res = router_req.get_response(ext_api)
|
||||||
|
r = self.deserialize(self.fmt, res)
|
||||||
|
return r
|
||||||
|
|
||||||
|
def get_edge_id(self):
|
||||||
|
edgeapi = utils.NeutronDbClient()
|
||||||
|
bindings = nsxv_db.get_nsxv_router_bindings(edgeapi.context.session)
|
||||||
|
for binding in bindings:
|
||||||
|
if binding.edge_id:
|
||||||
|
return binding.edge_id
|
||||||
|
# use a dummy edge
|
||||||
|
return "edge-1"
|
||||||
|
|
||||||
|
def test_edge_nsx_updates(self):
|
||||||
|
"""Test eges/nsx-update utility with different inputs."""
|
||||||
|
self._test_edge_nsx_update(self.edge_id, ["appliances=true"])
|
||||||
|
self._test_edge_nsx_update(self.edge_id, ["size=compact"])
|
||||||
|
self._test_edge_nsx_update(self.edge_id, ["hostgroup=update"])
|
||||||
|
self._test_edge_nsx_update(self.edge_id, ["hostgroup=all"])
|
||||||
|
self._test_edge_nsx_update(self.edge_id, ["hostgroup=clean"])
|
||||||
|
self._test_edge_nsx_update(self.edge_id, ["highavailability=True"])
|
||||||
|
self._test_edge_nsx_update(self.edge_id, ["resource=cpu", "limit=100"])
|
||||||
|
self._test_edge_nsx_update(self.edge_id, ["syslog-server=1.1.1.1",
|
||||||
|
"syslog-proto=tcp",
|
||||||
|
"log-level=debug"])
|
||||||
|
|
||||||
def test_bad_args(self):
|
def test_bad_args(self):
|
||||||
args = {'property': ["xxx"]}
|
args = {'property': ["xxx"]}
|
||||||
|
@ -126,6 +191,25 @@ class TestNsxvAdminUtils(AbstractTestAdminUtils,
|
||||||
'networks', 'nsx-update', **args)
|
'networks', 'nsx-update', **args)
|
||||||
self.assertEqual(1, len(errors))
|
self.assertEqual(1, len(errors))
|
||||||
|
|
||||||
|
def test_resources_with_common_args(self):
|
||||||
|
"""Run all nsxv admin utilities with some common arguments
|
||||||
|
|
||||||
|
Using arguments like edge-id which many apis need
|
||||||
|
This improves the test coverage
|
||||||
|
"""
|
||||||
|
args = ["edge-id=%s" % self.edge_id,
|
||||||
|
"router-id=e5b9b249-0034-4729-8ab6-fe4dacaa3a12",
|
||||||
|
"policy-id=1",
|
||||||
|
"network_id=net-1",
|
||||||
|
"net-id=net-1",
|
||||||
|
"security-group-id=sg-1",
|
||||||
|
"dvs-id=dvs-1",
|
||||||
|
"moref=virtualwire-1",
|
||||||
|
"teamingpolicy=LACP_ACTIVE"
|
||||||
|
]
|
||||||
|
self._test_resources_with_args(
|
||||||
|
resources.nsxv_resources, args)
|
||||||
|
|
||||||
|
|
||||||
class TestNsxv3AdminUtils(AbstractTestAdminUtils,
|
class TestNsxv3AdminUtils(AbstractTestAdminUtils,
|
||||||
test_v3_plugin.NsxV3PluginTestCaseMixin):
|
test_v3_plugin.NsxV3PluginTestCaseMixin):
|
||||||
|
@ -161,3 +245,20 @@ class TestNsxv3AdminUtils(AbstractTestAdminUtils,
|
||||||
|
|
||||||
def test_nsxv3_resources(self):
|
def test_nsxv3_resources(self):
|
||||||
self._test_resources(resources.nsxv3_resources)
|
self._test_resources(resources.nsxv3_resources)
|
||||||
|
|
||||||
|
def test_resources_with_common_args(self):
|
||||||
|
"""Run all nsxv3 admin utilities with some common arguments
|
||||||
|
|
||||||
|
Using arguments like dhcp_profile_uuid which many apis need
|
||||||
|
This improves the test coverage
|
||||||
|
"""
|
||||||
|
args = ["dhcp_profile_uuid=e5b9b249-0034-4729-8ab6-fe4dacaa3a12",
|
||||||
|
"metadata_proxy_uuid=e5b9b249-0034-4729-8ab6-fe4dacaa3a12",
|
||||||
|
]
|
||||||
|
# Create some neutron objects for the utilities to run on
|
||||||
|
with self._create_l3_ext_network() as network:
|
||||||
|
with self.subnet(network=network) as subnet:
|
||||||
|
with self.port(subnet=subnet):
|
||||||
|
# Run all utilities with backend objects
|
||||||
|
self._test_resources_with_args(
|
||||||
|
resources.nsxv3_resources, args)
|
||||||
|
|
Loading…
Reference in New Issue