Adding security groups models and client

- Adding security groups request and response json models
- Adding security groups api client
- Adding security groups models metatests
- Updating test_network and test_port metatest considering a kwargs previous update
- Adding the security groups composite
- Adding the security groups config file

Change-Id: Icb1a249ebb7c992c0595e4274defb8e3c46cd4e7
This commit is contained in:
Leonardo Maycotte 2014-12-11 10:41:26 -06:00
parent 13bc2d4b86
commit bbc780c681
14 changed files with 1037 additions and 73 deletions

View File

@ -0,0 +1,15 @@
"""
Copyright 2014 Rackspace
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""

View File

@ -0,0 +1,15 @@
"""
Copyright 2014 Rackspace
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""

View File

@ -0,0 +1,281 @@
"""
Copyright 2014 Rackspace
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
from cafe.engine.http.client import AutoMarshallingHTTPClient
from cloudcafe.networking.networks.extensions.security_groups_api.models.\
request import SecurityGroupRequest, SecurityGroupRuleRequest
from cloudcafe.networking.networks.extensions.security_groups_api.models.\
response import SecurityGroup, SecurityGroups, SecurityGroupRule, \
SecurityGroupRules
class SecurityGroupsClient(AutoMarshallingHTTPClient):
def __init__(self, url, auth_token, serialize_format=None,
deserialize_format=None, tenant_id=None):
"""
@param url: Base URL for the networks service
@type url: string
@param auth_token: Auth token to be used for all requests
@type auth_token: string
@param serialize_format: Format for serializing requests
@type serialize_format: string
@param deserialize_format: Format for de-serializing responses
@type deserialize_format: string
@param tenant_id: optional tenant id to be included in the
header if given
@type tenant_id: string
"""
super(SecurityGroupsClient, self).__init__(serialize_format,
deserialize_format)
self.auth_token = auth_token
self.default_headers['X-Auth-Token'] = auth_token
ct = '{content_type}/{content_subtype}'.format(
content_type='application',
content_subtype=self.serialize_format)
accept = '{content_type}/{content_subtype}'.format(
content_type='application',
content_subtype=self.deserialize_format)
self.default_headers['Content-Type'] = ct
self.default_headers['Accept'] = accept
if tenant_id:
self.default_headers['X-Auth-Project-Id'] = tenant_id
self.url = url
self.security_groups_url = '{url}/security-groups'.format(url=self.url)
self.security_group_rules_url = '{url}/security-group-rules'.format(
url=self.url)
def create_security_group(self, name=None, description=None,
tenant_id=None, requestslib_kwargs=None):
"""
@summary: Creates a security group
@param name: A symbolic name for the security group. Not required to
be unique.
@type name: string
@param description: (optional) Description of a security group.
@type description: string
@param tenant_id: (admin use only) Owner of the security group.
@type tenant_id: string
@return: security group create response
@rtype: Requests.response
"""
url = self.security_groups_url
request = SecurityGroupRequest(name=name, description=description,
tenant_id=tenant_id)
resp = self.request('POST', url,
response_entity_type=SecurityGroup,
request_entity=request,
requestslib_kwargs=requestslib_kwargs)
return resp
def get_security_group(self, security_group_id, requestslib_kwargs=None):
"""
@summary: Shows information for a specified security group
@param security_group_id: The UUID for the security group
@type security_group_id: string
@return: get security group response
@rtype: Requests.response
"""
url = '{base_url}/{security_group_id}'.format(
base_url=self.security_groups_url,
security_group_id=security_group_id)
resp = self.request('GET', url,
response_entity_type=SecurityGroup,
requestslib_kwargs=requestslib_kwargs)
return resp
def list_security_groups(self, security_group_id=None, name=None,
description=None, tenant_id=None,
limit=None, marker=None, page_reverse=None,
requestslib_kwargs=None):
"""
@summary: Lists security groups, filtered by params if given
@param security_group_id: The UUID for the security group to filter by
@type security_group_id: string
@param name: name for the security group to filter by
@type name: string
@param description: security group description to filter by
@type description: string
@param tenant_id: security group tenant ID to filter by
@type tenant_id: string
@param limit: page size
@type limit: int
@param marker: Id of the last item of the previous page
@type marker: string
@param page_reverse: direction of the page
@type page_reverse: bool
@return: list security groups response
@rtype: Requests.response
"""
params = {'id': security_group_id, 'name': name,
'description': description, 'tenant_id': tenant_id,
'limit': limit, 'marker': marker,
'page_reverse': page_reverse}
url = self.security_groups_url
resp = self.request('GET', url, params=params,
response_entity_type=SecurityGroups,
requestslib_kwargs=requestslib_kwargs)
return resp
def delete_security_group(self, security_group_id,
requestslib_kwargs=None):
"""
@summary: Deletes a specified security group and its associated
security group rules
@param security_group_id: The UUID for the security group
@type security_group_id: string
@return: delete security group response
@rtype: Requests.response
"""
url = '{base_url}/{security_group_id}'.format(
base_url=self.security_groups_url,
security_group_id=security_group_id)
resp = self.request('DELETE', url,
requestslib_kwargs=requestslib_kwargs)
return resp
def create_security_group_rule(self, security_group_id, direction,
ethertype, protocol=None,
port_range_min=None, port_range_max=None,
remote_group_id=None, remote_ip_prefix=None,
requestslib_kwargs=None):
"""
@summary: Creates a security group rule
@param security_group_id: The security group ID to associate with
@type security_group_id: string
@param direction: ingress or egress security group rule direction
@type direction: string
@param ethertype: Must be IPv4 or IPv6
@type ethertype: string
@param protocol: protocol matched by the security group rule.
Valid values are null, tcp, udp, and icmp.
@type protocol: string
@param port_range_min: The minimum port number in the range
that is matched by the security group rule. Value must be less
than or equal to the port_range_max for tcp or udp. If the protocol
is ICMP, this value must be an ICMP type.
@type port_range_min: int
@param port_range_max: The maximum port number in the range
@type port_range_max: int
@param remote_group_id: The remote group ID to be associated with
@type remote_group_id: string
@param remote_ip_prefix: The remote IP prefix to be associated
with, remote_group_id or remote_ip_prefix can be specified
@type remote_ip_prefix: string
@return: security group rule create response
@rtype: Requests.response
"""
url = self.security_group_rules_url
request = SecurityGroupRuleRequest(
direction=direction, ethertype=ethertype,
security_group_id=security_group_id, port_range_min=port_range_min,
port_range_max=port_range_max, protocol=protocol,
remote_group_id=remote_group_id, remote_ip_prefix=remote_ip_prefix)
resp = self.request('POST', url,
response_entity_type=SecurityGroupRule,
request_entity=request,
requestslib_kwargs=requestslib_kwargs)
return resp
def get_security_group_rule(self, security_group_rule_id,
requestslib_kwargs=None):
"""
@summary: Shows information for a specified security group rule
@param security_group_rule_id: The UUID for the security group rule
@type security_group_rule_id: string
@return: get security group rule response
@rtype: Requests.response
"""
url = '{base_url}/{security_group_rule_id}'.format(
base_url=self.security_group_rules_url,
security_group_rule_id=security_group_rule_id)
resp = self.request('GET', url,
response_entity_type=SecurityGroupRule,
requestslib_kwargs=requestslib_kwargs)
return resp
def list_security_group_rules(self, security_group_rule_id=None,
security_group_id=None, direction=None,
ethertype=None, protocol=None,
port_range_min=None, port_range_max=None,
remote_group_id=None, remote_ip_prefix=None,
tenant_id=None, limit=None, marker=None,
page_reverse=None, requestslib_kwargs=None):
"""
@summary: Lists security group rules, filtered by params if given
@param security_group_rule_id: security group rule ID to filter by
@type security_group_rule_id: string
@param security_group_id: The security group ID to filter by
@type security_group_id: string
@param direction: direction to filter by
@type direction: string
@param ethertype: IPv4 or IPv6 ethertype to filter by
@type ethertype: string
@param protocol: protocol like tcp, udp, or icmp to filter by
@type protocol: string
@param port_range_min: The minimum port number to filter by
@type port_range_min: int
@param port_range_max: The maximum port number to filter by
@type port_range_max: int
@param remote_group_id: The remote group ID filter by
@type remote_group_id: string
@param remote_ip_prefix: The remote IP prefix to filter by
@type remote_ip_prefix: string
@param tenant_id: security group rule tenant ID to filter by
@type tenant_id: string
@param limit: page size
@type limit: int
@param marker: Id of the last item of the previous page
@type marker: string
@param page_reverse: direction of the page
@type page_reverse: bool
@return: list security groups rules response
@rtype: Requests.response
"""
params = {'id': security_group_rule_id,
'security_group_id': security_group_id,
'direction': direction, 'ethertype': ethertype,
'protocol': protocol, 'port_range_min': port_range_min,
'port_range_max': port_range_max,
'remote_group_id': remote_group_id,
'remote_ip_prefix': remote_ip_prefix, 'tenant_id': tenant_id,
'limit': limit, 'marker': marker,
'page_reverse': page_reverse}
url = self.security_group_rules_url
resp = self.request('GET', url, params=params,
response_entity_type=SecurityGroupRules,
requestslib_kwargs=requestslib_kwargs)
return resp
def delete_security_group_rule(self, security_group_rule_id,
requestslib_kwargs=None):
"""
@summary: Deletes a specified security group rule
@param security_group_rule_id: The UUID for the security group rule
@type security_group_rule_id: string
@return: delete security group rule response
@rtype: Requests.response
"""
url = '{base_url}/{security_group_rule_id}'.format(
base_url=self.security_group_rules_url,
security_group_rule_id=security_group_rule_id)
resp = self.request('DELETE', url,
requestslib_kwargs=requestslib_kwargs)
return resp

View File

@ -0,0 +1,32 @@
"""
Copyright 2014 Rackspace
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
from cloudcafe.networking.networks.composites import _NetworkingAuthComposite
from cloudcafe.networking.networks.extensions.security_groups_api.client \
import SecurityGroupsClient
from cloudcafe.networking.networks.extensions.security_groups_api.config \
import SecurityGroupsConfig
class SecurityGroupsComposite(object):
networking_auth_composite = _NetworkingAuthComposite
def __init__(self):
auth_composite = self.networking_auth_composite()
self.url = auth_composite.networking_url
self.user = auth_composite._auth_user_config
self.config = SecurityGroupsConfig()
self.client = SecurityGroupsClient(**auth_composite.client_args)

