Add client API for share-networks and security-services.

Partially implements bp: join-tenant-network

Change-Id: I964240d8041a90446c16f879447e6bec85a45b35
This commit is contained in:
Aleks Chirko 2013-12-02 18:02:48 +02:00
parent a7a1d9758b
commit e7b5f18a98
10 changed files with 909 additions and 3 deletions

View File

@ -268,3 +268,21 @@ def slugify(value):
value = unicodedata.normalize('NFKD', value).encode('ascii', 'ignore')
value = unicode(_slugify_strip_re.sub('', value).strip().lower())
return _slugify_hyphenate_re.sub('-', value)
def make_metadata_dict(metadata):
"""
Converts given metadata in form of list of 'key=value' strings into
{'key': 'value'} dictionary
"""
metadata_dict = {}
for item in metadata:
try:
key, value = item.split('=')
except ValueError:
msg = "Wrong argument format: '%s'" % item
raise exceptions.CommandError(msg)
if 'password' in key:
value = value.strip('"').strip("'")
metadata_dict[key] = value
return metadata_dict

View File

@ -1,5 +1,7 @@
from manilaclient import client
from manilaclient.v1 import limits
from manilaclient.v1 import share_networks
from manilaclient.v1 import security_services
from manilaclient.v1 import quota_classes
from manilaclient.v1 import quotas
from manilaclient.v1 import shares
@ -31,6 +33,8 @@ class Client(object):
# know it's not being used as keyword argument
password = api_key
self.limits = limits.LimitsManager(self)
self.security_services = security_services.SecurityServiceManager(self)
self.share_networks = share_networks.ShareNetworkManager(self)
self.quota_classes = quota_classes.QuotaClassSetManager(self)
self.quotas = quotas.QuotaSetManager(self)

View File

@ -0,0 +1,145 @@
# Copyright 2013 OpenStack LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import urllib
from manilaclient import base
from manilaclient import exceptions
from manilaclient import utils
RESOURCES_PATH = '/security-services'
RESOURCE_PATH = "/security-services/%s"
RESOURCE_NAME = 'security_service'
RESOURCES_NAME = 'security_services'
class SecurityService(base.Resource):
"""Security service for Manila shares """
def __repr__(self):
return "<SecurityService: %s>" % self.id
class SecurityServiceManager(base.Manager):
"""Manage :class:`SecurityService` resources."""
resource_class = SecurityService
def create(self, type, dns_ip=None, server=None, domain=None, sid=None,
password=None, name=None, description=None):
"""Create security service for NAS.
:param type: security service type - 'ldap', 'kerberos' or
'active_directory'
:param dns_ip: dns ip address used inside tenant's network
:param server: security service server ip address or hostname
:param domain: security service domain
:param sid: security identifier used by tenant
:param password: password used by sid
:param name: security service name
:param description: security service description
:rtype: :class:`SecurityService`
"""
values = {'type': type}
if dns_ip:
values['dns_ip'] = dns_ip
if server:
values['server'] = server
if domain:
values['domain'] = domain
if sid:
values['sid'] = sid
if password:
values['password'] = password
if name:
values['name'] = name
if description:
values['description'] = description
body = {RESOURCE_NAME: values}
return self._create(RESOURCES_PATH, body, RESOURCE_NAME)
def get(self, security_service):
"""Get a security service info.
:param security_service: security service to get.
:rtype: :class:`SecurityService`
"""
return self._get(RESOURCE_PATH % security_service, RESOURCE_NAME)
def update(self, security_service, dns_ip=None, server=None, domain=None,
password=None, sid=None, name=None, description=None):
"""Updates a security service.
:param security_service: security service to update.
:param dns_ip: dns ip address used inside tenant's network
:param server: security service server ip address or hostname
:param domain: security service domain
:param sid: security identifier used by tenant
:param password: password used by sid
:param name: security service name
:param description: security service description
:rtype: :class:`SecurityService`
"""
values = {}
if dns_ip:
values['dns_ip'] = dns_ip
if server:
values['server'] = server
if domain:
values['domain'] = domain
if sid:
values['sid'] = sid
if password:
values['password'] = password
if name:
values['name'] = name
if description:
values['description'] = description
if not values:
msg = "Must specify fields to be updated"
raise exceptions.CommandError(msg)
body = {RESOURCE_NAME: values}
return self._update(RESOURCE_PATH % security_service,
body,
RESOURCE_NAME)
def delete(self, security_service):
"""Delete a security service.
:param security_service: security service to be deleted.
"""
self._delete(RESOURCE_PATH % security_service)
def list(self, search_opts=None):
"""Get a list of all security services.
:rtype: list of :class:`SecurityService`
"""
if search_opts:
query_string = urllib.urlencode([(key, value)
for (key, value)
in search_opts.items()
if value])
if query_string:
query_string = "?%s" % query_string
else:
query_string = ''
path = RESOURCES_PATH + "%s" % query_string
return self._list(path, RESOURCES_NAME)

