Allow sharing of security groups via RBAC mechanism

Neutron-lib api ref: https://review.openstack.org/#/c/635313/
Tempest tests: https://review.openstack.org/#/c/635312/
Client: https://review.openstack.org/#/c/635428/

Partial-Bug: #1817119
Depends-On: https://review.openstack.org/635313
Change-Id: I974b0a603b6ca75cf080fb7b0751c7fb87df8443
This commit is contained in:
Doug Wiegley 2019-02-06 14:05:26 -07:00 committed by Miguel Lavalle
parent fe73e8c9b3
commit 5e0fc3d2da
22 changed files with 342 additions and 49 deletions

View File

@ -17,6 +17,7 @@ is supported by:
* Regular port creation permissions on networks (since Liberty). * Regular port creation permissions on networks (since Liberty).
* Binding QoS policies permissions to networks or ports (since Mitaka). * Binding QoS policies permissions to networks or ports (since Mitaka).
* Attaching router gateways to networks (since Mitaka). * Attaching router gateways to networks (since Mitaka).
* Binding security groups to ports (since Stein).
Sharing an object with specific projects Sharing an object with specific projects
@ -201,11 +202,91 @@ This process can be repeated any number of times to share a qos-policy
with an arbitrary number of projects. with an arbitrary number of projects.
Sharing a security group with specific projects
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Create a security group to share:
.. code-block:: console
$ openstack security group create my_security_group
+-------------------+--------------------------------------+
| Field | Value |
+-------------------+--------------------------------------+
| created_at | 2019-02-07T06:09:59Z |
| description | my_security_group |
| id | 5ba835b7-22b0-4be6-bdbe-e0722d1b5f24 |
| location | None |
| name | my_security_group |
| project_id | 077e8f39d3db4c9e998d842b0503283a |
| revision_number | 1 |
| rules | ... |
| tags | [] |
| updated_at | 2019-02-07T06:09:59Z |
+-------------------+--------------------------------------+
Create the RBAC policy entry using the :command:`openstack network rbac create`
command (in this example, the ID of the project we want to share with is
``32016615de5d43bb88de99e7f2e26a1e``):
.. code-block:: console
$ openstack network rbac create --target-project \
32016615de5d43bb88de99e7f2e26a1e --action access_as_shared \
--type security_group 5ba835b7-22b0-4be6-bdbe-e0722d1b5f24
+-------------------+--------------------------------------+
| Field | Value |
+-------------------+--------------------------------------+
| action | access_as_shared |
| id | 8828e38d-a0df-4c78-963b-e5f215d3d550 |
| name | None |
| object_id | 5ba835b7-22b0-4be6-bdbe-e0722d1b5f24 |
| object_type | security_group |
| project_id | 077e8f39d3db4c9e998d842b0503283a |
| target_project_id | 32016615de5d43bb88de99e7f2e26a1e |
+-------------------+--------------------------------------+
The ``target-project`` parameter specifies the project that requires
access to the security group. The ``action`` parameter specifies what
the project is allowed to do. The ``type`` parameter says
that the target object is a security group. The final parameter is the ID of
the security group we are granting access to.
Project ``32016615de5d43bb88de99e7f2e26a1e`` will now be able to see
the security group when running :command:`openstack security group list` and
:command:`openstack security group show` and will also be able to bind
it to its ports. No other users (other than admins and the owner)
will be able to see the security group.
To remove access for that project, delete the RBAC policy that allows
it using the :command:`openstack network rbac delete` command:
.. code-block:: console
$ openstack network rbac delete 8828e38d-a0df-4c78-963b-e5f215d3d550
If that project has ports with the security group applied to them,
the server will not delete the RBAC policy until
the security group is no longer in use:
.. code-block:: console
$ openstack network rbac delete 8828e38d-a0df-4c78-963b-e5f215d3d550
RBAC policy on object 8828e38d-a0df-4c78-963b-e5f215d3d550
cannot be removed because other objects depend on it.
This process can be repeated any number of times to share a security-group
with an arbitrary number of projects.
How the 'shared' flag relates to these entries How the 'shared' flag relates to these entries
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
As introduced in other guide entries, neutron provides a means of As introduced in other guide entries, neutron provides a means of
making an object (``network``, ``qos-policy``) available to every project. making an object (``network``, ``qos-policy``, ``security-group``) available
to every project.
This is accomplished using the ``shared`` flag on the supported object: This is accomplished using the ``shared`` flag on the supported object:
.. code-block:: console .. code-block:: console

