Merge "Low-level Compute v2 API: security group"
This commit is contained in:
commit
9a1c9cabd4
openstackclient
api
compute
network/v2
tests
functional/compute/v2
unit
211
openstackclient/api/compute_v2.py
Normal file
211
openstackclient/api/compute_v2.py
Normal file
@ -0,0 +1,211 @@
|
|||||||
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
# not use this file except in compliance with the License. You may obtain
|
||||||
|
# a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
# License for the specific language governing permissions and limitations
|
||||||
|
# under the License.
|
||||||
|
#
|
||||||
|
|
||||||
|
"""Compute v2 API Library"""
|
||||||
|
|
||||||
|
from keystoneauth1 import exceptions as ksa_exceptions
|
||||||
|
from osc_lib.api import api
|
||||||
|
from osc_lib import exceptions
|
||||||
|
from osc_lib.i18n import _
|
||||||
|
|
||||||
|
|
||||||
|
class APIv2(api.BaseAPI):
|
||||||
|
"""Compute v2 API"""
|
||||||
|
|
||||||
|
def __init__(self, **kwargs):
|
||||||
|
super(APIv2, self).__init__(**kwargs)
|
||||||
|
|
||||||
|
# Overrides
|
||||||
|
|
||||||
|
# TODO(dtroyer): Override find() until these fixes get into an osc-lib
|
||||||
|
# minimum release
|
||||||
|
def find(
|
||||||
|
self,
|
||||||
|
path,
|
||||||
|
value=None,
|
||||||
|
attr=None,
|
||||||
|
):
|
||||||
|
"""Find a single resource by name or ID
|
||||||
|
|
||||||
|
:param string path:
|
||||||
|
The API-specific portion of the URL path
|
||||||
|
:param string value:
|
||||||
|
search expression (required, really)
|
||||||
|
:param string attr:
|
||||||
|
name of attribute for secondary search
|
||||||
|
"""
|
||||||
|
|
||||||
|
try:
|
||||||
|
ret = self._request('GET', "/%s/%s" % (path, value)).json()
|
||||||
|
if isinstance(ret, dict):
|
||||||
|
# strip off the enclosing dict
|
||||||
|
key = list(ret.keys())[0]
|
||||||
|
ret = ret[key]
|
||||||
|
except (
|
||||||
|
ksa_exceptions.NotFound,
|
||||||
|
ksa_exceptions.BadRequest,
|
||||||
|
):
|
||||||
|
kwargs = {attr: value}
|
||||||
|
try:
|
||||||
|
ret = self.find_one(path, **kwargs)
|
||||||
|
except ksa_exceptions.NotFound:
|
||||||
|
msg = _("%s not found") % value
|
||||||
|
raise exceptions.NotFound(msg)
|
||||||
|
|
||||||
|
return ret
|
||||||
|
|
||||||
|
# Security Groups
|
||||||
|
|
||||||
|
def security_group_create(
|
||||||
|
self,
|
||||||
|
name=None,
|
||||||
|
description=None,
|
||||||
|
):
|
||||||
|
"""Create a new security group
|
||||||
|
|
||||||
|
https://developer.openstack.org/api-ref/compute/#create-security-group
|
||||||
|
|
||||||
|
:param string name:
|
||||||
|
Security group name
|
||||||
|
:param integer description:
|
||||||
|
Security group description
|
||||||
|
"""
|
||||||
|
|
||||||
|
url = "/os-security-groups"
|
||||||
|
|
||||||
|
params = {
|
||||||
|
'name': name,
|
||||||
|
'description': description,
|
||||||
|
}
|
||||||
|
|
||||||
|
return self.create(
|
||||||
|
url,
|
||||||
|
json={'security_group': params},
|
||||||
|
)['security_group']
|
||||||
|
|
||||||
|
def security_group_delete(
|
||||||
|
self,
|
||||||
|
security_group=None,
|
||||||
|
):
|
||||||
|
"""Delete a security group
|
||||||
|
|
||||||
|
https://developer.openstack.org/api-ref/compute/#delete-security-group
|
||||||
|
|
||||||
|
:param string security_group:
|
||||||
|
Security group name or ID
|
||||||
|
"""
|
||||||
|
|
||||||
|
url = "/os-security-groups"
|
||||||
|
|
||||||
|
security_group = self.find(
|
||||||
|
url,
|
||||||
|
attr='name',
|
||||||
|
value=security_group,
|
||||||
|
)['id']
|
||||||
|
if security_group is not None:
|
||||||
|
return self.delete('/%s/%s' % (url, security_group))
|
||||||
|
|
||||||
|
return None
|
||||||
|
|
||||||
|
def security_group_find(
|
||||||
|
self,
|
||||||
|
security_group=None,
|
||||||
|
):
|
||||||
|
"""Return a security group given name or ID
|
||||||
|
|
||||||
|
https://developer.openstack.org/api-ref/compute/#show-security-group-details
|
||||||
|
|
||||||
|
:param string security_group:
|
||||||
|
Security group name or ID
|
||||||
|
:returns: A dict of the security group attributes
|
||||||
|
"""
|
||||||
|
|
||||||
|
url = "/os-security-groups"
|
||||||
|
|
||||||
|
return self.find(
|
||||||
|
url,
|
||||||
|
attr='name',
|
||||||
|
value=security_group,
|
||||||
|
)
|
||||||
|
|
||||||
|
def security_group_list(
|
||||||
|
self,
|
||||||
|
limit=None,
|
||||||
|
marker=None,
|
||||||
|
search_opts=None,
|
||||||
|
):
|
||||||
|
"""Get security groups
|
||||||
|
|
||||||
|
https://developer.openstack.org/api-ref/compute/#list-security-groups
|
||||||
|
|
||||||
|
:param integer limit:
|
||||||
|
query return count limit
|
||||||
|
:param string marker:
|
||||||
|
query marker
|
||||||
|
:param search_opts:
|
||||||
|
(undocumented) Search filter dict
|
||||||
|
all_tenants: True|False - return all projects
|
||||||
|
:returns:
|
||||||
|
list of security groups names
|
||||||
|
"""
|
||||||
|
|
||||||
|
params = {}
|
||||||
|
if search_opts is not None:
|
||||||
|
params = dict((k, v) for (k, v) in search_opts.items() if v)
|
||||||
|
if limit:
|
||||||
|
params['limit'] = limit
|
||||||
|
if marker:
|
||||||
|
params['offset'] = marker
|
||||||
|
|
||||||
|
url = "/os-security-groups"
|
||||||
|
return self.list(url, **params)["security_groups"]
|
||||||
|
|
||||||
|
def security_group_set(
|
||||||
|
self,
|
||||||
|
security_group=None,
|
||||||
|
# name=None,
|
||||||
|
# description=None,
|
||||||
|
**params
|
||||||
|
):
|
||||||
|
"""Update a security group
|
||||||
|
|
||||||
|
https://developer.openstack.org/api-ref/compute/#update-security-group
|
||||||
|
|
||||||
|
:param string security_group:
|
||||||
|
Security group name or ID
|
||||||
|
|
||||||
|
TODO(dtroyer): Create an update method in osc-lib
|
||||||
|
"""
|
||||||
|
|
||||||
|
# Short-circuit no-op
|
||||||
|
if params is None:
|
||||||
|
return None
|
||||||
|
|
||||||
|
url = "/os-security-groups"
|
||||||
|
|
||||||
|
security_group = self.find(
|
||||||
|
url,
|
||||||
|
attr='name',
|
||||||
|
value=security_group,
|
||||||
|
)
|
||||||
|
if security_group is not None:
|
||||||
|
for (k, v) in params.items():
|
||||||
|
# Only set a value if it is already present
|
||||||
|
if k in security_group:
|
||||||
|
security_group[k] = v
|
||||||
|
return self._request(
|
||||||
|
"PUT",
|
||||||
|
"/%s/%s" % (url, security_group['id']),
|
||||||
|
json={'security_group': security_group},
|
||||||
|
).json()['security_group']
|
||||||
|
return None
|
@ -31,6 +31,11 @@ API_VERSIONS = {
|
|||||||
"2.1": "novaclient.client",
|
"2.1": "novaclient.client",
|
||||||
}
|
}
|
||||||
|
|
||||||
|
COMPUTE_API_TYPE = 'compute'
|
||||||
|
COMPUTE_API_VERSIONS = {
|
||||||
|
'2': 'openstackclient.api.compute_v2.APIv2',
|
||||||
|
}
|
||||||
|
|
||||||
# Save the microversion if in use
|
# Save the microversion if in use
|
||||||
_compute_api_version = None
|
_compute_api_version = None
|
||||||
|
|
||||||
@ -58,6 +63,13 @@ def make_client(instance):
|
|||||||
|
|
||||||
LOG.debug('Instantiating compute client for %s', version)
|
LOG.debug('Instantiating compute client for %s', version)
|
||||||
|
|
||||||
|
compute_api = utils.get_client_class(
|
||||||
|
API_NAME,
|
||||||
|
version.ver_major,
|
||||||
|
COMPUTE_API_VERSIONS,
|
||||||
|
)
|
||||||
|
LOG.debug('Instantiating compute api: %s', compute_api)
|
||||||
|
|
||||||
# Set client http_log_debug to True if verbosity level is high enough
|
# Set client http_log_debug to True if verbosity level is high enough
|
||||||
http_log_debug = utils.get_effective_log_level() <= logging.DEBUG
|
http_log_debug = utils.get_effective_log_level() <= logging.DEBUG
|
||||||
|
|
||||||
@ -77,6 +89,16 @@ def make_client(instance):
|
|||||||
**kwargs
|
**kwargs
|
||||||
)
|
)
|
||||||
|
|
||||||
|
client.api = compute_api(
|
||||||
|
session=instance.session,
|
||||||
|
service_type=COMPUTE_API_TYPE,
|
||||||
|
endpoint=instance.get_endpoint_for_service_type(
|
||||||
|
COMPUTE_API_TYPE,
|
||||||
|
region_name=instance.region_name,
|
||||||
|
interface=instance.interface,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
return client
|
return client
|
||||||
|
|
||||||
|
|
||||||
|
@ -22,6 +22,7 @@ import logging
|
|||||||
import os
|
import os
|
||||||
import sys
|
import sys
|
||||||
|
|
||||||
|
from novaclient.v2 import servers
|
||||||
from osc_lib.cli import parseractions
|
from osc_lib.cli import parseractions
|
||||||
from osc_lib.command import command
|
from osc_lib.command import command
|
||||||
from osc_lib import exceptions
|
from osc_lib import exceptions
|
||||||
@ -29,11 +30,6 @@ from osc_lib import utils
|
|||||||
from oslo_utils import timeutils
|
from oslo_utils import timeutils
|
||||||
import six
|
import six
|
||||||
|
|
||||||
try:
|
|
||||||
from novaclient.v2 import servers
|
|
||||||
except ImportError:
|
|
||||||
from novaclient.v1_1 import servers
|
|
||||||
|
|
||||||
from openstackclient.i18n import _
|
from openstackclient.i18n import _
|
||||||
from openstackclient.identity import common as identity_common
|
from openstackclient.identity import common as identity_common
|
||||||
|
|
||||||
@ -316,12 +312,11 @@ class AddServerSecurityGroup(command.Command):
|
|||||||
compute_client.servers,
|
compute_client.servers,
|
||||||
parsed_args.server,
|
parsed_args.server,
|
||||||
)
|
)
|
||||||
security_group = utils.find_resource(
|
security_group = compute_client.api.security_group_find(
|
||||||
compute_client.security_groups,
|
|
||||||
parsed_args.group,
|
parsed_args.group,
|
||||||
)
|
)
|
||||||
|
|
||||||
server.add_security_group(security_group.id)
|
server.add_security_group(security_group['id'])
|
||||||
|
|
||||||
|
|
||||||
class AddServerVolume(command.Command):
|
class AddServerVolume(command.Command):
|
||||||
@ -1437,12 +1432,11 @@ class RemoveServerSecurityGroup(command.Command):
|
|||||||
compute_client.servers,
|
compute_client.servers,
|
||||||
parsed_args.server,
|
parsed_args.server,
|
||||||
)
|
)
|
||||||
security_group = utils.find_resource(
|
security_group = compute_client.api.security_group_find(
|
||||||
compute_client.security_groups,
|
|
||||||
parsed_args.group,
|
parsed_args.group,
|
||||||
)
|
)
|
||||||
|
|
||||||
server.remove_security_group(security_group.id)
|
server.remove_security_group(security_group['id'])
|
||||||
|
|
||||||
|
|
||||||
class RemoveServerVolume(command.Command):
|
class RemoveServerVolume(command.Command):
|
||||||
|
@ -140,13 +140,13 @@ class CreateSecurityGroup(common.NetworkAndComputeShowOne):
|
|||||||
|
|
||||||
def take_action_compute(self, client, parsed_args):
|
def take_action_compute(self, client, parsed_args):
|
||||||
description = self._get_description(parsed_args)
|
description = self._get_description(parsed_args)
|
||||||
obj = client.security_groups.create(
|
obj = client.api.security_group_create(
|
||||||
parsed_args.name,
|
parsed_args.name,
|
||||||
description,
|
description,
|
||||||
)
|
)
|
||||||
display_columns, property_columns = _get_columns(obj)
|
display_columns, property_columns = _get_columns(obj)
|
||||||
data = utils.get_dict_properties(
|
data = utils.get_dict_properties(
|
||||||
obj._info,
|
obj,
|
||||||
property_columns,
|
property_columns,
|
||||||
formatters=_formatters_compute
|
formatters=_formatters_compute
|
||||||
)
|
)
|
||||||
@ -174,8 +174,7 @@ class DeleteSecurityGroup(common.NetworkAndComputeDelete):
|
|||||||
client.delete_security_group(obj)
|
client.delete_security_group(obj)
|
||||||
|
|
||||||
def take_action_compute(self, client, parsed_args):
|
def take_action_compute(self, client, parsed_args):
|
||||||
data = utils.find_resource(client.security_groups, self.r)
|
client.api.security_group_delete(self.r)
|
||||||
client.security_groups.delete(data.id)
|
|
||||||
|
|
||||||
|
|
||||||
# TODO(rauta): Use the SDK resource mapped attribute names once
|
# TODO(rauta): Use the SDK resource mapped attribute names once
|
||||||
@ -242,7 +241,10 @@ class ListSecurityGroup(common.NetworkAndComputeLister):
|
|||||||
|
|
||||||
def take_action_compute(self, client, parsed_args):
|
def take_action_compute(self, client, parsed_args):
|
||||||
search = {'all_tenants': parsed_args.all_projects}
|
search = {'all_tenants': parsed_args.all_projects}
|
||||||
data = client.security_groups.list(search_opts=search)
|
data = client.api.security_group_list(
|
||||||
|
# TODO(dtroyer): add limit, marker
|
||||||
|
search_opts=search,
|
||||||
|
)
|
||||||
|
|
||||||
columns = (
|
columns = (
|
||||||
"ID",
|
"ID",
|
||||||
@ -254,7 +256,7 @@ class ListSecurityGroup(common.NetworkAndComputeLister):
|
|||||||
columns = columns + ('Tenant ID',)
|
columns = columns + ('Tenant ID',)
|
||||||
column_headers = column_headers + ('Project',)
|
column_headers = column_headers + ('Project',)
|
||||||
return (column_headers,
|
return (column_headers,
|
||||||
(utils.get_item_properties(
|
(utils.get_dict_properties(
|
||||||
s, columns,
|
s, columns,
|
||||||
) for s in data))
|
) for s in data))
|
||||||
|
|
||||||
@ -294,23 +296,20 @@ class SetSecurityGroup(common.NetworkAndComputeCommand):
|
|||||||
client.update_security_group(obj, **attrs)
|
client.update_security_group(obj, **attrs)
|
||||||
|
|
||||||
def take_action_compute(self, client, parsed_args):
|
def take_action_compute(self, client, parsed_args):
|
||||||
data = utils.find_resource(
|
data = client.api.security_group_find(parsed_args.group)
|
||||||
client.security_groups,
|
|
||||||
parsed_args.group,
|
|
||||||
)
|
|
||||||
|
|
||||||
if parsed_args.name is not None:
|
if parsed_args.name is not None:
|
||||||
data.name = parsed_args.name
|
data['name'] = parsed_args.name
|
||||||
if parsed_args.description is not None:
|
if parsed_args.description is not None:
|
||||||
data.description = parsed_args.description
|
data['description'] = parsed_args.description
|
||||||
|
|
||||||
# NOTE(rtheis): Previous behavior did not raise a CommandError
|
# NOTE(rtheis): Previous behavior did not raise a CommandError
|
||||||
# if there were no updates. Maintain this behavior and issue
|
# if there were no updates. Maintain this behavior and issue
|
||||||
# the update.
|
# the update.
|
||||||
client.security_groups.update(
|
client.api.security_group_set(
|
||||||
data,
|
data,
|
||||||
data.name,
|
data['name'],
|
||||||
data.description,
|
data['description'],
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
@ -337,13 +336,10 @@ class ShowSecurityGroup(common.NetworkAndComputeShowOne):
|
|||||||
return (display_columns, data)
|
return (display_columns, data)
|
||||||
|
|
||||||
def take_action_compute(self, client, parsed_args):
|
def take_action_compute(self, client, parsed_args):
|
||||||
obj = utils.find_resource(
|
obj = client.api.security_group_find(parsed_args.group)
|
||||||
client.security_groups,
|
|
||||||
parsed_args.group,
|
|
||||||
)
|
|
||||||
display_columns, property_columns = _get_columns(obj)
|
display_columns, property_columns = _get_columns(obj)
|
||||||
data = utils.get_dict_properties(
|
data = utils.get_dict_properties(
|
||||||
obj._info,
|
obj,
|
||||||
property_columns,
|
property_columns,
|
||||||
formatters=_formatters_compute
|
formatters=_formatters_compute
|
||||||
)
|
)
|
||||||
|
@ -16,11 +16,6 @@
|
|||||||
import argparse
|
import argparse
|
||||||
import logging
|
import logging
|
||||||
|
|
||||||
try:
|
|
||||||
from novaclient.v2 import security_group_rules as compute_secgroup_rules
|
|
||||||
except ImportError:
|
|
||||||
from novaclient.v1_1 import security_group_rules as compute_secgroup_rules
|
|
||||||
|
|
||||||
from osc_lib.cli import parseractions
|
from osc_lib.cli import parseractions
|
||||||
from osc_lib import exceptions
|
from osc_lib import exceptions
|
||||||
from osc_lib import utils
|
from osc_lib import utils
|
||||||
@ -348,10 +343,7 @@ class CreateSecurityGroupRule(common.NetworkAndComputeShowOne):
|
|||||||
return (display_columns, data)
|
return (display_columns, data)
|
||||||
|
|
||||||
def take_action_compute(self, client, parsed_args):
|
def take_action_compute(self, client, parsed_args):
|
||||||
group = utils.find_resource(
|
group = client.api.security_group_find(parsed_args.group)
|
||||||
client.security_groups,
|
|
||||||
parsed_args.group,
|
|
||||||
)
|
|
||||||
protocol = self._get_protocol(parsed_args)
|
protocol = self._get_protocol(parsed_args)
|
||||||
if protocol == 'icmp':
|
if protocol == 'icmp':
|
||||||
from_port, to_port = -1, -1
|
from_port, to_port = -1, -1
|
||||||
@ -363,10 +355,9 @@ class CreateSecurityGroupRule(common.NetworkAndComputeShowOne):
|
|||||||
remote_ip = None
|
remote_ip = None
|
||||||
if not (parsed_args.remote_group is None and
|
if not (parsed_args.remote_group is None and
|
||||||
parsed_args.src_group is None):
|
parsed_args.src_group is None):
|
||||||
parsed_args.remote_group = utils.find_resource(
|
parsed_args.remote_group = client.api.security_group_find(
|
||||||
client.security_groups,
|
|
||||||
parsed_args.remote_group or parsed_args.src_group,
|
parsed_args.remote_group or parsed_args.src_group,
|
||||||
).id
|
)['id']
|
||||||
if parsed_args.src_group:
|
if parsed_args.src_group:
|
||||||
LOG.warning(
|
LOG.warning(
|
||||||
_("The %(old)s option is deprecated, "
|
_("The %(old)s option is deprecated, "
|
||||||
@ -384,8 +375,9 @@ class CreateSecurityGroupRule(common.NetworkAndComputeShowOne):
|
|||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
remote_ip = '0.0.0.0/0'
|
remote_ip = '0.0.0.0/0'
|
||||||
|
|
||||||
obj = client.security_group_rules.create(
|
obj = client.security_group_rules.create(
|
||||||
group.id,
|
group['id'],
|
||||||
protocol,
|
protocol,
|
||||||
from_port,
|
from_port,
|
||||||
to_port,
|
to_port,
|
||||||
@ -567,27 +559,29 @@ class ListSecurityGroupRule(common.NetworkAndComputeLister):
|
|||||||
|
|
||||||
rules_to_list = []
|
rules_to_list = []
|
||||||
if parsed_args.group is not None:
|
if parsed_args.group is not None:
|
||||||
group = utils.find_resource(
|
group = client.api.security_group_find(
|
||||||
client.security_groups,
|
|
||||||
parsed_args.group,
|
parsed_args.group,
|
||||||
)
|
)
|
||||||
rules_to_list = group.rules
|
rules_to_list = group['rules']
|
||||||
else:
|
else:
|
||||||
columns = columns + ('parent_group_id',)
|
columns = columns + ('parent_group_id',)
|
||||||
search = {'all_tenants': parsed_args.all_projects}
|
search = {'all_tenants': parsed_args.all_projects}
|
||||||
for group in client.security_groups.list(search_opts=search):
|
for group in client.api.security_group_list(search_opts=search):
|
||||||
rules_to_list.extend(group.rules)
|
rules_to_list.extend(group['rules'])
|
||||||
|
|
||||||
# NOTE(rtheis): Turn the raw rules into resources.
|
# NOTE(rtheis): Turn the raw rules into resources.
|
||||||
rules = []
|
rules = []
|
||||||
for rule in rules_to_list:
|
for rule in rules_to_list:
|
||||||
rules.append(compute_secgroup_rules.SecurityGroupRule(
|
rules.append(
|
||||||
client.security_group_rules,
|
|
||||||
network_utils.transform_compute_security_group_rule(rule),
|
network_utils.transform_compute_security_group_rule(rule),
|
||||||
))
|
)
|
||||||
|
# rules.append(compute_secgroup_rules.SecurityGroupRule(
|
||||||
|
# client.security_group_rules,
|
||||||
|
# network_utils.transform_compute_security_group_rule(rule),
|
||||||
|
# ))
|
||||||
|
|
||||||
return (column_headers,
|
return (column_headers,
|
||||||
(utils.get_item_properties(
|
(utils.get_dict_properties(
|
||||||
s, columns,
|
s, columns,
|
||||||
) for s in rules))
|
) for s in rules))
|
||||||
|
|
||||||
@ -617,8 +611,8 @@ class ShowSecurityGroupRule(common.NetworkAndComputeShowOne):
|
|||||||
# the requested rule.
|
# the requested rule.
|
||||||
obj = None
|
obj = None
|
||||||
security_group_rules = []
|
security_group_rules = []
|
||||||
for security_group in client.security_groups.list():
|
for security_group in client.api.security_group_list():
|
||||||
security_group_rules.extend(security_group.rules)
|
security_group_rules.extend(security_group['rules'])
|
||||||
for security_group_rule in security_group_rules:
|
for security_group_rule in security_group_rules:
|
||||||
if parsed_args.rule == str(security_group_rule.get('id')):
|
if parsed_args.rule == str(security_group_rule.get('id')):
|
||||||
obj = security_group_rule
|
obj = security_group_rule
|
||||||
|
@ -371,7 +371,7 @@ class ServerTests(common.ComputeTestCase):
|
|||||||
server_name = uuid.uuid4().hex
|
server_name = uuid.uuid4().hex
|
||||||
server = json.loads(self.openstack(
|
server = json.loads(self.openstack(
|
||||||
# auto/none enable in nova micro version (v2.37+)
|
# auto/none enable in nova micro version (v2.37+)
|
||||||
'--os-compute-api-version 2.latest ' +
|
'--os-compute-api-version 2.37 ' +
|
||||||
'server create -f json ' +
|
'server create -f json ' +
|
||||||
'--flavor ' + self.flavor_name + ' ' +
|
'--flavor ' + self.flavor_name + ' ' +
|
||||||
'--image ' + self.image_name + ' ' +
|
'--image ' + self.image_name + ' ' +
|
||||||
@ -394,7 +394,7 @@ class ServerTests(common.ComputeTestCase):
|
|||||||
try:
|
try:
|
||||||
self.openstack(
|
self.openstack(
|
||||||
# auto/none enable in nova micro version (v2.37+)
|
# auto/none enable in nova micro version (v2.37+)
|
||||||
'--os-compute-api-version 2.latest ' +
|
'--os-compute-api-version 2.37 ' +
|
||||||
'server create -f json ' +
|
'server create -f json ' +
|
||||||
'--flavor ' + self.flavor_name + ' ' +
|
'--flavor ' + self.flavor_name + ' ' +
|
||||||
'--image ' + self.image_name + ' ' +
|
'--image ' + self.image_name + ' ' +
|
||||||
|
228
openstackclient/tests/unit/api/test_compute_v2.py
Normal file
228
openstackclient/tests/unit/api/test_compute_v2.py
Normal file
@ -0,0 +1,228 @@
|
|||||||
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
# not use this file except in compliance with the License. You may obtain
|
||||||
|
# a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
# License for the specific language governing permissions and limitations
|
||||||
|
# under the License.
|
||||||
|
#
|
||||||
|
|
||||||
|
"""Compute v2 API Library Tests"""
|
||||||
|
|
||||||
|
from requests_mock.contrib import fixture
|
||||||
|
|
||||||
|
from keystoneclient import session
|
||||||
|
from openstackclient.api import compute_v2 as compute
|
||||||
|
from openstackclient.tests.unit import utils
|
||||||
|
from osc_lib import exceptions as osc_lib_exceptions
|
||||||
|
|
||||||
|
|
||||||
|
FAKE_PROJECT = 'xyzpdq'
|
||||||
|
FAKE_URL = 'http://gopher.com/v2'
|
||||||
|
|
||||||
|
|
||||||
|
class TestComputeAPIv2(utils.TestCase):
|
||||||
|
|
||||||
|
def setUp(self):
|
||||||
|
super(TestComputeAPIv2, self).setUp()
|
||||||
|
sess = session.Session()
|
||||||
|
self.api = compute.APIv2(session=sess, endpoint=FAKE_URL)
|
||||||
|
self.requests_mock = self.useFixture(fixture.Fixture())
|
||||||
|
|
||||||
|
|
||||||
|
class TestSecurityGroup(TestComputeAPIv2):
|
||||||
|
|
||||||
|
FAKE_SECURITY_GROUP_RESP = {
|
||||||
|
'id': '1',
|
||||||
|
'name': 'sg1',
|
||||||
|
'description': 'test security group',
|
||||||
|
'tenant_id': '0123456789',
|
||||||
|
'rules': []
|
||||||
|
}
|
||||||
|
FAKE_SECURITY_GROUP_RESP_2 = {
|
||||||
|
'id': '2',
|
||||||
|
'name': 'sg2',
|
||||||
|
'description': 'another test security group',
|
||||||
|
'tenant_id': '0123456789',
|
||||||
|
'rules': []
|
||||||
|
}
|
||||||
|
LIST_SECURITY_GROUP_RESP = [
|
||||||
|
FAKE_SECURITY_GROUP_RESP_2,
|
||||||
|
FAKE_SECURITY_GROUP_RESP,
|
||||||
|
]
|
||||||
|
|
||||||
|
def test_security_group_create_default(self):
|
||||||
|
self.requests_mock.register_uri(
|
||||||
|
'POST',
|
||||||
|
FAKE_URL + '/os-security-groups',
|
||||||
|
json={'security_group': self.FAKE_SECURITY_GROUP_RESP},
|
||||||
|
status_code=200,
|
||||||
|
)
|
||||||
|
ret = self.api.security_group_create('sg1')
|
||||||
|
self.assertEqual(self.FAKE_SECURITY_GROUP_RESP, ret)
|
||||||
|
|
||||||
|
def test_security_group_create_options(self):
|
||||||
|
self.requests_mock.register_uri(
|
||||||
|
'POST',
|
||||||
|
FAKE_URL + '/os-security-groups',
|
||||||
|
json={'security_group': self.FAKE_SECURITY_GROUP_RESP},
|
||||||
|
status_code=200,
|
||||||
|
)
|
||||||
|
ret = self.api.security_group_create(
|
||||||
|
name='sg1',
|
||||||
|
description='desc',
|
||||||
|
)
|
||||||
|
self.assertEqual(self.FAKE_SECURITY_GROUP_RESP, ret)
|
||||||
|
|
||||||
|
def test_security_group_delete_id(self):
|
||||||
|
self.requests_mock.register_uri(
|
||||||
|
'GET',
|
||||||
|
FAKE_URL + '/os-security-groups/1',
|
||||||
|
json={'security_group': self.FAKE_SECURITY_GROUP_RESP},
|
||||||
|
status_code=200,
|
||||||
|
)
|
||||||
|
self.requests_mock.register_uri(
|
||||||
|
'DELETE',
|
||||||
|
FAKE_URL + '/os-security-groups/1',
|
||||||
|
status_code=202,
|
||||||
|
)
|
||||||
|
ret = self.api.security_group_delete('1')
|
||||||
|
self.assertEqual(202, ret.status_code)
|
||||||
|
self.assertEqual("", ret.text)
|
||||||
|
|
||||||
|
def test_security_group_delete_name(self):
|
||||||
|
self.requests_mock.register_uri(
|
||||||
|
'GET',
|
||||||
|
FAKE_URL + '/os-security-groups/sg1',
|
||||||
|
status_code=404,
|
||||||
|
)
|
||||||
|
self.requests_mock.register_uri(
|
||||||
|
'GET',
|
||||||
|
FAKE_URL + '/os-security-groups',
|
||||||
|
json={'security_groups': self.LIST_SECURITY_GROUP_RESP},
|
||||||
|
status_code=200,
|
||||||
|
)
|
||||||
|
self.requests_mock.register_uri(
|
||||||
|
'DELETE',
|
||||||
|
FAKE_URL + '/os-security-groups/1',
|
||||||
|
status_code=202,
|
||||||
|
)
|
||||||
|
ret = self.api.security_group_delete('sg1')
|
||||||
|
self.assertEqual(202, ret.status_code)
|
||||||
|
self.assertEqual("", ret.text)
|
||||||
|
|
||||||
|
def test_security_group_delete_not_found(self):
|
||||||
|
self.requests_mock.register_uri(
|
||||||
|
'GET',
|
||||||
|
FAKE_URL + '/os-security-groups/sg3',
|
||||||
|
status_code=404,
|
||||||
|
)
|
||||||
|
self.requests_mock.register_uri(
|
||||||
|
'GET',
|
||||||
|
FAKE_URL + '/os-security-groups',
|
||||||
|
json={'security_groups': self.LIST_SECURITY_GROUP_RESP},
|
||||||
|
status_code=200,
|
||||||
|
)
|
||||||
|
self.assertRaises(
|
||||||
|
osc_lib_exceptions.NotFound,
|
||||||
|
self.api.security_group_delete,
|
||||||
|
'sg3',
|
||||||
|
)
|
||||||
|
|
||||||
|
def test_security_group_find_id(self):
|
||||||
|
self.requests_mock.register_uri(
|
||||||
|
'GET',
|
||||||
|
FAKE_URL + '/os-security-groups/1',
|
||||||
|
json={'security_group': self.FAKE_SECURITY_GROUP_RESP},
|
||||||
|
status_code=200,
|
||||||
|
)
|
||||||
|
ret = self.api.security_group_find('1')
|
||||||
|
self.assertEqual(self.FAKE_SECURITY_GROUP_RESP, ret)
|
||||||
|
|
||||||
|
def test_security_group_find_name(self):
|
||||||
|
self.requests_mock.register_uri(
|
||||||
|
'GET',
|
||||||
|
FAKE_URL + '/os-security-groups/sg2',
|
||||||
|
status_code=404,
|
||||||
|
)
|
||||||
|
self.requests_mock.register_uri(
|
||||||
|
'GET',
|
||||||
|
FAKE_URL + '/os-security-groups',
|
||||||
|
json={'security_groups': self.LIST_SECURITY_GROUP_RESP},
|
||||||
|
status_code=200,
|
||||||
|
)
|
||||||
|
ret = self.api.security_group_find('sg2')
|
||||||
|
self.assertEqual(self.FAKE_SECURITY_GROUP_RESP_2, ret)
|
||||||
|
|
||||||
|
def test_security_group_find_not_found(self):
|
||||||
|
self.requests_mock.register_uri(
|
||||||
|
'GET',
|
||||||
|
FAKE_URL + '/os-security-groups/sg3',
|
||||||
|
status_code=404,
|
||||||
|
)
|
||||||
|
self.requests_mock.register_uri(
|
||||||
|
'GET',
|
||||||
|
FAKE_URL + '/os-security-groups',
|
||||||
|
json={'security_groups': self.LIST_SECURITY_GROUP_RESP},
|
||||||
|
status_code=200,
|
||||||
|
)
|
||||||
|
self.assertRaises(
|
||||||
|
osc_lib_exceptions.NotFound,
|
||||||
|
self.api.security_group_find,
|
||||||
|
'sg3',
|
||||||
|
)
|
||||||
|
|
||||||
|
def test_security_group_list_no_options(self):
|
||||||
|
self.requests_mock.register_uri(
|
||||||
|
'GET',
|
||||||
|
FAKE_URL + '/os-security-groups',
|
||||||
|
json={'security_groups': self.LIST_SECURITY_GROUP_RESP},
|
||||||
|
status_code=200,
|
||||||
|
)
|
||||||
|
ret = self.api.security_group_list()
|
||||||
|
self.assertEqual(self.LIST_SECURITY_GROUP_RESP, ret)
|
||||||
|
|
||||||
|
def test_security_group_set_options_id(self):
|
||||||
|
self.requests_mock.register_uri(
|
||||||
|
'GET',
|
||||||
|
FAKE_URL + '/os-security-groups/1',
|
||||||
|
json={'security_group': self.FAKE_SECURITY_GROUP_RESP},
|
||||||
|
status_code=200,
|
||||||
|
)
|
||||||
|
self.requests_mock.register_uri(
|
||||||
|
'PUT',
|
||||||
|
FAKE_URL + '/os-security-groups/1',
|
||||||
|
json={'security_group': self.FAKE_SECURITY_GROUP_RESP},
|
||||||
|
status_code=200,
|
||||||
|
)
|
||||||
|
ret = self.api.security_group_set(
|
||||||
|
security_group='1',
|
||||||
|
description='desc2')
|
||||||
|
self.assertEqual(self.FAKE_SECURITY_GROUP_RESP, ret)
|
||||||
|
|
||||||
|
def test_security_group_set_options_name(self):
|
||||||
|
self.requests_mock.register_uri(
|
||||||
|
'GET',
|
||||||
|
FAKE_URL + '/os-security-groups/sg2',
|
||||||
|
status_code=404,
|
||||||
|
)
|
||||||
|
self.requests_mock.register_uri(
|
||||||
|
'GET',
|
||||||
|
FAKE_URL + '/os-security-groups',
|
||||||
|
json={'security_groups': self.LIST_SECURITY_GROUP_RESP},
|
||||||
|
status_code=200,
|
||||||
|
)
|
||||||
|
self.requests_mock.register_uri(
|
||||||
|
'PUT',
|
||||||
|
FAKE_URL + '/os-security-groups/2',
|
||||||
|
json={'security_group': self.FAKE_SECURITY_GROUP_RESP_2},
|
||||||
|
status_code=200,
|
||||||
|
)
|
||||||
|
ret = self.api.security_group_set(
|
||||||
|
security_group='sg2',
|
||||||
|
description='desc2')
|
||||||
|
self.assertEqual(self.FAKE_SECURITY_GROUP_RESP_2, ret)
|
@ -17,6 +17,7 @@ import copy
|
|||||||
import mock
|
import mock
|
||||||
import uuid
|
import uuid
|
||||||
|
|
||||||
|
from openstackclient.api import compute_v2
|
||||||
from openstackclient.tests.unit import fakes
|
from openstackclient.tests.unit import fakes
|
||||||
from openstackclient.tests.unit.identity.v2_0 import fakes as identity_fakes
|
from openstackclient.tests.unit.identity.v2_0 import fakes as identity_fakes
|
||||||
from openstackclient.tests.unit.image.v2 import fakes as image_fakes
|
from openstackclient.tests.unit.image.v2 import fakes as image_fakes
|
||||||
@ -180,9 +181,6 @@ class FakeComputev2Client(object):
|
|||||||
self.hypervisors_stats = mock.Mock()
|
self.hypervisors_stats = mock.Mock()
|
||||||
self.hypervisors_stats.resource_class = fakes.FakeResource(None, {})
|
self.hypervisors_stats.resource_class = fakes.FakeResource(None, {})
|
||||||
|
|
||||||
self.security_groups = mock.Mock()
|
|
||||||
self.security_groups.resource_class = fakes.FakeResource(None, {})
|
|
||||||
|
|
||||||
self.security_group_rules = mock.Mock()
|
self.security_group_rules = mock.Mock()
|
||||||
self.security_group_rules.resource_class = fakes.FakeResource(None, {})
|
self.security_group_rules.resource_class = fakes.FakeResource(None, {})
|
||||||
|
|
||||||
@ -222,6 +220,11 @@ class TestComputev2(utils.TestCommand):
|
|||||||
token=fakes.AUTH_TOKEN,
|
token=fakes.AUTH_TOKEN,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
self.app.client_manager.compute.api = compute_v2.APIv2(
|
||||||
|
session=self.app.client_manager.session,
|
||||||
|
endpoint=fakes.AUTH_URL,
|
||||||
|
)
|
||||||
|
|
||||||
self.app.client_manager.identity = identity_fakes.FakeIdentityv2Client(
|
self.app.client_manager.identity = identity_fakes.FakeIdentityv2Client(
|
||||||
endpoint=fakes.AUTH_URL,
|
endpoint=fakes.AUTH_URL,
|
||||||
token=fakes.AUTH_TOKEN,
|
token=fakes.AUTH_TOKEN,
|
||||||
@ -485,11 +488,7 @@ class FakeSecurityGroup(object):
|
|||||||
|
|
||||||
# Overwrite default attributes.
|
# Overwrite default attributes.
|
||||||
security_group_attrs.update(attrs)
|
security_group_attrs.update(attrs)
|
||||||
|
return security_group_attrs
|
||||||
security_group = fakes.FakeResource(
|
|
||||||
info=copy.deepcopy(security_group_attrs),
|
|
||||||
loaded=True)
|
|
||||||
return security_group
|
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def create_security_groups(attrs=None, count=2):
|
def create_security_groups(attrs=None, count=2):
|
||||||
|
@ -41,11 +41,6 @@ class TestServer(compute_fakes.TestComputev2):
|
|||||||
self.flavors_mock = self.app.client_manager.compute.flavors
|
self.flavors_mock = self.app.client_manager.compute.flavors
|
||||||
self.flavors_mock.reset_mock()
|
self.flavors_mock.reset_mock()
|
||||||
|
|
||||||
# Get a shortcut to the compute client SecurityGroupManager Mock
|
|
||||||
self.security_groups_mock = \
|
|
||||||
self.app.client_manager.compute.security_groups
|
|
||||||
self.security_groups_mock.reset_mock()
|
|
||||||
|
|
||||||
# Get a shortcut to the image client ImageManager Mock
|
# Get a shortcut to the image client ImageManager Mock
|
||||||
self.images_mock = self.app.client_manager.image.images
|
self.images_mock = self.app.client_manager.image.images
|
||||||
self.images_mock.reset_mock()
|
self.images_mock.reset_mock()
|
||||||
@ -232,6 +227,9 @@ class TestServerAddPort(TestServer):
|
|||||||
self.find_port.assert_not_called()
|
self.find_port.assert_not_called()
|
||||||
|
|
||||||
|
|
||||||
|
@mock.patch(
|
||||||
|
'openstackclient.api.compute_v2.APIv2.security_group_find'
|
||||||
|
)
|
||||||
class TestServerAddSecurityGroup(TestServer):
|
class TestServerAddSecurityGroup(TestServer):
|
||||||
|
|
||||||
def setUp(self):
|
def setUp(self):
|
||||||
@ -239,11 +237,9 @@ class TestServerAddSecurityGroup(TestServer):
|
|||||||
|
|
||||||
self.security_group = \
|
self.security_group = \
|
||||||
compute_fakes.FakeSecurityGroup.create_one_security_group()
|
compute_fakes.FakeSecurityGroup.create_one_security_group()
|
||||||
# This is the return value for utils.find_resource() for security group
|
|
||||||
self.security_groups_mock.get.return_value = self.security_group
|
|
||||||
|
|
||||||
attrs = {
|
attrs = {
|
||||||
'security_groups': [{'name': self.security_group.id}]
|
'security_groups': [{'name': self.security_group['id']}]
|
||||||
}
|
}
|
||||||
methods = {
|
methods = {
|
||||||
'add_security_group': None,
|
'add_security_group': None,
|
||||||
@ -259,23 +255,24 @@ class TestServerAddSecurityGroup(TestServer):
|
|||||||
# Get the command object to test
|
# Get the command object to test
|
||||||
self.cmd = server.AddServerSecurityGroup(self.app, None)
|
self.cmd = server.AddServerSecurityGroup(self.app, None)
|
||||||
|
|
||||||
def test_server_add_security_group(self):
|
def test_server_add_security_group(self, sg_find_mock):
|
||||||
|
sg_find_mock.return_value = self.security_group
|
||||||
arglist = [
|
arglist = [
|
||||||
self.server.id,
|
self.server.id,
|
||||||
self.security_group.id
|
self.security_group['id']
|
||||||
]
|
]
|
||||||
verifylist = [
|
verifylist = [
|
||||||
('server', self.server.id),
|
('server', self.server.id),
|
||||||
('group', self.security_group.id),
|
('group', self.security_group['id']),
|
||||||
]
|
]
|
||||||
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
|
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
|
||||||
result = self.cmd.take_action(parsed_args)
|
result = self.cmd.take_action(parsed_args)
|
||||||
self.security_groups_mock.get.assert_called_with(
|
sg_find_mock.assert_called_with(
|
||||||
self.security_group.id,
|
self.security_group['id'],
|
||||||
)
|
)
|
||||||
self.servers_mock.get.assert_called_with(self.server.id)
|
self.servers_mock.get.assert_called_with(self.server.id)
|
||||||
self.server.add_security_group.assert_called_with(
|
self.server.add_security_group.assert_called_with(
|
||||||
self.security_group.id,
|
self.security_group['id'],
|
||||||
)
|
)
|
||||||
self.assertIsNone(result)
|
self.assertIsNone(result)
|
||||||
|
|
||||||
@ -1716,6 +1713,9 @@ class TestServerRemovePort(TestServer):
|
|||||||
self.find_port.assert_not_called()
|
self.find_port.assert_not_called()
|
||||||
|
|
||||||
|
|
||||||
|
@mock.patch(
|
||||||
|
'openstackclient.api.compute_v2.APIv2.security_group_find'
|
||||||
|
)
|
||||||
class TestServerRemoveSecurityGroup(TestServer):
|
class TestServerRemoveSecurityGroup(TestServer):
|
||||||
|
|
||||||
def setUp(self):
|
def setUp(self):
|
||||||
@ -1723,11 +1723,9 @@ class TestServerRemoveSecurityGroup(TestServer):
|
|||||||
|
|
||||||
self.security_group = \
|
self.security_group = \
|
||||||
compute_fakes.FakeSecurityGroup.create_one_security_group()
|
compute_fakes.FakeSecurityGroup.create_one_security_group()
|
||||||
# This is the return value for utils.find_resource() for security group
|
|
||||||
self.security_groups_mock.get.return_value = self.security_group
|
|
||||||
|
|
||||||
attrs = {
|
attrs = {
|
||||||
'security_groups': [{'name': self.security_group.id}]
|
'security_groups': [{'name': self.security_group['id']}]
|
||||||
}
|
}
|
||||||
methods = {
|
methods = {
|
||||||
'remove_security_group': None,
|
'remove_security_group': None,
|
||||||
@ -1743,23 +1741,24 @@ class TestServerRemoveSecurityGroup(TestServer):
|
|||||||
# Get the command object to test
|
# Get the command object to test
|
||||||
self.cmd = server.RemoveServerSecurityGroup(self.app, None)
|
self.cmd = server.RemoveServerSecurityGroup(self.app, None)
|
||||||
|
|
||||||
def test_server_remove_security_group(self):
|
def test_server_remove_security_group(self, sg_find_mock):
|
||||||
|
sg_find_mock.return_value = self.security_group
|
||||||
arglist = [
|
arglist = [
|
||||||
self.server.id,
|
self.server.id,
|
||||||
self.security_group.id
|
self.security_group['id']
|
||||||
]
|
]
|
||||||
verifylist = [
|
verifylist = [
|
||||||
('server', self.server.id),
|
('server', self.server.id),
|
||||||
('group', self.security_group.id),
|
('group', self.security_group['id']),
|
||||||
]
|
]
|
||||||
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
|
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
|
||||||
result = self.cmd.take_action(parsed_args)
|
result = self.cmd.take_action(parsed_args)
|
||||||
self.security_groups_mock.get.assert_called_with(
|
sg_find_mock.assert_called_with(
|
||||||
self.security_group.id,
|
self.security_group['id'],
|
||||||
)
|
)
|
||||||
self.servers_mock.get.assert_called_with(self.server.id)
|
self.servers_mock.get.assert_called_with(self.server.id)
|
||||||
self.server.remove_security_group.assert_called_with(
|
self.server.remove_security_group.assert_called_with(
|
||||||
self.security_group.id,
|
self.security_group['id'],
|
||||||
)
|
)
|
||||||
self.assertIsNone(result)
|
self.assertIsNone(result)
|
||||||
|
|
||||||
|
@ -228,6 +228,9 @@ class FakeResource(object):
|
|||||||
def get(self, item, default=None):
|
def get(self, item, default=None):
|
||||||
return self._info.get(item, default)
|
return self._info.get(item, default)
|
||||||
|
|
||||||
|
def pop(self, key, default_value=None):
|
||||||
|
return self.info.pop(key, default_value)
|
||||||
|
|
||||||
|
|
||||||
class FakeResponse(requests.Response):
|
class FakeResponse(requests.Response):
|
||||||
|
|
||||||
|
@ -31,10 +31,14 @@ class TestSecurityGroupCompute(compute_fakes.TestComputev2):
|
|||||||
self.compute = self.app.client_manager.compute
|
self.compute = self.app.client_manager.compute
|
||||||
|
|
||||||
|
|
||||||
|
@mock.patch(
|
||||||
|
'openstackclient.api.compute_v2.APIv2.security_group_create'
|
||||||
|
)
|
||||||
class TestCreateSecurityGroupCompute(TestSecurityGroupCompute):
|
class TestCreateSecurityGroupCompute(TestSecurityGroupCompute):
|
||||||
|
|
||||||
project = identity_fakes.FakeProject.create_one_project()
|
project = identity_fakes.FakeProject.create_one_project()
|
||||||
domain = identity_fakes.FakeDomain.create_one_domain()
|
domain = identity_fakes.FakeDomain.create_one_domain()
|
||||||
|
|
||||||
# The security group to be shown.
|
# The security group to be shown.
|
||||||
_security_group = \
|
_security_group = \
|
||||||
compute_fakes.FakeSecurityGroup.create_one_security_group()
|
compute_fakes.FakeSecurityGroup.create_one_security_group()
|
||||||
@ -48,10 +52,10 @@ class TestCreateSecurityGroupCompute(TestSecurityGroupCompute):
|
|||||||
)
|
)
|
||||||
|
|
||||||
data = (
|
data = (
|
||||||
_security_group.description,
|
_security_group['description'],
|
||||||
_security_group.id,
|
_security_group['id'],
|
||||||
_security_group.name,
|
_security_group['name'],
|
||||||
_security_group.tenant_id,
|
_security_group['tenant_id'],
|
||||||
'',
|
'',
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -60,61 +64,57 @@ class TestCreateSecurityGroupCompute(TestSecurityGroupCompute):
|
|||||||
|
|
||||||
self.app.client_manager.network_endpoint_enabled = False
|
self.app.client_manager.network_endpoint_enabled = False
|
||||||
|
|
||||||
self.compute.security_groups.create.return_value = self._security_group
|
|
||||||
|
|
||||||
# Get the command object to test
|
# Get the command object to test
|
||||||
self.cmd = security_group.CreateSecurityGroup(self.app, None)
|
self.cmd = security_group.CreateSecurityGroup(self.app, None)
|
||||||
|
|
||||||
def test_create_no_options(self):
|
def test_security_group_create_no_options(self, sg_mock):
|
||||||
self.assertRaises(tests_utils.ParserException,
|
self.assertRaises(tests_utils.ParserException,
|
||||||
self.check_parser, self.cmd, [], [])
|
self.check_parser, self.cmd, [], [])
|
||||||
|
|
||||||
def test_create_network_options(self):
|
def test_security_group_create_min_options(self, sg_mock):
|
||||||
|
sg_mock.return_value = self._security_group
|
||||||
arglist = [
|
arglist = [
|
||||||
'--project', self.project.name,
|
self._security_group['name'],
|
||||||
'--project-domain', self.domain.name,
|
|
||||||
self._security_group.name,
|
|
||||||
]
|
|
||||||
self.assertRaises(tests_utils.ParserException,
|
|
||||||
self.check_parser, self.cmd, arglist, [])
|
|
||||||
|
|
||||||
def test_create_min_options(self):
|
|
||||||
arglist = [
|
|
||||||
self._security_group.name,
|
|
||||||
]
|
]
|
||||||
verifylist = [
|
verifylist = [
|
||||||
('name', self._security_group.name),
|
('name', self._security_group['name']),
|
||||||
]
|
]
|
||||||
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
|
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
|
||||||
|
|
||||||
columns, data = self.cmd.take_action(parsed_args)
|
columns, data = self.cmd.take_action(parsed_args)
|
||||||
|
|
||||||
self.compute.security_groups.create.assert_called_once_with(
|
sg_mock.assert_called_once_with(
|
||||||
self._security_group.name,
|
self._security_group['name'],
|
||||||
self._security_group.name)
|
self._security_group['name'],
|
||||||
|
)
|
||||||
self.assertEqual(self.columns, columns)
|
self.assertEqual(self.columns, columns)
|
||||||
self.assertEqual(self.data, data)
|
self.assertEqual(self.data, data)
|
||||||
|
|
||||||
def test_create_all_options(self):
|
def test_security_group_create_all_options(self, sg_mock):
|
||||||
|
sg_mock.return_value = self._security_group
|
||||||
arglist = [
|
arglist = [
|
||||||
'--description', self._security_group.description,
|
'--description', self._security_group['description'],
|
||||||
self._security_group.name,
|
self._security_group['name'],
|
||||||
]
|
]
|
||||||
verifylist = [
|
verifylist = [
|
||||||
('description', self._security_group.description),
|
('description', self._security_group['description']),
|
||||||
('name', self._security_group.name),
|
('name', self._security_group['name']),
|
||||||
]
|
]
|
||||||
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
|
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
|
||||||
|
|
||||||
columns, data = self.cmd.take_action(parsed_args)
|
columns, data = self.cmd.take_action(parsed_args)
|
||||||
|
|
||||||
self.compute.security_groups.create.assert_called_once_with(
|
sg_mock.assert_called_once_with(
|
||||||
self._security_group.name,
|
self._security_group['name'],
|
||||||
self._security_group.description)
|
self._security_group['description'],
|
||||||
|
)
|
||||||
self.assertEqual(self.columns, columns)
|
self.assertEqual(self.columns, columns)
|
||||||
self.assertEqual(self.data, data)
|
self.assertEqual(self.data, data)
|
||||||
|
|
||||||
|
|
||||||
|
@mock.patch(
|
||||||
|
'openstackclient.api.compute_v2.APIv2.security_group_delete'
|
||||||
|
)
|
||||||
class TestDeleteSecurityGroupCompute(TestSecurityGroupCompute):
|
class TestDeleteSecurityGroupCompute(TestSecurityGroupCompute):
|
||||||
|
|
||||||
# The security groups to be deleted.
|
# The security groups to be deleted.
|
||||||
@ -126,9 +126,7 @@ class TestDeleteSecurityGroupCompute(TestSecurityGroupCompute):
|
|||||||
|
|
||||||
self.app.client_manager.network_endpoint_enabled = False
|
self.app.client_manager.network_endpoint_enabled = False
|
||||||
|
|
||||||
self.compute.security_groups.delete = mock.Mock(return_value=None)
|
self.compute.api.security_group_find = (
|
||||||
|
|
||||||
self.compute.security_groups.get = (
|
|
||||||
compute_fakes.FakeSecurityGroup.get_security_groups(
|
compute_fakes.FakeSecurityGroup.get_security_groups(
|
||||||
self._security_groups)
|
self._security_groups)
|
||||||
)
|
)
|
||||||
@ -136,27 +134,30 @@ class TestDeleteSecurityGroupCompute(TestSecurityGroupCompute):
|
|||||||
# Get the command object to test
|
# Get the command object to test
|
||||||
self.cmd = security_group.DeleteSecurityGroup(self.app, None)
|
self.cmd = security_group.DeleteSecurityGroup(self.app, None)
|
||||||
|
|
||||||
def test_security_group_delete(self):
|
def test_security_group_delete(self, sg_mock):
|
||||||
|
sg_mock.return_value = mock.Mock(return_value=None)
|
||||||
arglist = [
|
arglist = [
|
||||||
self._security_groups[0].id,
|
self._security_groups[0]['id'],
|
||||||
]
|
]
|
||||||
verifylist = [
|
verifylist = [
|
||||||
('group', [self._security_groups[0].id]),
|
('group', [self._security_groups[0]['id']]),
|
||||||
]
|
]
|
||||||
|
|
||||||
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
|
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
|
||||||
result = self.cmd.take_action(parsed_args)
|
result = self.cmd.take_action(parsed_args)
|
||||||
|
|
||||||
self.compute.security_groups.delete.assert_called_once_with(
|
sg_mock.assert_called_once_with(
|
||||||
self._security_groups[0].id)
|
self._security_groups[0]['id'],
|
||||||
|
)
|
||||||
self.assertIsNone(result)
|
self.assertIsNone(result)
|
||||||
|
|
||||||
def test_multi_security_groups_delete(self):
|
def test_security_group_multi_delete(self, sg_mock):
|
||||||
|
sg_mock.return_value = mock.Mock(return_value=None)
|
||||||
arglist = []
|
arglist = []
|
||||||
verifylist = []
|
verifylist = []
|
||||||
|
|
||||||
for s in self._security_groups:
|
for s in self._security_groups:
|
||||||
arglist.append(s.id)
|
arglist.append(s['id'])
|
||||||
verifylist = [
|
verifylist = [
|
||||||
('group', arglist),
|
('group', arglist),
|
||||||
]
|
]
|
||||||
@ -166,43 +167,39 @@ class TestDeleteSecurityGroupCompute(TestSecurityGroupCompute):
|
|||||||
|
|
||||||
calls = []
|
calls = []
|
||||||
for s in self._security_groups:
|
for s in self._security_groups:
|
||||||
calls.append(call(s.id))
|
calls.append(call(s['id']))
|
||||||
self.compute.security_groups.delete.assert_has_calls(calls)
|
sg_mock.assert_has_calls(calls)
|
||||||
self.assertIsNone(result)
|
self.assertIsNone(result)
|
||||||
|
|
||||||
def test_multi_security_groups_delete_with_exception(self):
|
def test_security_group_multi_delete_with_exception(self, sg_mock):
|
||||||
|
sg_mock.return_value = mock.Mock(return_value=None)
|
||||||
|
sg_mock.side_effect = ([
|
||||||
|
mock.Mock(return_value=None),
|
||||||
|
exceptions.CommandError,
|
||||||
|
])
|
||||||
arglist = [
|
arglist = [
|
||||||
self._security_groups[0].id,
|
self._security_groups[0]['id'],
|
||||||
'unexist_security_group',
|
'unexist_security_group',
|
||||||
]
|
]
|
||||||
verifylist = [
|
verifylist = [
|
||||||
('group',
|
('group',
|
||||||
[self._security_groups[0].id, 'unexist_security_group']),
|
[self._security_groups[0]['id'], 'unexist_security_group']),
|
||||||
]
|
]
|
||||||
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
|
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
|
||||||
|
|
||||||
find_mock_result = [self._security_groups[0], exceptions.CommandError]
|
|
||||||
self.compute.security_groups.get = (
|
|
||||||
mock.Mock(side_effect=find_mock_result)
|
|
||||||
)
|
|
||||||
self.compute.security_groups.find.side_effect = (
|
|
||||||
exceptions.NotFound(None))
|
|
||||||
|
|
||||||
try:
|
try:
|
||||||
self.cmd.take_action(parsed_args)
|
self.cmd.take_action(parsed_args)
|
||||||
self.fail('CommandError should be raised.')
|
self.fail('CommandError should be raised.')
|
||||||
except exceptions.CommandError as e:
|
except exceptions.CommandError as e:
|
||||||
self.assertEqual('1 of 2 groups failed to delete.', str(e))
|
self.assertEqual('1 of 2 groups failed to delete.', str(e))
|
||||||
|
|
||||||
self.compute.security_groups.get.assert_any_call(
|
sg_mock.assert_any_call(self._security_groups[0]['id'])
|
||||||
self._security_groups[0].id)
|
sg_mock.assert_any_call('unexist_security_group')
|
||||||
self.compute.security_groups.get.assert_any_call(
|
|
||||||
'unexist_security_group')
|
|
||||||
self.compute.security_groups.delete.assert_called_once_with(
|
|
||||||
self._security_groups[0].id
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
|
@mock.patch(
|
||||||
|
'openstackclient.api.compute_v2.APIv2.security_group_list'
|
||||||
|
)
|
||||||
class TestListSecurityGroupCompute(TestSecurityGroupCompute):
|
class TestListSecurityGroupCompute(TestSecurityGroupCompute):
|
||||||
|
|
||||||
# The security group to be listed.
|
# The security group to be listed.
|
||||||
@ -224,29 +221,29 @@ class TestListSecurityGroupCompute(TestSecurityGroupCompute):
|
|||||||
data = []
|
data = []
|
||||||
for grp in _security_groups:
|
for grp in _security_groups:
|
||||||
data.append((
|
data.append((
|
||||||
grp.id,
|
grp['id'],
|
||||||
grp.name,
|
grp['name'],
|
||||||
grp.description,
|
grp['description'],
|
||||||
))
|
))
|
||||||
data_all_projects = []
|
data_all_projects = []
|
||||||
for grp in _security_groups:
|
for grp in _security_groups:
|
||||||
data_all_projects.append((
|
data_all_projects.append((
|
||||||
grp.id,
|
grp['id'],
|
||||||
grp.name,
|
grp['name'],
|
||||||
grp.description,
|
grp['description'],
|
||||||
grp.tenant_id,
|
grp['tenant_id'],
|
||||||
))
|
))
|
||||||
|
|
||||||
def setUp(self):
|
def setUp(self):
|
||||||
super(TestListSecurityGroupCompute, self).setUp()
|
super(TestListSecurityGroupCompute, self).setUp()
|
||||||
|
|
||||||
self.app.client_manager.network_endpoint_enabled = False
|
self.app.client_manager.network_endpoint_enabled = False
|
||||||
self.compute.security_groups.list.return_value = self._security_groups
|
|
||||||
|
|
||||||
# Get the command object to test
|
# Get the command object to test
|
||||||
self.cmd = security_group.ListSecurityGroup(self.app, None)
|
self.cmd = security_group.ListSecurityGroup(self.app, None)
|
||||||
|
|
||||||
def test_security_group_list_no_options(self):
|
def test_security_group_list_no_options(self, sg_mock):
|
||||||
|
sg_mock.return_value = self._security_groups
|
||||||
arglist = []
|
arglist = []
|
||||||
verifylist = [
|
verifylist = [
|
||||||
('all_projects', False),
|
('all_projects', False),
|
||||||
@ -256,11 +253,12 @@ class TestListSecurityGroupCompute(TestSecurityGroupCompute):
|
|||||||
columns, data = self.cmd.take_action(parsed_args)
|
columns, data = self.cmd.take_action(parsed_args)
|
||||||
|
|
||||||
kwargs = {'search_opts': {'all_tenants': False}}
|
kwargs = {'search_opts': {'all_tenants': False}}
|
||||||
self.compute.security_groups.list.assert_called_once_with(**kwargs)
|
sg_mock.assert_called_once_with(**kwargs)
|
||||||
self.assertEqual(self.columns, columns)
|
self.assertEqual(self.columns, columns)
|
||||||
self.assertEqual(self.data, list(data))
|
self.assertEqual(self.data, list(data))
|
||||||
|
|
||||||
def test_security_group_list_all_projects(self):
|
def test_security_group_list_all_projects(self, sg_mock):
|
||||||
|
sg_mock.return_value = self._security_groups
|
||||||
arglist = [
|
arglist = [
|
||||||
'--all-projects',
|
'--all-projects',
|
||||||
]
|
]
|
||||||
@ -272,11 +270,14 @@ class TestListSecurityGroupCompute(TestSecurityGroupCompute):
|
|||||||
columns, data = self.cmd.take_action(parsed_args)
|
columns, data = self.cmd.take_action(parsed_args)
|
||||||
|
|
||||||
kwargs = {'search_opts': {'all_tenants': True}}
|
kwargs = {'search_opts': {'all_tenants': True}}
|
||||||
self.compute.security_groups.list.assert_called_once_with(**kwargs)
|
sg_mock.assert_called_once_with(**kwargs)
|
||||||
self.assertEqual(self.columns_all_projects, columns)
|
self.assertEqual(self.columns_all_projects, columns)
|
||||||
self.assertEqual(self.data_all_projects, list(data))
|
self.assertEqual(self.data_all_projects, list(data))
|
||||||
|
|
||||||
|
|
||||||
|
@mock.patch(
|
||||||
|
'openstackclient.api.compute_v2.APIv2.security_group_set'
|
||||||
|
)
|
||||||
class TestSetSecurityGroupCompute(TestSecurityGroupCompute):
|
class TestSetSecurityGroupCompute(TestSecurityGroupCompute):
|
||||||
|
|
||||||
# The security group to be set.
|
# The security group to be set.
|
||||||
@ -288,54 +289,54 @@ class TestSetSecurityGroupCompute(TestSecurityGroupCompute):
|
|||||||
|
|
||||||
self.app.client_manager.network_endpoint_enabled = False
|
self.app.client_manager.network_endpoint_enabled = False
|
||||||
|
|
||||||
self.compute.security_groups.update = mock.Mock(return_value=None)
|
self.compute.api.security_group_find = mock.Mock(
|
||||||
|
|
||||||
self.compute.security_groups.get = mock.Mock(
|
|
||||||
return_value=self._security_group)
|
return_value=self._security_group)
|
||||||
|
|
||||||
# Get the command object to test
|
# Get the command object to test
|
||||||
self.cmd = security_group.SetSecurityGroup(self.app, None)
|
self.cmd = security_group.SetSecurityGroup(self.app, None)
|
||||||
|
|
||||||
def test_set_no_options(self):
|
def test_security_group_set_no_options(self, sg_mock):
|
||||||
self.assertRaises(tests_utils.ParserException,
|
self.assertRaises(tests_utils.ParserException,
|
||||||
self.check_parser, self.cmd, [], [])
|
self.check_parser, self.cmd, [], [])
|
||||||
|
|
||||||
def test_set_no_updates(self):
|
def test_security_group_set_no_updates(self, sg_mock):
|
||||||
|
sg_mock.return_value = mock.Mock(return_value=None)
|
||||||
arglist = [
|
arglist = [
|
||||||
self._security_group.name,
|
self._security_group['name'],
|
||||||
]
|
]
|
||||||
verifylist = [
|
verifylist = [
|
||||||
('group', self._security_group.name),
|
('group', self._security_group['name']),
|
||||||
]
|
]
|
||||||
|
|
||||||
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
|
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
|
||||||
result = self.cmd.take_action(parsed_args)
|
result = self.cmd.take_action(parsed_args)
|
||||||
|
|
||||||
self.compute.security_groups.update.assert_called_once_with(
|
sg_mock.assert_called_once_with(
|
||||||
self._security_group,
|
self._security_group,
|
||||||
self._security_group.name,
|
self._security_group['name'],
|
||||||
self._security_group.description
|
self._security_group['description'],
|
||||||
)
|
)
|
||||||
self.assertIsNone(result)
|
self.assertIsNone(result)
|
||||||
|
|
||||||
def test_set_all_options(self):
|
def test_security_group_set_all_options(self, sg_mock):
|
||||||
new_name = 'new-' + self._security_group.name
|
sg_mock.return_value = mock.Mock(return_value=None)
|
||||||
new_description = 'new-' + self._security_group.description
|
new_name = 'new-' + self._security_group['name']
|
||||||
|
new_description = 'new-' + self._security_group['description']
|
||||||
arglist = [
|
arglist = [
|
||||||
'--name', new_name,
|
'--name', new_name,
|
||||||
'--description', new_description,
|
'--description', new_description,
|
||||||
self._security_group.name,
|
self._security_group['name'],
|
||||||
]
|
]
|
||||||
verifylist = [
|
verifylist = [
|
||||||
('description', new_description),
|
('description', new_description),
|
||||||
('group', self._security_group.name),
|
('group', self._security_group['name']),
|
||||||
('name', new_name),
|
('name', new_name),
|
||||||
]
|
]
|
||||||
|
|
||||||
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
|
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
|
||||||
result = self.cmd.take_action(parsed_args)
|
result = self.cmd.take_action(parsed_args)
|
||||||
|
|
||||||
self.compute.security_groups.update.assert_called_once_with(
|
sg_mock.assert_called_once_with(
|
||||||
self._security_group,
|
self._security_group,
|
||||||
new_name,
|
new_name,
|
||||||
new_description
|
new_description
|
||||||
@ -343,6 +344,9 @@ class TestSetSecurityGroupCompute(TestSecurityGroupCompute):
|
|||||||
self.assertIsNone(result)
|
self.assertIsNone(result)
|
||||||
|
|
||||||
|
|
||||||
|
@mock.patch(
|
||||||
|
'openstackclient.api.compute_v2.APIv2.security_group_find'
|
||||||
|
)
|
||||||
class TestShowSecurityGroupCompute(TestSecurityGroupCompute):
|
class TestShowSecurityGroupCompute(TestSecurityGroupCompute):
|
||||||
|
|
||||||
# The security group rule to be shown with the group.
|
# The security group rule to be shown with the group.
|
||||||
@ -364,10 +368,10 @@ class TestShowSecurityGroupCompute(TestSecurityGroupCompute):
|
|||||||
)
|
)
|
||||||
|
|
||||||
data = (
|
data = (
|
||||||
_security_group.description,
|
_security_group['description'],
|
||||||
_security_group.id,
|
_security_group['id'],
|
||||||
_security_group.name,
|
_security_group['name'],
|
||||||
_security_group.tenant_id,
|
_security_group['tenant_id'],
|
||||||
security_group._format_compute_security_group_rules(
|
security_group._format_compute_security_group_rules(
|
||||||
[_security_group_rule._info]),
|
[_security_group_rule._info]),
|
||||||
)
|
)
|
||||||
@ -377,27 +381,25 @@ class TestShowSecurityGroupCompute(TestSecurityGroupCompute):
|
|||||||
|
|
||||||
self.app.client_manager.network_endpoint_enabled = False
|
self.app.client_manager.network_endpoint_enabled = False
|
||||||
|
|
||||||
self.compute.security_groups.get.return_value = self._security_group
|
|
||||||
|
|
||||||
# Get the command object to test
|
# Get the command object to test
|
||||||
self.cmd = security_group.ShowSecurityGroup(self.app, None)
|
self.cmd = security_group.ShowSecurityGroup(self.app, None)
|
||||||
|
|
||||||
def test_show_no_options(self):
|
def test_security_group_show_no_options(self, sg_mock):
|
||||||
self.assertRaises(tests_utils.ParserException,
|
self.assertRaises(tests_utils.ParserException,
|
||||||
self.check_parser, self.cmd, [], [])
|
self.check_parser, self.cmd, [], [])
|
||||||
|
|
||||||
def test_show_all_options(self):
|
def test_security_group_show_all_options(self, sg_mock):
|
||||||
|
sg_mock.return_value = self._security_group
|
||||||
arglist = [
|
arglist = [
|
||||||
self._security_group.id,
|
self._security_group['id'],
|
||||||
]
|
]
|
||||||
verifylist = [
|
verifylist = [
|
||||||
('group', self._security_group.id),
|
('group', self._security_group['id']),
|
||||||
]
|
]
|
||||||
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
|
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
|
||||||
|
|
||||||
columns, data = self.cmd.take_action(parsed_args)
|
columns, data = self.cmd.take_action(parsed_args)
|
||||||
|
|
||||||
self.compute.security_groups.get.assert_called_once_with(
|
sg_mock.assert_called_once_with(self._security_group['id'])
|
||||||
self._security_group.id)
|
|
||||||
self.assertEqual(self.columns, columns)
|
self.assertEqual(self.columns, columns)
|
||||||
self.assertEqual(self.data, data)
|
self.assertEqual(self.data, data)
|
||||||
|
@ -11,7 +11,6 @@
|
|||||||
# under the License.
|
# under the License.
|
||||||
#
|
#
|
||||||
|
|
||||||
import copy
|
|
||||||
import mock
|
import mock
|
||||||
from mock import call
|
from mock import call
|
||||||
|
|
||||||
@ -20,7 +19,6 @@ from osc_lib import exceptions
|
|||||||
from openstackclient.network import utils as network_utils
|
from openstackclient.network import utils as network_utils
|
||||||
from openstackclient.network.v2 import security_group_rule
|
from openstackclient.network.v2 import security_group_rule
|
||||||
from openstackclient.tests.unit.compute.v2 import fakes as compute_fakes
|
from openstackclient.tests.unit.compute.v2 import fakes as compute_fakes
|
||||||
from openstackclient.tests.unit import fakes
|
|
||||||
from openstackclient.tests.unit.identity.v3 import fakes as identity_fakes
|
from openstackclient.tests.unit.identity.v3 import fakes as identity_fakes
|
||||||
from openstackclient.tests.unit import utils as tests_utils
|
from openstackclient.tests.unit import utils as tests_utils
|
||||||
|
|
||||||
@ -38,6 +36,7 @@ class TestCreateSecurityGroupRuleCompute(TestSecurityGroupRuleCompute):
|
|||||||
|
|
||||||
project = identity_fakes.FakeProject.create_one_project()
|
project = identity_fakes.FakeProject.create_one_project()
|
||||||
domain = identity_fakes.FakeDomain.create_one_domain()
|
domain = identity_fakes.FakeDomain.create_one_domain()
|
||||||
|
|
||||||
# The security group rule to be created.
|
# The security group rule to be created.
|
||||||
_security_group_rule = None
|
_security_group_rule = None
|
||||||
|
|
||||||
@ -61,51 +60,53 @@ class TestCreateSecurityGroupRuleCompute(TestSecurityGroupRuleCompute):
|
|||||||
|
|
||||||
self.app.client_manager.network_endpoint_enabled = False
|
self.app.client_manager.network_endpoint_enabled = False
|
||||||
|
|
||||||
self.compute.security_groups.get.return_value = self._security_group
|
self.compute.api.security_group_find = mock.Mock(
|
||||||
|
return_value=self._security_group,
|
||||||
|
)
|
||||||
|
|
||||||
# Get the command object to test
|
# Get the command object to test
|
||||||
self.cmd = security_group_rule.CreateSecurityGroupRule(self.app, None)
|
self.cmd = security_group_rule.CreateSecurityGroupRule(self.app, None)
|
||||||
|
|
||||||
def test_create_no_options(self):
|
def test_security_group_rule_create_no_options(self):
|
||||||
self.assertRaises(tests_utils.ParserException,
|
self.assertRaises(tests_utils.ParserException,
|
||||||
self.check_parser, self.cmd, [], [])
|
self.check_parser, self.cmd, [], [])
|
||||||
|
|
||||||
def test_create_all_source_options(self):
|
def test_security_group_rule_create_all_source_options(self):
|
||||||
arglist = [
|
arglist = [
|
||||||
'--src-ip', '10.10.0.0/24',
|
'--src-ip', '10.10.0.0/24',
|
||||||
'--src-group', self._security_group.id,
|
'--src-group', self._security_group['id'],
|
||||||
self._security_group.id,
|
self._security_group['id'],
|
||||||
]
|
]
|
||||||
self.assertRaises(tests_utils.ParserException,
|
self.assertRaises(tests_utils.ParserException,
|
||||||
self.check_parser, self.cmd, arglist, [])
|
self.check_parser, self.cmd, arglist, [])
|
||||||
|
|
||||||
def test_create_all_remote_options(self):
|
def test_security_group_rule_create_all_remote_options(self):
|
||||||
arglist = [
|
arglist = [
|
||||||
'--remote-ip', '10.10.0.0/24',
|
'--remote-ip', '10.10.0.0/24',
|
||||||
'--remote-group', self._security_group.id,
|
'--remote-group', self._security_group['id'],
|
||||||
self._security_group.id,
|
self._security_group['id'],
|
||||||
]
|
]
|
||||||
self.assertRaises(tests_utils.ParserException,
|
self.assertRaises(tests_utils.ParserException,
|
||||||
self.check_parser, self.cmd, arglist, [])
|
self.check_parser, self.cmd, arglist, [])
|
||||||
|
|
||||||
def test_create_bad_protocol(self):
|
def test_security_group_rule_create_bad_protocol(self):
|
||||||
arglist = [
|
arglist = [
|
||||||
'--protocol', 'foo',
|
'--protocol', 'foo',
|
||||||
self._security_group.id,
|
self._security_group['id'],
|
||||||
]
|
]
|
||||||
self.assertRaises(tests_utils.ParserException,
|
self.assertRaises(tests_utils.ParserException,
|
||||||
self.check_parser, self.cmd, arglist, [])
|
self.check_parser, self.cmd, arglist, [])
|
||||||
|
|
||||||
def test_create_all_protocol_options(self):
|
def test_security_group_rule_create_all_protocol_options(self):
|
||||||
arglist = [
|
arglist = [
|
||||||
'--protocol', 'tcp',
|
'--protocol', 'tcp',
|
||||||
'--proto', 'tcp',
|
'--proto', 'tcp',
|
||||||
self._security_group.id,
|
self._security_group['id'],
|
||||||
]
|
]
|
||||||
self.assertRaises(tests_utils.ParserException,
|
self.assertRaises(tests_utils.ParserException,
|
||||||
self.check_parser, self.cmd, arglist, [])
|
self.check_parser, self.cmd, arglist, [])
|
||||||
|
|
||||||
def test_create_network_options(self):
|
def test_security_group_rule_create_network_options(self):
|
||||||
arglist = [
|
arglist = [
|
||||||
'--ingress',
|
'--ingress',
|
||||||
'--ethertype', 'IPv4',
|
'--ethertype', 'IPv4',
|
||||||
@ -113,30 +114,32 @@ class TestCreateSecurityGroupRuleCompute(TestSecurityGroupRuleCompute):
|
|||||||
'--icmp-code', '11',
|
'--icmp-code', '11',
|
||||||
'--project', self.project.name,
|
'--project', self.project.name,
|
||||||
'--project-domain', self.domain.name,
|
'--project-domain', self.domain.name,
|
||||||
self._security_group.id,
|
self._security_group['id'],
|
||||||
]
|
]
|
||||||
self.assertRaises(tests_utils.ParserException,
|
self.assertRaises(tests_utils.ParserException,
|
||||||
self.check_parser, self.cmd, arglist, [])
|
self.check_parser, self.cmd, arglist, [])
|
||||||
|
|
||||||
def test_create_default_rule(self):
|
def test_security_group_rule_create_default_rule(self):
|
||||||
expected_columns, expected_data = self._setup_security_group_rule()
|
expected_columns, expected_data = self._setup_security_group_rule()
|
||||||
dst_port = str(self._security_group_rule.from_port) + ':' + \
|
dst_port = str(self._security_group_rule.from_port) + ':' + \
|
||||||
str(self._security_group_rule.to_port)
|
str(self._security_group_rule.to_port)
|
||||||
arglist = [
|
arglist = [
|
||||||
'--dst-port', dst_port,
|
'--dst-port', dst_port,
|
||||||
self._security_group.id,
|
self._security_group['id'],
|
||||||
]
|
]
|
||||||
verifylist = [
|
verifylist = [
|
||||||
('dst_port', (self._security_group_rule.from_port,
|
('dst_port', (self._security_group_rule.from_port,
|
||||||
self._security_group_rule.to_port)),
|
self._security_group_rule.to_port)),
|
||||||
('group', self._security_group.id),
|
('group', self._security_group['id']),
|
||||||
]
|
]
|
||||||
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
|
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
|
||||||
|
|
||||||
columns, data = self.cmd.take_action(parsed_args)
|
columns, data = self.cmd.take_action(parsed_args)
|
||||||
|
|
||||||
|
# TODO(dtroyer): save this for the security group rule changes
|
||||||
|
# self.compute.api.security_group_rule_create.assert_called_once_with(
|
||||||
self.compute.security_group_rules.create.assert_called_once_with(
|
self.compute.security_group_rules.create.assert_called_once_with(
|
||||||
self._security_group.id,
|
self._security_group['id'],
|
||||||
self._security_group_rule.ip_protocol,
|
self._security_group_rule.ip_protocol,
|
||||||
self._security_group_rule.from_port,
|
self._security_group_rule.from_port,
|
||||||
self._security_group_rule.to_port,
|
self._security_group_rule.to_port,
|
||||||
@ -146,71 +149,75 @@ class TestCreateSecurityGroupRuleCompute(TestSecurityGroupRuleCompute):
|
|||||||
self.assertEqual(expected_columns, columns)
|
self.assertEqual(expected_columns, columns)
|
||||||
self.assertEqual(expected_data, data)
|
self.assertEqual(expected_data, data)
|
||||||
|
|
||||||
def test_create_source_group(self):
|
def test_security_group_rule_create_source_group(self):
|
||||||
expected_columns, expected_data = self._setup_security_group_rule({
|
expected_columns, expected_data = self._setup_security_group_rule({
|
||||||
'from_port': 22,
|
'from_port': 22,
|
||||||
'to_port': 22,
|
'to_port': 22,
|
||||||
'group': {'name': self._security_group.name},
|
'group': {'name': self._security_group['name']},
|
||||||
})
|
})
|
||||||
arglist = [
|
arglist = [
|
||||||
'--dst-port', str(self._security_group_rule.from_port),
|
'--dst-port', str(self._security_group_rule.from_port),
|
||||||
'--src-group', self._security_group.name,
|
'--src-group', self._security_group['name'],
|
||||||
self._security_group.id,
|
self._security_group['id'],
|
||||||
]
|
]
|
||||||
verifylist = [
|
verifylist = [
|
||||||
('dst_port', (self._security_group_rule.from_port,
|
('dst_port', (self._security_group_rule.from_port,
|
||||||
self._security_group_rule.to_port)),
|
self._security_group_rule.to_port)),
|
||||||
('src_group', self._security_group.name),
|
('src_group', self._security_group['name']),
|
||||||
('group', self._security_group.id),
|
('group', self._security_group['id']),
|
||||||
]
|
]
|
||||||
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
|
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
|
||||||
|
|
||||||
columns, data = self.cmd.take_action(parsed_args)
|
columns, data = self.cmd.take_action(parsed_args)
|
||||||
|
|
||||||
|
# TODO(dtroyer): save this for the security group rule changes
|
||||||
|
# self.compute.api.security_group_rule_create.assert_called_once_with(
|
||||||
self.compute.security_group_rules.create.assert_called_once_with(
|
self.compute.security_group_rules.create.assert_called_once_with(
|
||||||
self._security_group.id,
|
self._security_group['id'],
|
||||||
self._security_group_rule.ip_protocol,
|
self._security_group_rule.ip_protocol,
|
||||||
self._security_group_rule.from_port,
|
self._security_group_rule.from_port,
|
||||||
self._security_group_rule.to_port,
|
self._security_group_rule.to_port,
|
||||||
self._security_group_rule.ip_range['cidr'],
|
self._security_group_rule.ip_range['cidr'],
|
||||||
self._security_group.id,
|
self._security_group['id'],
|
||||||
)
|
)
|
||||||
self.assertEqual(expected_columns, columns)
|
self.assertEqual(expected_columns, columns)
|
||||||
self.assertEqual(expected_data, data)
|
self.assertEqual(expected_data, data)
|
||||||
|
|
||||||
def test_create_remote_group(self):
|
def test_security_group_rule_create_remote_group(self):
|
||||||
expected_columns, expected_data = self._setup_security_group_rule({
|
expected_columns, expected_data = self._setup_security_group_rule({
|
||||||
'from_port': 22,
|
'from_port': 22,
|
||||||
'to_port': 22,
|
'to_port': 22,
|
||||||
'group': {'name': self._security_group.name},
|
'group': {'name': self._security_group['name']},
|
||||||
})
|
})
|
||||||
arglist = [
|
arglist = [
|
||||||
'--dst-port', str(self._security_group_rule.from_port),
|
'--dst-port', str(self._security_group_rule.from_port),
|
||||||
'--remote-group', self._security_group.name,
|
'--remote-group', self._security_group['name'],
|
||||||
self._security_group.id,
|
self._security_group['id'],
|
||||||
]
|
]
|
||||||
verifylist = [
|
verifylist = [
|
||||||
('dst_port', (self._security_group_rule.from_port,
|
('dst_port', (self._security_group_rule.from_port,
|
||||||
self._security_group_rule.to_port)),
|
self._security_group_rule.to_port)),
|
||||||
('remote_group', self._security_group.name),
|
('remote_group', self._security_group['name']),
|
||||||
('group', self._security_group.id),
|
('group', self._security_group['id']),
|
||||||
]
|
]
|
||||||
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
|
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
|
||||||
|
|
||||||
columns, data = self.cmd.take_action(parsed_args)
|
columns, data = self.cmd.take_action(parsed_args)
|
||||||
|
|
||||||
|
# TODO(dtroyer): save this for the security group rule changes
|
||||||
|
# self.compute.api.security_group_rule_create.assert_called_once_with(
|
||||||
self.compute.security_group_rules.create.assert_called_once_with(
|
self.compute.security_group_rules.create.assert_called_once_with(
|
||||||
self._security_group.id,
|
self._security_group['id'],
|
||||||
self._security_group_rule.ip_protocol,
|
self._security_group_rule.ip_protocol,
|
||||||
self._security_group_rule.from_port,
|
self._security_group_rule.from_port,
|
||||||
self._security_group_rule.to_port,
|
self._security_group_rule.to_port,
|
||||||
self._security_group_rule.ip_range['cidr'],
|
self._security_group_rule.ip_range['cidr'],
|
||||||
self._security_group.id,
|
self._security_group['id'],
|
||||||
)
|
)
|
||||||
self.assertEqual(expected_columns, columns)
|
self.assertEqual(expected_columns, columns)
|
||||||
self.assertEqual(expected_data, data)
|
self.assertEqual(expected_data, data)
|
||||||
|
|
||||||
def test_create_source_ip(self):
|
def test_security_group_rule_create_source_ip(self):
|
||||||
expected_columns, expected_data = self._setup_security_group_rule({
|
expected_columns, expected_data = self._setup_security_group_rule({
|
||||||
'ip_protocol': 'icmp',
|
'ip_protocol': 'icmp',
|
||||||
'from_port': -1,
|
'from_port': -1,
|
||||||
@ -220,19 +227,21 @@ class TestCreateSecurityGroupRuleCompute(TestSecurityGroupRuleCompute):
|
|||||||
arglist = [
|
arglist = [
|
||||||
'--protocol', self._security_group_rule.ip_protocol,
|
'--protocol', self._security_group_rule.ip_protocol,
|
||||||
'--src-ip', self._security_group_rule.ip_range['cidr'],
|
'--src-ip', self._security_group_rule.ip_range['cidr'],
|
||||||
self._security_group.id,
|
self._security_group['id'],
|
||||||
]
|
]
|
||||||
verifylist = [
|
verifylist = [
|
||||||
('protocol', self._security_group_rule.ip_protocol),
|
('protocol', self._security_group_rule.ip_protocol),
|
||||||
('src_ip', self._security_group_rule.ip_range['cidr']),
|
('src_ip', self._security_group_rule.ip_range['cidr']),
|
||||||
('group', self._security_group.id),
|
('group', self._security_group['id']),
|
||||||
]
|
]
|
||||||
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
|
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
|
||||||
|
|
||||||
columns, data = self.cmd.take_action(parsed_args)
|
columns, data = self.cmd.take_action(parsed_args)
|
||||||
|
|
||||||
|
# TODO(dtroyer): save this for the security group rule changes
|
||||||
|
# self.compute.api.security_group_rule_create.assert_called_once_with(
|
||||||
self.compute.security_group_rules.create.assert_called_once_with(
|
self.compute.security_group_rules.create.assert_called_once_with(
|
||||||
self._security_group.id,
|
self._security_group['id'],
|
||||||
self._security_group_rule.ip_protocol,
|
self._security_group_rule.ip_protocol,
|
||||||
self._security_group_rule.from_port,
|
self._security_group_rule.from_port,
|
||||||
self._security_group_rule.to_port,
|
self._security_group_rule.to_port,
|
||||||
@ -242,7 +251,7 @@ class TestCreateSecurityGroupRuleCompute(TestSecurityGroupRuleCompute):
|
|||||||
self.assertEqual(expected_columns, columns)
|
self.assertEqual(expected_columns, columns)
|
||||||
self.assertEqual(expected_data, data)
|
self.assertEqual(expected_data, data)
|
||||||
|
|
||||||
def test_create_remote_ip(self):
|
def test_security_group_rule_create_remote_ip(self):
|
||||||
expected_columns, expected_data = self._setup_security_group_rule({
|
expected_columns, expected_data = self._setup_security_group_rule({
|
||||||
'ip_protocol': 'icmp',
|
'ip_protocol': 'icmp',
|
||||||
'from_port': -1,
|
'from_port': -1,
|
||||||
@ -252,19 +261,21 @@ class TestCreateSecurityGroupRuleCompute(TestSecurityGroupRuleCompute):
|
|||||||
arglist = [
|
arglist = [
|
||||||
'--protocol', self._security_group_rule.ip_protocol,
|
'--protocol', self._security_group_rule.ip_protocol,
|
||||||
'--remote-ip', self._security_group_rule.ip_range['cidr'],
|
'--remote-ip', self._security_group_rule.ip_range['cidr'],
|
||||||
self._security_group.id,
|
self._security_group['id'],
|
||||||
]
|
]
|
||||||
verifylist = [
|
verifylist = [
|
||||||
('protocol', self._security_group_rule.ip_protocol),
|
('protocol', self._security_group_rule.ip_protocol),
|
||||||
('remote_ip', self._security_group_rule.ip_range['cidr']),
|
('remote_ip', self._security_group_rule.ip_range['cidr']),
|
||||||
('group', self._security_group.id),
|
('group', self._security_group['id']),
|
||||||
]
|
]
|
||||||
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
|
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
|
||||||
|
|
||||||
columns, data = self.cmd.take_action(parsed_args)
|
columns, data = self.cmd.take_action(parsed_args)
|
||||||
|
|
||||||
|
# TODO(dtroyer): save this for the security group rule changes
|
||||||
|
# self.compute.api.security_group_rule_create.assert_called_once_with(
|
||||||
self.compute.security_group_rules.create.assert_called_once_with(
|
self.compute.security_group_rules.create.assert_called_once_with(
|
||||||
self._security_group.id,
|
self._security_group['id'],
|
||||||
self._security_group_rule.ip_protocol,
|
self._security_group_rule.ip_protocol,
|
||||||
self._security_group_rule.from_port,
|
self._security_group_rule.from_port,
|
||||||
self._security_group_rule.to_port,
|
self._security_group_rule.to_port,
|
||||||
@ -274,7 +285,7 @@ class TestCreateSecurityGroupRuleCompute(TestSecurityGroupRuleCompute):
|
|||||||
self.assertEqual(expected_columns, columns)
|
self.assertEqual(expected_columns, columns)
|
||||||
self.assertEqual(expected_data, data)
|
self.assertEqual(expected_data, data)
|
||||||
|
|
||||||
def test_create_proto_option(self):
|
def test_security_group_rule_create_proto_option(self):
|
||||||
expected_columns, expected_data = self._setup_security_group_rule({
|
expected_columns, expected_data = self._setup_security_group_rule({
|
||||||
'ip_protocol': 'icmp',
|
'ip_protocol': 'icmp',
|
||||||
'from_port': -1,
|
'from_port': -1,
|
||||||
@ -284,20 +295,22 @@ class TestCreateSecurityGroupRuleCompute(TestSecurityGroupRuleCompute):
|
|||||||
arglist = [
|
arglist = [
|
||||||
'--proto', self._security_group_rule.ip_protocol,
|
'--proto', self._security_group_rule.ip_protocol,
|
||||||
'--src-ip', self._security_group_rule.ip_range['cidr'],
|
'--src-ip', self._security_group_rule.ip_range['cidr'],
|
||||||
self._security_group.id,
|
self._security_group['id'],
|
||||||
]
|
]
|
||||||
verifylist = [
|
verifylist = [
|
||||||
('proto', self._security_group_rule.ip_protocol),
|
('proto', self._security_group_rule.ip_protocol),
|
||||||
('protocol', None),
|
('protocol', None),
|
||||||
('src_ip', self._security_group_rule.ip_range['cidr']),
|
('src_ip', self._security_group_rule.ip_range['cidr']),
|
||||||
('group', self._security_group.id),
|
('group', self._security_group['id']),
|
||||||
]
|
]
|
||||||
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
|
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
|
||||||
|
|
||||||
columns, data = self.cmd.take_action(parsed_args)
|
columns, data = self.cmd.take_action(parsed_args)
|
||||||
|
|
||||||
|
# TODO(dtroyer): save this for the security group rule changes
|
||||||
|
# self.compute.api.security_group_rule_create.assert_called_once_with(
|
||||||
self.compute.security_group_rules.create.assert_called_once_with(
|
self.compute.security_group_rules.create.assert_called_once_with(
|
||||||
self._security_group.id,
|
self._security_group['id'],
|
||||||
self._security_group_rule.ip_protocol,
|
self._security_group_rule.ip_protocol,
|
||||||
self._security_group_rule.from_port,
|
self._security_group_rule.from_port,
|
||||||
self._security_group_rule.to_port,
|
self._security_group_rule.to_port,
|
||||||
@ -338,7 +351,7 @@ class TestDeleteSecurityGroupRuleCompute(TestSecurityGroupRuleCompute):
|
|||||||
self._security_group_rules[0].id)
|
self._security_group_rules[0].id)
|
||||||
self.assertIsNone(result)
|
self.assertIsNone(result)
|
||||||
|
|
||||||
def test_multi_security_group_rules_delete(self):
|
def test_security_group_rule_multi_delete(self):
|
||||||
arglist = []
|
arglist = []
|
||||||
verifylist = []
|
verifylist = []
|
||||||
|
|
||||||
@ -357,7 +370,7 @@ class TestDeleteSecurityGroupRuleCompute(TestSecurityGroupRuleCompute):
|
|||||||
self.compute.security_group_rules.delete.assert_has_calls(calls)
|
self.compute.security_group_rules.delete.assert_has_calls(calls)
|
||||||
self.assertIsNone(result)
|
self.assertIsNone(result)
|
||||||
|
|
||||||
def test_multi_security_group_rules_delete_with_exception(self):
|
def test_security_group_rule_multi_delete_with_exception(self):
|
||||||
arglist = [
|
arglist = [
|
||||||
self._security_group_rules[0].id,
|
self._security_group_rules[0].id,
|
||||||
'unexist_rule',
|
'unexist_rule',
|
||||||
@ -397,7 +410,7 @@ class TestListSecurityGroupRuleCompute(TestSecurityGroupRuleCompute):
|
|||||||
'ip_protocol': 'tcp',
|
'ip_protocol': 'tcp',
|
||||||
'from_port': 80,
|
'from_port': 80,
|
||||||
'to_port': 80,
|
'to_port': 80,
|
||||||
'group': {'name': _security_group.name},
|
'group': {'name': _security_group['name']},
|
||||||
})
|
})
|
||||||
_security_group_rule_icmp = \
|
_security_group_rule_icmp = \
|
||||||
compute_fakes.FakeSecurityGroupRule.create_one_security_group_rule({
|
compute_fakes.FakeSecurityGroupRule.create_one_security_group_rule({
|
||||||
@ -405,10 +418,12 @@ class TestListSecurityGroupRuleCompute(TestSecurityGroupRuleCompute):
|
|||||||
'from_port': -1,
|
'from_port': -1,
|
||||||
'to_port': -1,
|
'to_port': -1,
|
||||||
'ip_range': {'cidr': '10.0.2.0/24'},
|
'ip_range': {'cidr': '10.0.2.0/24'},
|
||||||
'group': {'name': _security_group.name},
|
'group': {'name': _security_group['name']},
|
||||||
})
|
})
|
||||||
_security_group.rules = [_security_group_rule_tcp._info,
|
_security_group['rules'] = [
|
||||||
_security_group_rule_icmp._info]
|
_security_group_rule_tcp._info,
|
||||||
|
_security_group_rule_icmp._info,
|
||||||
|
]
|
||||||
|
|
||||||
expected_columns_with_group = (
|
expected_columns_with_group = (
|
||||||
'ID',
|
'ID',
|
||||||
@ -422,7 +437,7 @@ class TestListSecurityGroupRuleCompute(TestSecurityGroupRuleCompute):
|
|||||||
|
|
||||||
expected_data_with_group = []
|
expected_data_with_group = []
|
||||||
expected_data_no_group = []
|
expected_data_no_group = []
|
||||||
for _security_group_rule in _security_group.rules:
|
for _security_group_rule in _security_group['rules']:
|
||||||
rule = network_utils.transform_compute_security_group_rule(
|
rule = network_utils.transform_compute_security_group_rule(
|
||||||
_security_group_rule
|
_security_group_rule
|
||||||
)
|
)
|
||||||
@ -443,41 +458,43 @@ class TestListSecurityGroupRuleCompute(TestSecurityGroupRuleCompute):
|
|||||||
|
|
||||||
self.app.client_manager.network_endpoint_enabled = False
|
self.app.client_manager.network_endpoint_enabled = False
|
||||||
|
|
||||||
self.compute.security_groups.get.return_value = \
|
self.compute.api.security_group_find = mock.Mock(
|
||||||
self._security_group
|
return_value=self._security_group,
|
||||||
self.compute.security_groups.list.return_value = \
|
)
|
||||||
[self._security_group]
|
self.compute.api.security_group_list = mock.Mock(
|
||||||
|
return_value=[self._security_group],
|
||||||
|
)
|
||||||
|
|
||||||
# Get the command object to test
|
# Get the command object to test
|
||||||
self.cmd = security_group_rule.ListSecurityGroupRule(self.app, None)
|
self.cmd = security_group_rule.ListSecurityGroupRule(self.app, None)
|
||||||
|
|
||||||
def test_list_default(self):
|
def test_security_group_rule_list_default(self):
|
||||||
parsed_args = self.check_parser(self.cmd, [], [])
|
parsed_args = self.check_parser(self.cmd, [], [])
|
||||||
|
|
||||||
columns, data = self.cmd.take_action(parsed_args)
|
columns, data = self.cmd.take_action(parsed_args)
|
||||||
self.compute.security_groups.list.assert_called_once_with(
|
self.compute.api.security_group_list.assert_called_once_with(
|
||||||
search_opts={'all_tenants': False}
|
search_opts={'all_tenants': False}
|
||||||
)
|
)
|
||||||
self.assertEqual(self.expected_columns_no_group, columns)
|
self.assertEqual(self.expected_columns_no_group, columns)
|
||||||
self.assertEqual(self.expected_data_no_group, list(data))
|
self.assertEqual(self.expected_data_no_group, list(data))
|
||||||
|
|
||||||
def test_list_with_group(self):
|
def test_security_group_rule_list_with_group(self):
|
||||||
arglist = [
|
arglist = [
|
||||||
self._security_group.id,
|
self._security_group['id'],
|
||||||
]
|
]
|
||||||
verifylist = [
|
verifylist = [
|
||||||
('group', self._security_group.id),
|
('group', self._security_group['id']),
|
||||||
]
|
]
|
||||||
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
|
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
|
||||||
|
|
||||||
columns, data = self.cmd.take_action(parsed_args)
|
columns, data = self.cmd.take_action(parsed_args)
|
||||||
self.compute.security_groups.get.assert_called_once_with(
|
self.compute.api.security_group_find.assert_called_once_with(
|
||||||
self._security_group.id
|
self._security_group['id']
|
||||||
)
|
)
|
||||||
self.assertEqual(self.expected_columns_with_group, columns)
|
self.assertEqual(self.expected_columns_with_group, columns)
|
||||||
self.assertEqual(self.expected_data_with_group, list(data))
|
self.assertEqual(self.expected_data_with_group, list(data))
|
||||||
|
|
||||||
def test_list_all_projects(self):
|
def test_security_group_rule_list_all_projects(self):
|
||||||
arglist = [
|
arglist = [
|
||||||
'--all-projects',
|
'--all-projects',
|
||||||
]
|
]
|
||||||
@ -487,13 +504,13 @@ class TestListSecurityGroupRuleCompute(TestSecurityGroupRuleCompute):
|
|||||||
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
|
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
|
||||||
|
|
||||||
columns, data = self.cmd.take_action(parsed_args)
|
columns, data = self.cmd.take_action(parsed_args)
|
||||||
self.compute.security_groups.list.assert_called_once_with(
|
self.compute.api.security_group_list.assert_called_once_with(
|
||||||
search_opts={'all_tenants': True}
|
search_opts={'all_tenants': True}
|
||||||
)
|
)
|
||||||
self.assertEqual(self.expected_columns_no_group, columns)
|
self.assertEqual(self.expected_columns_no_group, columns)
|
||||||
self.assertEqual(self.expected_data_no_group, list(data))
|
self.assertEqual(self.expected_data_no_group, list(data))
|
||||||
|
|
||||||
def test_list_with_ignored_options(self):
|
def test_security_group_rule_list_with_ignored_options(self):
|
||||||
arglist = [
|
arglist = [
|
||||||
'--long',
|
'--long',
|
||||||
]
|
]
|
||||||
@ -503,7 +520,7 @@ class TestListSecurityGroupRuleCompute(TestSecurityGroupRuleCompute):
|
|||||||
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
|
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
|
||||||
|
|
||||||
columns, data = self.cmd.take_action(parsed_args)
|
columns, data = self.cmd.take_action(parsed_args)
|
||||||
self.compute.security_groups.list.assert_called_once_with(
|
self.compute.api.security_group_list.assert_called_once_with(
|
||||||
search_opts={'all_tenants': False}
|
search_opts={'all_tenants': False}
|
||||||
)
|
)
|
||||||
self.assertEqual(self.expected_columns_no_group, columns)
|
self.assertEqual(self.expected_columns_no_group, columns)
|
||||||
@ -527,20 +544,19 @@ class TestShowSecurityGroupRuleCompute(TestSecurityGroupRuleCompute):
|
|||||||
|
|
||||||
# Build a security group fake customized for this test.
|
# Build a security group fake customized for this test.
|
||||||
security_group_rules = [self._security_group_rule._info]
|
security_group_rules = [self._security_group_rule._info]
|
||||||
security_group = fakes.FakeResource(
|
security_group = {'rules': security_group_rules}
|
||||||
info=copy.deepcopy({'rules': security_group_rules}),
|
self.compute.api.security_group_list = mock.Mock(
|
||||||
loaded=True)
|
return_value=[security_group],
|
||||||
security_group.rules = security_group_rules
|
)
|
||||||
self.compute.security_groups.list.return_value = [security_group]
|
|
||||||
|
|
||||||
# Get the command object to test
|
# Get the command object to test
|
||||||
self.cmd = security_group_rule.ShowSecurityGroupRule(self.app, None)
|
self.cmd = security_group_rule.ShowSecurityGroupRule(self.app, None)
|
||||||
|
|
||||||
def test_show_no_options(self):
|
def test_security_group_rule_show_no_options(self):
|
||||||
self.assertRaises(tests_utils.ParserException,
|
self.assertRaises(tests_utils.ParserException,
|
||||||
self.check_parser, self.cmd, [], [])
|
self.check_parser, self.cmd, [], [])
|
||||||
|
|
||||||
def test_show_all_options(self):
|
def test_security_group_rule_show_all_options(self):
|
||||||
arglist = [
|
arglist = [
|
||||||
self._security_group_rule.id,
|
self._security_group_rule.id,
|
||||||
]
|
]
|
||||||
@ -551,6 +567,6 @@ class TestShowSecurityGroupRuleCompute(TestSecurityGroupRuleCompute):
|
|||||||
|
|
||||||
columns, data = self.cmd.take_action(parsed_args)
|
columns, data = self.cmd.take_action(parsed_args)
|
||||||
|
|
||||||
self.compute.security_groups.list.assert_called_once_with()
|
self.compute.api.security_group_list.assert_called_once_with()
|
||||||
self.assertEqual(self.columns, columns)
|
self.assertEqual(self.columns, columns)
|
||||||
self.assertEqual(self.data, data)
|
self.assertEqual(self.data, data)
|
||||||
|
Loading…
x
Reference in New Issue
Block a user