Merge "Client Side Changes for Security Groups Support."

This commit is contained in:
Jenkins
2013-03-21 01:29:07 +00:00
committed by Gerrit Code Review
5 changed files with 262 additions and 1 deletions

View File

@@ -236,6 +236,40 @@ class LimitsCommands(common.AuthedCommandsBase):
self._pretty_list(self.dbaas.limits.list)
class SecurityGroupCommands(common.AuthedCommandsBase):
"""Commands to list and show Security Groups For an Instance and """
"""create and delete security group rules for them. """
params = ['id',
'secgroup_id',
'protocol',
'from_port',
'to_port',
'cidr'
]
def get(self):
"""Get a security group associated with an instance."""
self._require('id')
self._pretty_print(self.dbaas.security_groups.get, self.id)
def list(self):
"""List all the Security Groups and the rules"""
self._pretty_paged(self.dbaas.security_groups.list)
def add_rule(self):
"""Add a security group rule"""
self._require('secgroup_id', 'protocol',
'from_port', 'to_port', 'cidr')
self.dbaas.security_group_rules.create(self.secgroup_id, self.protocol,
self.from_port, self.to_port,
self.cidr)
def delete_rule(self):
"""Delete a security group rule"""
self._require('id')
self.dbaas.security_group_rules.delete(self.id)
COMMANDS = {'auth': common.Auth,
'instance': InstanceCommands,
'flavor': FlavorsCommands,
@@ -244,6 +278,7 @@ COMMANDS = {'auth': common.Auth,
'user': UserCommands,
'root': RootCommands,
'version': VersionCommands,
'secgroup': SecurityGroupCommands,
}

View File

@@ -306,6 +306,8 @@ class Dbaas(object):
from reddwarfclient.root import Root
from reddwarfclient.hosts import Hosts
from reddwarfclient.quota import Quotas
from reddwarfclient.security_groups import SecurityGroups
from reddwarfclient.security_groups import SecurityGroupRules
from reddwarfclient.storage import StorageInfo
from reddwarfclient.management import Management
from reddwarfclient.accounts import Accounts
@@ -328,6 +330,8 @@ class Dbaas(object):
self.root = Root(self)
self.hosts = Hosts(self)
self.quota = Quotas(self)
self.security_groups = SecurityGroups(self)
self.security_group_rules = SecurityGroupRules(self)
self.storage = StorageInfo(self)
self.management = Management(self)
self.accounts = Accounts(self)

View File

@@ -0,0 +1,119 @@
# Copyright 2013 Hewlett-Packard Development Company, L.P.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
from reddwarfclient import base
import exceptions
import urlparse
from reddwarfclient.common import limit_url
from reddwarfclient.common import Paginated
class SecurityGroup(base.Resource):
"""
Security Group is a resource used to hold security group information.
"""
def __repr__(self):
return "<SecurityGroup: %s>" % self.name
class SecurityGroups(base.ManagerWithFind):
"""
Manage :class:`SecurityGroup` resources.
"""
resource_class = SecurityGroup
def _list(self, url, response_key, limit=None, marker=None):
resp, body = self.api.client.get(limit_url(url, limit, marker))
if not body:
raise Exception("Call to " + url + " did not return a body.")
links = body.get('links', [])
next_links = [link['href'] for link in links if link['rel'] == 'next']
next_marker = None
for link in next_links:
# Extract the marker from the url.
parsed_url = urlparse.urlparse(link)
query_dict = dict(urlparse.parse_qsl(parsed_url.query))
next_marker = query_dict.get('marker', None)
instances = body[response_key]
instances = [self.resource_class(self, res) for res in instances]
return Paginated(instances, next_marker=next_marker, links=links)
def list(self, limit=None, marker=None):
"""
Get a list of all security groups.
:rtype: list of :class:`SecurityGroup`.
"""
return self._list("/security_groups", "security_groups", limit, marker)
def get(self, security_group):
"""
Get a specific security group.
:rtype: :class:`SecurityGroup`
"""
return self._get("/security_groups/%s" % base.getid(security_group),
"security_group")
class SecurityGroupRule(base.Resource):
"""
Security Group Rule is a resource used to hold security group
rule related information.
"""
def __repr__(self):
return \
"<SecurityGroupRule: ( \
Security Group id: %d, \
Protocol: %s, \
From_Port: %d, \
To_Port: %d, \
CIDR: %s )>" % (self.group_id, self.protocol, self.from_port,
self.to_port, self.cidr)
class SecurityGroupRules(base.ManagerWithFind):
"""
Manage :class:`SecurityGroupRules` resources.
"""
resource_class = SecurityGroupRule
def create(self, group_id, protocol, from_port, to_port, cidr):
"""
Create a new security group rule.
"""
body = {"security_group_rule": {
"group_id": group_id,
"protocol": protocol,
"from_port": from_port,
"to_port": to_port,
"cidr": cidr
}}
return self._create("/security_group_rules", body,
"security_group_rule")
def delete(self, security_group_rule):
"""
Delete the specified security group.
:param security_group_id: The security group id to delete
"""
resp, body = self.api.client.delete("/security_group_rules/%s" %
base.getid(security_group_rule))
if resp.status in (422, 500):
raise exceptions.from_response(resp, body)

View File

@@ -19,7 +19,8 @@ LISTIFY = {
"users": [[]],
"versions": [[]],
"attachments": [[]],
"limits": [[]]
"limits": [[]],
"security_groups": [[]]
}
TYPE_MAP = {

102
tests/test_secgroups.py Normal file
View File

@@ -0,0 +1,102 @@
from testtools import TestCase
from mock import Mock
from reddwarfclient import security_groups
from reddwarfclient import base
"""
Unit tests for security_groups.py
"""
class SecGroupTest(TestCase):
def setUp(self):
super(SecGroupTest, self).setUp()
self.orig__init = security_groups.SecurityGroup.__init__
security_groups.SecurityGroup.__init__ = Mock(return_value=None)
self.security_group = security_groups.SecurityGroup()
self.security_groups = security_groups.SecurityGroups(1)
def tearDown(self):
super(SecGroupTest, self).tearDown()
security_groups.SecurityGroup.__init__ = self.orig__init
def test___repr__(self):
self.security_group.name = "security_group-1"
self.assertEqual('<SecurityGroup: security_group-1>',
self.security_group.__repr__())
def test_list(self):
sec_group_list = ['secgroup1', 'secgroup2']
self.security_groups.list = Mock(return_value=sec_group_list)
self.assertEqual(sec_group_list, self.security_groups.list())
def test_get(self):
def side_effect_func(path, inst):
return path, inst
self.security_groups._get = Mock(side_effect=side_effect_func)
self.security_group.id = 1
self.assertEqual(('/security_groups/1', 'security_group'),
self.security_groups.get(self.security_group))
class SecGroupRuleTest(TestCase):
def setUp(self):
super(SecGroupRuleTest, self).setUp()
self.orig__init = security_groups.SecurityGroupRule.__init__
security_groups.SecurityGroupRule.__init__ = Mock(return_value=None)
security_groups.SecurityGroupRules.__init__ = Mock(return_value=None)
self.security_group_rule = security_groups.SecurityGroupRule()
self.security_group_rules = security_groups.SecurityGroupRules()
def tearDown(self):
super(SecGroupRuleTest, self).tearDown()
security_groups.SecurityGroupRule.__init__ = self.orig__init
def test___repr__(self):
self.security_group_rule.group_id = 1
self.security_group_rule.protocol = "tcp"
self.security_group_rule.from_port = 80
self.security_group_rule.to_port = 80
self.security_group_rule.cidr = "0.0.0.0//0"
representation = \
"<SecurityGroupRule: ( \
Security Group id: %d, \
Protocol: %s, \
From_Port: %d, \
To_Port: %d, \
CIDR: %s )>" % (1, "tcp", 80, 80, "0.0.0.0//0")
self.assertEqual(representation,
self.security_group_rule.__repr__())
def test_create(self):
def side_effect_func(path, body, inst):
return path, body, inst
self.security_group_rules._create = Mock(side_effect=side_effect_func)
p, b, i = self.security_group_rules.create(1, "tcp",
80, 80, "0.0.0.0//0")
self.assertEqual("/security_group_rules", p)
self.assertEqual("security_group_rule", i)
self.assertEqual(1, b["security_group_rule"]["group_id"])
self.assertEqual("tcp", b["security_group_rule"]["protocol"])
self.assertEqual(80, b["security_group_rule"]["from_port"])
self.assertEqual(80, b["security_group_rule"]["to_port"])
self.assertEqual("0.0.0.0//0", b["security_group_rule"]["cidr"])
def test_delete(self):
resp = Mock()
resp.status = 200
body = None
self.security_group_rules.api = Mock()
self.security_group_rules.api.client = Mock()
self.security_group_rules.api.client.delete = \
Mock(return_value=(resp, body))
self.security_group_rules.delete(self.id)
resp.status = 500
self.assertRaises(ValueError, self.security_group_rules.delete,
self.id)