Security Group support for Nova and Neutron

We have implemented the API and helpers for Nova and Neutron for
the Security Group Resource and Link. Security Group is working
just by using json requests.

Change-Id: I15ce76a42caa75f158ac1a792aa44d2da8c7e404
This commit is contained in:
Jorge Sevilla 2017-02-24 17:59:57 +01:00
parent 647740eb9c
commit ecf29929d7
11 changed files with 1561 additions and 31 deletions

View File

@ -114,6 +114,13 @@ class BaseHelper(object):
self.app = app
self.openstack_version = openstack_version
@staticmethod
def tenant_from_req(req):
try:
return req.environ["HTTP_X_PROJECT_ID"]
except KeyError:
raise exception.Forbidden(reason="Cannot find project ID")
def _get_req(self, req, method,
path=None,
content_type="application/json",
@ -198,17 +205,21 @@ class BaseHelper(object):
return self._get_req(req, path=path,
query_string=query_string, method="GET")
def _make_create_request(self, req, resource, parameters):
def _make_create_request(self, req, resource, parameters,
resource_object_name=None):
"""Create CREATE request
This method creates a CREATE Request instance
:param req: the incoming request
:param parameters: parameters with values
:param resource_object_name: in case resource name is different
to the response one.
"""
path = "/%s" % resource
single_resource = resource[:-1]
body = utils.make_body(single_resource, parameters)
if not resource_object_name:
resource_object_name = resource[:-1]
body = utils.make_body(resource_object_name, parameters)
return self._get_req(req, path=path,
content_type="application/json",
body=json.dumps(body), method="POST")
@ -245,13 +256,6 @@ class OpenStackHelper(BaseHelper):
}
}
@staticmethod
def tenant_from_req(req):
try:
return req.environ["HTTP_X_PROJECT_ID"]
except KeyError:
raise exception.Forbidden(reason="Cannot find project ID")
def _get_index_req(self, req):
tenant_id = self.tenant_from_req(req)
path = "/%s/servers" % tenant_id
@ -1009,3 +1013,237 @@ class OpenStackHelper(BaseHelper):
os_req = self._get_req(req, path=path, method="DELETE")
os_req.get_response(self.app)
return []
def _get_security_group(self, req, sec_id):
"""Retrieve info about a security group.
:param req: the incoming request
:param sec_id: security group id to show
"""
path = "os-security-groups"
tenant_id = self.tenant_from_req(req)
path = "/%s/%s/%s" % (tenant_id, path, sec_id)
os_req = self._get_req(req, path=path,
method="GET")
response = os_req.get_response(self.app)
return self.get_from_response(response, "security_group", [])
def get_security_group_details(self, req, sec_id):
"""Get details about a security group.
:param req: the incoming request
:param sec_id: security group id to show
"""
net = self._get_security_group(req, sec_id)
ooi_sec = os_helpers.build_security_group_from_nova([net])
return ooi_sec[0]
def list_security_groups(self, req):
"""List security groups
:param req: the incoming request
"""
path = "os-security-groups"
tenant_id = self.tenant_from_req(req)
path = "/%s/%s" % (tenant_id, path)
os_req = self._get_req(req, path=path,
method="GET")
response = os_req.get_response(self.app)
sec = self.get_from_response(response, "security_groups", [])
ooi_sec = os_helpers.build_security_group_from_nova(sec)
return ooi_sec
def create_security_group(self, req, name, description, rules):
"""Create security group
:param req: the incoming request
:param name: security group name
:param description: security group description
:param rules: security group rules
"""
try:
tenant_id = self.tenant_from_req(req)
path = "os-security-groups"
path = "/%s/%s" % (tenant_id, path)
param_group = {
"description": description,
"name": name,
}
body = utils.make_body('security_group', param_group)
os_req = self._get_req(req,
path=path,
content_type="application/json",
body=json.dumps(body),
method="POST")
response_group = os_req.get_response(self.app)
secgroup = self.get_from_response(
response_group, "security_group", {})
sec_id = secgroup["id"]
secgroup["rules"] = []
for rule in rules:
port_min, port_max = os_helpers.security_group_rule_port(
rule["port"]
)
param_rules = {
"parent_group_id": sec_id,
"ip_protocol": rule["protocol"],
"from_port": port_min,
"to_port": port_max,
"cidr": rule.get("range", "0.0.0.0/0")
}
body_rules = utils.make_body('security_group_rule',
param_rules)
path = "/%s/os-security-group-rules" % (tenant_id)
os_req_rules = self._get_req(req,
path=path,
content_type="application/json",
body=json.dumps(body_rules),
method="POST")
response_rules = os_req_rules.get_response(self.app)
secrules = self.get_from_response(
response_rules, "security_group_rule", {})
secgroup["rules"].append(secrules)
ooi_sec = os_helpers.build_security_group_from_nova(
[secgroup]
)
return ooi_sec[0]
except Exception as ex:
raise ex
def delete_security_group(self, req, sec_id):
"""Delete info about a security group.
:param req: the incoming request
:param sec_id: security group id to delete
"""
path = "os-security-groups"
tenant_id = self.tenant_from_req(req)
path = "/%s/%s/%s" % (tenant_id, path, sec_id)
os_req = self._get_req(req, path=path,
method="DELETE")
os_req.get_response(self.app)
return []
def _get_server_security_group(self, req, server_id):
"""Get security group from a server
:param req: incoming request
:param server_id: server id
:return: information about the security group
"""
path = "os-security-groups"
tenant_id = self.tenant_from_req(req)
path = "/%s/servers/%s/%s" % (tenant_id,
server_id,
path
)
os_req = self._get_req(req, path=path,
method="GET")
response = os_req.get_response(self.app)
sec = self.get_from_response(response,
"security_groups", [])
ooi_sec = os_helpers.build_security_group_from_nova(sec)
return ooi_sec
def list_server_security_groups(self, req,
server_id=None):
"""List security groups associated to a server
:param req: incoming request
:param server_id: server id
:return: security groups associated to servers
"""
return self._get_server_security_group(
req, server_id)
def list_server_security_links(self, req, server_id=None):
"""List security groups associated to servers
:param req: incoming request
:param server_id: server id
:return: security groups associated to servers
"""
link_list = []
if server_id:
compute_list = [self.get_server(req, server_id)]
else:
compute_list = self.index(req)
for c in compute_list:
server_id = c["id"]
server_secgroups = self._get_server_security_group(
req, server_id)
for sec in server_secgroups:
link = {
"compute_id": server_id,
"securitygroup": sec
}
link_list.append(link)
return link_list
def get_server_security_link(self, req, server_id,
securitygroup_id):
"""Show security group link from a server
:param req: incoming request
:param server_id: server id
:param securitygroup_id: security group id
:return: information about the link of security group
"""
ooi_sec = self._get_server_security_group(req, server_id)
for sg in ooi_sec:
if sg["id"] == securitygroup_id:
link = {"compute_id": server_id,
"securitygroup": sg
}
return [link]
return None
def delete_server_security_link(self, req, server_id,
securitygroup_id):
"""Delete security group link from a server
:param req: incoming request
:param server_id: server id
:param securitygroup_id: segurity group id
:return: empty
"""
tenant_id = self.tenant_from_req(req)
path = "/%s/servers/%s/action" % (tenant_id, server_id)
sg = self._get_security_group(req, securitygroup_id)
if "name" not in sg:
raise exception.NotFound("Security group %s not found."
% securitygroup_id)
param = {"name": sg["name"]}
body = utils.make_body('removeSecurityGroup', param)
os_req = self._get_req(req,
path=path,
content_type="application/json",
body=json.dumps(body),
method="POST")
os_req.get_response(self.app)
return []
def create_server_security_link(self, req, server_id,
securitygroup_id):
"""Create security group link in a server
:param req: incoming request
:param server_id: server id
:param securitygroup_id: segurity group id
:return: empty
"""
tenant_id = self.tenant_from_req(req)
path = "/%s/servers/%s/action" % (tenant_id, server_id)
sg = self._get_security_group(req, securitygroup_id)
if "name" not in sg:
raise exception.NotFound("Security group %s not found."
% securitygroup_id)
param = {"name": sg["name"]}
body = utils.make_body('addSecurityGroup', param)
os_req = self._get_req(req,
path=path,
content_type="application/json",
body=json.dumps(body),
method="POST")
os_req.get_response(self.app)
return []