View File

@ -0,0 +1,23 @@
"""
Copyright 2014 Rackspace
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
from cloudcafe.networking.networks.common.config import NetworkingBaseConfig
class SecurityGroupsConfig(NetworkingBaseConfig):
"""Security Groups and Rules configuration parameters"""
SECTION_NAME = 'security_groups'

View File

@ -0,0 +1,15 @@
"""
Copyright 2014 Rackspace
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""

View File

@ -0,0 +1,122 @@
"""
Copyright 2014 Rackspace
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import json
from cafe.engine.models.base import AutoMarshallingModel
class SecurityGroupRequest(AutoMarshallingModel):
"""
@summary: Security Group model request object for the OpenStack Networking
v2.0 API extension.
@param name: A symbolic name for the security group. Not required to be
unique.
@type name: string
@param description: (optional) Description of a security group.
@type description: string
@param tenant_id: (admin use only) Owner of the security group.
@type tenant_id: string
"""
def __init__(self, name=None, description=None, tenant_id=None, **kwargs):
super(SecurityGroupRequest, self).__init__()
self.name = name
self.description = description
self.tenant_id = tenant_id
def _obj_to_json(self):
body = {
'name': self.name,
'description': self.description,
'tenant_id': self.tenant_id
}
# Removing optional params not given
body = self._remove_empty_values(body)
main_body = {'security_group': body}
return json.dumps(main_body)
class SecurityGroupRuleRequest(AutoMarshallingModel):
"""
@summary: Security Group Rules model request object for the OpenStack
Networking v2.0 API extension.
@param direction: Ingress or egress: The direction in which the security
group rule is applied.
@type direction: string
@param ethertype: Must be IPv4 or IPv6, and addresses represented in CIDR
must match the ingress or egress rules.
@type ethertype: string
@param security_group_id: The security group ID to associate with this
security group rule.
@type security_group_id: string
@param port_range_min: (optional) The minimum port number in the range
that is matched by the security group rule. If the protocol is TCP or
UDP, this value must be less than or equal to the value of the
port_range_max attribute. If the protocol is ICMP, this value must be
an ICMP type.
@type port_range_min: int
@param port_range_max: (optional) The maximum port number in the range
that is matched by the security group rule. The port_range_min
attribute constrains the port_range_max attribute. If the protocol is
ICMP, this value must be an ICMP type.
@type port_range_max: int
@param protocol: (optional) The protocol that is matched by the security
group rule. Valid values are null, tcp, udp, and icmp.
@type protocol: string
@param remote_group_id: (optional) The remote group ID to be associated
with this security group rule.You can specify either remote_group_id
or remote_ip_prefix in the request body.
@type remote_group_id: string
@param remote_ip_prefix: (optional) The remote IP prefix to be associated
with this security group rule. You can specify either remote_group_id
or remote_ip_prefix in the request body. This attribute matches the
specified IP prefix as the source IP address of the IP packet.
@type remote_ip_prefix: string
"""
def __init__(self, direction=None, ethertype=None, security_group_id=None,
port_range_min=None, port_range_max=None, protocol=None,
remote_group_id=None, remote_ip_prefix=None, **kwargs):
super(SecurityGroupRuleRequest, self).__init__()
self.direction = direction
self.ethertype = ethertype
self.security_group_id = security_group_id
self.port_range_min = port_range_min
self.port_range_max = port_range_max
self.protocol = protocol
self.remote_group_id = remote_group_id
self.remote_ip_prefix = remote_ip_prefix
def _obj_to_json(self):
body = {
'direction': self.direction,
'ethertype': self.ethertype,
'security_group_id': self.security_group_id,
'port_range_min': self.port_range_min,
'port_range_max': self.port_range_max,
'protocol': self.protocol,
'remote_group_id': self.remote_group_id,
'remote_ip_prefix': self.remote_ip_prefix
}
# Removing optional params not given
body = self._remove_empty_values(body)
main_body = {'security_group_rule': body}
return json.dumps(main_body)

View File

@ -0,0 +1,214 @@
"""
Copyright 2014 Rackspace
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import json
from cafe.engine.models.base import AutoMarshallingListModel, \
AutoMarshallingModel
class SecurityGroup(AutoMarshallingModel):
"""
@summary: Security Group model response object for the OpenStack
Networking v2.0 API extension.
@param id_: UUID for the security group
@type id_: string
@param name: name for the security group
@type name: string
@param description: Description of the security group
@type description: string
@param security_group_rules: rules of the security group
@type security_group_rules: list
@param tenant_id: Owner of the security group
@type tenant_id: string
"""
SECURITY_GROUP = 'security_group'
def __init__(self, id_=None, name=None, description=None,
security_group_rules=None, tenant_id=None, **kwargs):
# kwargs is to be used for extensions or checking unexpected attrs
super(SecurityGroup, self).__init__()
self.id = id_
self.name = name
self.description = description
self.security_group_rules = security_group_rules
self.tenant_id = tenant_id
self.kwargs = kwargs
@classmethod
def _json_to_obj(cls, serialized_str):
"""Return security group object from a JSON serialized string"""
ret = None
json_dict = json.loads(serialized_str)
# Replacing attribute response names if they are Python reserved words
# with a trailing underscore, for ex. id for id_ or if they have a
# special character within the name replacing it for an underscore too
json_dict = cls._replace_dict_key(
json_dict, 'id', 'id_', recursion=True)
if cls.SECURITY_GROUP in json_dict:
security_group_dict = json_dict.get(cls.SECURITY_GROUP)
ret = SecurityGroup(**security_group_dict)
if ret.security_group_rules:
security_group_rules = []
for rule in ret.security_group_rules:
security_group_rules.append(SecurityGroupRule(**rule))
ret.security_group_rules = security_group_rules
return ret
class SecurityGroups(AutoMarshallingListModel):
SECURITY_GROUPS = 'security_groups'
@classmethod
def _json_to_obj(cls, serialized_str):
"""
Return a list of security group objects from a JSON
serialized string
"""
ret = cls()
json_dict = json.loads(serialized_str)
# Replacing attribute response names if they are Python reserved words
# with a trailing underscore, for ex. id for id_ or if they have a
# special character within the name replacing it for an underscore too
json_dict = cls._replace_dict_key(
json_dict, 'id', 'id_', recursion=True)
if cls.SECURITY_GROUPS in json_dict:
security_groups = json_dict.get(cls.SECURITY_GROUPS)
for security_group in security_groups:
result = SecurityGroup(**security_group)
if result.security_group_rules:
security_group_rules = []
for rule in result.security_group_rules:
security_group_rules.append(SecurityGroupRule(**rule))
result.security_group_rules = security_group_rules
ret.append(result)
return ret
class SecurityGroupRule(AutoMarshallingModel):
"""
@summary: Security Group Rules model response object for the OpenStack
Networking v2.0 API extension.
@param id_: UUID for the security group rule
@type id_: string
@param direction: Ingress or egress: The direction in which the security
group rule is applied.
@type direction: string
@param ethertype: Must be IPv4 or IPv6, and addresses represented in CIDR
must match the ingress or egress rules.
@type ethertype: string
@param security_group_id: The security group ID to associate with this
security group rule.
@type security_group_id: string
@param port_range_min: (optional) The minimum port number in the range
that is matched by the security group rule. If the protocol is TCP or
UDP, this value must be less than or equal to the value of the
port_range_max attribute. If the protocol is ICMP, this value must be
an ICMP type.
@type port_range_min: int
@param port_range_max: (optional) The maximum port number in the range
that is matched by the security group rule. The port_range_min
attribute constrains the port_range_max attribute. If the protocol is
ICMP, this value must be an ICMP type.
@type port_range_max: int
@param protocol: (optional) The protocol that is matched by the security
group rule. Valid values are null, tcp, udp, and icmp.
@type protocol: string
@param remote_group_id: (optional) The remote group ID to be associated
with this security group rule.You can specify either remote_group_id
or remote_ip_prefix in the request body.
@type remote_group_id: string
@param remote_ip_prefix: (optional) The remote IP prefix to be associated
with this security group rule. You can specify either remote_group_id
or remote_ip_prefix in the request body. This attribute matches the
specified IP prefix as the source IP address of the IP packet.
@type remote_ip_prefix: string
@param tenant_id: owner of the security group rule
@type tenant_id: string
"""
SECURITY_GROUP_RULE = 'security_group_rule'
def __init__(self, id_=None, direction=None, ethertype=None,
security_group_id=None, port_range_min=None,
port_range_max=None, protocol=None, remote_group_id=None,
remote_ip_prefix=None, tenant_id=None, **kwargs):
# kwargs is to be used for extensions or checking unexpected attrs
super(SecurityGroupRule, self).__init__()
self.id = id_
self.direction = direction
self.ethertype = ethertype
self.security_group_id = security_group_id
self.port_range_min = port_range_min
self.port_range_max = port_range_max
self.protocol = protocol
self.remote_group_id = remote_group_id
self.remote_ip_prefix = remote_ip_prefix
self.tenant_id = tenant_id
self.kwargs = kwargs
@classmethod
def _json_to_obj(cls, serialized_str):
"""Return security group rule object from a JSON serialized string"""
ret = None
json_dict = json.loads(serialized_str)
# Replacing attribute response names if they are Python reserved words
# with a trailing underscore, for ex. id for id_ or if they have a
# special character within the name replacing it for an underscore too
json_dict = cls._replace_dict_key(
json_dict, 'id', 'id_', recursion=True)
if cls.SECURITY_GROUP_RULE in json_dict:
security_group_rule_dict = json_dict.get(cls.SECURITY_GROUP_RULE)
ret = SecurityGroupRule(**security_group_rule_dict)
return ret
class SecurityGroupRules(AutoMarshallingListModel):
SECURITY_GROUP_RULES = 'security_group_rules'
@classmethod
def _json_to_obj(cls, serialized_str):
"""
Return a list of security group rule objects from a JSON
serialized string
"""
ret = cls()
json_dict = json.loads(serialized_str)
# Replacing attribute response names if they are Python reserved words
# with a trailing underscore, for ex. id for id_ or if they have a
# special character within the name replacing it for an underscore too
json_dict = cls._replace_dict_key(
json_dict, 'id', 'id_', recursion=True)
if cls.SECURITY_GROUP_RULES in json_dict:
security_group_rules = json_dict.get(cls.SECURITY_GROUP_RULES)
for security_group_rule in security_group_rules:
ret.append(SecurityGroupRule(**security_group_rule))
return ret

