Enhance tag mechanism

This patch enhances the tag mechanism for subnet, port, subnetpool,
router resources. The tag-ext as new extension is added so that
tag supports their resources.

APIImpact: Adds tag support to subnet, port, subnetpool, router
DocImpact: allow users to set tags on some resources

Change-Id: I3ab8c2f47f283bee7219f39f20b07361b8e0c5f1
Closes-Bug: #1661608
(cherry picked from commit b56f008f3a)
This commit is contained in:
Hirofumi Ichihara 2017-01-19 13:52:39 +09:00
parent cdeb93b0b3
commit fbf40fe1ba
9 changed files with 416 additions and 143 deletions

View File

@ -84,6 +84,7 @@ class RouterInterfaceAttachmentConflict(nexception.Conflict):
message = _("Error %(reason)s while attempting the operation.")
ROUTER = 'router'
ROUTERS = 'routers'
FLOATINGIP = 'floatingip'
FLOATINGIPS = '%ss' % FLOATINGIP

View File

@ -35,8 +35,9 @@ MAX_TAG_LEN = 60
TAG_PLUGIN_TYPE = 'TAG'
TAG_SUPPORTED_RESOURCES = {
# We shouldn't add new resources here. If more resources need to be tagged,
# we must add them in new extension.
attributes.NETWORKS: attributes.NETWORK,
# other resources can be added
}
TAG_ATTRIBUTE_MAP = {
@ -52,14 +53,6 @@ class TagNotFound(exceptions.NotFound):
message = _("Tag %(tag)s could not be found.")
def get_parent_resource_and_id(kwargs):
for key in kwargs:
for resource in TAG_SUPPORTED_RESOURCES:
if key == TAG_SUPPORTED_RESOURCES[resource] + '_id':
return resource, kwargs[key]
return None, None
def validate_tag(tag):
msg = validators.validate_string(tag, MAX_TAG_LEN)
if msg:
@ -88,17 +81,25 @@ def notify_tag_action(context, action, parent, parent_id, tags=None):
class TagController(object):
def __init__(self):
self.plugin = directory.get_plugin(TAG_PLUGIN_TYPE)
self.supported_resources = TAG_SUPPORTED_RESOURCES
def _get_parent_resource_and_id(self, kwargs):
for key in kwargs:
for resource in self.supported_resources:
if key == self.supported_resources[resource] + '_id':
return resource, kwargs[key]
return None, None
def index(self, request, **kwargs):
# GET /v2.0/networks/{network_id}/tags
parent, parent_id = get_parent_resource_and_id(kwargs)
parent, parent_id = self._get_parent_resource_and_id(kwargs)
return self.plugin.get_tags(request.context, parent, parent_id)
def show(self, request, id, **kwargs):
# GET /v2.0/networks/{network_id}/tags/{tag}
# id == tag
validate_tag(id)
parent, parent_id = get_parent_resource_and_id(kwargs)
parent, parent_id = self._get_parent_resource_and_id(kwargs)
return self.plugin.get_tag(request.context, parent, parent_id, id)
def create(self, request, **kwargs):
@ -110,7 +111,7 @@ class TagController(object):
# PUT /v2.0/networks/{network_id}/tags/{tag}
# id == tag
validate_tag(id)
parent, parent_id = get_parent_resource_and_id(kwargs)
parent, parent_id = self._get_parent_resource_and_id(kwargs)
notify_tag_action(request.context, 'create.start',
parent, parent_id, [id])
result = self.plugin.update_tag(request.context, parent, parent_id, id)
@ -122,7 +123,7 @@ class TagController(object):
# PUT /v2.0/networks/{network_id}/tags
# body: {"tags": ["aaa", "bbb"]}
validate_tags(body)
parent, parent_id = get_parent_resource_and_id(kwargs)
parent, parent_id = self._get_parent_resource_and_id(kwargs)
notify_tag_action(request.context, 'update.start',
parent, parent_id, body['tags'])
result = self.plugin.update_tags(request.context, parent,
@ -135,7 +136,7 @@ class TagController(object):
# DELETE /v2.0/networks/{network_id}/tags/{tag}
# id == tag
validate_tag(id)
parent, parent_id = get_parent_resource_and_id(kwargs)
parent, parent_id = self._get_parent_resource_and_id(kwargs)
notify_tag_action(request.context, 'delete.start',
parent, parent_id, [id])
result = self.plugin.delete_tag(request.context, parent, parent_id, id)
@ -145,7 +146,7 @@ class TagController(object):
def delete_all(self, request, **kwargs):
# DELETE /v2.0/networks/{network_id}/tags
parent, parent_id = get_parent_resource_and_id(kwargs)
parent, parent_id = self._get_parent_resource_and_id(kwargs)
notify_tag_action(request.context, 'delete_all.start',
parent, parent_id)
result = self.plugin.delete_tags(request.context, parent, parent_id)

View File

@ -0,0 +1,90 @@
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from neutron_lib.api import extensions as api_extensions
from neutron_lib.plugins import directory
from neutron.api import extensions
from neutron.api.v2 import attributes
from neutron.api.v2 import base
from neutron.api.v2 import resource as api_resource
from neutron.extensions import l3
from neutron.extensions import tag as tag_base
TAG_SUPPORTED_RESOURCES = {
# We shouldn't add new resources here. If more resources need to be tagged,
# we must add them in new extension.
attributes.SUBNETS: attributes.SUBNET,
attributes.PORTS: attributes.PORT,
attributes.SUBNETPOOLS: attributes.SUBNETPOOL,
l3.ROUTERS: l3.ROUTER,
}
class TagExtController(tag_base.TagController):
def __init__(self):
self.plugin = directory.get_plugin(tag_base.TAG_PLUGIN_TYPE)
self.supported_resources = TAG_SUPPORTED_RESOURCES
class Tag_ext(api_extensions.ExtensionDescriptor):
"""Extension class supporting tags for ext resources."""
@classmethod
def get_name(cls):
return ("Tag support for resources: %s"
% ', '.join(TAG_SUPPORTED_RESOURCES.values()))
@classmethod
def get_alias(cls):
return "tag-ext"
@classmethod
def get_description(cls):
return "Extends tag support to more L2 and L3 resources."
@classmethod
def get_updated(cls):
return "2017-01-01T00:00:00-00:00"
@classmethod
def get_resources(cls):
"""Returns Ext Resources."""
exts = []
action_status = {'index': 200, 'show': 204, 'update': 201,
'update_all': 200, 'delete': 204, 'delete_all': 204}
controller = api_resource.Resource(TagExtController(),
base.FAULT_MAP,
action_status=action_status)
collection_methods = {"delete_all": "DELETE",
"update_all": "PUT"}
exts = []
for collection_name, member_name in TAG_SUPPORTED_RESOURCES.items():
parent = {'member_name': member_name,
'collection_name': collection_name}
exts.append(extensions.ResourceExtension(
tag_base.TAGS, controller, parent,
collection_methods=collection_methods))
return exts
def get_optional_extensions(self):
return ['router']
def get_extended_resources(self, version):
if version != "2.0":
return {}
EXTENDED_ATTRIBUTES_2_0 = {}
for collection_name in TAG_SUPPORTED_RESOURCES:
EXTENDED_ATTRIBUTES_2_0[collection_name] = (
tag_base.TAG_ATTRIBUTE_MAP)
return EXTENDED_ATTRIBUTES_2_0

View File

@ -14,6 +14,7 @@
import functools
from neutron_lib.plugins import directory
from oslo_db import api as oslo_db_api
from oslo_db import exception as db_exc
from oslo_log import helpers as log_helpers
@ -22,19 +23,28 @@ from sqlalchemy.orm import exc
from neutron.api.v2 import attributes
from neutron.db import api as db_api
from neutron.db import common_db_mixin
from neutron.db.models import l3 as l3_model
from neutron.db.models import tag as tag_model
from neutron.db import models_v2
from neutron.db import tag_db as tag_methods
from neutron.extensions import l3 as l3_ext
from neutron.extensions import tag as tag_ext
resource_model_map = {
# When we'll add other resources, we must add new extension for them
# if we don't have better discovery mechanism instead of it.
attributes.NETWORKS: models_v2.Network,
# other resources can be added
attributes.SUBNETS: models_v2.Subnet,
attributes.PORTS: models_v2.Port,
attributes.SUBNETPOOLS: models_v2.SubnetPool,
l3_ext.ROUTERS: l3_model.Router,
}
def _extend_tags_dict(plugin, response_data, db_data):
if not directory.get_plugin(tag_ext.TAG_PLUGIN_TYPE):
return
tags = [tag_db.tag for tag_db in db_data.standard_attr.tags]
response_data['tags'] = tags
@ -42,7 +52,7 @@ def _extend_tags_dict(plugin, response_data, db_data):
class TagPlugin(common_db_mixin.CommonDbMixin, tag_ext.TagPluginBase):
"""Implementation of the Neutron Tag Service Plugin."""
supported_extension_aliases = ['tag']
supported_extension_aliases = ['tag', 'tag-ext']
def _get_resource(self, context, resource, resource_id):
model = resource_model_map[resource]

View File

@ -39,6 +39,7 @@ NETWORK_API_EXTENSIONS="
standard-attr-timestamp, \
subnet_allocation, \
tag, \
tag-ext, \
trunk, \
trunk-details"
NETWORK_API_EXTENSIONS="$(echo $NETWORK_API_EXTENSIONS | tr -d ' ')"

View File

@ -38,6 +38,7 @@ NETWORK_API_EXTENSIONS="
standard-attr-timestamp, \
subnet_allocation, \
tag, \
tag-ext, \
trunk, \
trunk-details"
NETWORK_API_EXTENSIONS="$(echo $NETWORK_API_EXTENSIONS | tr -d ' ')"

View File

@ -83,9 +83,72 @@ class TagNetworkTestJSON(TagTestJSON):
self._test_tag_operations()
class TagSubnetTestJSON(TagTestJSON):
resource = 'subnets'
@classmethod
def _create_resource(cls):
network = cls.create_network()
subnet = cls.create_subnet(network)
return subnet['id']
@test.attr(type='smoke')
@test.idempotent_id('2805aabf-a94c-4e70-a0b2-9814f06beb03')
@test.requires_ext(extension="tag-ext", service="network")
def test_subnet_tags(self):
self._test_tag_operations()
class TagPortTestJSON(TagTestJSON):
resource = 'ports'
@classmethod
def _create_resource(cls):
network = cls.create_network()
port = cls.create_port(network)
return port['id']
@test.attr(type='smoke')
@test.idempotent_id('c7c44f2c-edb0-4ebd-a386-d37cec155c34')
@test.requires_ext(extension="tag-ext", service="network")
def test_port_tags(self):
self._test_tag_operations()
class TagSubnetPoolTestJSON(TagTestJSON):
resource = 'subnetpools'
@classmethod
def _create_resource(cls):
subnetpool = cls.create_subnetpool('subnetpool', default_prefixlen=24,
prefixes=['10.0.0.0/8'])
return subnetpool['id']
@test.attr(type='smoke')
@test.idempotent_id('bdc1c24b-c0b5-4835-953c-8f67dc11edfe')
@test.requires_ext(extension="tag-ext", service="network")
def test_subnetpool_tags(self):
self._test_tag_operations()
class TagRouterTestJSON(TagTestJSON):
resource = 'routers'
@classmethod
def _create_resource(cls):
router = cls.create_router(router_name='test')
return router['id']
@test.attr(type='smoke')
@test.idempotent_id('b898ff92-dc33-4232-8ab9-2c6158c80d28')
@test.requires_ext(extension="router", service="network")
@test.requires_ext(extension="tag-ext", service="network")
def test_router_tags(self):
self._test_tag_operations()
class TagFilterTestJSON(base.BaseAdminNetworkTest):
credentials = ['primary', 'alt', 'admin']
resource = 'networks'
@classmethod
@test.requires_ext(extension="tag", service="network")
@ -166,9 +229,88 @@ class TagFilterNetworkTestJSON(TagFilterTestJSON):
def _list_resource(self, filters):
res = self.client.list_networks(**filters)
return res['networks']
return res[self.resource]
@test.attr(type='smoke')
@test.idempotent_id('a66b5cca-7db2-40f5-a33d-8ac9f864e53e')
def test_filter_network_tags(self):
self._test_filter_tags()
class TagFilterSubnetTestJSON(TagFilterTestJSON):
resource = 'subnets'
@classmethod
def _create_resource(cls, name):
network = cls.create_network()
res = cls.create_subnet(network, name=name)
return res['id']
def _list_resource(self, filters):
res = self.client.list_subnets(**filters)
return res[self.resource]
@test.attr(type='smoke')
@test.idempotent_id('dd8f9ba7-bcf6-496f-bead-714bd3daac10')
@test.requires_ext(extension="tag-ext", service="network")
def test_filter_subnet_tags(self):
self._test_filter_tags()
class TagFilterPortTestJSON(TagFilterTestJSON):
resource = 'ports'
@classmethod
def _create_resource(cls, name):
network = cls.create_network()
res = cls.create_port(network, name=name)
return res['id']
def _list_resource(self, filters):
res = self.client.list_ports(**filters)
return res[self.resource]
@test.attr(type='smoke')
@test.idempotent_id('09c036b8-c8d0-4bee-b776-7f4601512898')
@test.requires_ext(extension="tag-ext", service="network")
def test_filter_port_tags(self):
self._test_filter_tags()
class TagFilterSubnetpoolTestJSON(TagFilterTestJSON):
resource = 'subnetpools'
@classmethod
def _create_resource(cls, name):
res = cls.create_subnetpool(name, default_prefixlen=24,
prefixes=['10.0.0.0/8'])
return res['id']
def _list_resource(self, filters):
res = self.client.list_subnetpools(**filters)
return res[self.resource]
@test.attr(type='smoke')
@test.idempotent_id('16ae7ad2-55c2-4821-9195-bfd04ab245b7')
@test.requires_ext(extension="tag-ext", service="network")
def test_filter_subnetpool_tags(self):
self._test_filter_tags()
class TagFilterRouterTestJSON(TagFilterTestJSON):
resource = 'routers'
@classmethod
def _create_resource(cls, name):
res = cls.create_router(router_name=name)
return res['id']
def _list_resource(self, filters):
res = self.client.list_routers(**filters)
return res[self.resource]
@test.attr(type='smoke')
@test.idempotent_id('cdd3f3ea-073d-4435-a6cb-826a4064193d')
@test.requires_ext(extension="tag-ext", service="network")
def test_filter_router_tags(self):
self._test_filter_tags()

View File

@ -10,56 +10,100 @@
# License for the specific language governing permissions and limitations
# under the License.
import testscenarios
from neutron.api import extensions
from neutron.api.v2 import attributes
from neutron.common import config
import neutron.extensions
from neutron.services.tag import tag_plugin
from neutron.tests import fake_notifier
from neutron.tests.unit.db import test_db_base_plugin_v2
from neutron.tests.unit.extensions import test_l3
load_tests = testscenarios.load_tests_apply_scenarios
extensions_path = ':'.join(neutron.extensions.__path__)
class TestTagApiBase(test_db_base_plugin_v2.NeutronDbPluginV2TestCase):
class TestTagApiBase(test_db_base_plugin_v2.NeutronDbPluginV2TestCase,
test_l3.L3NatTestCaseMixin):
scenarios = [
('Network Tag Test',
dict(resource='networks',
member='network')),
('Subnet Tag Test',
dict(resource='subnets',
member='subnet')),
('Port Tag Test',
dict(resource='ports',
member='port')),
('Subnetpool Tag Test',
dict(resource='subnetpools',
member='subnetpool')),
('Router Tag Test',
dict(resource='routers',
member='router')),
]
def setUp(self):
service_plugins = {'TAG': "neutron.services.tag.tag_plugin.TagPlugin"}
service_plugins = {
'TAG': "neutron.services.tag.tag_plugin.TagPlugin",
'router':
"neutron.tests.unit.extensions.test_l3.TestL3NatServicePlugin"}
super(TestTagApiBase, self).setUp(service_plugins=service_plugins)
plugin = tag_plugin.TagPlugin()
l3_plugin = test_l3.TestL3NatServicePlugin()
ext_mgr = extensions.PluginAwareExtensionManager(
extensions_path, {'TAG': plugin}
extensions_path, {'router': l3_plugin, 'TAG': plugin}
)
ext_mgr.extend_resources("2.0", attributes.RESOURCE_ATTRIBUTE_MAP)
app = config.load_paste_app('extensions_test_app')
self.ext_api = extensions.ExtensionMiddleware(app, ext_mgr=ext_mgr)
def _get_resource_tags(self, resource_id):
res = self._show(self.resource, resource_id)
def _make_resource(self):
if self.resource == "networks":
res = self._make_network(self.fmt, 'net1', True)
elif self.resource == "subnets":
net = self._make_network(self.fmt, 'net1', True)
res = self._make_subnet(self.fmt, net, '10.0.0.1', '10.0.0.0/24')
elif self.resource == "ports":
net = self._make_network(self.fmt, 'net1', True)
res = self._make_port(self.fmt, net['network']['id'])
elif self.resource == "subnetpools":
res = self._make_subnetpool(self.fmt, ['10.0.0.0/8'],
name='my pool', tenant_id="tenant")
elif self.resource == "routers":
res = self._make_router(self.fmt, None)
return res[self.member]['id']
def _get_resource_tags(self):
res = self._show(self.resource, self.resource_id)
return res[self.member]['tags']
def _put_tag(self, resource_id, tag):
req = self._req('PUT', self.resource, id=resource_id,
def _put_tag(self, tag):
req = self._req('PUT', self.resource, id=self.resource_id,
subresource='tags', sub_id=tag)
return req.get_response(self.ext_api)
def _put_tags(self, resource_id, tags):
def _put_tags(self, tags):
body = {'tags': tags}
req = self._req('PUT', self.resource, data=body, id=resource_id,
req = self._req('PUT', self.resource, data=body, id=self.resource_id,
subresource='tags')
return req.get_response(self.ext_api)
def _get_tag(self, resource_id, tag):
req = self._req('GET', self.resource, id=resource_id,
def _get_tag(self, tag):
req = self._req('GET', self.resource, id=self.resource_id,
subresource='tags', sub_id=tag)
return req.get_response(self.ext_api)
def _delete_tag(self, resource_id, tag):
req = self._req('DELETE', self.resource, id=resource_id,
def _delete_tag(self, tag):
req = self._req('DELETE', self.resource, id=self.resource_id,
subresource='tags', sub_id=tag)
return req.get_response(self.ext_api)
def _delete_tags(self, resource_id):
req = self._req('DELETE', self.resource, id=resource_id,
def _delete_tags(self):
req = self._req('DELETE', self.resource, id=self.resource_id,
subresource='tags')
return req.get_response(self.ext_api)
@ -83,9 +127,7 @@ class TestTagApiBase(test_db_base_plugin_v2.NeutronDbPluginV2TestCase):
not_tags=None, not_tags_any=None):
params = self._make_query_string(tags, tags_any, not_tags,
not_tags_any)
req = self._req('GET', self.resource, params=params)
res = req.get_response(self.api)
res = self.deserialize(self.fmt, res)
res = self._list(self.resource, query_params=params)
return res[self.resource]
def _test_notification_report(self, expect_notify):
@ -96,168 +138,149 @@ class TestTagApiBase(test_db_base_plugin_v2.NeutronDbPluginV2TestCase):
fake_notifier.reset()
class TestNetworkTagApi(TestTagApiBase):
resource = 'networks'
member = 'network'
class TestResourceTagApi(TestTagApiBase):
def setUp(self):
super(TestResourceTagApi, self).setUp()
self.resource_id = self._make_resource()
def test_put_tag(self):
expect_notify = set(['tag.create.start',
'tag.create.end'])
with self.network() as net:
net_id = net['network']['id']
res = self._put_tag(net_id, 'red')
self.assertEqual(201, res.status_int)
tags = self._get_resource_tags(net_id)
self._assertEqualTags(['red'], tags)
self._test_notification_report(expect_notify)
res = self._put_tag(net_id, 'blue')
self.assertEqual(201, res.status_int)
tags = self._get_resource_tags(net_id)
self._assertEqualTags(['red', 'blue'], tags)
self._test_notification_report(expect_notify)
res = self._put_tag('red')
self.assertEqual(201, res.status_int)
tags = self._get_resource_tags()
self._assertEqualTags(['red'], tags)
self._test_notification_report(expect_notify)
res = self._put_tag('blue')
self.assertEqual(201, res.status_int)
tags = self._get_resource_tags()
self._assertEqualTags(['red', 'blue'], tags)
self._test_notification_report(expect_notify)
def test_put_tag_exists(self):
with self.network() as net:
net_id = net['network']['id']
res = self._put_tag(net_id, 'blue')
self.assertEqual(201, res.status_int)
res = self._put_tag(net_id, 'blue')
self.assertEqual(201, res.status_int)
res = self._put_tag('blue')
self.assertEqual(201, res.status_int)
res = self._put_tag('blue')
self.assertEqual(201, res.status_int)
def test_put_tags(self):
expect_notify = set(['tag.update.start',
'tag.update.end'])
with self.network() as net:
net_id = net['network']['id']
res = self._put_tags(net_id, ['red', 'green'])
self.assertEqual(200, res.status_int)
tags = self._get_resource_tags(net_id)
self._assertEqualTags(['red', 'green'], tags)
self._test_notification_report(expect_notify)
res = self._put_tags(['red', 'green'])
self.assertEqual(200, res.status_int)
tags = self._get_resource_tags()
self._assertEqualTags(['red', 'green'], tags)
self._test_notification_report(expect_notify)
def test_put_tags_replace(self):
with self.network() as net:
net_id = net['network']['id']
res = self._put_tags(net_id, ['red', 'green'])
self.assertEqual(200, res.status_int)
tags = self._get_resource_tags(net_id)
self._assertEqualTags(['red', 'green'], tags)
res = self._put_tags(net_id, ['blue', 'red'])
self.assertEqual(200, res.status_int)
tags = self._get_resource_tags(net_id)
self._assertEqualTags(['blue', 'red'], tags)
res = self._put_tags(['red', 'green'])
self.assertEqual(200, res.status_int)
tags = self._get_resource_tags()
self._assertEqualTags(['red', 'green'], tags)
res = self._put_tags(['blue', 'red'])
self.assertEqual(200, res.status_int)
tags = self._get_resource_tags()
self._assertEqualTags(['blue', 'red'], tags)
def test_get_tag(self):
with self.network() as net:
net_id = net['network']['id']
res = self._put_tag(net_id, 'red')
self.assertEqual(201, res.status_int)
res = self._get_tag(net_id, 'red')
self.assertEqual(204, res.status_int)
res = self._put_tag('red')
self.assertEqual(201, res.status_int)
res = self._get_tag('red')
self.assertEqual(204, res.status_int)
def test_get_tag_notfound(self):
with self.network() as net:
net_id = net['network']['id']
res = self._put_tag(net_id, 'red')
self.assertEqual(201, res.status_int)
res = self._get_tag(net_id, 'green')
self.assertEqual(404, res.status_int)
res = self._put_tag('red')
self.assertEqual(201, res.status_int)
res = self._get_tag('green')
self.assertEqual(404, res.status_int)
def test_delete_tag(self):
expect_notify = set(['tag.delete.start',
'tag.delete.end'])
with self.network() as net:
net_id = net['network']['id']
res = self._put_tags(net_id, ['red', 'green'])
self.assertEqual(200, res.status_int)
res = self._delete_tag(net_id, 'red')
self.assertEqual(204, res.status_int)
tags = self._get_resource_tags(net_id)
self._assertEqualTags(['green'], tags)
self._test_notification_report(expect_notify)
res = self._put_tags(['red', 'green'])
self.assertEqual(200, res.status_int)
res = self._delete_tag('red')
self.assertEqual(204, res.status_int)
tags = self._get_resource_tags()
self._assertEqualTags(['green'], tags)
self._test_notification_report(expect_notify)
def test_delete_tag_notfound(self):
with self.network() as net:
net_id = net['network']['id']
res = self._put_tags(net_id, ['red', 'green'])
self.assertEqual(200, res.status_int)
res = self._delete_tag(net_id, 'blue')
self.assertEqual(404, res.status_int)
res = self._put_tags(['red', 'green'])
self.assertEqual(200, res.status_int)
res = self._delete_tag('blue')
self.assertEqual(404, res.status_int)
def test_delete_tags(self):
expect_notify = set(['tag.delete_all.start',
'tag.delete_all.end'])
with self.network() as net:
net_id = net['network']['id']
res = self._put_tags(net_id, ['red', 'green'])
self.assertEqual(200, res.status_int)
res = self._delete_tags(net_id)
self.assertEqual(204, res.status_int)
tags = self._get_resource_tags(net_id)
self._assertEqualTags([], tags)
self._test_notification_report(expect_notify)
res = self._put_tags(['red', 'green'])
self.assertEqual(200, res.status_int)
res = self._delete_tags()
self.assertEqual(204, res.status_int)
tags = self._get_resource_tags()
self._assertEqualTags([], tags)
self._test_notification_report(expect_notify)
class TestNetworkTagFilter(TestTagApiBase):
resource = 'networks'
member = 'network'
class TestResourceTagFilter(TestTagApiBase):
def setUp(self):
super(TestNetworkTagFilter, self).setUp()
self._prepare_network_tags()
super(TestResourceTagFilter, self).setUp()
self._prepare_resource_tags()
def _prepare_network_tags(self):
res = self._make_network(self.fmt, 'net1', True)
net1_id = res['network']['id']
res = self._make_network(self.fmt, 'net2', True)
net2_id = res['network']['id']
res = self._make_network(self.fmt, 'net3', True)
net3_id = res['network']['id']
res = self._make_network(self.fmt, 'net4', True)
net4_id = res['network']['id']
res = self._make_network(self.fmt, 'net5', True)
net5_id = res['network']['id']
def _make_tags(self, resource_id, tags):
body = {'tags': tags}
req = self._req('PUT', self.resource, data=body, id=resource_id,
subresource='tags')
return req.get_response(self.ext_api)
self._put_tags(net1_id, ['red'])
self._put_tags(net2_id, ['red', 'blue'])
self._put_tags(net3_id, ['red', 'blue', 'green'])
self._put_tags(net4_id, ['green'])
# net5: no tags
tags = self._get_resource_tags(net5_id)
self._assertEqualTags([], tags)
def _prepare_resource_tags(self):
self.res1 = self._make_resource()
self.res2 = self._make_resource()
self.res3 = self._make_resource()
self.res4 = self._make_resource()
self.res5 = self._make_resource()
self._make_tags(self.res1, ['red'])
self._make_tags(self.res2, ['red', 'blue'])
self._make_tags(self.res3, ['red', 'blue', 'green'])
self._make_tags(self.res4, ['green'])
# res5: no tags
def _assertEqualResources(self, expected, res):
actual = [n['name'] for n in res]
actual = [n['id'] for n in res]
self.assertEqual(set(expected), set(actual))
def test_filter_tags_single(self):
res = self._get_tags_filter_resources(tags=['red'])
self._assertEqualResources(['net1', 'net2', 'net3'], res)
self._assertEqualResources([self.res1, self.res2, self.res3], res)
def test_filter_tags_multi(self):
res = self._get_tags_filter_resources(tags=['red', 'blue'])
self._assertEqualResources(['net2', 'net3'], res)
self._assertEqualResources([self.res2, self.res3], res)
def test_filter_tags_any_single(self):
res = self._get_tags_filter_resources(tags_any=['blue'])
self._assertEqualResources(['net2', 'net3'], res)
self._assertEqualResources([self.res2, self.res3], res)
def test_filter_tags_any_multi(self):
res = self._get_tags_filter_resources(tags_any=['red', 'blue'])
self._assertEqualResources(['net1', 'net2', 'net3'], res)
self._assertEqualResources([self.res1, self.res2, self.res3], res)
def test_filter_not_tags_single(self):
res = self._get_tags_filter_resources(not_tags=['red'])
self._assertEqualResources(['net4', 'net5'], res)
self._assertEqualResources([self.res4, self.res5], res)
def test_filter_not_tags_multi(self):
res = self._get_tags_filter_resources(not_tags=['red', 'blue'])
self._assertEqualResources(['net1', 'net4', 'net5'], res)
self._assertEqualResources([self.res1, self.res4, self.res5], res)
def test_filter_not_tags_any_single(self):
res = self._get_tags_filter_resources(not_tags_any=['blue'])
self._assertEqualResources(['net1', 'net4', 'net5'], res)
self._assertEqualResources([self.res1, self.res4, self.res5], res)
def test_filter_not_tags_any_multi(self):
res = self._get_tags_filter_resources(not_tags_any=['red', 'blue'])
self._assertEqualResources(['net4', 'net5'], res)
self._assertEqualResources([self.res4, self.res5], res)

View File

@ -0,0 +1,4 @@
---
features:
- Resource tag mechanism now supports subnet, port, subnetpool and
router resources.