View File

@ -539,3 +539,91 @@ class OpenStackNeutron(helpers.BaseHelper):
response = os_req.get_response()
if response.status_int != 202:
raise helpers.exception_from_response(response)
def get_security_group_details(self, req, sec_id):
"""Get info about a security group.
:param req: the incoming request
:param sec_id: security group id to show
"""
try:
secgroup = self.get_resource(req, 'security-groups', sec_id,
response_resource="security_group")
ooi_sec = os_helpers.build_security_group_from_neutron(
[secgroup]
)
return ooi_sec[0]
except Exception:
raise exception.NotFound()
def list_security_groups(self, req):
"""List security groups
:param req: the incoming request
"""
try:
secgroup = self.list_resources(req, 'security-groups',
response_resource="security_groups")
ooi_sec = os_helpers.build_security_group_from_neutron(
secgroup
)
return ooi_sec
except Exception:
raise exception.NotFound()
def create_security_group(self, req, name, description, rules):
"""Create security group
:param req: the incoming request
:param name: security group name
:param description: security group description
:param rules: security group rules
"""
try:
tenant_id = self.tenant_from_req(req)
param_group = {"tenant_id": tenant_id,
"description": description,
"name": name,
}
secgroup = self.create_resource(
req, 'security-groups', param_group,
response_resource="security_group")
sec_id = secgroup["id"]
secgroup["security_group_rules"] = []
for rule in rules:
port_min, port_max = os_helpers.security_group_rule_port(
rule["port"]
)
param_rule = {
"ethertype": rule.get("ipversion", "IPv4"),
"port_range_max": port_max,
"port_range_min": port_min,
"direction": os_helpers.security_group_rule_type(
rule["type"]),
"remote_ip_prefix": rule.get("range", "0.0.0.0/0"),
"protocol": rule["protocol"],
"security_group_id": sec_id,
}
secrule = self.create_resource(
req,
'security-group-rules', param_rule,
response_resource="security_group_rule")
secgroup["security_group_rules"].append(secrule)
ooi_sec = os_helpers.build_security_group_from_neutron(
[secgroup]
)
return ooi_sec[0]
except Exception as ex:
raise ex
def delete_security_group(self, req, sec_id):
"""Delete info about a security group.
:param req: the incoming request
:param sec_id: security group id to delete
"""
try:
secgroup = self.delete_resource(req, 'security-groups', sec_id)
return secgroup
except Exception:
raise exception.NotFound()

192
ooi/api/securitygroup.py Normal file
View File

@ -0,0 +1,192 @@
# Copyright 2015 LIP - INDIGO-DataCloud
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from ooi.api import base
from ooi.api import helpers
from ooi.api import helpers_neutron
from ooi import exception
from ooi.occi.core import collection
from ooi.occi.infrastructure import securitygroup
from ooi.occi import validator as occi_validator
def parse_validate_schema(req, scheme=None,
required_attr=None):
"""Parse attributes and validate scheme
Returns attributes from request
If scheme is specified, it validates the OCCI scheme:
-Raises exception in case of being invalid
:param req: request
:param: scheme: scheme to validate
:param: required_attr: attributes required
"""
parser = req.get_parser()(req.headers, req.body.decode("utf8"))
if scheme:
attributes = parser.parse()
validator = occi_validator.Validator(attributes)
validator.validate(scheme)
validator.validate_attributes(required_attr)
else:
attributes = parser.parse_attributes(req.headers)
return attributes
def process_parameters(req, scheme=None,
required_attr=None):
"""Get attributes from request parameters
:param req: request
:param scheme: scheme to validate
:param required_attr: attributes required
"""
parameters = parse_validate_schema(req, scheme, required_attr)
try:
attributes = {}
if 'X_PROJECT_ID' in req.headers:
attributes["X_PROJECT_ID"] = req.headers["X_PROJECT_ID"]
if "attributes" in parameters:
for k, v in parameters.get("attributes", None).items():
attributes[k.strip()] = v
if not attributes:
attributes = None
except Exception:
raise exception.Invalid
return attributes
class Controller(base.Controller):
def __init__(self, app=None, openstack_version=None,
neutron_ooi_endpoint=None):
"""Security group controller initialization
:param app: application
:param: openstack_version: nova version
:param: neutron_ooi_endpoint: This parameter
indicates the Neutron endpoint to load the Neutron Helper.
If it is None, Nova api is used.
"""
super(Controller, self).__init__(
app=app,
openstack_version=openstack_version)
if neutron_ooi_endpoint:
self.os_helper = helpers_neutron.OpenStackNeutron(
neutron_ooi_endpoint
)
else:
self.os_helper = helpers.OpenStackHelper(
self.app,
self.openstack_version
)
@staticmethod
def _get_security_group_resources(securitygroup_list):
"""Create OCCI security group instances from list
:param securitygroup_list: security group objects
provides by the cloud infrastructure
:return occi security group list
"""
occi_securitygroup_resources = []
if securitygroup_list:
for s in securitygroup_list:
s_rules = s['rules']
s_id = s["id"]
s_name = s["title"]
s_summary = s["summary"]
s = securitygroup.SecurityGroupResource(title=s_name,
id=s_id,
rules=s_rules,
summary=s_summary)
occi_securitygroup_resources.append(s)
return occi_securitygroup_resources
def index(self, req):
"""List security groups
:param req: request object
"""
occi_sc = self.os_helper.list_security_groups(req)
occi_sc_resources = self._get_security_group_resources(
occi_sc)
return collection.Collection(
resources=occi_sc_resources)
def show(self, req, id):
"""Get security group details
:param req: request object
:param id: security group identification
"""
resp = self.os_helper.get_security_group_details(req, id)
occi_sc_resources = self._get_security_group_resources(
[resp])
return occi_sc_resources[0]
def create(self, req, body=None):
"""Create a network instance in the cloud
:param req: request object
:param body: body request (not used)
"""
scheme = {
"category": securitygroup.SecurityGroupResource.kind,
}
required = ["occi.core.title",
"occi.securitygroup.rules"
]
attributes = process_parameters(req, scheme, required)
name = attributes.get('occi.core.title')
description = attributes.get("occi.core.summary", "")
try:
rules = attributes.get('occi.securitygroup.rules')
except Exception:
raise exception.Invalid(
"Bad JSON format for occi.securitygroup.rules: %s"
% attributes.get(
'occi.securitygroup.rules'))
sec = self.os_helper.create_security_group(
req, name, description, rules
)
occi_sec_resources = self._get_security_group_resources([sec])
return collection.Collection(
resources=occi_sec_resources)
def delete(self, req, id):
"""delete security groups which satisfy the parameters
:param req: current request
:param id: identification
"""
response = self.os_helper.delete_security_group(req, id)
return response
def run_action(self, req, id, body):
"""Run action over the security group
:param req: current request
:param id: security group
:param body: body
"""
action = req.GET.get("action", None)
occi_actions = [a.term for a in
securitygroup.SecurityGroupResource.actions]
if action is None or action not in occi_actions:
raise exception.InvalidAction(action=action)
raise exception.NotImplemented("Security group actions are not implemented")