View File

@ -0,0 +1,15 @@
"""
Copyright 2014 Rackspace
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""

View File

@ -0,0 +1,15 @@
"""
Copyright 2014 Rackspace
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""

View File

@ -0,0 +1,15 @@
"""
Copyright 2014 Rackspace
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""

View File

@ -0,0 +1,261 @@
"""
Copyright 2014 Rackspace
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import unittest
from cloudcafe.networking.networks.extensions.security_groups_api.models.\
request import SecurityGroupRequest, SecurityGroupRuleRequest
from cloudcafe.networking.networks.extensions.security_groups_api.models.\
response import SecurityGroup, SecurityGroups, SecurityGroupRule, \
SecurityGroupRules
SECURITY_GROUP_TAG = SecurityGroup.SECURITY_GROUP
SECURITY_GROUPS_TAG = SecurityGroups.SECURITY_GROUPS
SECURITY_GROUP_RULE_TAG = SecurityGroupRule.SECURITY_GROUP_RULE
SECURITY_GROUP_RULES_TAG = SecurityGroupRules.SECURITY_GROUP_RULES
SECURITY_GROUP_RULES_DATA = (
"""
[{
"direction": "egress",
"ethertype": "IPv4",
"id": "38ce2d8e-e8f1-48bd-83c2-d33cb9f50c3d",
"port_range_max": null,
"port_range_min": null,
"protocol": null,
"remote_group_id": null,
"remote_ip_prefix": null,
"security_group_id": "2076db17-a522-4506-91de-c6dd8e837028",
"tenant_id": "e4f50856753b4dc6afee5fa6b9b6c550"
},
{
"direction": "egress",
"ethertype": "IPv6",
"id": "565b9502-12de-4ffd-91e9-68885cff6ae1",
"port_range_max": null,
"port_range_min": null,
"protocol": null,
"remote_group_id": null,
"remote_ip_prefix": null,
"security_group_id": "2076db17-a522-4506-91de-c6dd8e837028",
"tenant_id": "e4f50856753b4dc6afee5fa6b9b6c550"
}]""")
class CreateSecurityGroupTest(unittest.TestCase):
"""Test for the Security Groups Create (POST) Model object request"""
@classmethod
def setUpClass(cls):
create_attrs = dict(
name='test_name_value', description='test_description_value',
tenant_id='test_tenant_id_value')
cls.security_group_model = SecurityGroupRequest(**create_attrs)
def test_json_request(self):
"""JSON test with all possible create attrs"""
expected_json_output = (
'{{"{tag}": {{"tenant_id": "test_tenant_id_value", "name": '
'"test_name_value", "description": "test_description_value"}}}}').\
format(tag=SECURITY_GROUP_TAG)
request_body = self.security_group_model._obj_to_json()
msg = ('Unexpected JSON Network request serialization. Expected {0} '
'instead of {1}'.format(expected_json_output, request_body))
self.assertEqual(request_body, expected_json_output, msg)
class CreateSecurityGroupRuleTest(unittest.TestCase):
"""Test for the Security Groups Rule Create (POST) Model object request"""
@classmethod
def setUpClass(cls):
create_attrs = dict(
direction='dir_val', ethertype='eth_val', security_group_id='s_id',
port_range_min=1, port_range_max=250, protocol='protocol_val',
remote_group_id='r_id', remote_ip_prefix='prefix_val')
cls.rules_model = SecurityGroupRuleRequest(**create_attrs)
def test_json_request(self):
"""JSON test with all possible create attrs"""
expected_json_output = (
'{{"{tag}": {{"remote_group_id": "r_id", "direction": '
'"dir_val", "protocol": "protocol_val", "ethertype": "eth_val", '
'"port_range_max": 250, "security_group_id": "s_id", '
'"port_range_min": 1, "remote_ip_prefix": "prefix_val"}}}}').\
format(tag=SECURITY_GROUP_RULE_TAG)
request_body = self.rules_model._obj_to_json()
msg = ('Unexpected JSON Network request serialization. Expected {0} '
'instead of {1}'.format(expected_json_output, request_body))
self.assertEqual(request_body, expected_json_output, msg)
class GetSecurityGroupTest(unittest.TestCase):
"""Test for the Security Groups Show (GET) Model object response"""
@classmethod
def setUpClass(cls):
"""Creating security groups model"""
rule1_attrs = dict(
id_=u'38ce2d8e-e8f1-48bd-83c2-d33cb9f50c3d', direction=u'egress',
ethertype=u'IPv4',
security_group_id=u'2076db17-a522-4506-91de-c6dd8e837028',
tenant_id=u'e4f50856753b4dc6afee5fa6b9b6c550')
rule2_attrs = dict(
id_=u'565b9502-12de-4ffd-91e9-68885cff6ae1', direction=u'egress',
ethertype=u'IPv6',
security_group_id=u'2076db17-a522-4506-91de-c6dd8e837028',
tenant_id=u'e4f50856753b4dc6afee5fa6b9b6c550')
rules = [SecurityGroupRule(**rule1_attrs),
SecurityGroupRule(**rule2_attrs)]
get_attrs = dict(
name='security_group_name_1',
description='group text description',
tenant_id='e4f50856753b4dc6afee5fa6b9b6c550',
id_='4e8e5957-649f-477b-9e5b-f1f75b21c03c',
security_group_rules=rules)
cls.expected_response = SecurityGroup(**get_attrs)
def test_json_response(self):
api_json_resp = (
"""{{
"{tag}": {{
"description": "group text description",
"id": "4e8e5957-649f-477b-9e5b-f1f75b21c03c",
"name": "security_group_name_1",
"security_group_rules": {rules_data},
"tenant_id": "e4f50856753b4dc6afee5fa6b9b6c550"}}}}
""").format(tag=SECURITY_GROUP_TAG,
rules_data=SECURITY_GROUP_RULES_DATA)
response_obj = SecurityGroup()._json_to_obj(api_json_resp)
self.assertEqual(response_obj, self.expected_response,
'JSON to Obj response different than expected')
class GetSecurityGroupRuleTest(unittest.TestCase):
"""Test for the Security Groups Rule Show (GET) Model object response"""
@classmethod
def setUpClass(cls):
"""Creating security groups rule model"""
get_attrs = dict(
id_=u'3c0e45ff-adaf-4124-b083-bf390e5482ff', direction=u'egress',
ethertype=u'IPv6',
security_group_id=u'85cc3048-abc3-43cc-89b3-377341426ac5',
tenant_id=u'e4f50856753b4dc6afee5fa6b9b6c550')
cls.expected_response = SecurityGroupRule(**get_attrs)
def test_json_response(self):
api_json_resp = (
"""{{
"{tag}": {{
"direction": "egress",
"ethertype": "IPv6",
"id": "3c0e45ff-adaf-4124-b083-bf390e5482ff",
"port_range_max": null,
"port_range_min": null,
"protocol": null,
"remote_group_id": null,
"remote_ip_prefix": null,
"security_group_id": "85cc3048-abc3-43cc-89b3-377341426ac5",
"tenant_id": "e4f50856753b4dc6afee5fa6b9b6c550"
}}}}""").format(tag=SECURITY_GROUP_RULE_TAG)
response_obj = SecurityGroupRule()._json_to_obj(api_json_resp)
self.assertEqual(response_obj, self.expected_response,
'JSON to Obj response different than expected')
class ListSecurityGroupsTest(unittest.TestCase):
"""Test for the Security Groups List (GET) Model object response"""
@classmethod
def setUpClass(cls):
"""Creating security groups model"""
rule1_attrs = dict(
id_=u'38ce2d8e-e8f1-48bd-83c2-d33cb9f50c3d', direction=u'egress',
ethertype=u'IPv4',
security_group_id=u'2076db17-a522-4506-91de-c6dd8e837028',
tenant_id=u'e4f50856753b4dc6afee5fa6b9b6c550')
rule2_attrs = dict(
id_=u'565b9502-12de-4ffd-91e9-68885cff6ae1', direction=u'egress',
ethertype=u'IPv6',
security_group_id=u'2076db17-a522-4506-91de-c6dd8e837028',
tenant_id=u'e4f50856753b4dc6afee5fa6b9b6c550')
rules = [SecurityGroupRule(**rule1_attrs),
SecurityGroupRule(**rule2_attrs)]
get1_attrs = dict(
name='security_group_name_1',
description='group text description',
tenant_id='e4f50856753b4dc6afee5fa6b9b6c550',
id_='4e8e5957-649f-477b-9e5b-f1f75b21c03c',
security_group_rules=rules)
get2_attrs = dict(
name='security_group_name_2',
description='group 2 text description',
tenant_id='a_tenant_id',
id_='a_security_group_id',
security_group_rules=rules)
cls.expected_response = [SecurityGroup(**get1_attrs),
SecurityGroup(**get2_attrs)]
def test_json_response(self):
api_json_resp = (
"""{{
"{tag}": [{{
"description": "group text description",
"id": "4e8e5957-649f-477b-9e5b-f1f75b21c03c",
"name": "security_group_name_1",
"security_group_rules": {rules_data},
"tenant_id": "e4f50856753b4dc6afee5fa6b9b6c550"}},
{{
"description": "group 2 text description",
"id": "a_security_group_id",
"name": "security_group_name_2",
"security_group_rules": {rules_data},
"tenant_id": "a_tenant_id"}}]
}}""").format(tag=SECURITY_GROUPS_TAG,
rules_data=SECURITY_GROUP_RULES_DATA)
response_obj = SecurityGroups()._json_to_obj(api_json_resp)
self.assertEqual(response_obj, self.expected_response,
'JSON to Obj response different than expected')
class ListSecurityGroupRulesTest(unittest.TestCase):
"""Test for the Security Groups List (GET) Model object response"""
@classmethod
def setUpClass(cls):
"""Creating security groups model"""
rule1_attrs = dict(
id_=u'38ce2d8e-e8f1-48bd-83c2-d33cb9f50c3d', direction=u'egress',
ethertype=u'IPv4',
security_group_id=u'2076db17-a522-4506-91de-c6dd8e837028',
tenant_id=u'e4f50856753b4dc6afee5fa6b9b6c550')
rule2_attrs = dict(
id_=u'565b9502-12de-4ffd-91e9-68885cff6ae1', direction=u'egress',
ethertype=u'IPv6',
security_group_id=u'2076db17-a522-4506-91de-c6dd8e837028',
tenant_id=u'e4f50856753b4dc6afee5fa6b9b6c550')
cls.expected_response = [SecurityGroupRule(**rule1_attrs),
SecurityGroupRule(**rule2_attrs)]
def test_json_response(self):
api_json_resp = (
"""{{
"{tag}": {rules_data}
}}""").format(tag=SECURITY_GROUP_RULES_TAG,
rules_data=SECURITY_GROUP_RULES_DATA)
response_obj = SecurityGroupRules()._json_to_obj(api_json_resp)
self.assertEqual(response_obj, self.expected_response,
'JSON to Obj response different than expected')
if __name__ == "__main__":
unittest.main()