View File

@ -16,6 +16,7 @@
import functools import functools
from neutron_lib.api.definitions import rbac_security_groups as rbac_sg_apidef
from oslo_config import cfg from oslo_config import cfg
from oslo_log import log as logging from oslo_log import log as logging
import oslo_messaging import oslo_messaging
@ -43,6 +44,7 @@ def disable_security_group_extension_by_config(aliases):
if not is_firewall_enabled(): if not is_firewall_enabled():
LOG.info('Disabled security-group extension.') LOG.info('Disabled security-group extension.')
_disable_extension('security-group', aliases) _disable_extension('security-group', aliases)
_disable_extension(rbac_sg_apidef.ALIAS, aliases)
LOG.info('Disabled allowed-address-pairs extension.') LOG.info('Disabled allowed-address-pairs extension.')
_disable_extension('allowed-address-pairs', aliases) _disable_extension('allowed-address-pairs', aliases)

View File

@ -37,7 +37,7 @@ rules = [
), ),
policy.DocumentedRuleDefault( policy.DocumentedRuleDefault(
'get_security_group', 'get_security_group',
base.RULE_ADMIN_OR_OWNER, base.RULE_ANY,
'Get a security group', 'Get a security group',
[ [
{ {

View File

@ -1 +1 @@
0ff9e3881597 9bfad3f1e780

View File

@ -0,0 +1,48 @@
# Copyright 2019 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
from alembic import op
import sqlalchemy as sa
"""support shared security groups
Revision ID: 9bfad3f1e780
Revises: 0ff9e3881597
Create Date: 2019-02-05 15:24:45.011378
"""
# revision identifiers, used by Alembic.
revision = '9bfad3f1e780'
down_revision = '0ff9e3881597'
def upgrade():
op.create_table('securitygrouprbacs',
sa.Column('project_id', sa.String(length=255), nullable=True),
sa.Column('id', sa.String(length=36), nullable=False),
sa.Column('target_tenant', sa.String(length=255), nullable=False),
sa.Column('action', sa.String(length=255), nullable=False),
sa.Column('object_id', sa.String(length=36), nullable=False),
sa.ForeignKeyConstraint(['object_id'], ['securitygroups.id'],
ondelete='CASCADE'),
sa.PrimaryKeyConstraint('id'),
sa.UniqueConstraint('target_tenant', 'object_id', 'action',
name='uniq_securitygrouprbacs0'
'target_tenant0object_id0action')
)
op.create_index(op.f('ix_securitygrouprbacs_project_id'),
'securitygrouprbacs', ['project_id'], unique=False)

View File

@ -18,6 +18,7 @@ import sqlalchemy as sa
from sqlalchemy import orm from sqlalchemy import orm
from neutron.db import models_v2 from neutron.db import models_v2
from neutron.db import rbac_db_models
from neutron.db import standard_attr from neutron.db import standard_attr
from neutron.extensions import securitygroup as sg from neutron.extensions import securitygroup as sg
@ -27,6 +28,10 @@ class SecurityGroup(standard_attr.HasStandardAttributes, model_base.BASEV2,
"""Represents a v2 neutron security group.""" """Represents a v2 neutron security group."""
name = sa.Column(sa.String(db_const.NAME_FIELD_SIZE)) name = sa.Column(sa.String(db_const.NAME_FIELD_SIZE))
rbac_entries = sa.orm.relationship(rbac_db_models.SecurityGroupRBAC,
backref='security_group',
lazy='subquery',
cascade='all, delete, delete-orphan')
api_collections = [sg.SECURITYGROUPS] api_collections = [sg.SECURITYGROUPS]
collection_resource_map = {sg.SECURITYGROUPS: 'security_group'} collection_resource_map = {sg.SECURITYGROUPS: 'security_group'}
tag_support = True tag_support = True

View File

@ -115,3 +115,14 @@ class QosPolicyRBAC(RBACColumns, model_base.BASEV2):
@staticmethod @staticmethod
def get_valid_actions(): def get_valid_actions():
return (ACCESS_SHARED,) return (ACCESS_SHARED,)
class SecurityGroupRBAC(RBACColumns, model_base.BASEV2):
"""RBAC table for security groups."""
object_id = _object_id_column('securitygroups.id')
object_type = 'security_group'
@staticmethod
def get_valid_actions():
return (ACCESS_SHARED,)

View File

@ -35,6 +35,7 @@ from neutron._i18n import _
from neutron.common import constants as n_const from neutron.common import constants as n_const
from neutron.common import utils from neutron.common import utils
from neutron.db.models import securitygroup as sg_models from neutron.db.models import securitygroup as sg_models
from neutron.db import rbac_db_mixin as rbac_mixin
from neutron.extensions import securitygroup as ext_sg from neutron.extensions import securitygroup as ext_sg
from neutron.objects import base as base_obj from neutron.objects import base as base_obj
from neutron.objects import securitygroup as sg_obj from neutron.objects import securitygroup as sg_obj
@ -42,7 +43,8 @@ from neutron.objects import securitygroup as sg_obj
@resource_extend.has_resource_extenders @resource_extend.has_resource_extenders
@registry.has_registry_receivers @registry.has_registry_receivers
class SecurityGroupDbMixin(ext_sg.SecurityGroupPluginBase): class SecurityGroupDbMixin(ext_sg.SecurityGroupPluginBase,
rbac_mixin.RbacPluginMixin):
"""Mixin class to add security group to db_base_plugin_v2.""" """Mixin class to add security group to db_base_plugin_v2."""
__native_bulk_support = True __native_bulk_support = True
@ -794,13 +796,14 @@ class SecurityGroupDbMixin(ext_sg.SecurityGroupPluginBase):
return return
port_sg = port.get(ext_sg.SECURITYGROUPS, []) port_sg = port.get(ext_sg.SECURITYGROUPS, [])
filters = {'id': port_sg}
tenant_id = port.get('tenant_id') tenant_id = port.get('tenant_id')
if tenant_id:
filters['tenant_id'] = [tenant_id] sg_objs = sg_obj.SecurityGroup.get_objects(context, id=port_sg)
valid_groups = set(g['id'] for g in
self.get_security_groups(context, fields=['id'], valid_groups = set(g.id for g in sg_objs
filters=filters)) if not tenant_id or g.tenant_id == tenant_id or
sg_obj.SecurityGroup.is_shared_with_tenant(context,
g.id, tenant_id))
requested_groups = set(port_sg) requested_groups = set(port_sg)
port_sg_missing = requested_groups - valid_groups port_sg_missing = requested_groups - valid_groups

View File

@ -0,0 +1,22 @@
# Copyright (c) 2019 Salesforce. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from neutron_lib.api.definitions import rbac_security_groups
from neutron_lib.api import extensions
class Rbac_security_groups(extensions.APIExtensionDescriptor):
"""Extension class supporting security groups RBAC."""
api_definition = rbac_security_groups

View File

@ -153,38 +153,26 @@ class QosPolicy(rbac_db.NeutronRbacObject):
@classmethod @classmethod
def get_object(cls, context, **kwargs): def get_object(cls, context, **kwargs):
# We want to get the policy regardless of its tenant id. We'll make policy_obj = super(QosPolicy, cls).get_object(context, **kwargs)
# sure the tenant has permission to access the policy later on. if not policy_obj:
admin_context = context.elevated() return
with cls.db_context_reader(admin_context):
policy_obj = super(QosPolicy, cls).get_object(admin_context,
**kwargs)
if (not policy_obj or
not cls.is_accessible(context, policy_obj)):
return
policy_obj.obj_load_attr('rules') policy_obj.obj_load_attr('rules')
policy_obj.obj_load_attr('is_default') policy_obj.obj_load_attr('is_default')
return policy_obj return policy_obj
@classmethod @classmethod
def get_objects(cls, context, _pager=None, validate_filters=True, def get_objects(cls, context, _pager=None, validate_filters=True,
**kwargs): **kwargs):
# We want to get the policy regardless of its tenant id. We'll make objs = super(QosPolicy, cls).get_objects(context, _pager,
# sure the tenant has permission to access the policy later on. validate_filters,
admin_context = context.elevated() **kwargs)
with cls.db_context_reader(admin_context): result = []
objs = super(QosPolicy, cls).get_objects(admin_context, _pager, for obj in objs:
validate_filters, obj.obj_load_attr('rules')
**kwargs) obj.obj_load_attr('is_default')
result = [] result.append(obj)
for obj in objs: return result
if not cls.is_accessible(context, obj):
continue
obj.obj_load_attr('rules')
obj.obj_load_attr('is_default')
result.append(obj)
return result
@classmethod @classmethod
def _get_object_policy(cls, context, binding_cls, **kwargs): def _get_object_policy(cls, context, binding_cls, **kwargs):

View File

@ -88,6 +88,35 @@ class RbacNeutronDbObjectMixin(rbac_db_mixin.RbacPluginMixin,
cls.is_shared_with_tenant(context, db_obj.id, cls.is_shared_with_tenant(context, db_obj.id,
context.tenant_id)) context.tenant_id))
@classmethod
def get_object(cls, context, **kwargs):
# We want to get the policy regardless of its tenant id. We'll make
# sure the tenant has permission to access the policy later on.
admin_context = context.elevated()
with cls.db_context_reader(admin_context):
obj = super(RbacNeutronDbObjectMixin,
cls).get_object(admin_context, **kwargs)
if (not obj or not cls.is_accessible(context, obj)):
return
return obj
@classmethod
def get_objects(cls, context, _pager=None, validate_filters=True,
**kwargs):
# We want to get the policy regardless of its tenant id. We'll make
# sure the tenant has permission to access the policy later on.
admin_context = context.elevated()
with cls.db_context_reader(admin_context):
objs = super(RbacNeutronDbObjectMixin,
cls).get_objects(admin_context, _pager,
validate_filters, **kwargs)
result = []
for obj in objs:
if not cls.is_accessible(context, obj):
continue
result.append(obj)
return result
@classmethod @classmethod
def _get_db_obj_rbac_entries(cls, context, rbac_obj_id, rbac_action): def _get_db_obj_rbac_entries(cls, context, rbac_obj_id, rbac_action):
rbac_db_model = cls.rbac_db_cls.db_model rbac_db_model = cls.rbac_db_cls.db_model

View File

@ -10,25 +10,42 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
from oslo_utils import versionutils
from oslo_versionedobjects import fields as obj_fields from oslo_versionedobjects import fields as obj_fields
from neutron.common import utils from neutron.common import utils
from neutron.db.models import securitygroup as sg_models from neutron.db.models import securitygroup as sg_models
from neutron.db import rbac_db_models
from neutron.objects import base from neutron.objects import base
from neutron.objects import common_types from neutron.objects import common_types
from neutron.objects import ports
from neutron.objects import rbac
from neutron.objects import rbac_db
@base.NeutronObjectRegistry.register @base.NeutronObjectRegistry.register
class SecurityGroup(base.NeutronDbObject): class SecurityGroupRBAC(rbac.RBACBaseObject):
# Version 1.0: Initial version # Version 1.0: Initial version
VERSION = '1.0' VERSION = '1.0'
db_model = rbac_db_models.SecurityGroupRBAC
@base.NeutronObjectRegistry.register
class SecurityGroup(rbac_db.NeutronRbacObject):
# Version 1.0: Initial version
# Version 1.1: Add RBAC support
VERSION = '1.1'
# required by RbacNeutronMetaclass
rbac_db_cls = SecurityGroupRBAC
db_model = sg_models.SecurityGroup db_model = sg_models.SecurityGroup
fields = { fields = {
'id': common_types.UUIDField(), 'id': common_types.UUIDField(),
'name': obj_fields.StringField(nullable=True), 'name': obj_fields.StringField(nullable=True),
'project_id': obj_fields.StringField(nullable=True), 'project_id': obj_fields.StringField(nullable=True),
'shared': obj_fields.BooleanField(default=False),
'is_default': obj_fields.BooleanField(default=False), 'is_default': obj_fields.BooleanField(default=False),
'rules': obj_fields.ListOfObjectsField( 'rules': obj_fields.ListOfObjectsField(
'SecurityGroupRule', nullable=True 'SecurityGroupRule', nullable=True
@ -64,6 +81,17 @@ class SecurityGroup(base.NeutronDbObject):
bool(db_obj.get('default_security_group'))) bool(db_obj.get('default_security_group')))
self.obj_reset_changes(['is_default']) self.obj_reset_changes(['is_default'])
def obj_make_compatible(self, primitive, target_version):
_target_version = versionutils.convert_version_to_tuple(target_version)
if _target_version < (1, 1):
primitive.pop('shared')
@classmethod
def get_bound_tenant_ids(cls, context, obj_id):
port_objs = ports.Port.get_objects(context,
security_group_ids=[obj_id])
return {port.tenant_id for port in port_objs}
@base.NeutronObjectRegistry.register @base.NeutronObjectRegistry.register
class DefaultSecurityGroup(base.NeutronDbObject): class DefaultSecurityGroup(base.NeutronDbObject):

View File

@ -40,6 +40,7 @@ from neutron_lib.api.definitions import port_security as psec
from neutron_lib.api.definitions import portbindings from neutron_lib.api.definitions import portbindings
from neutron_lib.api.definitions import portbindings_extended as pbe_ext from neutron_lib.api.definitions import portbindings_extended as pbe_ext
from neutron_lib.api.definitions import provider_net from neutron_lib.api.definitions import provider_net
from neutron_lib.api.definitions import rbac_security_groups as rbac_sg_apidef
from neutron_lib.api.definitions import security_groups_port_filtering from neutron_lib.api.definitions import security_groups_port_filtering
from neutron_lib.api.definitions import subnet as subnet_def from neutron_lib.api.definitions import subnet as subnet_def
from neutron_lib.api.definitions import subnet_onboard as subnet_onboard_def from neutron_lib.api.definitions import subnet_onboard as subnet_onboard_def
@ -174,6 +175,7 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
_supported_extension_aliases = [provider_net.ALIAS, _supported_extension_aliases = [provider_net.ALIAS,
external_net.ALIAS, portbindings.ALIAS, external_net.ALIAS, portbindings.ALIAS,
"quotas", "security-group", "quotas", "security-group",
rbac_sg_apidef.ALIAS,
agent_apidef.ALIAS, agent_apidef.ALIAS,
dhcpagentscheduler.ALIAS, dhcpagentscheduler.ALIAS,
multiprovidernet.ALIAS, multiprovidernet.ALIAS,

View File

@ -43,6 +43,7 @@ NETWORK_API_EXTENSIONS+=",qos-gateway-ip"
NETWORK_API_EXTENSIONS+=",quotas" NETWORK_API_EXTENSIONS+=",quotas"
NETWORK_API_EXTENSIONS+=",quota_details" NETWORK_API_EXTENSIONS+=",quota_details"
NETWORK_API_EXTENSIONS+=",rbac-policies" NETWORK_API_EXTENSIONS+=",rbac-policies"
NETWORK_API_EXTENSIONS+=",rbac-security-groups""
NETWORK_API_EXTENSIONS+=",router" NETWORK_API_EXTENSIONS+=",router"
NETWORK_API_EXTENSIONS+=",router_availability_zone" NETWORK_API_EXTENSIONS+=",router_availability_zone"
NETWORK_API_EXTENSIONS+=",security-group" NETWORK_API_EXTENSIONS+=",security-group"

View File

@ -91,7 +91,9 @@ class SecurityGroupServerAPIShimTestCase(base.BaseTestCase):
self.rcache.record_resource_update(self.ctx, 'Port', p) self.rcache.record_resource_update(self.ctx, 'Port', p)
return p return p
def _make_security_group_ovo(self, **kwargs): @mock.patch.object(securitygroup.SecurityGroup, 'is_shared_with_tenant',
return_value=False)
def _make_security_group_ovo(self, *args, **kwargs):
attrs = {'id': uuidutils.generate_uuid(), 'revision_number': 1} attrs = {'id': uuidutils.generate_uuid(), 'revision_number': 1}
sg_rule = securitygroup.SecurityGroupRule( sg_rule = securitygroup.SecurityGroupRule(
id=uuidutils.generate_uuid(), id=uuidutils.generate_uuid(),

View File

@ -755,7 +755,9 @@ class BaseObjectIfaceTestCase(_BaseObjectTestCase, test_base.BaseTestCase):
return self.model_map[obj_cls.db_model] return self.model_map[obj_cls.db_model]
# TODO(ihrachys) document the intent of all common test cases in docstrings # TODO(ihrachys) document the intent of all common test cases in docstrings
def test_get_object(self): def test_get_object(self, context=None):
if context is None:
context = self.context
with mock.patch.object( with mock.patch.object(
obj_db_api, 'get_object', obj_db_api, 'get_object',
return_value=self.db_objs[0]) as get_object_mock: return_value=self.db_objs[0]) as get_object_mock:
@ -766,7 +768,7 @@ class BaseObjectIfaceTestCase(_BaseObjectTestCase, test_base.BaseTestCase):
self.assertTrue(self._is_test_class(obj)) self.assertTrue(self._is_test_class(obj))
self._check_equal(self.objs[0], obj) self._check_equal(self.objs[0], obj)
get_object_mock.assert_called_once_with( get_object_mock.assert_called_once_with(
self._test_class, self.context, self._test_class, context,
**self._test_class.modify_fields_to_db(obj_keys)) **self._test_class.modify_fields_to_db(obj_keys))
def test_get_object_missing_object(self): def test_get_object_missing_object(self):
@ -827,7 +829,9 @@ class BaseObjectIfaceTestCase(_BaseObjectTestCase, test_base.BaseTestCase):
**filter_kwargs)) **filter_kwargs))
return mock_calls return mock_calls
def test_get_objects(self): def test_get_objects(self, context=None):
if context is None:
context = self.context
'''Test that get_objects fetches data from database.''' '''Test that get_objects fetches data from database.'''
with mock.patch.object( with mock.patch.object(
obj_db_api, 'get_objects', obj_db_api, 'get_objects',
@ -837,7 +841,7 @@ class BaseObjectIfaceTestCase(_BaseObjectTestCase, test_base.BaseTestCase):
[get_obj_persistent_fields(obj) for obj in self.objs], [get_obj_persistent_fields(obj) for obj in self.objs],
[get_obj_persistent_fields(obj) for obj in objs]) [get_obj_persistent_fields(obj) for obj in objs])
get_objects_mock.assert_any_call( get_objects_mock.assert_any_call(
self._test_class, self.context, self._test_class, context,
_pager=self.pager_map[self._test_class.obj_name()] _pager=self.pager_map[self._test_class.obj_name()]
) )

View File

@ -20,6 +20,7 @@ from neutron.objects import network
from neutron.objects.qos import binding from neutron.objects.qos import binding
from neutron.objects.qos import policy from neutron.objects.qos import policy
from neutron.tests.unit.objects import test_base as obj_test_base from neutron.tests.unit.objects import test_base as obj_test_base
from neutron.tests.unit.objects import test_rbac
from neutron.tests.unit import testlib_api from neutron.tests.unit import testlib_api
@ -156,7 +157,7 @@ class NetworkSegmentDbObjTestCase(obj_test_base.BaseDbObjectTestCase,
self.assertFalse(obj.hosts) self.assertFalse(obj.hosts)
class NetworkObjectIfaceTestCase(obj_test_base.BaseObjectIfaceTestCase): class NetworkObjectIfaceTestCase(test_rbac.RBACBaseObjectIfaceTestCase):
_test_class = network.Network _test_class = network.Network
def setUp(self): def setUp(self):

View File

@ -95,8 +95,9 @@ object_data = {
'RouterL3AgentBinding': '1.0-c5ba6c95e3a4c1236a55f490cd67da82', 'RouterL3AgentBinding': '1.0-c5ba6c95e3a4c1236a55f490cd67da82',
'RouterPort': '1.0-c8c8f499bcdd59186fcd83f323106908', 'RouterPort': '1.0-c8c8f499bcdd59186fcd83f323106908',
'RouterRoute': '1.0-07fc5337c801fb8c6ccfbcc5afb45907', 'RouterRoute': '1.0-07fc5337c801fb8c6ccfbcc5afb45907',
'SecurityGroup': '1.0-e26b90c409b31fd2e3c6fcec402ac0b9', 'SecurityGroup': '1.1-f712265418f154f7c080e02857ffe2ef',
'SecurityGroupPortBinding': '1.0-6879d5c0af80396ef5a72934b6a6ef20', 'SecurityGroupPortBinding': '1.0-6879d5c0af80396ef5a72934b6a6ef20',
'SecurityGroupRBAC': '1.0-192845c5ed0718e1c54fac36936fcd7d',
'SecurityGroupRule': '1.0-e9b8dace9d48b936c62ad40fe1f339d5', 'SecurityGroupRule': '1.0-e9b8dace9d48b936c62ad40fe1f339d5',
'SegmentHostMapping': '1.0-521597cf82ead26217c3bd10738f00f0', 'SegmentHostMapping': '1.0-521597cf82ead26217c3bd10738f00f0',
'ServiceProfile': '1.0-9beafc9e7d081b8258f3c5cb66ac5eed', 'ServiceProfile': '1.0-9beafc9e7d081b8258f3c5cb66ac5eed',

View File

@ -10,15 +10,31 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
import mock
from neutron.objects import network from neutron.objects import network
from neutron.objects.qos import policy from neutron.objects.qos import policy
from neutron.objects import rbac from neutron.objects import rbac
from neutron.objects import securitygroup
from neutron.tests import base as neutron_test_base from neutron.tests import base as neutron_test_base
from neutron.tests.unit.objects import test_base
class RBACBaseObjectTestCase(neutron_test_base.BaseTestCase): class RBACBaseObjectTestCase(neutron_test_base.BaseTestCase):
def test_get_type_class_map(self): def test_get_type_class_map(self):
class_map = {'qos_policy': policy.QosPolicyRBAC, class_map = {'qos_policy': policy.QosPolicyRBAC,
'network': network.NetworkRBAC} 'network': network.NetworkRBAC,
'security_group': securitygroup.SecurityGroupRBAC}
self.assertEqual(class_map, rbac.RBACBaseObject.get_type_class_map()) self.assertEqual(class_map, rbac.RBACBaseObject.get_type_class_map())
class RBACBaseObjectIfaceTestCase(test_base.BaseObjectIfaceTestCase):
def test_get_object(self, context=None):
super(RBACBaseObjectIfaceTestCase,
self).test_get_object(context=mock.ANY)
def test_get_objects(self, context=None):
super(RBACBaseObjectIfaceTestCase,
self).test_get_objects(context=mock.ANY)

View File

@ -25,7 +25,7 @@ from neutron.objects import base
from neutron.objects import common_types from neutron.objects import common_types
from neutron.objects.db import api as obj_db_api from neutron.objects.db import api as obj_db_api
from neutron.objects import rbac_db from neutron.objects import rbac_db
from neutron.tests.unit.objects import test_base from neutron.tests.unit.objects import test_rbac
from neutron.tests.unit import testlib_api from neutron.tests.unit import testlib_api
@ -77,7 +77,7 @@ class FakeNeutronDbObject(rbac_db.NeutronRbacObject):
pass pass
class RbacNeutronDbObjectTestCase(test_base.BaseObjectIfaceTestCase, class RbacNeutronDbObjectTestCase(test_rbac.RBACBaseObjectIfaceTestCase,
testlib_api.SqlTestCase): testlib_api.SqlTestCase):
_test_class = FakeNeutronDbObject _test_class = FakeNeutronDbObject

View File

@ -10,12 +10,57 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
import random
from neutron.objects import securitygroup from neutron.objects import securitygroup
from neutron.tests.unit.objects import test_base from neutron.tests.unit.objects import test_base
from neutron.tests.unit.objects import test_rbac
from neutron.tests.unit import testlib_api from neutron.tests.unit import testlib_api
class SecurityGroupIfaceObjTestCase(test_base.BaseObjectIfaceTestCase): class _SecurityGroupRBACBase(object):
def get_random_object_fields(self, obj_cls=None):
fields = (super(_SecurityGroupRBACBase, self).
get_random_object_fields(obj_cls))
rnd_actions = self._test_class.db_model.get_valid_actions()
idx = random.randint(0, len(rnd_actions) - 1)
fields['action'] = rnd_actions[idx]
return fields
class SecurityGroupRBACDbObjectTestCase(_SecurityGroupRBACBase,
test_base.BaseDbObjectTestCase,
testlib_api.SqlTestCase):
_test_class = securitygroup.SecurityGroupRBAC
def setUp(self):
super(SecurityGroupRBACDbObjectTestCase, self).setUp()
for obj in self.db_objs:
sg_obj = securitygroup.SecurityGroup(self.context,
id=obj['object_id'],
project_id=obj['project_id'])
sg_obj.create()
def _create_test_security_group_rbac(self):
self.objs[0].create()
return self.objs[0]
def test_object_version_degradation_1_1_to_1_0_no_shared(self):
security_group_rbac_obj = self._create_test_security_group_rbac()
x = security_group_rbac_obj.obj_to_primitive('1.0')
security_group_rbac_dict = x
self.assertNotIn('shared',
security_group_rbac_dict['versioned_object.data'])
class SecurityGroupRBACIfaceObjectTestCase(_SecurityGroupRBACBase,
test_base.BaseObjectIfaceTestCase):
_test_class = securitygroup.SecurityGroupRBAC
class SecurityGroupIfaceObjTestCase(test_rbac.RBACBaseObjectIfaceTestCase):
_test_class = securitygroup.SecurityGroup _test_class = securitygroup.SecurityGroup

View File

@ -0,0 +1,4 @@
features:
- |
Security groups are now supported via the network RBAC mechanism.
Please refer to the admin guide for further details.