neutron/neutron/db/address_scope_db.py

139 lines
6.0 KiB
Python

# Copyright (c) 2015 Huawei Technologies Co.,LTD.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from neutron_lib.api.definitions import address_scope as apidef
from neutron_lib.api.definitions import network as net_def
from neutron_lib import constants
from neutron_lib.db import api as db_api
from neutron_lib.db import resource_extend
from neutron_lib.db import utils as db_utils
from neutron_lib.exceptions import address_scope as api_err
from oslo_utils import uuidutils
from neutron._i18n import _
from neutron.extensions import address_scope as ext_address_scope
from neutron.objects import address_scope as obj_addr_scope
from neutron.objects import base as base_obj
from neutron.objects import subnetpool as subnetpool_obj
@resource_extend.has_resource_extenders
class AddressScopeDbMixin(ext_address_scope.AddressScopePluginBase):
"""Mixin class to add address scope to db_base_plugin_v2."""
__native_bulk_support = True
@staticmethod
def _make_address_scope_dict(address_scope, fields=None):
res = address_scope.to_dict()
return db_utils.resource_fields(res, fields)
def _get_address_scope(self, context, id):
obj = obj_addr_scope.AddressScope.get_object(context, id=id)
if obj is None:
raise api_err.AddressScopeNotFound(address_scope_id=id)
return obj
def is_address_scope_owned_by_tenant(self, context, id):
"""Check if address scope id is owned by the tenant or not.
AddressScopeNotFound is raised if the
- address scope id doesn't exist or
- if the (unshared) address scope id is not owned by this tenant.
@return Returns true if the user is admin or tenant is owner
Returns false if the address scope id is shared and not
owned by the tenant.
"""
address_scope = self._get_address_scope(context, id)
return context.is_admin or (
address_scope.project_id == context.project_id)
def get_ip_version_for_address_scope(self, context, id):
address_scope = self._get_address_scope(context, id)
return address_scope.ip_version
def create_address_scope(self, context, address_scope):
"""Create an address scope."""
a_s = address_scope['address_scope']
address_scope_id = a_s.get('id') or uuidutils.generate_uuid()
# TODO(ralonsoh): remove tenant_id reference once bp/keystone-v3
# migration finishes.
project_id = a_s.get('project_id') or a_s['tenant_id']
pool_args = {'project_id': project_id,
'id': address_scope_id,
'name': a_s['name'],
'shared': a_s['shared'],
'ip_version': a_s['ip_version']}
address_scope = obj_addr_scope.AddressScope(context, **pool_args)
address_scope.create()
return self._make_address_scope_dict(address_scope)
def update_address_scope(self, context, id, address_scope):
a_s = address_scope['address_scope']
address_scope = self._get_address_scope(context, id)
if address_scope.shared and not a_s.get('shared', True):
reason = _("Shared address scope can't be unshared")
raise api_err.AddressScopeUpdateError(
address_scope_id=id, reason=reason)
address_scope.update_fields(a_s)
address_scope.update()
return self._make_address_scope_dict(address_scope)
def get_address_scope(self, context, id, fields=None):
address_scope = self._get_address_scope(context, id)
return self._make_address_scope_dict(address_scope, fields)
def get_address_scopes(self, context, filters=None, fields=None,
sorts=None, limit=None, marker=None,
page_reverse=False):
pager = base_obj.Pager(sorts, limit, page_reverse, marker)
address_scopes = obj_addr_scope.AddressScope.get_objects(
context, _pager=pager, **filters)
return [
self._make_address_scope_dict(addr_scope, fields)
for addr_scope in address_scopes
]
def get_address_scopes_count(self, context, filters=None):
return obj_addr_scope.AddressScope.count(context, **filters)
def delete_address_scope(self, context, id):
with db_api.CONTEXT_WRITER.using(context):
if subnetpool_obj.SubnetPool.get_objects(context,
address_scope_id=id):
raise api_err.AddressScopeInUse(address_scope_id=id)
address_scope = self._get_address_scope(context, id)
address_scope.delete()
@staticmethod
@resource_extend.extends([net_def.COLLECTION_NAME])
def _extend_network_dict_address_scope(network_res, network_db):
network_res[apidef.IPV4_ADDRESS_SCOPE] = None
network_res[apidef.IPV6_ADDRESS_SCOPE] = None
subnetpools = {subnet.subnetpool for subnet in network_db.subnets
if subnet.subnetpool}
for subnetpool in subnetpools:
# A network will be constrained to only one subnetpool per address
# family. Retrieve the address scope of subnetpools as the address
# scopes of network.
as_id = subnetpool[apidef.ADDRESS_SCOPE_ID]
if subnetpool['ip_version'] == constants.IP_VERSION_4:
network_res[apidef.IPV4_ADDRESS_SCOPE] = as_id
if subnetpool['ip_version'] == constants.IP_VERSION_6:
network_res[apidef.IPV6_ADDRESS_SCOPE] = as_id
return network_res