View File

@ -0,0 +1,144 @@
# Copyright 2015 LIP - INDIGO-DataCloud
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from ooi.api import base
import ooi.api.helpers
from ooi import exception
from ooi.occi.core import collection
from ooi.occi.infrastructure import compute
from ooi.occi.infrastructure import securitygroup
from ooi.occi.infrastructure import securitygroup_link
from ooi.occi import validator as occi_validator
def _get_security_link_resources(link_list):
"""Create OCCI security group instances from a list
:param link_list: provides by the cloud infrastructure
"""
occi_secgropu_resources = []
if link_list:
for l in link_list:
compute_id = l['compute_id']
sec = l['securitygroup']
sec_id = sec.get("id")
sec_name = sec.get("title", "")
sec_rules = sec.get("rules", [])
s = securitygroup.SecurityGroupResource(title=sec_name,
id=sec_id,
rules=sec_rules)
c = compute.ComputeResource(title="Compute",
id=compute_id
)
link = securitygroup_link.SecurityGroupLink(source=c,
target=s)
occi_secgropu_resources.append(link)
return occi_secgropu_resources
class Controller(base.Controller):
def __init__(self, *args, **kwargs):
super(Controller, self).__init__(*args, **kwargs)
# TODO(jorgesece): add neutron controller to list securitygroups
self.os_helper = ooi.api.helpers.OpenStackHelper(
self.app,
self.openstack_version
)
def _get_attachment_from_id(self, req, attachment_id):
try:
server_id, security_id = attachment_id.split('_', 1)
return {"server_id": server_id,
"securitygroup_id": security_id}
except ValueError:
raise exception.LinkNotFound(link_id=attachment_id)
def index(self, req):
"""List security group links
:param req: request object
"""
link_list = self.os_helper.list_server_security_links(req)
occi_link_resources = _get_security_link_resources(link_list)
return collection.Collection(resources=occi_link_resources)
def show(self, req, id):
"""Get security group details
:param req: request object
:param id: security group identification
"""
try:
link_info = self._get_attachment_from_id(req, id)
server_id = link_info["server_id"]
security_name = link_info["securitygroup_id"]
link = self.os_helper.get_server_security_link(
req, server_id, security_name
)
occi_instance = _get_security_link_resources(
link
)[0]
return occi_instance
except Exception:
raise exception.LinkNotFound(link_id=id)
def create(self, req, body=None):
"""Create a security group link
Creates a link between a server and a securitygroup.
:param req: request object
:param body: body request (not used)
"""
parser = req.get_parser()(req.headers, req.body)
scheme = {
"category": securitygroup_link.SecurityGroupLink.kind,
}
obj = parser.parse()
validator = occi_validator.Validator(obj)
validator.validate(scheme)
attrs = obj.get("attributes", {})
_, securitygroup_id = ooi.api.helpers.get_id_with_kind(
req,
attrs.get("occi.core.target"),
securitygroup.SecurityGroupResource.kind)
_, server_id = ooi.api.helpers.get_id_with_kind(
req,
attrs.get("occi.core.source"),
compute.ComputeResource.kind)
self.os_helper.create_server_security_link(
req, server_id,
securitygroup_id)
link = {"compute_id": server_id,
"securitygroup": {"id": securitygroup_id}
}
occi_instance = _get_security_link_resources([link])
return collection.Collection(resources=occi_instance)
def delete(self, req, id):
"""Delete security group link
:param req: current request
:param id: identification
"""
link_info = self._get_attachment_from_id(req, id)
server_id = link_info["server_id"]
security_id = link_info["securitygroup_id"]
try:
self.os_helper.delete_server_security_link(
req, server_id, security_id)
except Exception:
raise exception.LinkNotFound(link_id=id)
return []

View File

@ -45,4 +45,119 @@ def network_status(neutron_status):
if neutron_status == "ACTIVE":
return "active"
else:
return "inactive"
return "inactive"
def security_group_rule_type(neutron_type):
"""Translate neutron rule type.
Translate to/from openstack - occi
:param neutron_type: neutron status
"""
if neutron_type == "ingress":
return "inbound"
elif neutron_type == "egress":
return "outbound"
elif neutron_type == "inbound":
return "ingress"
elif neutron_type == "outbound":
return "egress"
else:
return None
def security_group_rule_port(os_port):
"""Translate openstack rule port
Translate to/from openstack - occi
:param neutron_type: neutron status
"""
ports = str(os_port).split('-')
if ports.__len__() == 1:
port_min = port_max = ports[0]
elif ports.__len__() == 2:
port_min, port_max = ports
else:
raise Exception("Port value")
return port_min, port_max
def build_security_group_from_neutron(sec_groups):
"""Translate neutron security group
Translate to the ooi a standard security group format.
:param sec_groups: array of security groups
"""
sec_list = []
for sec in sec_groups:
ooi_sec = {}
rules_list = []
ooi_sec["id"] = sec["id"]
ooi_sec["title"] = sec.get("name", None)
ooi_sec["summary"] = sec.get("description", "")
for rule in sec["security_group_rules"]:
ipversion = rule.get("ethertype", "IPv4")
rule_type = security_group_rule_type(
rule["direction"]
)
rule_protocol = rule.get("protocol", None)
port_min = rule["port_range_min"]
port_max = rule["port_range_max"]
if port_min and (port_min != port_max):
rule_port = "%s-%s" % (port_min,
port_max
)
else:
rule_port = port_min
rule_range = str(rule["remote_ip_prefix"])
rules_list.append({"type": rule_type,
"protocol": rule_protocol,
"port": rule_port,
"range": rule_range,
"ipversion": ipversion}
)
ooi_sec["rules"] = rules_list
sec_list.append(ooi_sec)
return sec_list
def build_security_group_from_nova(sec_groups):
"""Translate nova security group
Translate to the ooi a standard security group format.
:param sec_groups: array of security groups
"""
sec_list = []
for sec in sec_groups:
ooi_sec = {}
rules_list = []
ooi_sec["id"] = sec["id"]
ooi_sec["title"] = sec.get("name", None)
ooi_sec["summary"] = sec.get("description", "")
for rule in sec["rules"]:
ipversion = "IPv4"
rule_protocol = rule.get("ip_protocol", None)
port_min = rule["from_port"]
port_max = rule["to_port"]
if port_min and (port_min != port_max):
rule_port = "%s-%s" % (port_min,
port_max
)
else:
rule_port = port_min
rule_range = str(rule["ip_range"].get("cidr", ""))
# BUG(jorgesce): type is alwayns inbound because nova
# does not provide that information.
rules_list.append({"type": "inbound",
"protocol": rule_protocol,
"port": rule_port,
"range": rule_range,
"ipversion": ipversion}
)
ooi_sec["rules"] = rules_list
sec_list.append(ooi_sec)
return sec_list

