neutron/neutron/tests/unit/extensions/test_tag.py

407 lines
16 KiB
Python

# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from neutron_lib.api import attributes
from neutron_lib import context
from oslo_utils import uuidutils
import testscenarios
from neutron.api import extensions
from neutron.common import config
import neutron.extensions
from neutron.objects.qos import policy
from neutron.objects import trunk
from neutron.services.tag import tag_plugin
from neutron.tests import fake_notifier
from neutron.tests.unit.extensions import test_l3
from neutron.tests.unit.extensions import test_securitygroup
DB_PLUGIN_KLASS = 'neutron.tests.unit.extensions.test_tag.TestTagPlugin'
load_tests = testscenarios.load_tests_apply_scenarios
extensions_path = ':'.join(neutron.extensions.__path__)
class TestTagPlugin(test_securitygroup.SecurityGroupTestPlugin,
test_l3.TestL3NatBasePlugin):
__native_pagination_support = True
__native_sorting_support = True
supported_extension_aliases = ["external-net", "security-group"]
class TestTagApiBase(test_securitygroup.SecurityGroupsTestCase,
test_l3.L3NatTestCaseMixin):
scenarios = [
('Network Tag Test',
dict(collection='networks',
member='network')),
('Subnet Tag Test',
dict(collection='subnets',
member='subnet')),
('Port Tag Test',
dict(collection='ports',
member='port')),
('Subnetpool Tag Test',
dict(collection='subnetpools',
member='subnetpool')),
('Router Tag Test',
dict(collection='routers',
member='router')),
('Floatingip Tag Test',
dict(collection='floatingips',
member='floatingip')),
('Securitygroup Tag Test',
dict(collection='security-groups',
member='security_group')),
('QoS Policy Tag Test',
dict(collection='policies',
member='policy')),
('Trunk Tag Test',
dict(collection='trunks',
member='trunk')),
]
def setUp(self):
service_plugins = {
'TAG': "neutron.services.tag.tag_plugin.TagPlugin",
'router':
"neutron.tests.unit.extensions.test_l3.TestL3NatServicePlugin"}
super(TestTagApiBase, self).setUp(plugin=DB_PLUGIN_KLASS,
service_plugins=service_plugins)
plugin = tag_plugin.TagPlugin()
l3_plugin = test_l3.TestL3NatServicePlugin()
sec_plugin = test_securitygroup.SecurityGroupTestPlugin()
ext_mgr = extensions.PluginAwareExtensionManager(
extensions_path, {'router': l3_plugin, 'TAG': plugin,
'sec': sec_plugin}
)
ext_mgr.extend_resources("2.0", attributes.RESOURCES)
app = config.load_paste_app('extensions_test_app')
self.ext_api = extensions.ExtensionMiddleware(app, ext_mgr=ext_mgr)
def _is_object(self):
return self.collection in ['policies', 'trunks']
def _prepare_make_resource(self):
if self.collection == "floatingips":
net = self._make_network(self.fmt, 'net1', True)
self._set_net_external(net['network']['id'])
self._make_subnet(self.fmt, net, '10.0.0.1', '10.0.0.0/24')
info = {'network_id': net['network']['id']}
self._make_router(self.fmt, None,
external_gateway_info=info)
self.net = net['network']
def _make_object(self):
ctxt = context.get_admin_context()
if self.collection == "policies":
self.obj = policy.QosPolicy(context=ctxt,
id=uuidutils.generate_uuid(),
project_id='tenant', name='pol1',
rules=[])
elif self.collection == "trunks":
net = self._make_network(self.fmt, 'net1', True)
port = self._make_port(self.fmt, net['network']['id'])
self.obj = trunk.Trunk(context=ctxt,
id=uuidutils.generate_uuid(),
project_id='tenant', name='',
port_id=port['port']['id'])
self.obj.create()
return self.obj.id
def _make_resource(self):
if self._is_object():
return self._make_object()
if self.collection == "networks":
res = self._make_network(self.fmt, 'net1', True)
elif self.collection == "subnets":
net = self._make_network(self.fmt, 'net1', True)
res = self._make_subnet(self.fmt, net, '10.0.0.1', '10.0.0.0/24')
elif self.collection == "ports":
net = self._make_network(self.fmt, 'net1', True)
res = self._make_port(self.fmt, net['network']['id'])
elif self.collection == "subnetpools":
res = self._make_subnetpool(self.fmt, ['10.0.0.0/8'],
name='my pool', tenant_id="tenant")
elif self.collection == "routers":
res = self._make_router(self.fmt, None)
elif self.collection == "floatingips":
res = self._make_floatingip(self.fmt, self.net['id'])
elif self.collection == "security-groups":
res = self._make_security_group(self.fmt, 'sec1', '')
return res[self.member]['id']
def _get_object_tags(self):
ctxt = context.get_admin_context()
res = self.obj.get_object(ctxt, id=self.resource_id)
return res.to_dict()['tags']
def _get_resource_tags(self):
if self._is_object():
return self._get_object_tags()
res = self._show(self.collection, self.resource_id)
return res[self.member]['tags']
def _put_tag(self, tag):
req = self._req('PUT', self.collection, id=self.resource_id,
subresource='tags', sub_id=tag)
return req.get_response(self.ext_api)
def _put_tags(self, tags=None, body=None):
if tags:
body = {'tags': tags}
elif body:
body = body
else:
body = {}
req = self._req('PUT', self.collection, data=body, id=self.resource_id,
subresource='tags')
return req.get_response(self.ext_api)
def _get_tag(self, tag):
req = self._req('GET', self.collection, id=self.resource_id,
subresource='tags', sub_id=tag)
return req.get_response(self.ext_api)
def _delete_tag(self, tag):
req = self._req('DELETE', self.collection, id=self.resource_id,
subresource='tags', sub_id=tag)
return req.get_response(self.ext_api)
def _delete_tags(self):
req = self._req('DELETE', self.collection, id=self.resource_id,
subresource='tags')
return req.get_response(self.ext_api)
def _assertEqualTags(self, expected, actual):
self.assertEqual(set(expected), set(actual))
def _get_tags_filter_objects(self, tags, tags_any, not_tags,
not_tags_any):
filters = {}
if tags:
filters['tags'] = tags
if tags_any:
filters['tags-any'] = tags_any
if not_tags:
filters['not-tags'] = not_tags
if not_tags_any:
filters['not-tags-any'] = not_tags_any
if self.collection == "policies":
obj_class = policy.QosPolicy
elif self.collection == "trunks":
obj_class = trunk.Trunk
ctxt = context.get_admin_context()
res = obj_class.get_objects(ctxt, **filters)
return [n.id for n in res]
def _make_query_string(self, tags, tags_any, not_tags, not_tags_any):
filter_strings = []
if tags:
filter_strings.append("tags=" + ','.join(tags))
if tags_any:
filter_strings.append("tags-any=" + ','.join(tags_any))
if not_tags:
filter_strings.append("not-tags=" + ','.join(not_tags))
if not_tags_any:
filter_strings.append("not-tags-any=" + ','.join(not_tags_any))
return '&'.join(filter_strings)
def _get_tags_filter_resources(self, tags=None, tags_any=None,
not_tags=None, not_tags_any=None):
if self._is_object():
return self._get_tags_filter_objects(tags, tags_any, not_tags,
not_tags_any)
params = self._make_query_string(tags, tags_any, not_tags,
not_tags_any)
res = self._list(self.collection, query_params=params)
return [n['id'] for n in res[self.collection.replace('-', '_')]]
def _test_notification_report(self, expect_notify):
notify = set(n['event_type'] for n in fake_notifier.NOTIFICATIONS)
duplicated_notify = expect_notify & notify
self.assertEqual(expect_notify, duplicated_notify)
fake_notifier.reset()
class TestResourceTagApi(TestTagApiBase):
def setUp(self):
super(TestResourceTagApi, self).setUp()
self._prepare_make_resource()
self.resource_id = self._make_resource()
def test_put_tag(self):
expect_notify = set(['tag.create.start',
'tag.create.end'])
res = self._put_tag('red')
self.assertEqual(201, res.status_int)
tags = self._get_resource_tags()
self._assertEqualTags(['red'], tags)
self._test_notification_report(expect_notify)
res = self._put_tag('blue')
self.assertEqual(201, res.status_int)
tags = self._get_resource_tags()
self._assertEqualTags(['red', 'blue'], tags)
self._test_notification_report(expect_notify)
def test_put_tag_exists(self):
res = self._put_tag('blue')
self.assertEqual(201, res.status_int)
res = self._put_tag('blue')
self.assertEqual(201, res.status_int)
def test_put_tags(self):
expect_notify = set(['tag.update.start',
'tag.update.end'])
res = self._put_tags(['red', 'green'])
self.assertEqual(200, res.status_int)
tags = self._get_resource_tags()
self._assertEqualTags(['red', 'green'], tags)
self._test_notification_report(expect_notify)
def test_put_invalid_tags(self):
res = self._put_tags()
self.assertEqual(400, res.status_int)
res = self._put_tags(body=7)
self.assertEqual(400, res.status_int)
res = self._put_tags(body={'invalid': True})
self.assertEqual(400, res.status_int)
def test_put_tags_replace(self):
res = self._put_tags(['red', 'green'])
self.assertEqual(200, res.status_int)
tags = self._get_resource_tags()
self._assertEqualTags(['red', 'green'], tags)
res = self._put_tags(['blue', 'red'])
self.assertEqual(200, res.status_int)
tags = self._get_resource_tags()
self._assertEqualTags(['blue', 'red'], tags)
def test_get_tag(self):
res = self._put_tag('red')
self.assertEqual(201, res.status_int)
res = self._get_tag('red')
self.assertEqual(204, res.status_int)
def test_get_tag_notfound(self):
res = self._put_tag('red')
self.assertEqual(201, res.status_int)
res = self._get_tag('green')
self.assertEqual(404, res.status_int)
def test_delete_tag(self):
expect_notify = set(['tag.delete.start',
'tag.delete.end'])
res = self._put_tags(['red', 'green'])
self.assertEqual(200, res.status_int)
res = self._delete_tag('red')
self.assertEqual(204, res.status_int)
tags = self._get_resource_tags()
self._assertEqualTags(['green'], tags)
self._test_notification_report(expect_notify)
def test_delete_tag_notfound(self):
res = self._put_tags(['red', 'green'])
self.assertEqual(200, res.status_int)
res = self._delete_tag('blue')
self.assertEqual(404, res.status_int)
def test_delete_tags(self):
expect_notify = set(['tag.delete_all.start',
'tag.delete_all.end'])
res = self._put_tags(['red', 'green'])
self.assertEqual(200, res.status_int)
res = self._delete_tags()
self.assertEqual(204, res.status_int)
tags = self._get_resource_tags()
self._assertEqualTags([], tags)
self._test_notification_report(expect_notify)
class TestResourceTagFilter(TestTagApiBase):
def setUp(self):
super(TestResourceTagFilter, self).setUp()
self._prepare_resource_tags()
def _make_tags(self, resource_id, tags):
body = {'tags': tags}
req = self._req('PUT', self.collection, data=body, id=resource_id,
subresource='tags')
return req.get_response(self.ext_api)
def _prepare_resource_tags(self):
self._prepare_make_resource()
self.res1 = self._make_resource()
self.res2 = self._make_resource()
self.res3 = self._make_resource()
self.res4 = self._make_resource()
self.res5 = self._make_resource()
self.res_ids = [self.res1, self.res2, self.res3, self.res4, self.res5]
self._make_tags(self.res1, ['red'])
self._make_tags(self.res2, ['red', 'blue'])
self._make_tags(self.res3, ['red', 'blue', 'green'])
self._make_tags(self.res4, ['green'])
# res5: no tags
def _assertEqualResources(self, expected, resources):
actual = [n for n in resources if n in self.res_ids]
self.assertEqual(set(expected), set(actual))
def test_filter_tags_single(self):
resources = self._get_tags_filter_resources(tags=['red'])
self._assertEqualResources([self.res1, self.res2, self.res3],
resources)
def test_filter_tags_multi(self):
resources = self._get_tags_filter_resources(tags=['red', 'blue'])
self._assertEqualResources([self.res2, self.res3], resources)
def test_filter_tags_any_single(self):
resources = self._get_tags_filter_resources(tags_any=['blue'])
self._assertEqualResources([self.res2, self.res3], resources)
def test_filter_tags_any_multi(self):
resources = self._get_tags_filter_resources(tags_any=['red', 'blue'])
self._assertEqualResources([self.res1, self.res2, self.res3],
resources)
def test_filter_not_tags_single(self):
resources = self._get_tags_filter_resources(not_tags=['red'])
self._assertEqualResources([self.res4, self.res5], resources)
def test_filter_not_tags_multi(self):
resources = self._get_tags_filter_resources(not_tags=['red', 'blue'])
self._assertEqualResources([self.res1, self.res4, self.res5],
resources)
def test_filter_not_tags_any_single(self):
resources = self._get_tags_filter_resources(not_tags_any=['blue'])
self._assertEqualResources([self.res1, self.res4, self.res5],
resources)
def test_filter_not_tags_any_multi(self):
resources = self._get_tags_filter_resources(not_tags_any=['red',
'blue'])
self._assertEqualResources([self.res4, self.res5], resources)