Add SecurityGroupRule object

This adds the beginnings of the SecurityGroupRule object, with
linkage back to parent and grantee groups. It is not currently
complete enough to serve as a full implementation, but provides
what we need for virt drivers.

Related to blueprint compute-manager-objects

Change-Id: I4099007c8d4e1c8deaa74cd001cb477a9f164261
This commit is contained in:
Dan Smith 2013-10-14 14:03:55 -07:00
parent 2efcc9dc24
commit dbfbbe7658
2 changed files with 147 additions and 0 deletions

View File

@ -0,0 +1,80 @@
# Copyright 2013 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from nova import db
from nova.objects import base
from nova.objects import fields
from nova.objects import security_group
OPTIONAL_ATTRS = ['parent_group', 'grantee_group']
class SecurityGroupRule(base.NovaPersistentObject, base.NovaObject):
# Version 1.0: Initial version
VERSION = '1.0'
fields = {
'id': fields.IntegerField(),
'protocol': fields.StringField(nullable=True),
'from_port': fields.IntegerField(nullable=True),
'to_port': fields.IntegerField(nullable=True),
'cidr': fields.Field(fields.CIDR(), nullable=True),
'parent_group': fields.ObjectField('SecurityGroup', nullable=True),
'grantee_group': fields.ObjectField('SecurityGroup', nullable=True),
}
@staticmethod
def _from_db_subgroup(context, db_group):
if db_group is None:
return None
return security_group.SecurityGroup._from_db_object(
context, security_group.SecurityGroup(), db_group)
@staticmethod
def _from_db_object(context, rule, db_rule, expected_attrs=None,
expected_sub_attrs=None):
if expected_attrs is None:
expected_attrs = []
if expected_sub_attrs is None:
expected_sub_attrs = {}
for field in rule.fields:
if field in expected_attrs:
rule[field] = rule._from_db_subgroup(context, db_rule[field])
elif field not in OPTIONAL_ATTRS:
rule[field] = db_rule[field]
rule._context = context
rule.obj_reset_changes()
return rule
@base.remotable_classmethod
def get_by_id(cls, context, rule_id):
db_rule = db.security_group_rule_get(context, rule_id)
return cls._from_db_object(context, cls(), db_rule)
class SecurityGroupRuleList(base.ObjectListBase, base.NovaObject):
fields = {
'objects': fields.ListOfObjectsField('SecurityGroupRule'),
}
@base.remotable_classmethod
def get_by_security_group_id(cls, context, secgroup_id):
db_rules = db.security_group_rule_get_by_security_group(
context, secgroup_id, columns_to_join=['grantee_group'])
return base.obj_make_list(context, cls(), SecurityGroupRule, db_rules,
expected_attrs=['grantee_group'])
@classmethod
def get_by_security_group(cls, context, security_group):
return cls.get_by_security_group_id(context, security_group.id)

View File

@ -0,0 +1,67 @@
# Copyright 2013 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from nova import db
from nova.objects import security_group
from nova.objects import security_group_rule
from nova.tests.objects import test_objects
from nova.tests.objects import test_security_group
fake_rule = {
'created_at': None,
'updated_at': None,
'deleted_at': None,
'deleted': False,
'id': 1,
'protocol': 'tcp',
'from_port': 22,
'to_port': 22,
'cidr': '0.0.0.0/0',
}
class _TestSecurityGroupRuleObject(object):
def test_get_by_id(self):
with mock.patch.object(db, 'security_group_rule_get') as sgrg:
sgrg.return_value = fake_rule
rule = security_group_rule.SecurityGroupRule.get_by_id(
self.context, 1)
for field in fake_rule:
self.assertEqual(fake_rule[field], rule[field])
sgrg.assert_called_with(self.context, 1)
def test_get_by_security_group(self):
secgroup = security_group.SecurityGroup()
secgroup.id = 123
rule = dict(fake_rule)
rule['grantee_group'] = dict(test_security_group.fake_secgroup, id=123)
stupid_method = 'security_group_rule_get_by_security_group'
with mock.patch.object(db, stupid_method) as sgrgbsg:
sgrgbsg.return_value = [rule]
rules = (security_group_rule.SecurityGroupRuleList.
get_by_security_group(self.context, secgroup))
self.assertEqual(1, len(rules))
self.assertEqual(123, rules[0].grantee_group.id)
class TestSecurityGroupRuleObject(test_objects._LocalTest,
_TestSecurityGroupRuleObject):
pass
class TestSecurityGroupRuleObjectRemote(test_objects._RemoteTest,
_TestSecurityGroupRuleObject):
pass