View File

@ -194,6 +194,10 @@ servers = {
"flavor": {"id": flavors[1]["id"]},
"image": {"id": images["foo"]["id"]},
"status": "ACTIVE",
"security_groups":[
{"name": "group1"},
{"name": "group2"}
]
},
{
"id": uuid.uuid4().hex,
@ -201,6 +205,9 @@ servers = {
"flavor": {"id": flavors[2]["id"]},
"image": {"id": images["bar"]["id"]},
"status": "SHUTOFF",
"security_groups":[
{"name": "group1"}
]
},
{
"id": uuid.uuid4().hex,
@ -208,6 +215,9 @@ servers = {
"flavor": {"id": flavors[1]["id"]},
"image": {"id": images["bar"]["id"]},
"status": "ERROR",
"security_groups":[
{"name": "group2"}
]
},
],
tenants["bar"]["id"]: [],
@ -237,7 +247,10 @@ servers = {
"OS-EXT-IPS:type": "floating",
"OS-EXT-IPS-MAC:mac_addr": "1234"},
]
}
},
"security_groups":[
{"name": "group1"}
]
}
],
}
@ -265,6 +278,54 @@ volumes[tenants["baz"]["id"]][1]["attachments"] = [{
"id": volumes[tenants["baz"]["id"]][0]["id"],
}]
security_groups = {
tenants["foo"]["id"]: [],
tenants["baz"]["id"]: [
{
"name": "group1",
"id": uuid.uuid4().hex,
"description": "group one",
"rules": [
{"from_port": 443,
"to_port": 443, "ip_range": {"cidr": "10.0.0.0/32"},
"ip_protocol": "tcp"},
{"from_port": "1000",
"to_port": 2000, "ip_range": {"cidr": "11.0.0.0/32"},
"ip_protocol": "udp"},
]
},
{
"name": "group2",
"id": uuid.uuid4().hex,
"description": "group two",
"rules": [
{"from_port": 80,
"to_port": 80, "ip_range": {"cidr": "10.0.0.0/32"},
"ip_protocol": "tcp"},
{"from_port": "4000",
"to_port": 7000, "ip_range": {"cidr": "13.0.0.0/32"},
"ip_protocol": "udp"},
]
}
],
tenants["bar"]["id"]: [
{
"name": "group3",
"id": uuid.uuid4().hex,
"description": "group three",
"rules": [
{"from_port": 443,
"to_port": 443, "ip_range": {"cidr": "10.0.0.0/32"},
"ip_protocol": "tcp"},
{"from_port": "1000",
"to_port": 2000, "ip_range": {"cidr": "11.0.0.0/32"},
"ip_protocol": "udp"},
]
},
]
}
def fake_query_results():
cats = []
@ -485,8 +546,17 @@ class FakeApp(object):
"os-floating-ip-pools")
self._populate(path, "floating_ip", floating_ips[tenant["id"]],
"os-floating-ips")
self._populate_ports(path, servers[tenant["id"]],
ports[tenant["id"]])
self._populate_server_links(path, "os-interface",
"interfaceAttachments",
servers[tenant["id"]],
ports[tenant["id"]])
self._populate_server_links(path, "os-security-groups",
"security_groups",
servers[tenant["id"]],
security_groups[tenant["id"]])
self._populate(path, "security_group",
security_groups[tenant["id"]],
"os-security-groups")
# NOTE(aloga): dict_values un Py3 is not serializable in JSON
self._populate(path, "image", list(images.values()))
self._populate(path, "flavor", list(flavors.values()))
@ -530,16 +600,20 @@ class FakeApp(object):
self.routes[obj_path] = create_fake_json_resp(
{"volumeAttachment": attach})
def _populate_ports(self, path, servers_list, ports_list):
def _populate_server_links(self, path, resource, obj,
servers_list, link_list):
if servers_list:
for p in ports_list:
for s in servers_list:
list_obj = []
path_base = "%s/servers/%s/%s" % (
path,
servers_list[0]["id"],
"os-interface"
s["id"],
resource
)
for l in link_list:
list_obj.append(l)
self.routes[path_base] = create_fake_json_resp(
{"interfaceAttachments": [p]})
{obj: list_obj})
@webob.dec.wsgify()
def __call__(self, req):
@ -619,7 +693,9 @@ class FakeApp(object):
body = req.json_body.copy()
action = body.popitem()
if action[0] in ["os-start", "os-stop", "reboot",
"addFloatingIp", "removeFloatingIp"]:
"addFloatingIp", "removeFloatingIp",
"removeSecurityGroup",
"addSecurityGroup"]:
return self._get_from_routes(req)
elif req.path_info.endswith("os-volume_attachments"):
return self._do_create_attachment(req)

View File

