Merge "Security Group support for Nova and Neutron"
This commit is contained in:
commit
98af646b0c
@ -114,6 +114,13 @@ class BaseHelper(object):
|
||||
self.app = app
|
||||
self.openstack_version = openstack_version
|
||||
|
||||
@staticmethod
|
||||
def tenant_from_req(req):
|
||||
try:
|
||||
return req.environ["HTTP_X_PROJECT_ID"]
|
||||
except KeyError:
|
||||
raise exception.Forbidden(reason="Cannot find project ID")
|
||||
|
||||
def _get_req(self, req, method,
|
||||
path=None,
|
||||
content_type="application/json",
|
||||
@ -198,17 +205,21 @@ class BaseHelper(object):
|
||||
return self._get_req(req, path=path,
|
||||
query_string=query_string, method="GET")
|
||||
|
||||
def _make_create_request(self, req, resource, parameters):
|
||||
def _make_create_request(self, req, resource, parameters,
|
||||
resource_object_name=None):
|
||||
"""Create CREATE request
|
||||
|
||||
This method creates a CREATE Request instance
|
||||
|
||||
:param req: the incoming request
|
||||
:param parameters: parameters with values
|
||||
:param resource_object_name: in case resource name is different
|
||||
to the response one.
|
||||
"""
|
||||
path = "/%s" % resource
|
||||
single_resource = resource[:-1]
|
||||
body = utils.make_body(single_resource, parameters)
|
||||
if not resource_object_name:
|
||||
resource_object_name = resource[:-1]
|
||||
body = utils.make_body(resource_object_name, parameters)
|
||||
return self._get_req(req, path=path,
|
||||
content_type="application/json",
|
||||
body=json.dumps(body), method="POST")
|
||||
@ -245,13 +256,6 @@ class OpenStackHelper(BaseHelper):
|
||||
}
|
||||
}
|
||||
|
||||
@staticmethod
|
||||
def tenant_from_req(req):
|
||||
try:
|
||||
return req.environ["HTTP_X_PROJECT_ID"]
|
||||
except KeyError:
|
||||
raise exception.Forbidden(reason="Cannot find project ID")
|
||||
|
||||
def _get_index_req(self, req):
|
||||
tenant_id = self.tenant_from_req(req)
|
||||
path = "/%s/servers" % tenant_id
|
||||
@ -1009,3 +1013,237 @@ class OpenStackHelper(BaseHelper):
|
||||
os_req = self._get_req(req, path=path, method="DELETE")
|
||||
os_req.get_response(self.app)
|
||||
return []
|
||||
|
||||
def _get_security_group(self, req, sec_id):
|
||||
"""Retrieve info about a security group.
|
||||
|
||||
:param req: the incoming request
|
||||
:param sec_id: security group id to show
|
||||
"""
|
||||
path = "os-security-groups"
|
||||
tenant_id = self.tenant_from_req(req)
|
||||
path = "/%s/%s/%s" % (tenant_id, path, sec_id)
|
||||
os_req = self._get_req(req, path=path,
|
||||
method="GET")
|
||||
response = os_req.get_response(self.app)
|
||||
return self.get_from_response(response, "security_group", [])
|
||||
|
||||
def get_security_group_details(self, req, sec_id):
|
||||
"""Get details about a security group.
|
||||
|
||||
:param req: the incoming request
|
||||
:param sec_id: security group id to show
|
||||
"""
|
||||
net = self._get_security_group(req, sec_id)
|
||||
ooi_sec = os_helpers.build_security_group_from_nova([net])
|
||||
return ooi_sec[0]
|
||||
|
||||
def list_security_groups(self, req):
|
||||
"""List security groups
|
||||
|
||||
:param req: the incoming request
|
||||
"""
|
||||
path = "os-security-groups"
|
||||
tenant_id = self.tenant_from_req(req)
|
||||
path = "/%s/%s" % (tenant_id, path)
|
||||
os_req = self._get_req(req, path=path,
|
||||
method="GET")
|
||||
response = os_req.get_response(self.app)
|
||||
sec = self.get_from_response(response, "security_groups", [])
|
||||
ooi_sec = os_helpers.build_security_group_from_nova(sec)
|
||||
return ooi_sec
|
||||
|
||||
def create_security_group(self, req, name, description, rules):
|
||||
"""Create security group
|
||||
|
||||
:param req: the incoming request
|
||||
:param name: security group name
|
||||
:param description: security group description
|
||||
:param rules: security group rules
|
||||
"""
|
||||
try:
|
||||
tenant_id = self.tenant_from_req(req)
|
||||
path = "os-security-groups"
|
||||
path = "/%s/%s" % (tenant_id, path)
|
||||
param_group = {
|
||||
"description": description,
|
||||
"name": name,
|
||||
}
|
||||
body = utils.make_body('security_group', param_group)
|
||||
os_req = self._get_req(req,
|
||||
path=path,
|
||||
content_type="application/json",
|
||||
body=json.dumps(body),
|
||||
method="POST")
|
||||
response_group = os_req.get_response(self.app)
|
||||
secgroup = self.get_from_response(
|
||||
response_group, "security_group", {})
|
||||
sec_id = secgroup["id"]
|
||||
secgroup["rules"] = []
|
||||
for rule in rules:
|
||||
port_min, port_max = os_helpers.security_group_rule_port(
|
||||
rule["port"]
|
||||
)
|
||||
param_rules = {
|
||||
"parent_group_id": sec_id,
|
||||
"ip_protocol": rule["protocol"],
|
||||
"from_port": port_min,
|
||||
"to_port": port_max,
|
||||
"cidr": rule.get("range", "0.0.0.0/0")
|
||||
}
|
||||
body_rules = utils.make_body('security_group_rule',
|
||||
param_rules)
|
||||
path = "/%s/os-security-group-rules" % (tenant_id)
|
||||
os_req_rules = self._get_req(req,
|
||||
path=path,
|
||||
content_type="application/json",
|
||||
body=json.dumps(body_rules),
|
||||
method="POST")
|
||||
response_rules = os_req_rules.get_response(self.app)
|
||||
secrules = self.get_from_response(
|
||||
response_rules, "security_group_rule", {})
|
||||
secgroup["rules"].append(secrules)
|
||||
ooi_sec = os_helpers.build_security_group_from_nova(
|
||||
[secgroup]
|
||||
)
|
||||
return ooi_sec[0]
|
||||
except Exception as ex:
|
||||
raise ex
|
||||
|
||||
def delete_security_group(self, req, sec_id):
|
||||
"""Delete info about a security group.
|
||||
|
||||
:param req: the incoming request
|
||||
:param sec_id: security group id to delete
|
||||
"""
|
||||
path = "os-security-groups"
|
||||
tenant_id = self.tenant_from_req(req)
|
||||
path = "/%s/%s/%s" % (tenant_id, path, sec_id)
|
||||
os_req = self._get_req(req, path=path,
|
||||
method="DELETE")
|
||||
os_req.get_response(self.app)
|
||||
return []
|
||||
|
||||
def _get_server_security_group(self, req, server_id):
|
||||
"""Get security group from a server
|
||||
|
||||
:param req: incoming request
|
||||
:param server_id: server id
|
||||
:return: information about the security group
|
||||
"""
|
||||
path = "os-security-groups"
|
||||
tenant_id = self.tenant_from_req(req)
|
||||
path = "/%s/servers/%s/%s" % (tenant_id,
|
||||
server_id,
|
||||
path
|
||||
)
|
||||
os_req = self._get_req(req, path=path,
|
||||
method="GET")
|
||||
response = os_req.get_response(self.app)
|
||||
sec = self.get_from_response(response,
|
||||
"security_groups", [])
|
||||
ooi_sec = os_helpers.build_security_group_from_nova(sec)
|
||||
return ooi_sec
|
||||
|
||||
def list_server_security_groups(self, req,
|
||||
server_id=None):
|
||||
"""List security groups associated to a server
|
||||
|
||||
:param req: incoming request
|
||||
:param server_id: server id
|
||||
:return: security groups associated to servers
|
||||
"""
|
||||
return self._get_server_security_group(
|
||||
req, server_id)
|
||||
|
||||
def list_server_security_links(self, req, server_id=None):
|
||||
"""List security groups associated to servers
|
||||
|
||||
:param req: incoming request
|
||||
:param server_id: server id
|
||||
:return: security groups associated to servers
|
||||
"""
|
||||
link_list = []
|
||||
if server_id:
|
||||
compute_list = [self.get_server(req, server_id)]
|
||||
else:
|
||||
compute_list = self.index(req)
|
||||
for c in compute_list:
|
||||
server_id = c["id"]
|
||||
server_secgroups = self._get_server_security_group(
|
||||
req, server_id)
|
||||
for sec in server_secgroups:
|
||||
link = {
|
||||
"compute_id": server_id,
|
||||
"securitygroup": sec
|
||||
}
|
||||
link_list.append(link)
|
||||
return link_list
|
||||
|
||||
def get_server_security_link(self, req, server_id,
|
||||
securitygroup_id):
|
||||
"""Show security group link from a server
|
||||
|
||||
:param req: incoming request
|
||||
:param server_id: server id
|
||||
:param securitygroup_id: security group id
|
||||
:return: information about the link of security group
|
||||
"""
|
||||
ooi_sec = self._get_server_security_group(req, server_id)
|
||||
for sg in ooi_sec:
|
||||
if sg["id"] == securitygroup_id:
|
||||
link = {"compute_id": server_id,
|
||||
"securitygroup": sg
|
||||
}
|
||||
return [link]
|
||||
return None
|
||||
|
||||
def delete_server_security_link(self, req, server_id,
|
||||
securitygroup_id):
|
||||
"""Delete security group link from a server
|
||||
|
||||
:param req: incoming request
|
||||
:param server_id: server id
|
||||
:param securitygroup_id: segurity group id
|
||||
:return: empty
|
||||
"""
|
||||
tenant_id = self.tenant_from_req(req)
|
||||
path = "/%s/servers/%s/action" % (tenant_id, server_id)
|
||||
sg = self._get_security_group(req, securitygroup_id)
|
||||
if "name" not in sg:
|
||||
raise exception.NotFound("Security group %s not found."
|
||||
% securitygroup_id)
|
||||
param = {"name": sg["name"]}
|
||||
body = utils.make_body('removeSecurityGroup', param)
|
||||
os_req = self._get_req(req,
|
||||
path=path,
|
||||
content_type="application/json",
|
||||
body=json.dumps(body),
|
||||
method="POST")
|
||||
os_req.get_response(self.app)
|
||||
return []
|
||||
|
||||
def create_server_security_link(self, req, server_id,
|
||||
securitygroup_id):
|
||||
"""Create security group link in a server
|
||||
|
||||
:param req: incoming request
|
||||
:param server_id: server id
|
||||
:param securitygroup_id: segurity group id
|
||||
:return: empty
|
||||
"""
|
||||
tenant_id = self.tenant_from_req(req)
|
||||
path = "/%s/servers/%s/action" % (tenant_id, server_id)
|
||||
sg = self._get_security_group(req, securitygroup_id)
|
||||
if "name" not in sg:
|
||||
raise exception.NotFound("Security group %s not found."
|
||||
% securitygroup_id)
|
||||
param = {"name": sg["name"]}
|
||||
body = utils.make_body('addSecurityGroup', param)
|
||||
os_req = self._get_req(req,
|
||||
path=path,
|
||||
content_type="application/json",
|
||||
body=json.dumps(body),
|
||||
method="POST")
|
||||
os_req.get_response(self.app)
|
||||
return []
|
||||
|
@ -539,3 +539,91 @@ class OpenStackNeutron(helpers.BaseHelper):
|
||||
response = os_req.get_response()
|
||||
if response.status_int != 202:
|
||||
raise helpers.exception_from_response(response)
|
||||
|
||||
def get_security_group_details(self, req, sec_id):
|
||||
"""Get info about a security group.
|
||||
|
||||
:param req: the incoming request
|
||||
:param sec_id: security group id to show
|
||||
"""
|
||||
try:
|
||||
secgroup = self.get_resource(req, 'security-groups', sec_id,
|
||||
response_resource="security_group")
|
||||
ooi_sec = os_helpers.build_security_group_from_neutron(
|
||||
[secgroup]
|
||||
)
|
||||
return ooi_sec[0]
|
||||
except Exception:
|
||||
raise exception.NotFound()
|
||||
|
||||
def list_security_groups(self, req):
|
||||
"""List security groups
|
||||
|
||||
:param req: the incoming request
|
||||
"""
|
||||
try:
|
||||
secgroup = self.list_resources(req, 'security-groups',
|
||||
response_resource="security_groups")
|
||||
ooi_sec = os_helpers.build_security_group_from_neutron(
|
||||
secgroup
|
||||
)
|
||||
return ooi_sec
|
||||
except Exception:
|
||||
raise exception.NotFound()
|
||||
|
||||
def create_security_group(self, req, name, description, rules):
|
||||
"""Create security group
|
||||
|
||||
:param req: the incoming request
|
||||
:param name: security group name
|
||||
:param description: security group description
|
||||
:param rules: security group rules
|
||||
"""
|
||||
try:
|
||||
tenant_id = self.tenant_from_req(req)
|
||||
param_group = {"tenant_id": tenant_id,
|
||||
"description": description,
|
||||
"name": name,
|
||||
}
|
||||
secgroup = self.create_resource(
|
||||
req, 'security-groups', param_group,
|
||||
response_resource="security_group")
|
||||
sec_id = secgroup["id"]
|
||||
secgroup["security_group_rules"] = []
|
||||
for rule in rules:
|
||||
port_min, port_max = os_helpers.security_group_rule_port(
|
||||
rule["port"]
|
||||
)
|
||||
param_rule = {
|
||||
"ethertype": rule.get("ipversion", "IPv4"),
|
||||
"port_range_max": port_max,
|
||||
"port_range_min": port_min,
|
||||
"direction": os_helpers.security_group_rule_type(
|
||||
rule["type"]),
|
||||
"remote_ip_prefix": rule.get("range", "0.0.0.0/0"),
|
||||
"protocol": rule["protocol"],
|
||||
"security_group_id": sec_id,
|
||||
}
|
||||
secrule = self.create_resource(
|
||||
req,
|
||||
'security-group-rules', param_rule,
|
||||
response_resource="security_group_rule")
|
||||
secgroup["security_group_rules"].append(secrule)
|
||||
ooi_sec = os_helpers.build_security_group_from_neutron(
|
||||
[secgroup]
|
||||
)
|
||||
return ooi_sec[0]
|
||||
except Exception as ex:
|
||||
raise ex
|
||||
|
||||
def delete_security_group(self, req, sec_id):
|
||||
"""Delete info about a security group.
|
||||
|
||||
:param req: the incoming request
|
||||
:param sec_id: security group id to delete
|
||||
"""
|
||||
try:
|
||||
secgroup = self.delete_resource(req, 'security-groups', sec_id)
|
||||
return secgroup
|
||||
except Exception:
|
||||
raise exception.NotFound()
|
192
ooi/api/securitygroup.py
Normal file
192
ooi/api/securitygroup.py
Normal file
@ -0,0 +1,192 @@
|
||||
# Copyright 2015 LIP - INDIGO-DataCloud
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from ooi.api import base
|
||||
from ooi.api import helpers
|
||||
from ooi.api import helpers_neutron
|
||||
from ooi import exception
|
||||
from ooi.occi.core import collection
|
||||
from ooi.occi.infrastructure import securitygroup
|
||||
from ooi.occi import validator as occi_validator
|
||||
|
||||
|
||||
def parse_validate_schema(req, scheme=None,
|
||||
required_attr=None):
|
||||
"""Parse attributes and validate scheme
|
||||
|
||||
|
||||
Returns attributes from request
|
||||
If scheme is specified, it validates the OCCI scheme:
|
||||
-Raises exception in case of being invalid
|
||||
|
||||
:param req: request
|
||||
:param: scheme: scheme to validate
|
||||
:param: required_attr: attributes required
|
||||
"""
|
||||
parser = req.get_parser()(req.headers, req.body.decode("utf8"))
|
||||
if scheme:
|
||||
attributes = parser.parse()
|
||||
validator = occi_validator.Validator(attributes)
|
||||
validator.validate(scheme)
|
||||
validator.validate_attributes(required_attr)
|
||||
else:
|
||||
attributes = parser.parse_attributes(req.headers)
|
||||
return attributes
|
||||
|
||||
|
||||
def process_parameters(req, scheme=None,
|
||||
required_attr=None):
|
||||
"""Get attributes from request parameters
|
||||
|
||||
:param req: request
|
||||
:param scheme: scheme to validate
|
||||
:param required_attr: attributes required
|
||||
"""
|
||||
parameters = parse_validate_schema(req, scheme, required_attr)
|
||||
try:
|
||||
attributes = {}
|
||||
if 'X_PROJECT_ID' in req.headers:
|
||||
attributes["X_PROJECT_ID"] = req.headers["X_PROJECT_ID"]
|
||||
if "attributes" in parameters:
|
||||
for k, v in parameters.get("attributes", None).items():
|
||||
attributes[k.strip()] = v
|
||||
if not attributes:
|
||||
attributes = None
|
||||
except Exception:
|
||||
raise exception.Invalid
|
||||
return attributes
|
||||
|
||||
|
||||
class Controller(base.Controller):
|
||||
def __init__(self, app=None, openstack_version=None,
|
||||
neutron_ooi_endpoint=None):
|
||||
"""Security group controller initialization
|
||||
|
||||
:param app: application
|
||||
:param: openstack_version: nova version
|
||||
:param: neutron_ooi_endpoint: This parameter
|
||||
indicates the Neutron endpoint to load the Neutron Helper.
|
||||
If it is None, Nova api is used.
|
||||
"""
|
||||
|
||||
super(Controller, self).__init__(
|
||||
app=app,
|
||||
openstack_version=openstack_version)
|
||||
if neutron_ooi_endpoint:
|
||||
self.os_helper = helpers_neutron.OpenStackNeutron(
|
||||
neutron_ooi_endpoint
|
||||
)
|
||||
else:
|
||||
self.os_helper = helpers.OpenStackHelper(
|
||||
self.app,
|
||||
self.openstack_version
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def _get_security_group_resources(securitygroup_list):
|
||||
"""Create OCCI security group instances from list
|
||||
|
||||
:param securitygroup_list: security group objects
|
||||
provides by the cloud infrastructure
|
||||
:return occi security group list
|
||||
"""
|
||||
occi_securitygroup_resources = []
|
||||
if securitygroup_list:
|
||||
for s in securitygroup_list:
|
||||
s_rules = s['rules']
|
||||
s_id = s["id"]
|
||||
s_name = s["title"]
|
||||
s_summary = s["summary"]
|
||||
s = securitygroup.SecurityGroupResource(title=s_name,
|
||||
id=s_id,
|
||||
rules=s_rules,
|
||||
summary=s_summary)
|
||||
occi_securitygroup_resources.append(s)
|
||||
return occi_securitygroup_resources
|
||||
|
||||
def index(self, req):
|
||||
"""List security groups
|
||||
|
||||
:param req: request object
|
||||
"""
|
||||
occi_sc = self.os_helper.list_security_groups(req)
|
||||
occi_sc_resources = self._get_security_group_resources(
|
||||
occi_sc)
|
||||
|
||||
return collection.Collection(
|
||||
resources=occi_sc_resources)
|
||||
|
||||
def show(self, req, id):
|
||||
"""Get security group details
|
||||
|
||||
:param req: request object
|
||||
:param id: security group identification
|
||||
"""
|
||||
resp = self.os_helper.get_security_group_details(req, id)
|
||||
occi_sc_resources = self._get_security_group_resources(
|
||||
[resp])
|
||||
return occi_sc_resources[0]
|
||||
|
||||
def create(self, req, body=None):
|
||||
"""Create a network instance in the cloud
|
||||
|
||||
:param req: request object
|
||||
:param body: body request (not used)
|
||||
"""
|
||||
scheme = {
|
||||
"category": securitygroup.SecurityGroupResource.kind,
|
||||
}
|
||||
required = ["occi.core.title",
|
||||
"occi.securitygroup.rules"
|
||||
]
|
||||
attributes = process_parameters(req, scheme, required)
|
||||
name = attributes.get('occi.core.title')
|
||||
description = attributes.get("occi.core.summary", "")
|
||||
try:
|
||||
rules = attributes.get('occi.securitygroup.rules')
|
||||
except Exception:
|
||||
raise exception.Invalid(
|
||||
"Bad JSON format for occi.securitygroup.rules: %s"
|
||||
% attributes.get(
|
||||
'occi.securitygroup.rules'))
|
||||
sec = self.os_helper.create_security_group(
|
||||
req, name, description, rules
|
||||
)
|
||||
occi_sec_resources = self._get_security_group_resources([sec])
|
||||
return collection.Collection(
|
||||
resources=occi_sec_resources)
|
||||
|
||||
def delete(self, req, id):
|
||||
"""delete security groups which satisfy the parameters
|
||||
|
||||
:param req: current request
|
||||
:param id: identification
|
||||
"""
|
||||
response = self.os_helper.delete_security_group(req, id)
|
||||
return response
|
||||
|
||||
def run_action(self, req, id, body):
|
||||
"""Run action over the security group
|
||||
|
||||
:param req: current request
|
||||
:param id: security group
|
||||
:param body: body
|
||||
"""
|
||||
action = req.GET.get("action", None)
|
||||
occi_actions = [a.term for a in
|
||||
securitygroup.SecurityGroupResource.actions]
|
||||
|
||||
if action is None or action not in occi_actions:
|
||||
raise exception.InvalidAction(action=action)
|
||||
raise exception.NotImplemented("Security group actions are not implemented")
|
144
ooi/api/securitygroup_link.py
Normal file
144
ooi/api/securitygroup_link.py
Normal file
@ -0,0 +1,144 @@
|
||||
# Copyright 2015 LIP - INDIGO-DataCloud
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from ooi.api import base
|
||||
import ooi.api.helpers
|
||||
from ooi import exception
|
||||
from ooi.occi.core import collection
|
||||
from ooi.occi.infrastructure import compute
|
||||
from ooi.occi.infrastructure import securitygroup
|
||||
from ooi.occi.infrastructure import securitygroup_link
|
||||
from ooi.occi import validator as occi_validator
|
||||
|
||||
|
||||
def _get_security_link_resources(link_list):
|
||||
"""Create OCCI security group instances from a list
|
||||
|
||||
:param link_list: provides by the cloud infrastructure
|
||||
"""
|
||||
occi_secgropu_resources = []
|
||||
if link_list:
|
||||
for l in link_list:
|
||||
compute_id = l['compute_id']
|
||||
sec = l['securitygroup']
|
||||
sec_id = sec.get("id")
|
||||
sec_name = sec.get("title", "")
|
||||
sec_rules = sec.get("rules", [])
|
||||
s = securitygroup.SecurityGroupResource(title=sec_name,
|
||||
id=sec_id,
|
||||
rules=sec_rules)
|
||||
c = compute.ComputeResource(title="Compute",
|
||||
id=compute_id
|
||||
)
|
||||
link = securitygroup_link.SecurityGroupLink(source=c,
|
||||
target=s)
|
||||
occi_secgropu_resources.append(link)
|
||||
return occi_secgropu_resources
|
||||
|
||||
|
||||
class Controller(base.Controller):
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
super(Controller, self).__init__(*args, **kwargs)
|
||||
# TODO(jorgesece): add neutron controller to list securitygroups
|
||||
self.os_helper = ooi.api.helpers.OpenStackHelper(
|
||||
self.app,
|
||||
self.openstack_version
|
||||
)
|
||||
|
||||
def _get_attachment_from_id(self, req, attachment_id):
|
||||
try:
|
||||
server_id, security_id = attachment_id.split('_', 1)
|
||||
return {"server_id": server_id,
|
||||
"securitygroup_id": security_id}
|
||||
except ValueError:
|
||||
raise exception.LinkNotFound(link_id=attachment_id)
|
||||
|
||||
def index(self, req):
|
||||
"""List security group links
|
||||
|
||||
:param req: request object
|
||||
"""
|
||||
link_list = self.os_helper.list_server_security_links(req)
|
||||
occi_link_resources = _get_security_link_resources(link_list)
|
||||
return collection.Collection(resources=occi_link_resources)
|
||||
|
||||
def show(self, req, id):
|
||||
"""Get security group details
|
||||
|
||||
:param req: request object
|
||||
:param id: security group identification
|
||||
"""
|
||||
try:
|
||||
link_info = self._get_attachment_from_id(req, id)
|
||||
server_id = link_info["server_id"]
|
||||
security_name = link_info["securitygroup_id"]
|
||||
link = self.os_helper.get_server_security_link(
|
||||
req, server_id, security_name
|
||||
)
|
||||
occi_instance = _get_security_link_resources(
|
||||
link
|
||||
)[0]
|
||||
return occi_instance
|
||||
except Exception:
|
||||
raise exception.LinkNotFound(link_id=id)
|
||||
|
||||
def create(self, req, body=None):
|
||||
"""Create a security group link
|
||||
|
||||
Creates a link between a server and a securitygroup.
|
||||
|
||||
:param req: request object
|
||||
:param body: body request (not used)
|
||||
"""
|
||||
parser = req.get_parser()(req.headers, req.body)
|
||||
scheme = {
|
||||
"category": securitygroup_link.SecurityGroupLink.kind,
|
||||
}
|
||||
obj = parser.parse()
|
||||
validator = occi_validator.Validator(obj)
|
||||
validator.validate(scheme)
|
||||
attrs = obj.get("attributes", {})
|
||||
_, securitygroup_id = ooi.api.helpers.get_id_with_kind(
|
||||
req,
|
||||
attrs.get("occi.core.target"),
|
||||
securitygroup.SecurityGroupResource.kind)
|
||||
_, server_id = ooi.api.helpers.get_id_with_kind(
|
||||
req,
|
||||
attrs.get("occi.core.source"),
|
||||
compute.ComputeResource.kind)
|
||||
self.os_helper.create_server_security_link(
|
||||
req, server_id,
|
||||
securitygroup_id)
|
||||
link = {"compute_id": server_id,
|
||||
"securitygroup": {"id": securitygroup_id}
|
||||
}
|
||||
occi_instance = _get_security_link_resources([link])
|
||||
return collection.Collection(resources=occi_instance)
|
||||
|
||||
def delete(self, req, id):
|
||||
"""Delete security group link
|
||||
|
||||
:param req: current request
|
||||
:param id: identification
|
||||
"""
|
||||
link_info = self._get_attachment_from_id(req, id)
|
||||
server_id = link_info["server_id"]
|
||||
security_id = link_info["securitygroup_id"]
|
||||
try:
|
||||
self.os_helper.delete_server_security_link(
|
||||
req, server_id, security_id)
|
||||
except Exception:
|
||||
raise exception.LinkNotFound(link_id=id)
|
||||
return []
|
@ -46,3 +46,118 @@ def network_status(neutron_status):
|
||||
return "active"
|
||||
else:
|
||||
return "inactive"
|
||||
|
||||
|
||||
def security_group_rule_type(neutron_type):
|
||||
"""Translate neutron rule type.
|
||||
|
||||
Translate to/from openstack - occi
|
||||
|
||||
:param neutron_type: neutron status
|
||||
"""
|
||||
if neutron_type == "ingress":
|
||||
return "inbound"
|
||||
elif neutron_type == "egress":
|
||||
return "outbound"
|
||||
elif neutron_type == "inbound":
|
||||
return "ingress"
|
||||
elif neutron_type == "outbound":
|
||||
return "egress"
|
||||
else:
|
||||
return None
|
||||
|
||||
|
||||
def security_group_rule_port(os_port):
|
||||
"""Translate openstack rule port
|
||||
|
||||
Translate to/from openstack - occi
|
||||
|
||||
:param neutron_type: neutron status
|
||||
"""
|
||||
ports = str(os_port).split('-')
|
||||
if ports.__len__() == 1:
|
||||
port_min = port_max = ports[0]
|
||||
elif ports.__len__() == 2:
|
||||
port_min, port_max = ports
|
||||
else:
|
||||
raise Exception("Port value")
|
||||
return port_min, port_max
|
||||
|
||||
|
||||
def build_security_group_from_neutron(sec_groups):
|
||||
"""Translate neutron security group
|
||||
|
||||
Translate to the ooi a standard security group format.
|
||||
|
||||
:param sec_groups: array of security groups
|
||||
"""
|
||||
sec_list = []
|
||||
for sec in sec_groups:
|
||||
ooi_sec = {}
|
||||
rules_list = []
|
||||
ooi_sec["id"] = sec["id"]
|
||||
ooi_sec["title"] = sec.get("name", None)
|
||||
ooi_sec["summary"] = sec.get("description", "")
|
||||
for rule in sec["security_group_rules"]:
|
||||
ipversion = rule.get("ethertype", "IPv4")
|
||||
rule_type = security_group_rule_type(
|
||||
rule["direction"]
|
||||
)
|
||||
rule_protocol = rule.get("protocol", None)
|
||||
port_min = rule["port_range_min"]
|
||||
port_max = rule["port_range_max"]
|
||||
if port_min and (port_min != port_max):
|
||||
rule_port = "%s-%s" % (port_min,
|
||||
port_max
|
||||
)
|
||||
else:
|
||||
rule_port = port_min
|
||||
rule_range = str(rule["remote_ip_prefix"])
|
||||
rules_list.append({"type": rule_type,
|
||||
"protocol": rule_protocol,
|
||||
"port": rule_port,
|
||||
"range": rule_range,
|
||||
"ipversion": ipversion}
|
||||
)
|
||||
ooi_sec["rules"] = rules_list
|
||||
sec_list.append(ooi_sec)
|
||||
return sec_list
|
||||
|
||||
|
||||
def build_security_group_from_nova(sec_groups):
|
||||
"""Translate nova security group
|
||||
|
||||
Translate to the ooi a standard security group format.
|
||||
|
||||
:param sec_groups: array of security groups
|
||||
"""
|
||||
sec_list = []
|
||||
for sec in sec_groups:
|
||||
ooi_sec = {}
|
||||
rules_list = []
|
||||
ooi_sec["id"] = sec["id"]
|
||||
ooi_sec["title"] = sec.get("name", None)
|
||||
ooi_sec["summary"] = sec.get("description", "")
|
||||
for rule in sec["rules"]:
|
||||
ipversion = "IPv4"
|
||||
rule_protocol = rule.get("ip_protocol", None)
|
||||
port_min = rule["from_port"]
|
||||
port_max = rule["to_port"]
|
||||
if port_min and (port_min != port_max):
|
||||
rule_port = "%s-%s" % (port_min,
|
||||
port_max
|
||||
)
|
||||
else:
|
||||
rule_port = port_min
|
||||
rule_range = str(rule["ip_range"].get("cidr", ""))
|
||||
# BUG(jorgesce): type is alwayns inbound because nova
|
||||
# does not provide that information.
|
||||
rules_list.append({"type": "inbound",
|
||||
"protocol": rule_protocol,
|
||||
"port": rule_port,
|
||||
"range": rule_range,
|
||||
"ipversion": ipversion}
|
||||
)
|
||||
ooi_sec["rules"] = rules_list
|
||||
sec_list.append(ooi_sec)
|
||||
return sec_list
|
@ -194,6 +194,10 @@ servers = {
|
||||
"flavor": {"id": flavors[1]["id"]},
|
||||
"image": {"id": images["foo"]["id"]},
|
||||
"status": "ACTIVE",
|
||||
"security_groups":[
|
||||
{"name": "group1"},
|
||||
{"name": "group2"}
|
||||
]
|
||||
},
|
||||
{
|
||||
"id": uuid.uuid4().hex,
|
||||
@ -201,6 +205,9 @@ servers = {
|
||||
"flavor": {"id": flavors[2]["id"]},
|
||||
"image": {"id": images["bar"]["id"]},
|
||||
"status": "SHUTOFF",
|
||||
"security_groups":[
|
||||
{"name": "group1"}
|
||||
]
|
||||
},
|
||||
{
|
||||
"id": uuid.uuid4().hex,
|
||||
@ -208,6 +215,9 @@ servers = {
|
||||
"flavor": {"id": flavors[1]["id"]},
|
||||
"image": {"id": images["bar"]["id"]},
|
||||
"status": "ERROR",
|
||||
"security_groups":[
|
||||
{"name": "group2"}
|
||||
]
|
||||
},
|
||||
],
|
||||
tenants["bar"]["id"]: [],
|
||||
@ -237,7 +247,10 @@ servers = {
|
||||
"OS-EXT-IPS:type": "floating",
|
||||
"OS-EXT-IPS-MAC:mac_addr": "1234"},
|
||||
]
|
||||
}
|
||||
},
|
||||
"security_groups":[
|
||||
{"name": "group1"}
|
||||
]
|
||||
}
|
||||
],
|
||||
}
|
||||
@ -265,6 +278,54 @@ volumes[tenants["baz"]["id"]][1]["attachments"] = [{
|
||||
"id": volumes[tenants["baz"]["id"]][0]["id"],
|
||||
}]
|
||||
|
||||
security_groups = {
|
||||
tenants["foo"]["id"]: [],
|
||||
tenants["baz"]["id"]: [
|
||||
{
|
||||
"name": "group1",
|
||||
"id": uuid.uuid4().hex,
|
||||
"description": "group one",
|
||||
"rules": [
|
||||
{"from_port": 443,
|
||||
"to_port": 443, "ip_range": {"cidr": "10.0.0.0/32"},
|
||||
"ip_protocol": "tcp"},
|
||||
{"from_port": "1000",
|
||||
"to_port": 2000, "ip_range": {"cidr": "11.0.0.0/32"},
|
||||
"ip_protocol": "udp"},
|
||||
]
|
||||
},
|
||||
{
|
||||
"name": "group2",
|
||||
"id": uuid.uuid4().hex,
|
||||
"description": "group two",
|
||||
"rules": [
|
||||
{"from_port": 80,
|
||||
"to_port": 80, "ip_range": {"cidr": "10.0.0.0/32"},
|
||||
"ip_protocol": "tcp"},
|
||||
{"from_port": "4000",
|
||||
"to_port": 7000, "ip_range": {"cidr": "13.0.0.0/32"},
|
||||
"ip_protocol": "udp"},
|
||||
]
|
||||
}
|
||||
|
||||
],
|
||||
tenants["bar"]["id"]: [
|
||||
{
|
||||
"name": "group3",
|
||||
"id": uuid.uuid4().hex,
|
||||
"description": "group three",
|
||||
"rules": [
|
||||
{"from_port": 443,
|
||||
"to_port": 443, "ip_range": {"cidr": "10.0.0.0/32"},
|
||||
"ip_protocol": "tcp"},
|
||||
{"from_port": "1000",
|
||||
"to_port": 2000, "ip_range": {"cidr": "11.0.0.0/32"},
|
||||
"ip_protocol": "udp"},
|
||||
]
|
||||
},
|
||||
]
|
||||
}
|
||||
|
||||
|
||||
def fake_query_results():
|
||||
cats = []
|
||||
@ -485,8 +546,17 @@ class FakeApp(object):
|
||||
"os-floating-ip-pools")
|
||||
self._populate(path, "floating_ip", floating_ips[tenant["id"]],
|
||||
"os-floating-ips")
|
||||
self._populate_ports(path, servers[tenant["id"]],
|
||||
self._populate_server_links(path, "os-interface",
|
||||
"interfaceAttachments",
|
||||
servers[tenant["id"]],
|
||||
ports[tenant["id"]])
|
||||
self._populate_server_links(path, "os-security-groups",
|
||||
"security_groups",
|
||||
servers[tenant["id"]],
|
||||
security_groups[tenant["id"]])
|
||||
self._populate(path, "security_group",
|
||||
security_groups[tenant["id"]],
|
||||
"os-security-groups")
|
||||
# NOTE(aloga): dict_values un Py3 is not serializable in JSON
|
||||
self._populate(path, "image", list(images.values()))
|
||||
self._populate(path, "flavor", list(flavors.values()))
|
||||
@ -530,16 +600,20 @@ class FakeApp(object):
|
||||
self.routes[obj_path] = create_fake_json_resp(
|
||||
{"volumeAttachment": attach})
|
||||
|
||||
def _populate_ports(self, path, servers_list, ports_list):
|
||||
def _populate_server_links(self, path, resource, obj,
|
||||
servers_list, link_list):
|
||||
if servers_list:
|
||||
for p in ports_list:
|
||||
for s in servers_list:
|
||||
list_obj = []
|
||||
path_base = "%s/servers/%s/%s" % (
|
||||
path,
|
||||
servers_list[0]["id"],
|
||||
"os-interface"
|
||||
s["id"],
|
||||
resource
|
||||
)
|
||||
for l in link_list:
|
||||
list_obj.append(l)
|
||||
self.routes[path_base] = create_fake_json_resp(
|
||||
{"interfaceAttachments": [p]})
|
||||
{obj: list_obj})
|
||||
|
||||
@webob.dec.wsgify()
|
||||
def __call__(self, req):
|
||||
@ -619,7 +693,9 @@ class FakeApp(object):
|
||||
body = req.json_body.copy()
|
||||
action = body.popitem()
|
||||
if action[0] in ["os-start", "os-stop", "reboot",
|
||||
"addFloatingIp", "removeFloatingIp"]:
|
||||
"addFloatingIp", "removeFloatingIp",
|
||||
"removeSecurityGroup",
|
||||
"addSecurityGroup"]:
|
||||
return self._get_from_routes(req)
|
||||
elif req.path_info.endswith("os-volume_attachments"):
|
||||
return self._do_create_attachment(req)
|
||||
|
@ -29,6 +29,8 @@ application_url = "https://foo.example.org:8774/ooiv1"
|
||||
tenants = {
|
||||
"foo": {"id": uuid.uuid4().hex,
|
||||
"name": "foo"},
|
||||
"baz": {"id": uuid.uuid4().hex,
|
||||
"name": "foo"},
|
||||
"bar": {"id": uuid.uuid4().hex,
|
||||
"name": "bar"},
|
||||
"public": {"id": uuid.uuid4().hex,
|
||||
@ -153,6 +155,40 @@ network_links = {
|
||||
],
|
||||
}
|
||||
|
||||
security_groups = {
|
||||
tenants["bar"]["id"]: [],
|
||||
tenants["foo"]["id"]: [],
|
||||
tenants["baz"]["id"]: [
|
||||
{
|
||||
"name": "group1",
|
||||
"id": uuid.uuid4().hex,
|
||||
"description": "group one",
|
||||
"security_group_rules": [
|
||||
{"ethertype": "IPv4", "port_range_min": 443,
|
||||
"port_range_max": 443, "remote_ip_prefix": "10.0.0.0/32",
|
||||
"protocol": "tcp", "direction": "ingress"},
|
||||
{"ethertype": "IPv4", "port_range_min": "8000",
|
||||
"port_range_max": 9000, "remote_ip_prefix": "11.0.0.0/24",
|
||||
"protocol": "udp", "direction": "egress"}
|
||||
]
|
||||
},
|
||||
{
|
||||
"name": "group2",
|
||||
"id": uuid.uuid4().hex,
|
||||
"description": "group two",
|
||||
"security_group_rules": [
|
||||
{"ethertype": "IPv4", "port_range_min": 80,
|
||||
"port_range_max": 80, "remote_ip_prefix": "10.0.0.0/32",
|
||||
"protocol": "tcp", "direction": "ingress"},
|
||||
{"ethertype": "IPv4", "port_range_min": "5000",
|
||||
"port_range_max": 6000, "remote_ip_prefix": "11.0.0.0/24",
|
||||
"protocol": "udp", "direction": "egress"}
|
||||
]
|
||||
}
|
||||
|
||||
]
|
||||
}
|
||||
|
||||
|
||||
def create_fake_json_resp(data, status=200):
|
||||
r = webob.Response()
|
||||
@ -201,6 +237,22 @@ def create_header(params, schemes, project=None):
|
||||
return headers
|
||||
|
||||
|
||||
def create_req_json_occi(params, category, method="POST"):
|
||||
headers = create_headers(category,
|
||||
content_type="application/occi+json")
|
||||
body = {}
|
||||
for c in category:
|
||||
body["kind"] = "%s%s" % (
|
||||
c.scheme, c.term)
|
||||
|
||||
body["attributes"] = params
|
||||
req = webob.Request.blank(path="")
|
||||
req.headers = headers
|
||||
req.method = method
|
||||
req.body = json.dumps(body).encode("utf8")
|
||||
return wsgi.Request(req.environ)
|
||||
|
||||
|
||||
def create_req_test_occi(params, category):
|
||||
headers = create_header_occi(params, category)
|
||||
req = webob.Request.blank(path="")
|
||||
@ -209,7 +261,7 @@ def create_req_test_occi(params, category):
|
||||
|
||||
|
||||
def create_header_occi(params, category, project=None):
|
||||
headers = {}
|
||||
headers = create_headers(category, project)
|
||||
att = ""
|
||||
if params is not None:
|
||||
for k, v in params.items():
|
||||
@ -218,6 +270,13 @@ def create_header_occi(params, category, project=None):
|
||||
else:
|
||||
att = "%s, %s=%s" % (att, k, v)
|
||||
headers["X_OCCI_Attribute"] = att
|
||||
|
||||
return headers
|
||||
|
||||
|
||||
def create_headers(category, content_type=None,
|
||||
project=None):
|
||||
headers = {}
|
||||
if category is not None:
|
||||
cat = ""
|
||||
for c in category:
|
||||
@ -227,6 +286,8 @@ def create_header_occi(params, category, project=None):
|
||||
headers['Category'] = cat[:-1]
|
||||
if project is not None:
|
||||
headers['X_PROJECT_ID'] = project
|
||||
if content_type is not None:
|
||||
headers['Content-Type'] = content_type
|
||||
return headers
|
||||
|
||||
|
||||
@ -376,3 +437,35 @@ def build_occi_nova(network):
|
||||
for l in links:
|
||||
result.append(("Link", l))
|
||||
return result
|
||||
|
||||
|
||||
def build_occi_securitygroup(secgroup):
|
||||
name = secgroup["title"]
|
||||
secgroup_id = secgroup["id"]
|
||||
rules = secgroup["rules"]
|
||||
summary = secgroup["summary"]
|
||||
app_url = application_url
|
||||
cats = []
|
||||
cats.append('securitygroup; '
|
||||
'scheme='
|
||||
'"http://schemas.ogf.org/occi/infrastructure#";'
|
||||
' class="kind"; title="securitygroup resource";'
|
||||
' rel='
|
||||
'"http://schemas.ogf.org/occi/core#resource";'
|
||||
' location="%s/securitygroup/"' % app_url)
|
||||
links = []
|
||||
|
||||
attrs = [
|
||||
'occi.core.id="%s"' % secgroup_id,
|
||||
'occi.core.title="%s"' % name,
|
||||
'occi.core.summary="%s"' % summary,
|
||||
'occi.securitygroup.rules="%s"' % json.dumps(rules).replace('"', "'"),
|
||||
]
|
||||
result = []
|
||||
for c in cats:
|
||||
result.append(("Category", c))
|
||||
for a in attrs:
|
||||
result.append(("X-OCCI-Attribute", a))
|
||||
for l in links:
|
||||
result.append(("Link", l))
|
||||
return result
|
@ -17,8 +17,10 @@ import uuid
|
||||
|
||||
import mock
|
||||
|
||||
from ooi.api import helpers
|
||||
from ooi.api import helpers_neutron
|
||||
from ooi import exception
|
||||
from ooi.openstack import helpers as openstack_helper
|
||||
from ooi.tests import base
|
||||
from ooi.tests import fakes_network as fakes
|
||||
from ooi import utils
|
||||
@ -593,3 +595,80 @@ class TestNetOpenStackHelper(base.TestCase):
|
||||
self.helper.delete_port,
|
||||
None,
|
||||
iface)
|
||||
|
||||
@mock.patch.object(helpers_neutron.OpenStackNeutron, "create_resource")
|
||||
@mock.patch.object(helpers.BaseHelper, "tenant_from_req")
|
||||
def test_create_security_groups(self, m_tenant, m_create):
|
||||
tenant_id = fakes.tenants["baz"]["id"]
|
||||
sec_group = fakes.security_groups[tenant_id][0]
|
||||
expected = openstack_helper.build_security_group_from_neutron(
|
||||
[sec_group])[0]
|
||||
m_tenant.return_value = uuid.uuid4().hex
|
||||
group_info = {"name": sec_group["name"], "id": sec_group["id"],
|
||||
"description": sec_group["description"]}
|
||||
rules_out_1 = sec_group["security_group_rules"][0]
|
||||
rules_out_2 = sec_group["security_group_rules"][1]
|
||||
m_create.side_effect = [group_info, rules_out_1, rules_out_2]
|
||||
ret = self.helper.create_security_group(None, expected["title"],
|
||||
expected["summary"],
|
||||
expected["rules"])
|
||||
self.assertEqual(expected, ret)
|
||||
self.assertEqual(3, m_create.call_count)
|
||||
|
||||
@mock.patch.object(helpers_neutron.OpenStackNeutron, "list_resources")
|
||||
def test_list_security_group(self, m_list):
|
||||
tenant_id = fakes.tenants["baz"]["id"]
|
||||
sec_group = fakes.security_groups[tenant_id]
|
||||
expected = openstack_helper.build_security_group_from_neutron(
|
||||
sec_group)
|
||||
m_list.return_value = sec_group
|
||||
ret = self.helper.list_security_groups(None)
|
||||
self.assertEqual(2, ret.__len__())
|
||||
self.assertEqual(expected, ret)
|
||||
m_list.assert_called_with(None, 'security-groups',
|
||||
response_resource="security_groups")
|
||||
|
||||
@mock.patch.object(helpers_neutron.OpenStackNeutron, "list_resources")
|
||||
def test_list_security_group_empty(self, m_list):
|
||||
tenant_id = fakes.tenants["bar"]["id"]
|
||||
m_list.return_value = fakes.security_groups[tenant_id]
|
||||
ret = self.helper.list_security_groups(None)
|
||||
self.assertEqual(0, ret.__len__())
|
||||
m_list.assert_called_with(None, 'security-groups',
|
||||
response_resource="security_groups")
|
||||
|
||||
@mock.patch.object(helpers_neutron.OpenStackNeutron, "get_resource")
|
||||
def test_show_security_group(self, m_list):
|
||||
tenant_id = fakes.tenants["baz"]["id"]
|
||||
sec_group = fakes.security_groups[tenant_id][0]
|
||||
expected = openstack_helper.build_security_group_from_neutron(
|
||||
[sec_group])[0]
|
||||
list_sec = sec_group
|
||||
m_list.return_value = list_sec
|
||||
ret = self.helper.get_security_group_details(None, None)
|
||||
self.assertEqual(expected, ret)
|
||||
m_list.assert_called_with(None, 'security-groups', None,
|
||||
response_resource="security_group")
|
||||
|
||||
@mock.patch.object(helpers_neutron.OpenStackNeutron, "get_resource")
|
||||
def test_show_security_group_not_found(self, m_list):
|
||||
m_list.return_value = []
|
||||
self.assertRaises(exception.NotFound,
|
||||
self.helper.get_security_group_details,
|
||||
None,
|
||||
None)
|
||||
|
||||
@mock.patch.object(helpers_neutron.OpenStackNeutron, "delete_resource")
|
||||
def test_delete_security_group(self, m_list):
|
||||
m_list.return_value = None
|
||||
ret = self.helper.delete_security_group(None, None)
|
||||
self.assertIsNone(ret)
|
||||
m_list.assert_called_with(None, 'security-groups', None)
|
||||
|
||||
@mock.patch.object(helpers_neutron.OpenStackNeutron, "delete_resource")
|
||||
def test_delete_security_group_not_found(self, m_list):
|
||||
m_list.side_effect = exception.OCCIException()
|
||||
self.assertRaises(exception.NotFound,
|
||||
self.helper.delete_security_group,
|
||||
None,
|
||||
None)
|
@ -18,8 +18,10 @@ import uuid
|
||||
import mock
|
||||
|
||||
from ooi.api import helpers
|
||||
from ooi.openstack import helpers as os_helpers
|
||||
from ooi.tests import base
|
||||
from ooi.tests import fakes_network as fakes
|
||||
from ooi.tests import fakes as fakes_nova
|
||||
from ooi.tests import fakes_network
|
||||
from ooi import utils
|
||||
|
||||
|
||||
@ -33,10 +35,12 @@ class TestNovaNetOpenStackHelper(base.TestCase):
|
||||
@mock.patch.object(helpers.OpenStackHelper, "tenant_from_req")
|
||||
def test_list_networks_with_public(self, m_t, m_rq):
|
||||
id = uuid.uuid4().hex
|
||||
resp = fakes.create_fake_json_resp({"networks": [{"id": id}]}, 200)
|
||||
resp = fakes_network.create_fake_json_resp(
|
||||
{"networks": [{"id": id}]},
|
||||
200)
|
||||
req_mock = mock.MagicMock()
|
||||
req_mock.get_response.return_value = resp
|
||||
resp_float = fakes.create_fake_json_resp(
|
||||
resp_float = fakes_network.create_fake_json_resp(
|
||||
{"floating_ip_pools": [{"id": id}]}, 200
|
||||
)
|
||||
req_mock_float = mock.MagicMock()
|
||||
@ -49,10 +53,11 @@ class TestNovaNetOpenStackHelper(base.TestCase):
|
||||
@mock.patch.object(helpers.OpenStackHelper, "tenant_from_req")
|
||||
def test_list_networks_with_no_public(self, m_t, m_rq):
|
||||
id = uuid.uuid4().hex
|
||||
resp = fakes.create_fake_json_resp({"networks": [{"id": id}]}, 200)
|
||||
resp = fakes_network.create_fake_json_resp(
|
||||
{"networks": [{"id": id}]}, 200)
|
||||
req_mock = mock.MagicMock()
|
||||
req_mock.get_response.return_value = resp
|
||||
resp_float = fakes.create_fake_json_resp(
|
||||
resp_float = fakes_network.create_fake_json_resp(
|
||||
{"floating_ip_pools": []}, 204
|
||||
)
|
||||
req_mock_float = mock.MagicMock()
|
||||
@ -67,8 +72,9 @@ class TestNovaNetOpenStackHelper(base.TestCase):
|
||||
id = uuid.uuid4().hex
|
||||
tenant_id = uuid.uuid4().hex
|
||||
m_t.return_value = tenant_id
|
||||
resp = fakes.create_fake_json_resp({"networks": [{"id": id}]}, 200)
|
||||
resp_float = fakes.create_fake_json_resp(
|
||||
resp = fakes_network.create_fake_json_resp(
|
||||
{"networks": [{"id": id}]}, 200)
|
||||
resp_float = fakes_network.create_fake_json_resp(
|
||||
{"floating_ip_pools": [{"id": id}]}, 200
|
||||
)
|
||||
req_mock = mock.MagicMock()
|
||||
@ -103,7 +109,7 @@ class TestNovaNetOpenStackHelper(base.TestCase):
|
||||
label = "network11"
|
||||
tenant_id = uuid.uuid4().hex
|
||||
m_t.return_value = tenant_id
|
||||
resp = fakes.create_fake_json_resp(
|
||||
resp = fakes_network.create_fake_json_resp(
|
||||
{"network": {"id": id, "label": label,
|
||||
"cidr": address,
|
||||
"gateway": gateway}}, 200
|
||||
@ -134,7 +140,7 @@ class TestNovaNetOpenStackHelper(base.TestCase):
|
||||
"cidr": cidr,
|
||||
"gateway": gateway
|
||||
}
|
||||
resp = fakes.create_fake_json_resp(
|
||||
resp = fakes_network.create_fake_json_resp(
|
||||
{"network": {"id": net_id, "label": name,
|
||||
"cidr": cidr,
|
||||
"gateway": gateway}}, 200
|
||||
@ -174,3 +180,249 @@ class TestNovaNetOpenStackHelper(base.TestCase):
|
||||
None, method="DELETE",
|
||||
path="/%s/os-networks/%s" % (tenant_id, net_id),
|
||||
)
|
||||
|
||||
@mock.patch.object(helpers.OpenStackHelper, "_get_req")
|
||||
@mock.patch.object(helpers.OpenStackHelper, "tenant_from_req")
|
||||
def test_list_security_groups(self, m_t, m_rq):
|
||||
tenant_id = fakes_nova.tenants["baz"]["id"]
|
||||
m_t.return_value = tenant_id
|
||||
sc_groups = fakes_nova.security_groups[tenant_id]
|
||||
resp = fakes_network.create_fake_json_resp(
|
||||
{"security_groups": sc_groups}, 200)
|
||||
req_mock = mock.MagicMock()
|
||||
req_mock.get_response.return_value = resp
|
||||
m_rq.side_effect = [req_mock]
|
||||
ret = self.helper.list_security_groups(None)
|
||||
cont = 0
|
||||
for sc in sc_groups:
|
||||
self.assertEqual(sc['id'], ret[cont]['id'])
|
||||
cont = cont + 1
|
||||
self.assertEqual(
|
||||
{'method': 'GET',
|
||||
'path': '/%s/os-security-groups' % (tenant_id)},
|
||||
m_rq.call_args_list[0][1]
|
||||
)
|
||||
|
||||
@mock.patch.object(helpers.OpenStackHelper, "_get_req")
|
||||
@mock.patch.object(helpers.OpenStackHelper, "tenant_from_req")
|
||||
def test_delete_security_groups(self, m_t, m_rq):
|
||||
tenant_id = fakes_nova.tenants["baz"]["id"]
|
||||
m_t.return_value = tenant_id
|
||||
sc_id = fakes_nova.security_groups[tenant_id][0]['id']
|
||||
req_mock = mock.MagicMock()
|
||||
req_mock.get_response.return_value = []
|
||||
m_rq.return_value = req_mock
|
||||
ret = self.helper.delete_security_group(None, sc_id)
|
||||
self.assertEqual(ret, [])
|
||||
m_rq.assert_called_with(
|
||||
None, method="DELETE",
|
||||
path="/%s/os-security-groups/%s" % (tenant_id, sc_id),
|
||||
)
|
||||
|
||||
@mock.patch.object(helpers.OpenStackHelper, "_get_req")
|
||||
@mock.patch.object(helpers.OpenStackHelper, "tenant_from_req")
|
||||
def test_get_security_group(self, m_t, m_rq):
|
||||
tenant_id = fakes_nova.tenants["baz"]["id"]
|
||||
m_t.return_value = tenant_id
|
||||
sc_group = fakes_nova.security_groups[tenant_id][0]
|
||||
id = sc_group['id']
|
||||
m_t.return_value = tenant_id
|
||||
resp = fakes_network.create_fake_json_resp(
|
||||
{"security_group": sc_group}, 200
|
||||
)
|
||||
req_mock = mock.MagicMock()
|
||||
req_mock.get_response.return_value = resp
|
||||
m_rq.return_value = req_mock
|
||||
ret = self.helper.get_security_group_details(None, id)
|
||||
self.assertEqual(sc_group['id'], ret["id"])
|
||||
self.assertEqual(sc_group['description'], ret["summary"])
|
||||
occi_os_group = os_helpers.build_security_group_from_nova(
|
||||
fakes_nova.security_groups[tenant_id]
|
||||
)[0]
|
||||
cont = 0
|
||||
for r in ret["rules"]:
|
||||
self.assertEqual(
|
||||
occi_os_group['rules'][cont]['protocol'], r["protocol"])
|
||||
self.assertEqual(
|
||||
occi_os_group['rules'][cont]['range'], r["range"])
|
||||
self.assertEqual(
|
||||
occi_os_group['rules'][cont]['port'], r["port"])
|
||||
self.assertEqual(
|
||||
occi_os_group['rules'][cont]['type'], r["type"])
|
||||
cont += 1
|
||||
|
||||
m_rq.assert_called_with(
|
||||
None, method="GET",
|
||||
path="/%s/os-security-groups/%s" % (tenant_id, id),
|
||||
)
|
||||
|
||||
@mock.patch.object(helpers.OpenStackHelper, "_get_req")
|
||||
@mock.patch.object(helpers.OpenStackHelper, "tenant_from_req")
|
||||
def test_create_security_group(self, m_t, m_rq):
|
||||
tenant_id = fakes_nova.tenants["baz"]["id"]
|
||||
m_t.return_value = tenant_id
|
||||
sc_group = fakes_nova.security_groups[tenant_id][0]
|
||||
occi_os_group = os_helpers.build_security_group_from_nova(
|
||||
fakes_nova.security_groups[tenant_id]
|
||||
)[0]
|
||||
resp = fakes_network.create_fake_json_resp(
|
||||
{"security_group": sc_group}, 200
|
||||
)
|
||||
req_mock = mock.MagicMock()
|
||||
req_mock.get_response.return_value = resp
|
||||
resp_rule1 = fakes_network.create_fake_json_resp(
|
||||
{"security_group_rule": sc_group['rules'][0]}, 200
|
||||
)
|
||||
req_mock_rule1 = mock.MagicMock()
|
||||
req_mock_rule1.get_response.return_value = resp_rule1
|
||||
resp_rule2 = fakes_network.create_fake_json_resp(
|
||||
{"security_group_rule": sc_group['rules'][1]}, 200
|
||||
)
|
||||
req_mock_rule2 = mock.MagicMock()
|
||||
req_mock_rule2.get_response.return_value = resp_rule2
|
||||
m_rq.side_effect = [req_mock, req_mock_rule1, req_mock_rule2]
|
||||
ret = self.helper.create_security_group(
|
||||
None,
|
||||
name=occi_os_group['title'],
|
||||
description=occi_os_group['summary'],
|
||||
rules=occi_os_group['rules']
|
||||
)
|
||||
cont = 0
|
||||
for r in ret["rules"]:
|
||||
self.assertEqual(
|
||||
occi_os_group['rules'][cont]['protocol'], r["protocol"]
|
||||
)
|
||||
self.assertEqual(
|
||||
occi_os_group['rules'][cont]['range'], r["range"]
|
||||
)
|
||||
self.assertEqual(
|
||||
occi_os_group['rules'][cont]['port'], r["port"])
|
||||
self.assertEqual(
|
||||
occi_os_group['rules'][cont]['type'], r["type"])
|
||||
cont += 1
|
||||
|
||||
@mock.patch.object(helpers.OpenStackHelper, "_get_req")
|
||||
@mock.patch.object(helpers.OpenStackHelper, "tenant_from_req")
|
||||
def test_get_server_security_group(self, mock_tenant, mock_get):
|
||||
tenant_id = fakes_nova.tenants["baz"]["id"]
|
||||
server_id = uuid.uuid4().hex
|
||||
sc_group = fakes_nova.security_groups[tenant_id]
|
||||
mock_tenant.return_value = tenant_id
|
||||
resp = fakes_network.create_fake_json_resp(
|
||||
{"security_groups": sc_group}, 200
|
||||
)
|
||||
req_mock = mock.MagicMock()
|
||||
req_mock.get_response.return_value = resp
|
||||
mock_get.return_value = req_mock
|
||||
ret = self.helper._get_server_security_group(None, server_id)
|
||||
segroup = os_helpers.build_security_group_from_nova(
|
||||
sc_group
|
||||
)
|
||||
cont = 0
|
||||
for s in segroup:
|
||||
self.assertEqual(s, ret[cont])
|
||||
cont += 1
|
||||
mock_get.assert_called_with(
|
||||
None, method="GET",
|
||||
path="/%s/servers/%s/os-security-groups" % (tenant_id,
|
||||
server_id),
|
||||
)
|
||||
|
||||
@mock.patch.object(helpers.OpenStackHelper, "index")
|
||||
@mock.patch.object(helpers.OpenStackHelper, "_get_server_security_group")
|
||||
def test_list_server_security_links(self, mock_get, mock_list):
|
||||
tenant_id = fakes_nova.tenants["baz"]["id"]
|
||||
servers = fakes_nova.servers[tenant_id]
|
||||
mock_list.return_value = servers
|
||||
sg = fakes_nova.security_groups[tenant_id]
|
||||
segroup = os_helpers.build_security_group_from_nova(sg)[0]
|
||||
mock_get.return_value = [segroup]
|
||||
ret = self.helper.list_server_security_links(None)
|
||||
cont = 0
|
||||
for server in servers:
|
||||
self.assertEqual(server["id"],
|
||||
ret[cont]['compute_id'])
|
||||
self.assertEqual(segroup["title"],
|
||||
ret[cont]['securitygroup']["title"])
|
||||
|
||||
cont += 1
|
||||
|
||||
@mock.patch.object(helpers.OpenStackHelper, "_get_server_security_group")
|
||||
def test_get_server_security_link(self, mock_get):
|
||||
tenant_id = fakes_nova.tenants["baz"]["id"]
|
||||
server_id = uuid.uuid4().hex
|
||||
sg = fakes_nova.security_groups[tenant_id]
|
||||
segroup = os_helpers.build_security_group_from_nova(sg)[0]
|
||||
mock_get.return_value = [segroup]
|
||||
ret = self.helper.get_server_security_link(None, server_id,
|
||||
segroup["id"])
|
||||
self.assertEqual(server_id,
|
||||
ret[0]['compute_id'])
|
||||
self.assertEqual(segroup["title"],
|
||||
ret[0]['securitygroup']["title"])
|
||||
|
||||
@mock.patch.object(helpers.OpenStackHelper, "_get_req")
|
||||
@mock.patch.object(helpers.OpenStackHelper, "tenant_from_req")
|
||||
def test_delete_server_security_link(self, mock_tenant, mock_req):
|
||||
tenant_id = fakes_nova.tenants["baz"]["id"]
|
||||
server_id = uuid.uuid4().hex
|
||||
sg_name = "baz"
|
||||
mock_tenant.return_value = tenant_id
|
||||
sc_group = fakes_nova.security_groups[tenant_id][0]
|
||||
sg_name = sc_group["name"]
|
||||
resp_get = fakes_network.create_fake_json_resp(
|
||||
{"security_group": sc_group}, 200
|
||||
)
|
||||
req_mock_get = mock.MagicMock()
|
||||
req_mock_get.get_response.return_value = resp_get
|
||||
resp_cre = fakes_network.create_fake_json_resp(
|
||||
{}, 204
|
||||
)
|
||||
req_mock_del = mock.MagicMock()
|
||||
req_mock_del.get_response.return_value = resp_cre
|
||||
mock_req.side_effect = [req_mock_get, req_mock_del]
|
||||
ret = self.helper.delete_server_security_link(None,
|
||||
server_id,
|
||||
sg_name)
|
||||
self.assertEqual([], ret)
|
||||
mock_req.assert_called_with(
|
||||
None, method="POST",
|
||||
path="/%s/servers/%s/action" % (tenant_id,
|
||||
server_id),
|
||||
body='{"removeSecurityGroup": {"name": "%s"}}' % sg_name,
|
||||
content_type='application/json'
|
||||
|
||||
)
|
||||
|
||||
@mock.patch.object(helpers.OpenStackHelper, "_get_req")
|
||||
@mock.patch.object(helpers.OpenStackHelper, "tenant_from_req")
|
||||
def test_create_server_security_link(self, mock_tenant, mock_req):
|
||||
tenant_id = fakes_nova.tenants["baz"]["id"]
|
||||
server_id = uuid.uuid4().hex
|
||||
sg_id = "baz"
|
||||
mock_tenant.return_value = tenant_id
|
||||
sc_group = fakes_nova.security_groups[tenant_id][0]
|
||||
resp_get = fakes_network.create_fake_json_resp(
|
||||
{"security_group": sc_group}, 200
|
||||
)
|
||||
req_mock_get = mock.MagicMock()
|
||||
req_mock_get.get_response.return_value = resp_get
|
||||
resp_create = fakes_network.create_fake_json_resp(
|
||||
{}, 204
|
||||
)
|
||||
req_mock_cre = mock.MagicMock()
|
||||
req_mock_cre.get_response.return_value = resp_create
|
||||
mock_req.side_effect = [req_mock_get, req_mock_cre]
|
||||
ret = self.helper.create_server_security_link(None,
|
||||
server_id,
|
||||
sg_id)
|
||||
self.assertEqual([], ret)
|
||||
sg_name = sc_group["name"]
|
||||
mock_req.assert_called_with(
|
||||
None, method="POST",
|
||||
path="/%s/servers/%s/action" % (tenant_id,
|
||||
server_id),
|
||||
body='{"addSecurityGroup": {"name": "%s"}}' % sg_name,
|
||||
content_type='application/json'
|
||||
|
||||
)
|
||||
|
149
ooi/tests/unit/controllers/test_security_group.py
Normal file
149
ooi/tests/unit/controllers/test_security_group.py
Normal file
@ -0,0 +1,149 @@
|
||||
# Copyright 2015 LIP - INDIGO-DataCloud
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import mock
|
||||
|
||||
from ooi.api import helpers_neutron
|
||||
from ooi.api import securitygroup as security_group_api
|
||||
from ooi import exception
|
||||
from ooi.occi.infrastructure import securitygroup as occi_security_group
|
||||
from ooi.openstack import helpers as openstack_helper
|
||||
from ooi.tests import base
|
||||
from ooi.tests import fakes_network as fakes
|
||||
|
||||
|
||||
class TestSecurityGroupControllerNeutron(base.TestController):
|
||||
|
||||
def setUp(self):
|
||||
super(TestSecurityGroupControllerNeutron, self).setUp()
|
||||
self.controller = security_group_api.Controller(
|
||||
neutron_ooi_endpoint="ff")
|
||||
|
||||
@mock.patch.object(helpers_neutron.OpenStackNeutron,
|
||||
"list_security_groups")
|
||||
def test_list_security_group(self, m_list):
|
||||
tenant_id = fakes.tenants["baz"]["id"]
|
||||
sec_group = openstack_helper.build_security_group_from_neutron(
|
||||
fakes.security_groups[tenant_id]
|
||||
)
|
||||
req = fakes.create_req_test(None, None)
|
||||
m_list.return_value = sec_group
|
||||
result = self.controller.index(req)
|
||||
expected = self.controller._get_security_group_resources(sec_group)
|
||||
self.assertEqual(result.resources.__len__(),
|
||||
expected.__len__())
|
||||
for r in result.resources:
|
||||
self.assertIsInstance(r, occi_security_group.SecurityGroupResource)
|
||||
m_list.assert_called_with(req)
|
||||
|
||||
@mock.patch.object(helpers_neutron.OpenStackNeutron,
|
||||
"list_security_groups")
|
||||
def test_list_security_group_empty(self, m_list):
|
||||
tenant_id = fakes.tenants["foo"]["id"]
|
||||
sec_group = openstack_helper.build_security_group_from_neutron(
|
||||
fakes.security_groups[tenant_id]
|
||||
)
|
||||
req = fakes.create_req_test(None, None)
|
||||
m_list.return_value = sec_group
|
||||
result = self.controller.index(req)
|
||||
self.assertEqual(result.resources.__len__(), 0)
|
||||
|
||||
@mock.patch.object(helpers_neutron.OpenStackNeutron,
|
||||
"get_security_group_details")
|
||||
def test_show_security_group(self, m_list):
|
||||
tenant_id = fakes.tenants["baz"]["id"]
|
||||
sec_group = openstack_helper.build_security_group_from_neutron(
|
||||
[fakes.security_groups[tenant_id][0]]
|
||||
)
|
||||
req = fakes.create_req_test(None, None)
|
||||
m_list.return_value = sec_group[0]
|
||||
result = self.controller.show(req, None)
|
||||
expected = self.controller._get_security_group_resources(sec_group)[0]
|
||||
self.assertIsInstance(
|
||||
result,
|
||||
occi_security_group.SecurityGroupResource)
|
||||
self.assertEqual(result, expected)
|
||||
m_list.assert_called_with(req, None)
|
||||
|
||||
@mock.patch.object(helpers_neutron.OpenStackNeutron, "get_resource")
|
||||
def test_show_security_group_not_found(self, m_list):
|
||||
tenant_id = fakes.tenants["baz"]["id"]
|
||||
sec_group = openstack_helper.build_security_group_from_neutron(
|
||||
fakes.security_groups[tenant_id]
|
||||
)
|
||||
m_list.return_value = sec_group
|
||||
req = fakes.create_req_test(None, None)
|
||||
self.assertRaises(exception.NotFound,
|
||||
self.controller.show,
|
||||
req,
|
||||
None)
|
||||
|
||||
@mock.patch.object(helpers_neutron.OpenStackNeutron, "delete_resource")
|
||||
def test_delete_security_group(self, m_list):
|
||||
m_list.return_value = None
|
||||
ret = self.controller.delete(None, None)
|
||||
self.assertIsNone(ret)
|
||||
m_list.assert_called_with(None, 'security-groups', None)
|
||||
|
||||
@mock.patch.object(helpers_neutron.OpenStackNeutron,
|
||||
"delete_security_group")
|
||||
def test_delete_security_group_not_found(self, m_list):
|
||||
m_list.side_effect = exception.NotFound
|
||||
req = fakes.create_req_test(None, None)
|
||||
self.assertRaises(exception.NotFound,
|
||||
self.controller.delete,
|
||||
req,
|
||||
None)
|
||||
|
||||
@mock.patch.object(helpers_neutron.OpenStackNeutron,
|
||||
"create_security_group")
|
||||
def test_create_security_groups(self, m_create):
|
||||
tenant_id = fakes.tenants["baz"]["id"]
|
||||
sec_group = openstack_helper.build_security_group_from_neutron(
|
||||
fakes.security_groups[tenant_id]
|
||||
)[0]
|
||||
params = {"occi.core.title": sec_group["title"],
|
||||
"occi.securitygroup.rules": sec_group["rules"],
|
||||
"occi.core.summary": sec_group["summary"]
|
||||
}
|
||||
categories = {occi_security_group.SecurityGroupResource.kind}
|
||||
req = fakes.create_req_json_occi(params, categories)
|
||||
m_create.return_value = sec_group
|
||||
ret = self.controller.create(req, params)
|
||||
expected = self.controller._get_security_group_resources(
|
||||
[sec_group])
|
||||
self.assertIsInstance(ret.resources[0],
|
||||
occi_security_group.SecurityGroupResource)
|
||||
self.assertEqual(expected[0], ret.resources[0])
|
||||
m_create.assert_called_with(req, sec_group["title"],
|
||||
sec_group["summary"],
|
||||
sec_group["rules"])
|
||||
|
||||
def test_create_error(self):
|
||||
test_networks = fakes.networks[fakes.tenants["foo"]["id"]]
|
||||
schema1 = occi_security_group.SecurityGroupResource.kind.scheme
|
||||
net = test_networks[0]
|
||||
schemes = {schema1: net}
|
||||
parameters = {"occi.core.title": "name"}
|
||||
req = fakes.create_req_test(parameters, schemes)
|
||||
|
||||
self.assertRaises(exception.Invalid, self.controller.create, req)
|
||||
|
||||
def test_create_invalid_param_rule(self):
|
||||
params = {"occi.core.title": "group",
|
||||
"occi.securitygroup.rules": "{'wrong': 'value'}]"
|
||||
}
|
||||
categories = {occi_security_group.SecurityGroupResource.kind}
|
||||
req = fakes.create_req_test_occi(params, categories)
|
||||
self.assertRaises(exception.Invalid, self.controller.create, req)
|
104
ooi/tests/unit/controllers/test_securitygroup_link.py
Normal file
104
ooi/tests/unit/controllers/test_securitygroup_link.py
Normal file
@ -0,0 +1,104 @@
|
||||
# Copyright 2015 Spanish National Research Council
|
||||
# Copyright 2015 LIP - INDIGO-DataCloud
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import uuid
|
||||
|
||||
import mock
|
||||
|
||||
from ooi.api import helpers
|
||||
from ooi.api import securitygroup_link as securitygroup_link_api
|
||||
from ooi.occi.core import collection
|
||||
from ooi.occi.infrastructure import securitygroup_link
|
||||
from ooi.openstack import helpers as os_helpers
|
||||
from ooi.tests import base
|
||||
from ooi.tests import fakes as fakes_nova
|
||||
|
||||
|
||||
class TestNetworkLinkController(base.TestController):
|
||||
def setUp(self):
|
||||
super(TestNetworkLinkController, self).setUp()
|
||||
self.controller = securitygroup_link_api.Controller(
|
||||
mock.MagicMock(), None)
|
||||
|
||||
@mock.patch.object(helpers.OpenStackHelper, "list_server_security_links")
|
||||
def test_index(self, mock_list):
|
||||
tenant_id = fakes_nova.tenants['bar']["id"]
|
||||
servers = fakes_nova.servers[tenant_id]
|
||||
sg = fakes_nova.security_groups[tenant_id]
|
||||
segroup = os_helpers.build_security_group_from_nova(sg)[0]
|
||||
links = []
|
||||
for server in servers:
|
||||
link = {
|
||||
"compute_id": server["id"],
|
||||
"securitygroup": segroup
|
||||
}
|
||||
links.append(link)
|
||||
mock_list.return_value = links
|
||||
ret = self.controller.index(None)
|
||||
self.assertIsInstance(ret, collection.Collection)
|
||||
|
||||
@mock.patch.object(helpers.OpenStackHelper, "get_server_security_link")
|
||||
@mock.patch.object(helpers.OpenStackHelper, "list_security_groups")
|
||||
def test_show(self, mock_list, mock_get):
|
||||
tenant_id = fakes_nova.tenants['baz']["id"]
|
||||
server = fakes_nova.servers[tenant_id][0]
|
||||
server_id = server['id']
|
||||
secgroup_name = server['security_groups'][0]["name"]
|
||||
link_id = '%s_%s' % (server_id, secgroup_name)
|
||||
sec_group = os_helpers.build_security_group_from_nova(
|
||||
fakes_nova.security_groups[tenant_id]
|
||||
)
|
||||
link = {
|
||||
"compute_id": server_id,
|
||||
"securitygroup": sec_group[0]
|
||||
}
|
||||
|
||||
mock_get.return_value = [link]
|
||||
mock_list.return_value = sec_group
|
||||
ret = self.controller.show(None, link_id)
|
||||
self.assertIsInstance(ret, securitygroup_link.SecurityGroupLink)
|
||||
|
||||
@mock.patch.object(helpers.OpenStackHelper, "delete_server_security_link")
|
||||
def test_delete(self, mock_del):
|
||||
tenant_id = fakes_nova.tenants['baz']["id"]
|
||||
server = fakes_nova.servers[tenant_id][0]
|
||||
server_id = server['id']
|
||||
secgroup_name = server['security_groups'][0]["name"]
|
||||
link_id = '%s_%s' % (server_id, secgroup_name)
|
||||
mock_del.return_value = []
|
||||
ret = self.controller.delete(None, link_id)
|
||||
self.assertEqual([], ret)
|
||||
|
||||
@mock.patch.object(helpers.OpenStackHelper, "create_server_security_link")
|
||||
@mock.patch("ooi.occi.validator.Validator")
|
||||
@mock.patch("ooi.api.helpers.get_id_with_kind")
|
||||
def test_create(self, m_get_id, m_validator, m_create):
|
||||
compute_id = uuid.uuid4().hex
|
||||
sec_id = uuid.uuid4().hex
|
||||
obj = {
|
||||
"attributes": {
|
||||
"occi.core.target": sec_id,
|
||||
"occi.core.source": compute_id
|
||||
}
|
||||
}
|
||||
req = self._build_req(uuid.uuid4().hex)
|
||||
req.get_parser = mock.MagicMock()
|
||||
req.get_parser.return_value.return_value.parse.return_value = obj
|
||||
m_validator.validate.return_value = True
|
||||
m_get_id.side_effect = [('', compute_id), ('', sec_id)]
|
||||
m_create.return_value = []
|
||||
ret = self.controller.create(req, None)
|
||||
link = ret.resources.pop()
|
||||
self.assertIsInstance(link, securitygroup_link.SecurityGroupLink)
|
Loading…
Reference in New Issue
Block a user