132 lines
5.3 KiB
Python
132 lines
5.3 KiB
Python
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
#
|
|
|
|
import functools
|
|
|
|
from neutron_lib.objects import exceptions as obj_exc
|
|
from neutron_lib.plugins import directory
|
|
from oslo_log import helpers as log_helpers
|
|
from sqlalchemy.orm import exc
|
|
|
|
from neutron.db import _model_query as model_query
|
|
from neutron.db import _resource_extend as resource_extend
|
|
from neutron.db import api as db_api
|
|
from neutron.db import common_db_mixin
|
|
from neutron.db import standard_attr
|
|
from neutron.db import tag_db as tag_methods
|
|
from neutron.extensions import tagging
|
|
from neutron.objects import tag as tag_obj
|
|
|
|
|
|
# Taggable resources
|
|
resource_model_map = standard_attr.get_standard_attr_resource_model_map()
|
|
|
|
|
|
@resource_extend.has_resource_extenders
|
|
class TagPlugin(common_db_mixin.CommonDbMixin, tagging.TagPluginBase):
|
|
"""Implementation of the Neutron Tag Service Plugin."""
|
|
|
|
supported_extension_aliases = ['standard-attr-tag']
|
|
|
|
__filter_validation_support = True
|
|
|
|
def __new__(cls, *args, **kwargs):
|
|
inst = super(TagPlugin, cls).__new__(cls, *args, **kwargs)
|
|
inst._filter_methods = [] # prevent GC of our partial functions
|
|
for model in resource_model_map.values():
|
|
method = functools.partial(tag_methods.apply_tag_filters, model)
|
|
inst._filter_methods.append(method)
|
|
model_query.register_hook(model, "tag",
|
|
query_hook=None,
|
|
filter_hook=None,
|
|
result_filters=method)
|
|
return inst
|
|
|
|
@staticmethod
|
|
@resource_extend.extends(list(resource_model_map))
|
|
def _extend_tags_dict(response_data, db_data):
|
|
if not directory.get_plugin(tagging.TAG_PLUGIN_TYPE):
|
|
return
|
|
tags = [tag_db.tag for tag_db in db_data.standard_attr.tags]
|
|
response_data['tags'] = tags
|
|
|
|
def _get_resource(self, context, resource, resource_id):
|
|
model = resource_model_map[resource]
|
|
try:
|
|
return model_query.get_by_id(context, model, resource_id)
|
|
except exc.NoResultFound:
|
|
raise tagging.TagResourceNotFound(resource=resource,
|
|
resource_id=resource_id)
|
|
|
|
@log_helpers.log_method_call
|
|
def get_tags(self, context, resource, resource_id):
|
|
res = self._get_resource(context, resource, resource_id)
|
|
tags = [tag_db.tag for tag_db in res.standard_attr.tags]
|
|
return dict(tags=tags)
|
|
|
|
@log_helpers.log_method_call
|
|
def get_tag(self, context, resource, resource_id, tag):
|
|
res = self._get_resource(context, resource, resource_id)
|
|
if not any(tag == tag_db.tag for tag_db in res.standard_attr.tags):
|
|
raise tagging.TagNotFound(tag=tag)
|
|
|
|
@log_helpers.log_method_call
|
|
@db_api.retry_if_session_inactive()
|
|
def update_tags(self, context, resource, resource_id, body):
|
|
with db_api.context_manager.writer.using(context):
|
|
# We get and do all operations with objects in one session
|
|
res = self._get_resource(context, resource, resource_id)
|
|
new_tags = set(body['tags'])
|
|
old_tags = {tag_db.tag for tag_db in res.standard_attr.tags}
|
|
tags_added = new_tags - old_tags
|
|
tags_removed = old_tags - new_tags
|
|
if tags_removed:
|
|
tag_obj.Tag.delete_objects(
|
|
context,
|
|
standard_attr_id=res.standard_attr_id,
|
|
tag=[
|
|
tag_db.tag
|
|
for tag_db in res.standard_attr.tags
|
|
if tag_db.tag in tags_removed
|
|
]
|
|
)
|
|
for tag in tags_added:
|
|
tag_obj.Tag(context, standard_attr_id=res.standard_attr_id,
|
|
tag=tag).create()
|
|
return body
|
|
|
|
@log_helpers.log_method_call
|
|
def update_tag(self, context, resource, resource_id, tag):
|
|
res = self._get_resource(context, resource, resource_id)
|
|
if any(tag == tag_db.tag for tag_db in res.standard_attr.tags):
|
|
return
|
|
try:
|
|
tag_obj.Tag(context, standard_attr_id=res.standard_attr_id,
|
|
tag=tag).create()
|
|
except obj_exc.NeutronDbObjectDuplicateEntry:
|
|
pass
|
|
|
|
@log_helpers.log_method_call
|
|
def delete_tags(self, context, resource, resource_id):
|
|
res = self._get_resource(context, resource, resource_id)
|
|
tag_obj.Tag.delete_objects(context,
|
|
standard_attr_id=res.standard_attr_id)
|
|
|
|
@log_helpers.log_method_call
|
|
def delete_tag(self, context, resource, resource_id, tag):
|
|
res = self._get_resource(context, resource, resource_id)
|
|
if not tag_obj.Tag.delete_objects(context,
|
|
tag=tag, standard_attr_id=res.standard_attr_id):
|
|
raise tagging.TagNotFound(tag=tag)
|