@ -29,6 +29,8 @@ application_url = "https://foo.example.org:8774/ooiv1"
tenants = {
"foo": {"id": uuid.uuid4().hex,
"name": "foo"},
"baz": {"id": uuid.uuid4().hex,
"name": "foo"},
"bar": {"id": uuid.uuid4().hex,
"name": "bar"},
"public": {"id": uuid.uuid4().hex,
@ -153,6 +155,40 @@ network_links = {
],
}
security_groups = {
tenants["bar"]["id"]: [],
tenants["foo"]["id"]: [],
tenants["baz"]["id"]: [
{
"name": "group1",
"id": uuid.uuid4().hex,
"description": "group one",
"security_group_rules": [
{"ethertype": "IPv4", "port_range_min": 443,
"port_range_max": 443, "remote_ip_prefix": "10.0.0.0/32",
"protocol": "tcp", "direction": "ingress"},
{"ethertype": "IPv4", "port_range_min": "8000",
"port_range_max": 9000, "remote_ip_prefix": "11.0.0.0/24",
"protocol": "udp", "direction": "egress"}
]
},
{
"name": "group2",
"id": uuid.uuid4().hex,
"description": "group two",
"security_group_rules": [
{"ethertype": "IPv4", "port_range_min": 80,
"port_range_max": 80, "remote_ip_prefix": "10.0.0.0/32",
"protocol": "tcp", "direction": "ingress"},
{"ethertype": "IPv4", "port_range_min": "5000",
"port_range_max": 6000, "remote_ip_prefix": "11.0.0.0/24",
"protocol": "udp", "direction": "egress"}
]
}
]
}
def create_fake_json_resp(data, status=200):
r = webob.Response()
@ -201,6 +237,22 @@ def create_header(params, schemes, project=None):
return headers
def create_req_json_occi(params, category, method="POST"):
headers = create_headers(category,
content_type="application/occi+json")
body = {}
for c in category:
body["kind"] = "%s%s" % (
c.scheme, c.term)
body["attributes"] = params
req = webob.Request.blank(path="")
req.headers = headers
req.method = method
req.body = json.dumps(body).encode("utf8")
return wsgi.Request(req.environ)
def create_req_test_occi(params, category):
headers = create_header_occi(params, category)
req = webob.Request.blank(path="")
@ -209,7 +261,7 @@ def create_req_test_occi(params, category):
def create_header_occi(params, category, project=None):
headers = {}
headers = create_headers(category, project)
att = ""
if params is not None:
for k, v in params.items():
@ -218,6 +270,13 @@ def create_header_occi(params, category, project=None):
else:
att = "%s, %s=%s" % (att, k, v)
headers["X_OCCI_Attribute"] = att
return headers
def create_headers(category, content_type=None,
project=None):
headers = {}
if category is not None:
cat = ""
for c in category:
@ -227,6 +286,8 @@ def create_header_occi(params, category, project=None):
headers['Category'] = cat[:-1]
if project is not None:
headers['X_PROJECT_ID'] = project
if content_type is not None:
headers['Content-Type'] = content_type
return headers
@ -376,3 +437,35 @@ def build_occi_nova(network):
for l in links:
result.append(("Link", l))
return result
def build_occi_securitygroup(secgroup):
name = secgroup["title"]
secgroup_id = secgroup["id"]
rules = secgroup["rules"]
summary = secgroup["summary"]
app_url = application_url
cats = []
cats.append('securitygroup; '
'scheme='
'"http://schemas.ogf.org/occi/infrastructure#";'
' class="kind"; title="securitygroup resource";'
' rel='
'"http://schemas.ogf.org/occi/core#resource";'
' location="%s/securitygroup/"' % app_url)
links = []
attrs = [
'occi.core.id="%s"' % secgroup_id,
'occi.core.title="%s"' % name,
'occi.core.summary="%s"' % summary,
'occi.securitygroup.rules="%s"' % json.dumps(rules).replace('"', "'"),
]
result = []
for c in cats:
result.append(("Category", c))
for a in attrs:
result.append(("X-OCCI-Attribute", a))
for l in links:
result.append(("Link", l))
return result

View File

@ -17,8 +17,10 @@ import uuid
import mock
from ooi.api import helpers
from ooi.api import helpers_neutron
from ooi import exception
from ooi.openstack import helpers as openstack_helper
from ooi.tests import base
from ooi.tests import fakes_network as fakes
from ooi import utils
@ -593,3 +595,80 @@ class TestNetOpenStackHelper(base.TestCase):
self.helper.delete_port,
None,
iface)
@mock.patch.object(helpers_neutron.OpenStackNeutron, "create_resource")
@mock.patch.object(helpers.BaseHelper, "tenant_from_req")
def test_create_security_groups(self, m_tenant, m_create):
tenant_id = fakes.tenants["baz"]["id"]
sec_group = fakes.security_groups[tenant_id][0]
expected = openstack_helper.build_security_group_from_neutron(
[sec_group])[0]
m_tenant.return_value = uuid.uuid4().hex
group_info = {"name": sec_group["name"], "id": sec_group["id"],
"description": sec_group["description"]}
rules_out_1 = sec_group["security_group_rules"][0]
rules_out_2 = sec_group["security_group_rules"][1]
m_create.side_effect = [group_info, rules_out_1, rules_out_2]
ret = self.helper.create_security_group(None, expected["title"],
expected["summary"],
expected["rules"])
self.assertEqual(expected, ret)
self.assertEqual(3, m_create.call_count)
@mock.patch.object(helpers_neutron.OpenStackNeutron, "list_resources")
def test_list_security_group(self, m_list):
tenant_id = fakes.tenants["baz"]["id"]
sec_group = fakes.security_groups[tenant_id]
expected = openstack_helper.build_security_group_from_neutron(
sec_group)
m_list.return_value = sec_group
ret = self.helper.list_security_groups(None)
self.assertEqual(2, ret.__len__())
self.assertEqual(expected, ret)
m_list.assert_called_with(None, 'security-groups',
response_resource="security_groups")
@mock.patch.object(helpers_neutron.OpenStackNeutron, "list_resources")
def test_list_security_group_empty(self, m_list):
tenant_id = fakes.tenants["bar"]["id"]
m_list.return_value = fakes.security_groups[tenant_id]
ret = self.helper.list_security_groups(None)
self.assertEqual(0, ret.__len__())
m_list.assert_called_with(None, 'security-groups',
response_resource="security_groups")
@mock.patch.object(helpers_neutron.OpenStackNeutron, "get_resource")
def test_show_security_group(self, m_list):
tenant_id = fakes.tenants["baz"]["id"]
sec_group = fakes.security_groups[tenant_id][0]
expected = openstack_helper.build_security_group_from_neutron(
[sec_group])[0]
list_sec = sec_group
m_list.return_value = list_sec
ret = self.helper.get_security_group_details(None, None)
self.assertEqual(expected, ret)
m_list.assert_called_with(None, 'security-groups', None,
response_resource="security_group")
@mock.patch.object(helpers_neutron.OpenStackNeutron, "get_resource")
def test_show_security_group_not_found(self, m_list):
m_list.return_value = []
self.assertRaises(exception.NotFound,
self.helper.get_security_group_details,
None,
None)
@mock.patch.object(helpers_neutron.OpenStackNeutron, "delete_resource")
def test_delete_security_group(self, m_list):
m_list.return_value = None
ret = self.helper.delete_security_group(None, None)
self.assertIsNone(ret)
m_list.assert_called_with(None, 'security-groups', None)
@mock.patch.object(helpers_neutron.OpenStackNeutron, "delete_resource")
def test_delete_security_group_not_found(self, m_list):
m_list.side_effect = exception.OCCIException()
self.assertRaises(exception.NotFound,
self.helper.delete_security_group,
None,
None)

View File