View File

@ -75,16 +75,14 @@ class ShowNetworkTest(unittest.TestCase):
def setUpClass(cls): def setUpClass(cls):
"""Creating network_model with currently supported attributes""" """Creating network_model with currently supported attributes"""
show_attrs = dict( show_attrs = dict(
status='ACTIVE', subnets=['54d6f61d-db07-451c-9ab3-b9609b6b6f0b', status='ACTIVE', subnets=[u'54d6f61d-db07-451c-9ab3-b9609b6b6f0b',
'79d6f61d-d007-51cd-9a33-b9609b6b6f0c'], u'79d6f61d-d007-51cd-9a33-b9609b6b6f0c'],
name='net1', admin_state_up=True, name='net1', admin_state_up=True,
tenant_id='9bacb3c5d39d41a79512987f338cf177', shared=False, tenant_id='9bacb3c5d39d41a79512987f338cf177', shared=False,
id_='4e8e5957-649f-477b-9e5b-f1f75b21c03c', router_external=True) id_='4e8e5957-649f-477b-9e5b-f1f75b21c03c')
cls.expected_response = Network(**show_attrs) cls.expected_response = Network(**show_attrs)
def test_json_response(self): def test_json_response(self):
# Response data with extension attributes, if supported later on they
# will need to be added to the setUp object model in this test class
api_json_resp = ( api_json_resp = (
"""{{ """{{
"{tag}": {{ "{tag}": {{
@ -96,24 +94,8 @@ class ShowNetworkTest(unittest.TestCase):
"name": "net1", "name": "net1",
"admin_state_up": true, "admin_state_up": true,
"tenant_id": "9bacb3c5d39d41a79512987f338cf177", "tenant_id": "9bacb3c5d39d41a79512987f338cf177",
"segments": [
{{
"provider:segmentation_id": 2,
"provider:physical_network":
"8bab8453-1bc9-45af-8c70-f83aa9b50453",
"provider:network_type": "vlan"
}},
{{
"provider:segmentation_id": null,
"provider:physical_network":
"8bab8453-1bc9-45af-8c70-f83aa9b50453",
"provider:network_type": "stt"
}}
],
"shared": false, "shared": false,
"port_security_enabled": true, "id": "4e8e5957-649f-477b-9e5b-f1f75b21c03c"
"id": "4e8e5957-649f-477b-9e5b-f1f75b21c03c",
"router:external": true
}} }}
}}""").format(tag=NETWORK_TAG) }}""").format(tag=NETWORK_TAG)
response_obj = Network()._json_to_obj(api_json_resp) response_obj = Network()._json_to_obj(api_json_resp)
@ -127,22 +109,20 @@ class ShowMultipleNetworksTest(unittest.TestCase):
def setUpClass(cls): def setUpClass(cls):
"""Creating network_model with currently supported attributes""" """Creating network_model with currently supported attributes"""
show_attrs_1 = dict( show_attrs_1 = dict(
status='ACTIVE', subnets=['54d6f61d-db07-451c-9ab3-b9609b6b6f0b'], status='ACTIVE', subnets=[u'54d6f61d-db07-451c-9ab3-b9609b6b6f0b'],
name='private-network', admin_state_up=True, name='private-network', admin_state_up=True,
tenant_id='4fd44f30292945e481c7b8a0c8908869', shared=True, tenant_id='4fd44f30292945e481c7b8a0c8908869', shared=True,
id_='d32019d3-bc6e-4319-9c1d-6722fc136a22', router_external=True) id_='d32019d3-bc6e-4319-9c1d-6722fc136a22')
show_attrs_2 = dict( show_attrs_2 = dict(
status='ACTIVE', subnets=['08eae331-0402-425a-923c-34f7cfe39c1b'], status='ACTIVE', subnets=[u'08eae331-0402-425a-923c-34f7cfe39c1b'],
name='private', admin_state_up=True, name='private', admin_state_up=True,
tenant_id='26a7980765d0414dbc1fc1f88cdb7e6e', shared=True, tenant_id='26a7980765d0414dbc1fc1f88cdb7e6e', shared=True,
id_='db193ab3-96e3-4cb3-8fc5-05f4296d0324', router_external=True) id_='db193ab3-96e3-4cb3-8fc5-05f4296d0324')
net1 = Network(**show_attrs_1) net1 = Network(**show_attrs_1)
net2 = Network(**show_attrs_2) net2 = Network(**show_attrs_2)
cls.expected_response = [net1, net2] cls.expected_response = [net1, net2]
def test_json_response(self): def test_json_response(self):
# Response data with extension attributes, if supported later on they
# will need to be added to the setUp object model in this test class
api_json_resp = ( api_json_resp = (
"""{{ """{{
"{tag}": [ "{tag}": [
@ -152,14 +132,10 @@ class ShowMultipleNetworksTest(unittest.TestCase):
"54d6f61d-db07-451c-9ab3-b9609b6b6f0b" "54d6f61d-db07-451c-9ab3-b9609b6b6f0b"
], ],
"name": "private-network", "name": "private-network",
"provider:physical_network": null,
"admin_state_up": true, "admin_state_up": true,
"tenant_id": "4fd44f30292945e481c7b8a0c8908869", "tenant_id": "4fd44f30292945e481c7b8a0c8908869",
"provider:network_type": "local",
"router:external": true,
"shared": true, "shared": true,
"id": "d32019d3-bc6e-4319-9c1d-6722fc136a22", "id": "d32019d3-bc6e-4319-9c1d-6722fc136a22"
"provider:segmentation_id": null
}}, }},
{{ {{
"status": "ACTIVE", "status": "ACTIVE",
@ -167,14 +143,10 @@ class ShowMultipleNetworksTest(unittest.TestCase):
"08eae331-0402-425a-923c-34f7cfe39c1b" "08eae331-0402-425a-923c-34f7cfe39c1b"
], ],
"name": "private", "name": "private",
"provider:physical_network": null,
"admin_state_up": true, "admin_state_up": true,
"tenant_id": "26a7980765d0414dbc1fc1f88cdb7e6e", "tenant_id": "26a7980765d0414dbc1fc1f88cdb7e6e",
"provider:network_type": "local",
"router:external": true,
"shared": true, "shared": true,
"id": "db193ab3-96e3-4cb3-8fc5-05f4296d0324", "id": "db193ab3-96e3-4cb3-8fc5-05f4296d0324"
"provider:segmentation_id": null
}} }}
] ]
}}""").format(tag=NETWORKS_TAG) }}""").format(tag=NETWORKS_TAG)