View File

@ -0,0 +1,147 @@
# Copyright 2013 OpenStack LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import urllib
from manilaclient import base
from manilaclient import exceptions
RESOURCES_PATH = '/share-networks'
RESOURCE_PATH = "/share-networks/%s"
RESOURCE_NAME = 'share_network'
RESOURCES_NAME = 'share_networks'
class ShareNetwork(base.Resource):
"""Network info for Manila shares """
def __repr__(self):
return "<ShareNetwork: %s>" % self.id
class ShareNetworkManager(base.Manager):
"""Manage :class:`ShareNetwork` resources."""
resource_class = ShareNetwork
def create(self, neutron_net_id=None, neutron_subnet_id=None,
name=None, description=None):
"""Create share network.
:param metadata: metadata specific to the manila network plugin in use
:param name: share network name
:param description: share network description
:rtype: :class:`ShareNetwork`
"""
values = {}
if neutron_net_id:
values['neutron_net_id'] = neutron_net_id
if neutron_subnet_id:
values['neutron_subnet_id'] = neutron_subnet_id
if name:
values['name'] = name
if description:
values['description'] = description
body = {RESOURCE_NAME: values}
return self._create(RESOURCES_PATH, body, RESOURCE_NAME)
def add_security_service(self, share_network, security_service):
"""Associate given security service with a share network
:param share_network: share network name or id
:param security_service: security service name or id
:rtype: :class:`ShareNetwork`
"""
body = {'add_security_service': {'security_service_id':
security_service}}
return self._create(RESOURCE_PATH % share_network + '/action',
body,
RESOURCE_NAME)
def remove_security_service(self, share_network, security_service):
"""Dissociate security service from a share network
:param share_network: share network name or id
:param security_service: security service name or id
:rtype: :class:`ShareNetwork`
"""
body = {'remove_security_service': {'security_service_id':
security_service}}
return self._create(RESOURCE_PATH % share_network + '/action',
body,
RESOURCE_NAME)
def get(self, share_network):
"""Get a share network.
:param policy: share network to get.
:rtype: :class:`NetworkInfo`
"""
return self._get(RESOURCE_PATH % base.getid(share_network),
RESOURCE_NAME)
def update(self, share_network, neutron_net_id=None,
neutron_subnet_id=None, name=None, description=None):
"""Updates a share network.
:param share_network: share network to update.
:rtype: :class:`NetworkInfo`
"""
values = {}
if neutron_net_id:
values['neutron_net_id'] = neutron_net_id
if neutron_subnet_id:
values['neutron_subnet_id'] = neutron_subnet_id
if name:
values['name'] = name
if description:
values['description'] = description
if not values:
msg = "Must specify fields to be updated"
raise exceptions.CommandError(msg)
body = {RESOURCE_NAME: values}
return self._update(RESOURCE_PATH % share_network,
body,
RESOURCE_NAME)
def delete(self, share_network):
"""Delete a share network.
:param share_network: share network to be deleted.
"""
self._delete(RESOURCE_PATH % share_network)
def list(self, search_opts=None):
"""Get a list of all share network.
:rtype: list of :class:`NetworkInfo`
"""
if search_opts:
query_string = urllib.urlencode([(key, value)
for (key, value)
in search_opts.items()
if value])
if query_string:
query_string = "?%s" % query_string
else:
query_string = ''
path = RESOURCES_PATH + "%s" % query_string
return self._list(path, RESOURCES_NAME)

View File