@ -18,8 +18,10 @@ import uuid
import mock
from ooi.api import helpers
from ooi.openstack import helpers as os_helpers
from ooi.tests import base
from ooi.tests import fakes_network as fakes
from ooi.tests import fakes as fakes_nova
from ooi.tests import fakes_network
from ooi import utils
@ -33,10 +35,12 @@ class TestNovaNetOpenStackHelper(base.TestCase):
@mock.patch.object(helpers.OpenStackHelper, "tenant_from_req")
def test_list_networks_with_public(self, m_t, m_rq):
id = uuid.uuid4().hex
resp = fakes.create_fake_json_resp({"networks": [{"id": id}]}, 200)
resp = fakes_network.create_fake_json_resp(
{"networks": [{"id": id}]},
200)
req_mock = mock.MagicMock()
req_mock.get_response.return_value = resp
resp_float = fakes.create_fake_json_resp(
resp_float = fakes_network.create_fake_json_resp(
{"floating_ip_pools": [{"id": id}]}, 200
)
req_mock_float = mock.MagicMock()
@ -49,10 +53,11 @@ class TestNovaNetOpenStackHelper(base.TestCase):
@mock.patch.object(helpers.OpenStackHelper, "tenant_from_req")
def test_list_networks_with_no_public(self, m_t, m_rq):
id = uuid.uuid4().hex
resp = fakes.create_fake_json_resp({"networks": [{"id": id}]}, 200)
resp = fakes_network.create_fake_json_resp(
{"networks": [{"id": id}]}, 200)
req_mock = mock.MagicMock()
req_mock.get_response.return_value = resp
resp_float = fakes.create_fake_json_resp(
resp_float = fakes_network.create_fake_json_resp(
{"floating_ip_pools": []}, 204
)
req_mock_float = mock.MagicMock()
@ -67,8 +72,9 @@ class TestNovaNetOpenStackHelper(base.TestCase):
id = uuid.uuid4().hex
tenant_id = uuid.uuid4().hex
m_t.return_value = tenant_id
resp = fakes.create_fake_json_resp({"networks": [{"id": id}]}, 200)
resp_float = fakes.create_fake_json_resp(
resp = fakes_network.create_fake_json_resp(
{"networks": [{"id": id}]}, 200)
resp_float = fakes_network.create_fake_json_resp(
{"floating_ip_pools": [{"id": id}]}, 200
)
req_mock = mock.MagicMock()
@ -103,7 +109,7 @@ class TestNovaNetOpenStackHelper(base.TestCase):
label = "network11"
tenant_id = uuid.uuid4().hex
m_t.return_value = tenant_id
resp = fakes.create_fake_json_resp(
resp = fakes_network.create_fake_json_resp(
{"network": {"id": id, "label": label,
"cidr": address,
"gateway": gateway}}, 200
@ -134,7 +140,7 @@ class TestNovaNetOpenStackHelper(base.TestCase):
"cidr": cidr,
"gateway": gateway
}
resp = fakes.create_fake_json_resp(
resp = fakes_network.create_fake_json_resp(
{"network": {"id": net_id, "label": name,
"cidr": cidr,
"gateway": gateway}}, 200
@ -173,4 +179,250 @@ class TestNovaNetOpenStackHelper(base.TestCase):
m_rq.assert_called_with(
None, method="DELETE",
path="/%s/os-networks/%s" % (tenant_id, net_id),
)
)
@mock.patch.object(helpers.OpenStackHelper, "_get_req")
@mock.patch.object(helpers.OpenStackHelper, "tenant_from_req")
def test_list_security_groups(self, m_t, m_rq):
tenant_id = fakes_nova.tenants["baz"]["id"]
m_t.return_value = tenant_id
sc_groups = fakes_nova.security_groups[tenant_id]
resp = fakes_network.create_fake_json_resp(
{"security_groups": sc_groups}, 200)
req_mock = mock.MagicMock()
req_mock.get_response.return_value = resp
m_rq.side_effect = [req_mock]
ret = self.helper.list_security_groups(None)
cont = 0
for sc in sc_groups:
self.assertEqual(sc['id'], ret[cont]['id'])
cont = cont + 1
self.assertEqual(
{'method': 'GET',
'path': '/%s/os-security-groups' % (tenant_id)},
m_rq.call_args_list[0][1]
)
@mock.patch.object(helpers.OpenStackHelper, "_get_req")
@mock.patch.object(helpers.OpenStackHelper, "tenant_from_req")
def test_delete_security_groups(self, m_t, m_rq):
tenant_id = fakes_nova.tenants["baz"]["id"]
m_t.return_value = tenant_id
sc_id = fakes_nova.security_groups[tenant_id][0]['id']
req_mock = mock.MagicMock()
req_mock.get_response.return_value = []
m_rq.return_value = req_mock
ret = self.helper.delete_security_group(None, sc_id)
self.assertEqual(ret, [])
m_rq.assert_called_with(
None, method="DELETE",
path="/%s/os-security-groups/%s" % (tenant_id, sc_id),
)
@mock.patch.object(helpers.OpenStackHelper, "_get_req")
@mock.patch.object(helpers.OpenStackHelper, "tenant_from_req")
def test_get_security_group(self, m_t, m_rq):
tenant_id = fakes_nova.tenants["baz"]["id"]
m_t.return_value = tenant_id
sc_group = fakes_nova.security_groups[tenant_id][0]
id = sc_group['id']
m_t.return_value = tenant_id
resp = fakes_network.create_fake_json_resp(
{"security_group": sc_group}, 200
)
req_mock = mock.MagicMock()
req_mock.get_response.return_value = resp
m_rq.return_value = req_mock
ret = self.helper.get_security_group_details(None, id)
self.assertEqual(sc_group['id'], ret["id"])
self.assertEqual(sc_group['description'], ret["summary"])
occi_os_group = os_helpers.build_security_group_from_nova(
fakes_nova.security_groups[tenant_id]
)[0]
cont = 0
for r in ret["rules"]:
self.assertEqual(
occi_os_group['rules'][cont]['protocol'], r["protocol"])
self.assertEqual(
occi_os_group['rules'][cont]['range'], r["range"])
self.assertEqual(
occi_os_group['rules'][cont]['port'], r["port"])
self.assertEqual(
occi_os_group['rules'][cont]['type'], r["type"])
cont += 1
m_rq.assert_called_with(
None, method="GET",
path="/%s/os-security-groups/%s" % (tenant_id, id),
)
@mock.patch.object(helpers.OpenStackHelper, "_get_req")
@mock.patch.object(helpers.OpenStackHelper, "tenant_from_req")
def test_create_security_group(self, m_t, m_rq):
tenant_id = fakes_nova.tenants["baz"]["id"]
m_t.return_value = tenant_id
sc_group = fakes_nova.security_groups[tenant_id][0]
occi_os_group = os_helpers.build_security_group_from_nova(
fakes_nova.security_groups[tenant_id]
)[0]
resp = fakes_network.create_fake_json_resp(
{"security_group": sc_group}, 200
)
req_mock = mock.MagicMock()
req_mock.get_response.return_value = resp
resp_rule1 = fakes_network.create_fake_json_resp(
{"security_group_rule": sc_group['rules'][0]}, 200
)
req_mock_rule1 = mock.MagicMock()
req_mock_rule1.get_response.return_value = resp_rule1
resp_rule2 = fakes_network.create_fake_json_resp(
{"security_group_rule": sc_group['rules'][1]}, 200
)
req_mock_rule2 = mock.MagicMock()
req_mock_rule2.get_response.return_value = resp_rule2
m_rq.side_effect = [req_mock, req_mock_rule1, req_mock_rule2]
ret = self.helper.create_security_group(
None,
name=occi_os_group['title'],
description=occi_os_group['summary'],
rules=occi_os_group['rules']
)
cont = 0
for r in ret["rules"]:
self.assertEqual(
occi_os_group['rules'][cont]['protocol'], r["protocol"]
)
self.assertEqual(
occi_os_group['rules'][cont]['range'], r["range"]
)
self.assertEqual(
occi_os_group['rules'][cont]['port'], r["port"])
self.assertEqual(
occi_os_group['rules'][cont]['type'], r["type"])
cont += 1
@mock.patch.object(helpers.OpenStackHelper, "_get_req")
@mock.patch.object(helpers.OpenStackHelper, "tenant_from_req")
def test_get_server_security_group(self, mock_tenant, mock_get):
tenant_id = fakes_nova.tenants["baz"]["id"]
server_id = uuid.uuid4().hex
sc_group = fakes_nova.security_groups[tenant_id]
mock_tenant.return_value = tenant_id
resp = fakes_network.create_fake_json_resp(
{"security_groups": sc_group}, 200
)
req_mock = mock.MagicMock()
req_mock.get_response.return_value = resp
mock_get.return_value = req_mock
ret = self.helper._get_server_security_group(None, server_id)
segroup = os_helpers.build_security_group_from_nova(
sc_group
)
cont = 0
for s in segroup:
self.assertEqual(s, ret[cont])
cont += 1
mock_get.assert_called_with(
None, method="GET",
path="/%s/servers/%s/os-security-groups" % (tenant_id,
server_id),
)
@mock.patch.object(helpers.OpenStackHelper, "index")
@mock.patch.object(helpers.OpenStackHelper, "_get_server_security_group")
def test_list_server_security_links(self, mock_get, mock_list):
tenant_id = fakes_nova.tenants["baz"]["id"]
servers = fakes_nova.servers[tenant_id]
mock_list.return_value = servers
sg = fakes_nova.security_groups[tenant_id]
segroup = os_helpers.build_security_group_from_nova(sg)[0]
mock_get.return_value = [segroup]
ret = self.helper.list_server_security_links(None)
cont = 0
for server in servers:
self.assertEqual(server["id"],
ret[cont]['compute_id'])
self.assertEqual(segroup["title"],
ret[cont]['securitygroup']["title"])
cont += 1
@mock.patch.object(helpers.OpenStackHelper, "_get_server_security_group")
def test_get_server_security_link(self, mock_get):
tenant_id = fakes_nova.tenants["baz"]["id"]
server_id = uuid.uuid4().hex
sg = fakes_nova.security_groups[tenant_id]
segroup = os_helpers.build_security_group_from_nova(sg)[0]
mock_get.return_value = [segroup]
ret = self.helper.get_server_security_link(None, server_id,
segroup["id"])
self.assertEqual(server_id,
ret[0]['compute_id'])
self.assertEqual(segroup["title"],
ret[0]['securitygroup']["title"])
@mock.patch.object(helpers.OpenStackHelper, "_get_req")
@mock.patch.object(helpers.OpenStackHelper, "tenant_from_req")
def test_delete_server_security_link(self, mock_tenant, mock_req):
tenant_id = fakes_nova.tenants["baz"]["id"]
server_id = uuid.uuid4().hex
sg_name = "baz"
mock_tenant.return_value = tenant_id
sc_group = fakes_nova.security_groups[tenant_id][0]
sg_name = sc_group["name"]
resp_get = fakes_network.create_fake_json_resp(
{"security_group": sc_group}, 200
)
req_mock_get = mock.MagicMock()
req_mock_get.get_response.return_value = resp_get
resp_cre = fakes_network.create_fake_json_resp(
{}, 204
)
req_mock_del = mock.MagicMock()
req_mock_del.get_response.return_value = resp_cre
mock_req.side_effect = [req_mock_get, req_mock_del]
ret = self.helper.delete_server_security_link(None,
server_id,
sg_name)
self.assertEqual([], ret)
mock_req.assert_called_with(
None, method="POST",
path="/%s/servers/%s/action" % (tenant_id,
server_id),
body='{"removeSecurityGroup": {"name": "%s"}}' % sg_name,
content_type='application/json'
)
@mock.patch.object(helpers.OpenStackHelper, "_get_req")
@mock.patch.object(helpers.OpenStackHelper, "tenant_from_req")
def test_create_server_security_link(self, mock_tenant, mock_req):
tenant_id = fakes_nova.tenants["baz"]["id"]
server_id = uuid.uuid4().hex
sg_id = "baz"
mock_tenant.return_value = tenant_id
sc_group = fakes_nova.security_groups[tenant_id][0]
resp_get = fakes_network.create_fake_json_resp(
{"security_group": sc_group}, 200
)
req_mock_get = mock.MagicMock()
req_mock_get.get_response.return_value = resp_get
resp_create = fakes_network.create_fake_json_resp(
{}, 204
)
req_mock_cre = mock.MagicMock()
req_mock_cre.get_response.return_value = resp_create
mock_req.side_effect = [req_mock_get, req_mock_cre]
ret = self.helper.create_server_security_link(None,
server_id,
sg_id)
self.assertEqual([], ret)
sg_name = sc_group["name"]
mock_req.assert_called_with(
None, method="POST",
path="/%s/servers/%s/action" % (tenant_id,
server_id),
body='{"addSecurityGroup": {"name": "%s"}}' % sg_name,
content_type='application/json'
)

View File

@ -0,0 +1,149 @@
# Copyright 2015 LIP - INDIGO-DataCloud
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from ooi.api import helpers_neutron
from ooi.api import securitygroup as security_group_api
from ooi import exception
from ooi.occi.infrastructure import securitygroup as occi_security_group
from ooi.openstack import helpers as openstack_helper
from ooi.tests import base
from ooi.tests import fakes_network as fakes
class TestSecurityGroupControllerNeutron(base.TestController):
def setUp(self):
super(TestSecurityGroupControllerNeutron, self).setUp()
self.controller = security_group_api.Controller(
neutron_ooi_endpoint="ff")
@mock.patch.object(helpers_neutron.OpenStackNeutron,
"list_security_groups")
def test_list_security_group(self, m_list):
tenant_id = fakes.tenants["baz"]["id"]
sec_group = openstack_helper.build_security_group_from_neutron(
fakes.security_groups[tenant_id]
)
req = fakes.create_req_test(None, None)
m_list.return_value = sec_group
result = self.controller.index(req)
expected = self.controller._get_security_group_resources(sec_group)
self.assertEqual(result.resources.__len__(),
expected.__len__())
for r in result.resources:
self.assertIsInstance(r, occi_security_group.SecurityGroupResource)
m_list.assert_called_with(req)
@mock.patch.object(helpers_neutron.OpenStackNeutron,
"list_security_groups")
def test_list_security_group_empty(self, m_list):
tenant_id = fakes.tenants["foo"]["id"]
sec_group = openstack_helper.build_security_group_from_neutron(
fakes.security_groups[tenant_id]
)
req = fakes.create_req_test(None, None)
m_list.return_value = sec_group
result = self.controller.index(req)
self.assertEqual(result.resources.__len__(), 0)
@mock.patch.object(helpers_neutron.OpenStackNeutron,
"get_security_group_details")
def test_show_security_group(self, m_list):
tenant_id = fakes.tenants["baz"]["id"]
sec_group = openstack_helper.build_security_group_from_neutron(
[fakes.security_groups[tenant_id][0]]
)
req = fakes.create_req_test(None, None)
m_list.return_value = sec_group[0]
result = self.controller.show(req, None)
expected = self.controller._get_security_group_resources(sec_group)[0]
self.assertIsInstance(
result,
occi_security_group.SecurityGroupResource)
self.assertEqual(result, expected)
m_list.assert_called_with(req, None)
@mock.patch.object(helpers_neutron.OpenStackNeutron, "get_resource")
def test_show_security_group_not_found(self, m_list):
tenant_id = fakes.tenants["baz"]["id"]
sec_group = openstack_helper.build_security_group_from_neutron(
fakes.security_groups[tenant_id]
)
m_list.return_value = sec_group
req = fakes.create_req_test(None, None)
self.assertRaises(exception.NotFound,
self.controller.show,
req,
None)
@mock.patch.object(helpers_neutron.OpenStackNeutron, "delete_resource")
def test_delete_security_group(self, m_list):
m_list.return_value = None
ret = self.controller.delete(None, None)
self.assertIsNone(ret)
m_list.assert_called_with(None, 'security-groups', None)
@mock.patch.object(helpers_neutron.OpenStackNeutron,
"delete_security_group")
def test_delete_security_group_not_found(self, m_list):
m_list.side_effect = exception.NotFound
req = fakes.create_req_test(None, None)
self.assertRaises(exception.NotFound,
self.controller.delete,
req,
None)
@mock.patch.object(helpers_neutron.OpenStackNeutron,
"create_security_group")
def test_create_security_groups(self, m_create):
tenant_id = fakes.tenants["baz"]["id"]
sec_group = openstack_helper.build_security_group_from_neutron(
fakes.security_groups[tenant_id]
)[0]
params = {"occi.core.title": sec_group["title"],
"occi.securitygroup.rules": sec_group["rules"],
"occi.core.summary": sec_group["summary"]
}
categories = {occi_security_group.SecurityGroupResource.kind}
req = fakes.create_req_json_occi(params, categories)
m_create.return_value = sec_group
ret = self.controller.create(req, params)
expected = self.controller._get_security_group_resources(
[sec_group])
self.assertIsInstance(ret.resources[0],
occi_security_group.SecurityGroupResource)
self.assertEqual(expected[0], ret.resources[0])
m_create.assert_called_with(req, sec_group["title"],
sec_group["summary"],
sec_group["rules"])
def test_create_error(self):
test_networks = fakes.networks[fakes.tenants["foo"]["id"]]
schema1 = occi_security_group.SecurityGroupResource.kind.scheme
net = test_networks[0]
schemes = {schema1: net}
parameters = {"occi.core.title": "name"}
req = fakes.create_req_test(parameters, schemes)
self.assertRaises(exception.Invalid, self.controller.create, req)
def test_create_invalid_param_rule(self):
params = {"occi.core.title": "group",
"occi.securitygroup.rules": "{'wrong': 'value'}]"
}
categories = {occi_security_group.SecurityGroupResource.kind}
req = fakes.create_req_test_occi(params, categories)
self.assertRaises(exception.Invalid, self.controller.create, req)

View File

@ -0,0 +1,104 @@
# Copyright 2015 Spanish National Research Council
# Copyright 2015 LIP - INDIGO-DataCloud
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import uuid
import mock
from ooi.api import helpers
from ooi.api import securitygroup_link as securitygroup_link_api
from ooi.occi.core import collection
from ooi.occi.infrastructure import securitygroup_link
from ooi.openstack import helpers as os_helpers
from ooi.tests import base
from ooi.tests import fakes as fakes_nova
class TestNetworkLinkController(base.TestController):
def setUp(self):
super(TestNetworkLinkController, self).setUp()
self.controller = securitygroup_link_api.Controller(
mock.MagicMock(), None)
@mock.patch.object(helpers.OpenStackHelper, "list_server_security_links")
def test_index(self, mock_list):
tenant_id = fakes_nova.tenants['bar']["id"]
servers = fakes_nova.servers[tenant_id]
sg = fakes_nova.security_groups[tenant_id]
segroup = os_helpers.build_security_group_from_nova(sg)[0]
links = []
for server in servers:
link = {
"compute_id": server["id"],
"securitygroup": segroup
}
links.append(link)
mock_list.return_value = links
ret = self.controller.index(None)
self.assertIsInstance(ret, collection.Collection)
@mock.patch.object(helpers.OpenStackHelper, "get_server_security_link")
@mock.patch.object(helpers.OpenStackHelper, "list_security_groups")
def test_show(self, mock_list, mock_get):
tenant_id = fakes_nova.tenants['baz']["id"]
server = fakes_nova.servers[tenant_id][0]
server_id = server['id']
secgroup_name = server['security_groups'][0]["name"]
link_id = '%s_%s' % (server_id, secgroup_name)
sec_group = os_helpers.build_security_group_from_nova(
fakes_nova.security_groups[tenant_id]
)
link = {
"compute_id": server_id,
"securitygroup": sec_group[0]
}
mock_get.return_value = [link]
mock_list.return_value = sec_group
ret = self.controller.show(None, link_id)
self.assertIsInstance(ret, securitygroup_link.SecurityGroupLink)
@mock.patch.object(helpers.OpenStackHelper, "delete_server_security_link")
def test_delete(self, mock_del):
tenant_id = fakes_nova.tenants['baz']["id"]
server = fakes_nova.servers[tenant_id][0]
server_id = server['id']
secgroup_name = server['security_groups'][0]["name"]
link_id = '%s_%s' % (server_id, secgroup_name)
mock_del.return_value = []
ret = self.controller.delete(None, link_id)
self.assertEqual([], ret)
@mock.patch.object(helpers.OpenStackHelper, "create_server_security_link")
@mock.patch("ooi.occi.validator.Validator")
@mock.patch("ooi.api.helpers.get_id_with_kind")
def test_create(self, m_get_id, m_validator, m_create):
compute_id = uuid.uuid4().hex
sec_id = uuid.uuid4().hex
obj = {
"attributes": {
"occi.core.target": sec_id,
"occi.core.source": compute_id
}
}
req = self._build_req(uuid.uuid4().hex)
req.get_parser = mock.MagicMock()
req.get_parser.return_value.return_value.parse.return_value = obj
m_validator.validate.return_value = True
m_get_id.side_effect = [('', compute_id), ('', sec_id)]
m_create.return_value = []
ret = self.controller.create(req, None)
link = ret.resources.pop()
self.assertIsInstance(link, securitygroup_link.SecurityGroupLink)