View File

@ -116,15 +116,14 @@ class ShowPortTest(unittest.TestCase):
def setUpClass(cls): def setUpClass(cls):
"""Creating port_model with with extension included attributes""" """Creating port_model with with extension included attributes"""
show_attrs = dict( show_attrs = dict(
status="ACTIVE", binding_host_id="devstack", name="response_name", status="ACTIVE", name="response_name",
allowed_address_pairs=[], admin_state_up=True, allowed_address_pairs=[], admin_state_up=True,
network_id="a87cc70a-3e15-4acf-8205-9b711a3531b7", network_id="a87cc70a-3e15-4acf-8205-9b711a3531b7",
tenant_id="7e02058126cc4950b75f9970368ba177", tenant_id="7e02058126cc4950b75f9970368ba177",
extra_dhcp_opts=[], binding_vif_details={"port_filter": True, extra_dhcp_opts=[],
"ovs_hybrid_plug": True}, binding_vif_type="ovs",
device_owner="network:router_interface", device_owner="network:router_interface",
mac_address="fa:16:3e:23:fd:d7", binding_profile={}, mac_address="fa:16:3e:23:fd:d7",
binding_vnic_type="normal", fixed_ips=[{"subnet_id": fixed_ips=[{"subnet_id":
"a0304c3a-4f08-4c43-88af-d796509c97d2", "ip_address": "10.0.0.1"}], "a0304c3a-4f08-4c43-88af-d796509c97d2", "ip_address": "10.0.0.1"}],
id_="46d4bfb9-b26e-41f3-bd2e-e6dcc1ccedb2", security_groups=[], id_="46d4bfb9-b26e-41f3-bd2e-e6dcc1ccedb2", security_groups=[],
device_id="5e3898d7-11be-483e-9732-b2f5eccd2b2e") device_id="5e3898d7-11be-483e-9732-b2f5eccd2b2e")
@ -135,22 +134,14 @@ class ShowPortTest(unittest.TestCase):
"""{{ """{{
"{tag}": {{ "{tag}": {{
"status": "ACTIVE", "status": "ACTIVE",
"binding:host_id": "devstack",
"name": "response_name", "name": "response_name",
"allowed_address_pairs": [], "allowed_address_pairs": [],
"admin_state_up": true, "admin_state_up": true,
"network_id": "a87cc70a-3e15-4acf-8205-9b711a3531b7", "network_id": "a87cc70a-3e15-4acf-8205-9b711a3531b7",
"tenant_id": "7e02058126cc4950b75f9970368ba177", "tenant_id": "7e02058126cc4950b75f9970368ba177",
"extra_dhcp_opts": [], "extra_dhcp_opts": [],
"binding:vif_details": {{
"port_filter": true,
"ovs_hybrid_plug": true
}},
"binding:vif_type": "ovs",
"device_owner": "network:router_interface", "device_owner": "network:router_interface",
"mac_address": "fa:16:3e:23:fd:d7", "mac_address": "fa:16:3e:23:fd:d7",
"binding:profile": {{}},
"binding:vnic_type": "normal",
"fixed_ips": [ "fixed_ips": [
{{ {{
"subnet_id": "a0304c3a-4f08-4c43-88af-d796509c97d2", "subnet_id": "a0304c3a-4f08-4c43-88af-d796509c97d2",
@ -196,29 +187,17 @@ class ShowMultiplePortTest(unittest.TestCase):
cls.expected_response = [sub1, sub2] cls.expected_response = [sub1, sub2]
def test_json_response(self): def test_json_response(self):
# Response data with extension attributes, if supported later on they
# will need to be added to the setUp object model in this test class
api_json_resp = ( api_json_resp = (
"""{{ """{{
"{tag}": [ "{tag}": [
{{ {{
"status": "ACTIVE", "status": "ACTIVE",
"binding:host_id": "devstack",
"name": "", "name": "",
"allowed_address_pairs": [],
"admin_state_up": true, "admin_state_up": true,
"network_id": "70c1db1f-b701-45bd-96e0-a313ee3430b3", "network_id": "70c1db1f-b701-45bd-96e0-a313ee3430b3",
"tenant_id": "", "tenant_id": "",
"extra_dhcp_opts": [],
"binding:vif_details": {{
"port_filter": true,
"ovs_hybrid_plug": true
}},
"binding:vif_type": "ovs",
"device_owner": "network:router_gateway", "device_owner": "network:router_gateway",
"mac_address": "fa:16:3e:58:42:ed", "mac_address": "fa:16:3e:58:42:ed",
"binding:profile": {{}},
"binding:vnic_type": "normal",
"fixed_ips": [ "fixed_ips": [
{{ {{
"subnet_id": "008ba151-0b8c-4a67-98b5-0d2b87666062", "subnet_id": "008ba151-0b8c-4a67-98b5-0d2b87666062",
@ -231,22 +210,12 @@ class ShowMultiplePortTest(unittest.TestCase):
}}, }},
{{ {{
"status": "ACTIVE", "status": "ACTIVE",
"binding:host_id": "devstack",
"name": "", "name": "",
"allowed_address_pairs": [],
"admin_state_up": true, "admin_state_up": true,
"network_id": "f27aa545-cbdd-4907-b0c6-c9e8b039dcc2", "network_id": "f27aa545-cbdd-4907-b0c6-c9e8b039dcc2",
"tenant_id": "d397de8a63f341818f198abb0966f6f3", "tenant_id": "d397de8a63f341818f198abb0966f6f3",
"extra_dhcp_opts": [],
"binding:vif_details": {{
"port_filter": true,
"ovs_hybrid_plug": true
}},
"binding:vif_type": "ovs",
"device_owner": "network:router_interface", "device_owner": "network:router_interface",
"mac_address": "fa:16:3e:bb:3c:e4", "mac_address": "fa:16:3e:bb:3c:e4",
"binding:profile": {{}},
"binding:vnic_type": "normal",
"fixed_ips": [ "fixed_ips": [
{{ {{
"subnet_id": "288bf4a1-51ba-43b6-9d0a-520e9005db17", "subnet_id": "288bf4a1-51ba-43b6-9d0a-520e9005db17",