@ -108,7 +108,7 @@ class ShareManager(base.ManagerWithFind):
resource_class = Share
def create(self, share_proto, size, snapshot_id=None, name=None,
description=None, metadata=None):
description=None, metadata=None, share_network_id=None):
"""Create NAS.
:param size: Size of NAS in GB
@ -130,7 +130,8 @@ class ShareManager(base.ManagerWithFind):
'name': name,
'description': description,
'metadata': share_metadata,
'share_proto': share_proto}}
'share_proto': share_proto,
'share_network_id': share_network_id}}
return self._create('/shares', body, 'share')
def get(self, share_id):

View File

@ -291,6 +291,11 @@ def do_rate_limits(cs, args):
metavar='<key=value>',
help='Metadata key=value pairs (Optional, Default=None)',
default=None)
@utils.arg(
'--share-network-id',
metavar='<network-info-id>',
help='Optional network info id',
default=None)
@utils.arg(
'--description',
metavar='<description>',
@ -306,7 +311,8 @@ def do_create(cs, args):
share = cs.shares.create(args.share_protocol, args.size, args.snapshot_id,
args.name, args.description,
metadata=share_metadata)
metadata=share_metadata,
share_network_id=args.share_network_id)
_print_share(cs, share)
@ -640,3 +646,316 @@ def do_reset_state(cs, args):
"""Explicitly update the state of a share."""
share = _find_share(cs, args.share)
share.reset_state(args.state)
@utils.arg(
'--neutron-net-id',
metavar='neutron-net-id',
default=None,
help="Neutron network id. If using manila network neutron plug-in, this "
"value should be specified. Tenant can't have more than one share's"
" network with the same neutron_net_id and neutron_subnet_id ")
@utils.arg(
'--neutron-subnet-id',
metavar='neutron-subnet-id',
default=None,
help="Neutron subnet id. If using manila network neutron plug-in, this "
"value should be specified. Tenant can't have more than one share's"
" network with the same neutron_net_id and neutron_subnet_id ")
@utils.arg(
'--name',
metavar='<name>',
default=None,
help="Share network name")
@utils.arg(
'--description',
metavar='<description>',
default=None,
help="Share network description")
def do_share_network_create(cs, args):
"""Create description for network used by the tenant"""
values = {'neutron_net_id': args.neutron_net_id,
'neutron_subnet_id': args.neutron_subnet_id,
'name': args.name,
'description': args.description}
share_network = cs.share_networks.create(**values)
info = share_network._info.copy()
utils.print_dict(info)
@utils.arg(
'share_network',
metavar='<share-network>',
help='Share network to update.')
@utils.arg(
'--neutron-net-id',
metavar='neutron-net-id',
default=None,
help="Neutron network id. Tenant can't have more than one share's"
" network with the same neutron_net_id and neutron_subnet_id ")
@utils.arg(
'--neutron-subnet-id',
metavar='neutron-subnet-id',
default=None,
help="Neutron subnet id. Tenant can't have more than one share's"
" network with the same neutron_net_id and neutron_subnet_id ")
@utils.arg(
'--name',
metavar='<name>',
default=None,
help="Share network name")
@utils.arg(
'--description',
metavar='<description>',
default=None,
help="Share network description")
def do_share_network_update(cs, args):
"""Update share network data"""
values = {'neutron_net_id': args.neutron_net_id,
'neutron_subnet_id': args.neutron_subnet_id,
'name': args.name,
'description': args.description}
share_network = cs.share_networks.update(args.share_network, **values)
info = share_network._info.copy()
utils.print_dict(info)
@utils.arg(
'share_network',
metavar='<share-network>',
help='Share network to show.')
def do_share_network_show(cs, args):
"""Get a description for network used by the tenant"""
share_network = cs.share_networks.get(args.share_network)
info = share_network._info.copy()
utils.print_dict(info)
@utils.arg(
'--all-tenants',
dest='all_tenants',
metavar='<0|1>',
nargs='?',
type=int,
const=1,
default=0,
help='Display information from all tenants (Admin only).')
@utils.arg(
'--status',
metavar='<status>',
default=None,
help='Filter results by status')
def do_share_network_list(cs, args):
"""Get a list of network info"""
all_tenants = int(os.environ.get("ALL_TENANTS", args.all_tenants))
search_opts = {
'all_tenants': all_tenants,
'status': args.status,
}
share_networks = cs.share_networks.list(search_opts=search_opts)
fields = ['id', 'name', 'status']
utils.print_list(share_networks, fields=fields)
@utils.arg(
'share_network',
metavar='<share-network>',
help='Share network')
@utils.arg(
'security_service',
metavar='<security-service>',
help='Security service to associate with.')
def do_share_network_security_service_add(cs, args):
"""Associate security service with share network"""
cs.share_networks.add_security_service(args.share_network,
args.security_service)
@utils.arg(
'share_network',
metavar='<share-network>',
help='Share network.')
@utils.arg(
'security_service',
metavar='<security-service>',
help='Security service to dissociate.')
def do_share_network_security_service_remove(cs, args):
"""Dissociate security service from share network"""
cs.share_networks.remove_security_service(args.share_network,
args.security_service)
@utils.arg(
'share_network',
metavar='<share-network>',
help='Share network.')
def do_share_network_security_service_list(cs, args):
"""Get a list of security services associated with a given share network"""
search_opts = {
'share_network_id': args.share_network,
}
security_services = cs.security_services.list(search_opts=search_opts)
fields = ['id', 'name', 'status']
utils.print_list(security_services, fields=fields)
@utils.arg(
'share_network',
metavar='<share-network>',
help='Share network to be deleted.')
def do_share_network_delete(cs, args):
"""Delete share network"""
cs.share_networks.delete(args.share_network)
@utils.arg(
'type',
metavar='<type>',
help="Security service type: 'ldap', 'kerberos' or 'active_directory'")
@utils.arg(
'--dns-ip',
metavar='<dns_ip>',
default=None,
help="dns ip address used inside tenant's network")
@utils.arg(
'--server',
metavar='<server>',
default=None,
help="security service ip address or hostname")
@utils.arg(
'--domain',
metavar='<domain>',
default=None,
help="security service domain")
@utils.arg(
'--sid',
metavar='<security identifier>',
default=None,
help="security service user or group used by tenant")
@utils.arg(
'--password',
metavar='<password>',
default=None,
help="password used by sid")
@utils.arg(
'--name',
metavar='<name>',
default=None,
help="security service name")
@utils.arg(
'--description',
metavar='<description>',
default=None,
help="security service description")
def do_security_service_create(cs, args):
"""Create security service used by tenant"""
values = {'dns_ip': args.dns_ip,
'server': args.server,
'domain': args.domain,
'sid': args.sid,
'password': args.password,
'name': args.name,
'description': args.description}
security_service = cs.security_services.create(args.type, **values)
info = security_service._info.copy()
utils.print_dict(info)
@utils.arg(
'security_service',
metavar='<security-service>',
help='Security service to update.')
@utils.arg(
'--dns-ip',
metavar='<dns-ip>',
default=None,
help="dns ip address used inside tenant's network")
@utils.arg(
'--server',
metavar='<server>',
default=None,
help="security service ip address or hostname")
@utils.arg(
'--domain',
metavar='<domain>',
default=None,
help="security service domain")
@utils.arg(
'--sid',
metavar='<security identifier>',
default=None,
help="security service user or group used by tenant")
@utils.arg(
'--password',
metavar='<password>',
default=None,
help="password used by sid")
@utils.arg(
'--name',
metavar='<name>',
default=None,
help="security service name")
@utils.arg(
'--description',
metavar='<description>',
default=None,
help="security service description")
def do_security_service_update(cs, args):
"""Update security service"""
values = {'dns_ip': args.dns_ip,
'server': args.server,
'domain': args.domain,
'sid': args.sid,
'password': args.password,
'name': args.name,
'description': args.description}
security_service = cs.security_services.update(args.security_service,
**values)
info = security_service._info.copy()
utils.print_dict(info)
@utils.arg(
'security_service',
metavar='<security-service>',
help='Security service to show.')
def do_security_service_show(cs, args):
"""Show security service"""
security_service = cs.security_services.get(args.security_service)
info = security_service._info.copy()
utils.print_dict(info)
@utils.arg(
'--all-tenants',
dest='all_tenants',
metavar='<0|1>',
nargs='?',
type=int,
const=1,
default=0,
help='Display information from all tenants (Admin only).')
@utils.arg(
'--status',
metavar='<status>',
default=None,
help='Filter results by status')
def do_security_service_list(cs, args):
"""Get a list of security services"""
all_tenants = int(os.environ.get("ALL_TENANTS", args.all_tenants))
search_opts = {
'all_tenants': all_tenants,
'status': args.status,
}
security_services = cs.security_services.list(search_opts=search_opts)
fields = ['id', 'name', 'status']
utils.print_list(security_services, fields=fields)
@utils.arg(
'security_service',
metavar='<security-service>',
help='Security service to delete.')
def do_security_service_delete(cs, args):
"""Delete security service"""
cs.security_services.delete(args.security_service)

View File

@ -128,3 +128,11 @@ class FakeHTTPClient(fakes.FakeHTTPClient):
def get_shares_1234_metadata(self, **kw):
return (200, {}, {"metadata": {"key1": "val1", "key2": "val2"}})
def fake_create(url, body, response_key):
return {'url': url, 'body': body, 'resp_key': response_key}
def fake_update(url, body, response_key):
return {'url': url, 'body': body, 'resp_key': response_key}

View File

@ -0,0 +1,129 @@
# Copyright 2013 OpenStack LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
try:
from collections import OrderedDict
except ImportError:
from ordereddict import OrderedDict
import mock
import unittest
from manilaclient import exceptions
from manilaclient.v1 import security_services
from tests.v1 import fakes
class SecurityServiceTest(unittest.TestCase):
def setUp(self):
super(SecurityServiceTest, self).setUp()
self.manager = security_services.SecurityServiceManager(api=None)
def test_create_all_fields(self):
values = {'type': 'ldap',
'dns_ip': 'fake dns ip',
'server': 'fake.ldap.server',
'domain': 'fake.ldap.domain',
'sid': 'fake sid',
'password': 'fake password',
'name': 'fake name',
'description': 'fake description'}
with mock.patch.object(self.manager, '_create', fakes.fake_create):
result = self.manager.create(**values)
self.assertEqual(result['url'], security_services.RESOURCES_PATH)
self.assertEqual(result['resp_key'],
security_services.RESOURCE_NAME)
self.assertTrue(security_services.RESOURCE_NAME in result['body'])
self.assertEqual(result['body'][security_services.RESOURCE_NAME],
values)
def test_create_some_fields(self):
values = {'type': 'ldap',
'dns_ip': 'fake dns ip',
'server': 'fake.ldap.server',
'domain': 'fake.ldap.domain',
'sid': 'fake sid'}
with mock.patch.object(self.manager, '_create', fakes.fake_create):
result = self.manager.create(**values)
self.assertEqual(result['url'], security_services.RESOURCES_PATH)
self.assertEqual(result['resp_key'],
security_services.RESOURCE_NAME)
self.assertTrue(security_services.RESOURCE_NAME in result['body'])
self.assertEqual(result['body'][security_services.RESOURCE_NAME],
values)
def test_delete(self):
security_service = 'fake service'
with mock.patch.object(self.manager, '_delete', mock.Mock()):
self.manager.delete(security_service)
self.manager._delete.assert_called_once_with(
security_services.RESOURCE_PATH % security_service)
def test_get(self):
security_service = 'fake service'
with mock.patch.object(self.manager, '_get', mock.Mock()):
self.manager.get(security_service)
self.manager._get.assert_called_once_with(
security_services.RESOURCE_PATH % security_service,
security_services.RESOURCE_NAME)
def test_list_no_filters(self):
with mock.patch.object(self.manager, '_list',
mock.Mock(return_value=None)):
self.manager.list()
self.manager._list.assert_called_once_with(
security_services.RESOURCES_PATH,
security_services.RESOURCES_NAME)
def test_list_with_filters(self):
filters = OrderedDict([('all_tenants', 1),
('status', 'ERROR'),
('network', 'fake_network')])
expected_postfix = '?all_tenants=1&status=ERROR&network=fake_network'
with mock.patch.object(self.manager, '_list',
mock.Mock(return_value=None)):
self.manager.list(search_opts=filters)
self.manager._list.assert_called_once_with(
security_services.RESOURCES_PATH + expected_postfix,
security_services.RESOURCES_NAME)
def test_update(self):
security_service = 'fake service'
values = {'dns_ip': 'new dns ip',
'server': 'new.ldap.server',
'domain': 'new.ldap.domain',
'sid': 'new sid',
'password': 'fake password',}
with mock.patch.object(self.manager, '_update', fakes.fake_update):
result = self.manager.update(security_service, **values)
self.assertEqual(
result['url'],
security_services.RESOURCE_PATH % security_service)
self.assertEqual(result['resp_key'],
security_services.RESOURCE_NAME)
self.assertEqual(result['body'][security_services.RESOURCE_NAME],
values)
def test_update_no_fields_specified(self):
security_service = 'fake service'
self.assertRaises(exceptions.CommandError,
self.manager.update,
security_service)

View File

@ -0,0 +1,134 @@
# Copyright 2013 OpenStack LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
try:
from collections import OrderedDict
except ImportError:
from ordereddict import OrderedDict
import mock
import unittest
from manilaclient import exceptions
from manilaclient.v1 import share_networks
from tests.v1 import fakes
class ShareNetworkTest(unittest.TestCase):
def setUp(self):
super(ShareNetworkTest, self).setUp()
self.manager = share_networks.ShareNetworkManager(api=None)
def test_create(self):
values = {'neutron_net_id': 'fake net id',
'neutron_subnet_id': 'fake subnet id',
'name': 'fake name',
'description': 'whatever'}
body_expected = {share_networks.RESOURCE_NAME: values}
with mock.patch.object(self.manager, '_create', fakes.fake_create):
result = self.manager.create(**values)
self.assertEqual(result['url'], share_networks.RESOURCES_PATH)
self.assertEqual(result['resp_key'], share_networks.RESOURCE_NAME)
self.assertEqual(
result['body'],
body_expected)
def test_delete(self):
share_nw = 'fake share nw'
with mock.patch.object(self.manager, '_delete', mock.Mock()):
self.manager.delete(share_nw)
self.manager._delete.assert_called_once_with(
share_networks.RESOURCE_PATH % share_nw)
def test_get(self):
share_nw = 'fake share nw'
with mock.patch.object(self.manager, '_get', mock.Mock()):
self.manager.get(share_nw)
self.manager._get.assert_called_once_with(
share_networks.RESOURCE_PATH % share_nw,
share_networks.RESOURCE_NAME)
def test_list_no_filters(self):
with mock.patch.object(self.manager, '_list',
mock.Mock(return_value=None)):
self.manager.list()
self.manager._list.assert_called_once_with(
share_networks.RESOURCES_PATH,
share_networks.RESOURCES_NAME)
def test_list_with_filters(self):
filters = OrderedDict([('all_tenants', 1), ('status', 'ERROR')])
expected_path = \
share_networks.RESOURCES_PATH + '?all_tenants=1&status=ERROR'
with mock.patch.object(self.manager, '_list',
mock.Mock(return_value=None)):
self.manager.list(search_opts=filters)
self.manager._list.assert_called_once_with(
expected_path,
share_networks.RESOURCES_NAME)
def test_update(self):
share_nw = 'fake share nw'
values = {'neutron_net_id': 'new net id',
'neutron_subnet_id': 'new subnet id',
'name': 'new name',
'description': 'new whatever'}
body_expected = {share_networks.RESOURCE_NAME: values}
with mock.patch.object(self.manager, '_update', fakes.fake_update):
result = self.manager.update(share_nw, **values)
self.assertEqual(result['url'],
share_networks.RESOURCE_PATH % share_nw)
self.assertEqual(result['resp_key'], share_networks.RESOURCE_NAME)
self.assertEqual(result['body'], body_expected)
def test_update_with_exception(self):
share_nw = 'fake share nw'
self.assertRaises(exceptions.CommandError,
self.manager.update,
share_nw)
def test_add_security_service(self):
security_service = 'fake security service'
share_nw = 'fake share nw'
expected_path = (share_networks.RESOURCE_PATH + '/action') % share_nw
expected_body = {'add_security_service': {'security_service_id':
security_service}}
with mock.patch.object(self.manager, '_create', mock.Mock()):
self.manager.add_security_service(share_nw, security_service)
self.manager._create.assert_called_once_with(
expected_path,
expected_body,
share_networks.RESOURCE_NAME)
def test_remove_security_service(self):
security_service = 'fake security service'
share_nw = 'fake share nw'
expected_path = (share_networks.RESOURCE_PATH + '/action') % share_nw
expected_body = {'remove_security_service': {'security_service_id':
security_service}}
with mock.patch.object(self.manager, '_create', mock.Mock()):
self.manager.remove_security_service(share_nw, security_service)
self.manager._create.assert_called_once_with(
expected_path,
expected_body,
share_networks.RESOURCE_NAME)

View File

@ -2,6 +2,7 @@ coverage>=3.6
discover
fixtures>=0.3.14
mock>=1.0
ordereddict
pep8==1.3.3
sphinx>=1.1.2
testrepository>=0.0.17