Fix pep8 issues in manila/tests

Fix pep8 issues in some test files.

Partial-Fix: #1333290

Change-Id: If863267f0a3dae1efaf81433814f26a8ebdd1776
This commit is contained in:
Andreas Jaeger 2014-08-15 13:12:58 +02:00
parent 3deb373176
commit 754e5f8587
15 changed files with 371 additions and 363 deletions

View File

@ -38,7 +38,7 @@ class LimiterTest(test.TestCase):
"""
def setUp(self):
"""Run before each test. """
"""Run before each test."""
super(LimiterTest, self).setUp()
self.tiny = range(1)
self.small = range(10)
@ -46,7 +46,7 @@ class LimiterTest(test.TestCase):
self.large = range(10000)
def test_limiter_offset_zero(self):
"""Test offset key works with 0. """
"""Test offset key works with 0."""
req = webob.Request.blank('/?offset=0')
self.assertEqual(common.limited(self.tiny, req), self.tiny)
self.assertEqual(common.limited(self.small, req), self.small)
@ -54,7 +54,7 @@ class LimiterTest(test.TestCase):
self.assertEqual(common.limited(self.large, req), self.large[:1000])
def test_limiter_offset_medium(self):
"""Test offset key works with a medium sized number. """
"""Test offset key works with a medium sized number."""
req = webob.Request.blank('/?offset=10')
self.assertEqual(common.limited(self.tiny, req), [])
self.assertEqual(common.limited(self.small, req), self.small[10:])
@ -62,7 +62,7 @@ class LimiterTest(test.TestCase):
self.assertEqual(common.limited(self.large, req), self.large[10:1010])
def test_limiter_offset_over_max(self):
"""Test offset key works with a number over 1000 (max_limit). """
"""Test offset key works with a number over 1000 (max_limit)."""
req = webob.Request.blank('/?offset=1001')
self.assertEqual(common.limited(self.tiny, req), [])
self.assertEqual(common.limited(self.small, req), [])
@ -71,19 +71,19 @@ class LimiterTest(test.TestCase):
common.limited(self.large, req), self.large[1001:2001])
def test_limiter_offset_blank(self):
"""Test offset key works with a blank offset. """
"""Test offset key works with a blank offset."""
req = webob.Request.blank('/?offset=')
self.assertRaises(
webob.exc.HTTPBadRequest, common.limited, self.tiny, req)
def test_limiter_offset_bad(self):
"""Test offset key works with a BAD offset. """
"""Test offset key works with a BAD offset."""
req = webob.Request.blank(u'/?offset=\u0020aa')
self.assertRaises(
webob.exc.HTTPBadRequest, common.limited, self.tiny, req)
def test_limiter_nothing(self):
"""Test request with no offset or limit """
"""Test request with no offset or limit."""
req = webob.Request.blank('/')
self.assertEqual(common.limited(self.tiny, req), self.tiny)
self.assertEqual(common.limited(self.small, req), self.small)
@ -91,7 +91,7 @@ class LimiterTest(test.TestCase):
self.assertEqual(common.limited(self.large, req), self.large[:1000])
def test_limiter_limit_zero(self):
"""Test limit of zero. """
"""Test limit of zero."""
req = webob.Request.blank('/?limit=0')
self.assertEqual(common.limited(self.tiny, req), self.tiny)
self.assertEqual(common.limited(self.small, req), self.small)
@ -99,7 +99,7 @@ class LimiterTest(test.TestCase):
self.assertEqual(common.limited(self.large, req), self.large[:1000])
def test_limiter_limit_medium(self):
"""Test limit of 10. """
"""Test limit of 10."""
req = webob.Request.blank('/?limit=10')
self.assertEqual(common.limited(self.tiny, req), self.tiny)
self.assertEqual(common.limited(self.small, req), self.small)
@ -107,7 +107,7 @@ class LimiterTest(test.TestCase):
self.assertEqual(common.limited(self.large, req), self.large[:10])
def test_limiter_limit_over_max(self):
"""Test limit of 3000. """
"""Test limit of 3000."""
req = webob.Request.blank('/?limit=3000')
self.assertEqual(common.limited(self.tiny, req), self.tiny)
self.assertEqual(common.limited(self.small, req), self.small)
@ -115,7 +115,7 @@ class LimiterTest(test.TestCase):
self.assertEqual(common.limited(self.large, req), self.large[:1000])
def test_limiter_limit_and_offset(self):
"""Test request with both limit and offset. """
"""Test request with both limit and offset."""
items = range(2000)
req = webob.Request.blank('/?offset=1&limit=3')
self.assertEqual(common.limited(items, req), items[1:4])
@ -127,7 +127,7 @@ class LimiterTest(test.TestCase):
self.assertEqual(common.limited(items, req), [])
def test_limiter_custom_max_limit(self):
"""Test a max_limit other than 1000. """
"""Test a max_limit other than 1000."""
items = range(2000)
req = webob.Request.blank('/?offset=1&limit=3')
self.assertEqual(
@ -142,13 +142,13 @@ class LimiterTest(test.TestCase):
self.assertEqual(common.limited(items, req, max_limit=2000), [])
def test_limiter_negative_limit(self):
"""Test a negative limit. """
"""Test a negative limit."""
req = webob.Request.blank('/?limit=-3000')
self.assertRaises(
webob.exc.HTTPBadRequest, common.limited, self.tiny, req)
def test_limiter_negative_offset(self):
"""Test a negative offset. """
"""Test a negative offset."""
req = webob.Request.blank('/?offset=-30')
self.assertRaises(
webob.exc.HTTPBadRequest, common.limited, self.tiny, req)
@ -162,30 +162,30 @@ class PaginationParamsTest(test.TestCase):
"""
def test_no_params(self):
"""Test no params. """
"""Test no params."""
req = webob.Request.blank('/')
self.assertEqual(common.get_pagination_params(req), {})
def test_valid_marker(self):
"""Test valid marker param. """
"""Test valid marker param."""
req = webob.Request.blank(
'/?marker=263abb28-1de6-412f-b00b-f0ee0c4333c2')
'/?marker=263abb28-1de6-412f-b00b-f0ee0c4333c2')
self.assertEqual(common.get_pagination_params(req),
{'marker': '263abb28-1de6-412f-b00b-f0ee0c4333c2'})
def test_valid_limit(self):
"""Test valid limit param. """
"""Test valid limit param."""
req = webob.Request.blank('/?limit=10')
self.assertEqual(common.get_pagination_params(req), {'limit': 10})
def test_invalid_limit(self):
"""Test invalid limit param. """
"""Test invalid limit param."""
req = webob.Request.blank('/?limit=-2')
self.assertRaises(
webob.exc.HTTPBadRequest, common.get_pagination_params, req)
def test_valid_limit_and_marker(self):
"""Test valid limit and marker parameters. """
"""Test valid limit and marker parameters."""
marker = '263abb28-1de6-412f-b00b-f0ee0c4333c2'
req = webob.Request.blank('/?limit=20&marker=%s' % marker)
self.assertEqual(common.get_pagination_params(req),

View File

@ -165,81 +165,84 @@ class ShareServerAPITest(test.TestCase):
def test_index_no_filters(self):
result = self.controller.index(FakeRequestAdmin)
policy.check_policy.assert_called_once_with(CONTEXT,
share_servers.RESOURCE_NAME, 'index')
policy.check_policy.assert_called_once_with(
CONTEXT, share_servers.RESOURCE_NAME, 'index')
db_api.share_server_get_all.assert_called_once_with(CONTEXT)
self.assertEqual(result, fake_share_server_list)
def test_index_host_filter(self):
result = self.controller.index(FakeRequestWithHost)
policy.check_policy.assert_called_once_with(CONTEXT,
share_servers.RESOURCE_NAME, 'index')
policy.check_policy.assert_called_once_with(
CONTEXT, share_servers.RESOURCE_NAME, 'index')
db_api.share_server_get_all.assert_called_once_with(CONTEXT)
self.assertEqual(result['share_servers'],
[fake_share_server_list['share_servers'][0]])
def test_index_status_filter(self):
result = self.controller.index(FakeRequestWithStatus)
policy.check_policy.assert_called_once_with(CONTEXT,
share_servers.RESOURCE_NAME, 'index')
policy.check_policy.assert_called_once_with(
CONTEXT, share_servers.RESOURCE_NAME, 'index')
db_api.share_server_get_all.assert_called_once_with(CONTEXT)
self.assertEqual(result['share_servers'],
[fake_share_server_list['share_servers'][1]])
def test_index_project_id_filter(self):
result = self.controller.index(FakeRequestWithProjectId)
policy.check_policy.assert_called_once_with(CONTEXT,
share_servers.RESOURCE_NAME, 'index')
policy.check_policy.assert_called_once_with(
CONTEXT, share_servers.RESOURCE_NAME, 'index')
db_api.share_server_get_all.assert_called_once_with(CONTEXT)
self.assertEqual(result['share_servers'],
[fake_share_server_list['share_servers'][0]])
def test_index_share_network_filter_by_name(self):
result = self.controller.index(FakeRequestWithShareNetworkName)
policy.check_policy.assert_called_once_with(CONTEXT,
share_servers.RESOURCE_NAME, 'index')
policy.check_policy.assert_called_once_with(
CONTEXT, share_servers.RESOURCE_NAME, 'index')
db_api.share_server_get_all.assert_called_once_with(CONTEXT)
self.assertEqual(result['share_servers'],
[fake_share_server_list['share_servers'][0]])
def test_index_share_network_filter_by_id(self):
result = self.controller.index(FakeRequestWithShareNetworkId)
policy.check_policy.assert_called_once_with(CONTEXT,
share_servers.RESOURCE_NAME, 'index')
policy.check_policy.assert_called_once_with(
CONTEXT, share_servers.RESOURCE_NAME, 'index')
db_api.share_server_get_all.assert_called_once_with(CONTEXT)
self.assertEqual(result['share_servers'],
[fake_share_server_list['share_servers'][0]])
def test_index_fake_filter(self):
result = self.controller.index(FakeRequestWithFakeFilter)
policy.check_policy.assert_called_once_with(CONTEXT,
share_servers.RESOURCE_NAME, 'index')
policy.check_policy.assert_called_once_with(
CONTEXT, share_servers.RESOURCE_NAME, 'index')
db_api.share_server_get_all.assert_called_once_with(CONTEXT)
self.assertEqual(len(result['share_servers']), 0)
def test_show(self):
self.stubs.Set(db_api, 'share_server_get',
mock.Mock(return_value=fake_share_server_get()))
result = self.controller.show(FakeRequestAdmin,
fake_share_server_get_result['share_server']['id'])
policy.check_policy.assert_called_once_with(CONTEXT,
share_servers.RESOURCE_NAME, 'show')
db_api.share_server_get.assert_called_once_with(CONTEXT,
result = self.controller.show(
FakeRequestAdmin,
fake_share_server_get_result['share_server']['id'])
policy.check_policy.assert_called_once_with(
CONTEXT, share_servers.RESOURCE_NAME, 'show')
db_api.share_server_get.assert_called_once_with(
CONTEXT, fake_share_server_get_result['share_server']['id'])
self.assertEqual(result['share_server'],
fake_share_server_get_result['share_server'])
def test_details(self):
self.stubs.Set(db_api, 'share_server_get',
mock.Mock(return_value=fake_share_server_get()))
self.stubs.Set(db_api, 'share_server_backend_details_get',
self.stubs.Set(
db_api, 'share_server_backend_details_get',
mock.Mock(return_value=fake_share_server_backend_details_get()))
result = self.controller.details(FakeRequestAdmin,
fake_share_server_get_result['share_server']['id'])
policy.check_policy.assert_called_once_with(CONTEXT,
share_servers.RESOURCE_NAME, 'details')
db_api.share_server_get.assert_called_once_with(CONTEXT,
result = self.controller.details(
FakeRequestAdmin,
fake_share_server_get_result['share_server']['id'])
policy.check_policy.assert_called_once_with(
CONTEXT, share_servers.RESOURCE_NAME, 'details')
db_api.share_server_get.assert_called_once_with(
CONTEXT, fake_share_server_get_result['share_server']['id'])
db_api.share_server_backend_details_get.assert_called_once_with(
CONTEXT, fake_share_server_get_result['share_server']['id'])
self.assertEqual(result,
@ -251,12 +254,13 @@ class ShareServerAPITest(test.TestCase):
mock.Mock(return_value=share_server))
self.stubs.Set(self.controller.share_api, 'delete_share_server',
mock.Mock())
result = self.controller.delete(FakeRequestAdmin,
fake_share_server_get_result['share_server']['id'])
policy.check_policy.assert_called_once_with(CONTEXT,
share_servers.RESOURCE_NAME, 'delete')
db_api.share_server_get.assert_called_once_with(CONTEXT,
result = self.controller.delete(
FakeRequestAdmin,
fake_share_server_get_result['share_server']['id'])
policy.check_policy.assert_called_once_with(
CONTEXT, share_servers.RESOURCE_NAME, 'delete')
db_api.share_server_get.assert_called_once_with(
CONTEXT, fake_share_server_get_result['share_server']['id'])
self.controller.share_api.delete_share_server.assert_called_once_with(
CONTEXT, share_server)
@ -266,12 +270,13 @@ class ShareServerAPITest(test.TestCase):
mock.Mock(return_value=share_server))
self.stubs.Set(self.controller.share_api, 'delete_share_server',
mock.Mock())
result = self.controller.delete(FakeRequestAdmin,
fake_share_server_get_result['share_server']['id'])
policy.check_policy.assert_called_once_with(CONTEXT,
share_servers.RESOURCE_NAME, 'delete')
db_api.share_server_get.assert_called_once_with(CONTEXT,
result = self.controller.delete(
FakeRequestAdmin,
fake_share_server_get_result['share_server']['id'])
policy.check_policy.assert_called_once_with(
CONTEXT, share_servers.RESOURCE_NAME, 'delete')
db_api.share_server_get.assert_called_once_with(
CONTEXT, fake_share_server_get_result['share_server']['id'])
self.controller.share_api.delete_share_server.assert_called_once_with(
CONTEXT, share_server)
@ -291,7 +296,7 @@ class ShareServerAPITest(test.TestCase):
FakeRequestAdmin,
share_server_id)
db_api.share_server_get.assert_called_once_with(CONTEXT,
share_server_id)
share_server_id)
self.controller.share_api.delete_share_server.assert_called_once_with(
CONTEXT, share_server)
@ -311,8 +316,8 @@ class ShareServerAPITest(test.TestCase):
share_server_id)
db_api.share_server_get.assert_called_once_with(
CONTEXT, share_server_id)
policy.check_policy.assert_called_once_with(CONTEXT,
share_servers.RESOURCE_NAME, 'delete')
policy.check_policy.assert_called_once_with(
CONTEXT, share_servers.RESOURCE_NAME, 'delete')
def test_delete_creating_server(self):
share_server = FakeShareServer(status=constants.STATUS_CREATING)
@ -322,7 +327,8 @@ class ShareServerAPITest(test.TestCase):
self.controller.delete,
FakeRequestAdmin,
share_server['id'])
policy.check_policy.assert_called_once_with(CONTEXT,
policy.check_policy.assert_called_once_with(
CONTEXT,
share_servers.RESOURCE_NAME, 'delete')
def test_delete_deleting_server(self):
@ -333,5 +339,5 @@ class ShareServerAPITest(test.TestCase):
self.controller.delete,
FakeRequestAdmin,
share_server['id'])
policy.check_policy.assert_called_once_with(CONTEXT,
share_servers.RESOURCE_NAME, 'delete')
policy.check_policy.assert_called_once_with(
CONTEXT, share_servers.RESOURCE_NAME, 'delete')

View File

@ -70,13 +70,13 @@ class NovaApiTestCase(test.TestCase):
self.novaclient = FakeNovaClient()
self.ctx = context.get_admin_context()
self.stubs.Set(nova, 'novaclient',
mock.Mock(return_value=self.novaclient))
mock.Mock(return_value=self.novaclient))
self.stubs.Set(nova, '_untranslate_server_summary_view',
lambda server: server)
def test_server_create(self):
result = self.api.server_create(self.ctx, 'server_name', 'fake_image',
'fake_flavor', None, None, None)
'fake_flavor', None, None, None)
self.assertEqual(result['id'], 'created_id')
def test_server_delete(self):
@ -122,14 +122,14 @@ class NovaApiTestCase(test.TestCase):
def test_server_reboot_hard(self):
self.stubs.Set(self.novaclient.servers, 'reboot', mock.Mock())
self.api.server_reboot(self.ctx, 'id1')
self.novaclient.servers.reboot.assert_called_once_with('id1',
nova_servers.REBOOT_HARD)
self.novaclient.servers.reboot.assert_called_once_with(
'id1', nova_servers.REBOOT_HARD)
def test_server_reboot_soft(self):
self.stubs.Set(self.novaclient.servers, 'reboot', mock.Mock())
self.api.server_reboot(self.ctx, 'id1', True)
self.novaclient.servers.reboot.assert_called_once_with('id1',
nova_servers.REBOOT_SOFT)
self.novaclient.servers.reboot.assert_called_once_with(
'id1', nova_servers.REBOOT_SOFT)
def test_server_rebuild(self):
self.stubs.Set(self.novaclient.servers, 'rebuild', mock.Mock())
@ -144,7 +144,7 @@ class NovaApiTestCase(test.TestCase):
self.api.instance_volume_attach(self.ctx, 'instance_id',
'vol_id', 'device')
self.novaclient.volumes.create_server_volume.\
assert_called_once_with('instance_id', 'vol_id', 'device')
assert_called_once_with('instance_id', 'vol_id', 'device')
def test_instance_volume_detach(self):
self.stubs.Set(self.novaclient.volumes, 'delete_server_volume',
@ -152,7 +152,7 @@ class NovaApiTestCase(test.TestCase):
self.api.instance_volume_detach(self.ctx, 'instance_id',
'att_id')
self.novaclient.volumes.delete_server_volume.\
assert_called_once_with('instance_id', 'att_id')
assert_called_once_with('instance_id', 'att_id')
def test_instance_volumes_list(self):
self.stubs.Set(self.novaclient.volumes, 'get_server_volumes',
@ -177,7 +177,7 @@ class NovaApiTestCase(test.TestCase):
self.api.update_server_volume(self.ctx, 'instance_id', 'att_id',
'new_vol_id')
self.novaclient.volumes.update_server_volume.\
assert_called_once_with('instance_id', 'att_id', 'new_vol_id')
assert_called_once_with('instance_id', 'att_id', 'new_vol_id')
def test_keypair_create(self):
self.stubs.Set(self.novaclient.keypairs, 'create', mock.Mock())
@ -188,13 +188,13 @@ class NovaApiTestCase(test.TestCase):
self.stubs.Set(self.novaclient.keypairs, 'create', mock.Mock())
self.api.keypair_import(self.ctx, 'keypair_name', 'fake_pub_key')
self.novaclient.keypairs.create.\
assert_called_once_with('keypair_name', 'fake_pub_key')
assert_called_once_with('keypair_name', 'fake_pub_key')
def test_keypair_delete(self):
self.stubs.Set(self.novaclient.keypairs, 'delete', mock.Mock())
self.api.keypair_delete(self.ctx, 'fake_keypair_id')
self.novaclient.keypairs.delete.\
assert_called_once_with('fake_keypair_id')
assert_called_once_with('fake_keypair_id')
def test_keypair_list(self):
self.assertEqual([{'id': 'id1'}, {'id': 'id2'}],

View File

@ -26,8 +26,7 @@ from manila import test
class NetAppClusteredDrvTestCase(test.TestCase):
"""Tests for NetApp cmode driver.
"""
"""Tests for NetApp cmode driver."""
def setUp(self):
super(NetAppClusteredDrvTestCase, self).setUp()
self._context = context.get_admin_context()
@ -53,8 +52,7 @@ class NetAppClusteredDrvTestCase(test.TestCase):
'network_allocations': [
{'ip_address': 'ip'}
]
}
}
}}
self.snapshot = {'id': 'fake_snapshot_uuid',
'project_id': 'fake_tenant_id',
'share_id': 'fake_share_id',
@ -101,7 +99,7 @@ class NetAppClusteredDrvTestCase(test.TestCase):
mock.call('vserver-create', vserver_create_args),
mock.call('aggr-get-iter'),
mock.call('vserver-modify', vserver_modify_args),
]
]
)
def test_update_share_stats(self):
@ -290,7 +288,7 @@ class NetAppClusteredDrvTestCase(test.TestCase):
'interface-name': 'os_all_id',
'role': 'data',
'vserver': 'vserver-name',
}
}
self.driver._client.send_request.assert_has_calls([
mock.call('net-vlan-create', vlan_args),
mock.call('net-interface-create', interface_args),
@ -381,9 +379,9 @@ class NetAppClusteredDrvTestCase(test.TestCase):
vserver_info = naapi.NaElement('vserver-info')
vserver_aggr_info_list = naapi.NaElement('vserver-aggr-info-list')
for i in range(1, 4):
vserver_aggr_info_list.add_node_with_children('aggr-attributes',
**{'aggr-name': 'fake%s' % i,
'aggr-availsize': '%s' % i})
vserver_aggr_info_list.add_node_with_children(
'aggr-attributes', **{'aggr-name': 'fake%s' % i,
'aggr-availsize': '%s' % i})
vserver_info.add_child_elem(vserver_aggr_info_list)
attributes.add_child_elem(vserver_info)
root.add_child_elem(attributes)
@ -517,8 +515,7 @@ class NetAppClusteredDrvTestCase(test.TestCase):
class NetAppNFSHelperTestCase(test.TestCase):
"""Tests for NetApp 7mode driver.
"""
"""Tests for NetApp 7mode driver."""
def setUp(self):
super(NetAppNFSHelperTestCase, self).setUp()
self._context = context.get_admin_context()
@ -576,8 +573,7 @@ class NetAppNFSHelperTestCase(test.TestCase):
class NetAppCIFSHelperTestCase(test.TestCase):
"""Tests for NetApp 7mode driver.
"""
"""Tests for NetApp 7mode driver."""
def setUp(self):
super(NetAppCIFSHelperTestCase, self).setUp()
self._context = context.get_admin_context()

View File

@ -37,16 +37,21 @@ class OVS_Lib_Test(test.TestCase):
def test_reset_bridge(self):
self.br.reset_bridge()
self.execute.assert_has_calls([mock.call("ovs-vsctl", self.TO, "--",
"--if-exists", "del-br", self.BR_NAME, run_as_root=True),
mock.call("ovs-vsctl", self.TO, "add-br",
self.BR_NAME, run_as_root=True)])
"--if-exists", "del-br",
self.BR_NAME,
run_as_root=True),
mock.call("ovs-vsctl", self.TO,
"add-br",
self.BR_NAME,
run_as_root=True)])
def test_delete_port(self):
pname = "tap5"
self.br.delete_port(pname)
self.execute.assert_called_once_with("ovs-vsctl", self.TO, "--",
"--if-exists", "del-port", self.BR_NAME, pname,
run_as_root=True)
"--if-exists", "del-port",
self.BR_NAME, pname,
run_as_root=True)
def test_port_id_regex(self):
result = ('external_ids : {attached-mac="fa:16:3e:23:5b:f2",'

View File

@ -114,7 +114,7 @@ class NeutronApiTest(test.TestCase):
'dhcp_opts': 'test dhcp'}
with mock.patch.object(self.neutron_api, '_has_port_binding_extension',
mock.Mock(return_value=True)):
mock.Mock(return_value=True)):
port = self.neutron_api.create_port(**port_args)
self.assertEqual(port['tenant_id'], port_args['tenant_id'])
self.assertEqual(port['network_id'],
@ -139,7 +139,7 @@ class NeutronApiTest(test.TestCase):
port_args = {'tenant_id': 'test tenant', 'network_id': 'test net'}
with mock.patch.object(self.neutron_api, '_has_port_binding_extension',
mock.Mock(return_value=True)):
mock.Mock(return_value=True)):
port = self.neutron_api.create_port(**port_args)
self.assertEqual(port['tenant_id'], port_args['tenant_id'])
@ -153,13 +153,13 @@ class NeutronApiTest(test.TestCase):
side_effect=neutron_client_exc.NeutronClientException)
with mock.patch.object(self.neutron_api, '_has_port_binding_extension',
mock.Mock(return_value=True)):
mock.Mock(return_value=True)):
with mock.patch.object(self.neutron_api.client, 'create_port',
client_create_port_mock):
client_create_port_mock):
self.assertRaises(exception.NetworkException,
self.neutron_api.create_port,
**port_args)
self.neutron_api.create_port,
**port_args)
neutron_api.LOG.exception.assert_called_once()
@mock.patch.object(neutron_api.LOG, 'exception', mock.Mock())
@ -170,9 +170,9 @@ class NeutronApiTest(test.TestCase):
status_code=409))
with mock.patch.object(self.neutron_api, '_has_port_binding_extension',
mock.Mock(return_value=True)):
mock.Mock(return_value=True)):
with mock.patch.object(self.neutron_api.client, 'create_port',
client_create_port_mock):
client_create_port_mock):
self.assertRaises(exception.PortLimitExceeded,
self.neutron_api.create_port,
@ -182,7 +182,7 @@ class NeutronApiTest(test.TestCase):
def test_delete_port(self):
port_id = 'test port id'
with mock.patch.object(self.neutron_api.client, 'delete_port',
mock.Mock()) as client_delete_port_mock:
mock.Mock()) as client_delete_port_mock:
self.neutron_api.delete_port(port_id)
client_delete_port_mock.assert_called_once_with(port_id)
@ -193,7 +193,7 @@ class NeutronApiTest(test.TestCase):
client_list_ports_mock = mock.Mock(return_value={'ports': fake_ports})
with mock.patch.object(self.neutron_api.client, 'list_ports',
client_list_ports_mock):
client_list_ports_mock):
ports = self.neutron_api.list_ports(**search_opts)
client_list_ports_mock.assert_called_once_with(**search_opts)
@ -205,7 +205,7 @@ class NeutronApiTest(test.TestCase):
client_show_port_mock = mock.Mock(return_value={'port': fake_port})
with mock.patch.object(self.neutron_api.client, 'show_port',
client_show_port_mock):
client_show_port_mock):
port = self.neutron_api.show_port(port_id)
client_show_port_mock.assert_called_once_with(port_id)
@ -215,10 +215,10 @@ class NeutronApiTest(test.TestCase):
network_id = 'test network id'
fake_network = {'fake network': 'fake network info'}
client_show_network_mock = mock.Mock(
return_value={'network': fake_network})
return_value={'network': fake_network})
with mock.patch.object(self.neutron_api.client, 'show_network',
client_show_network_mock):
client_show_network_mock):
network = self.neutron_api.get_network(network_id)
client_show_network_mock.assert_called_once_with(network_id)
@ -228,7 +228,7 @@ class NeutronApiTest(test.TestCase):
subnet_id = 'fake subnet id'
with mock.patch.object(self.neutron_api.client, 'show_subnet',
mock.Mock(return_value={'subnet': {}})):
mock.Mock(return_value={'subnet': {}})):
subnet = self.neutron_api.get_subnet(subnet_id)
self.neutron_api.client.show_subnet.assert_called_once_with(
@ -238,10 +238,10 @@ class NeutronApiTest(test.TestCase):
def test_get_all_network(self):
fake_networks = [{'fake network': 'fake network info'}]
client_list_networks_mock = mock.Mock(
return_value={'networks': fake_networks})
return_value={'networks': fake_networks})
with mock.patch.object(self.neutron_api.client, 'list_networks',
client_list_networks_mock):
client_list_networks_mock):
networks = self.neutron_api.get_all_networks()
client_list_networks_mock.assert_any_call()
@ -290,10 +290,10 @@ class NeutronApiTest(test.TestCase):
def test_list_routers(self):
fake_routers = [{'fake router': 'fake router info'}]
client_list_routers_mock = mock.Mock(
return_value={'routers': fake_routers})
return_value={'routers': fake_routers})
with mock.patch.object(self.neutron_api.client, 'list_routers',
client_list_routers_mock):
client_list_routers_mock):
networks = self.neutron_api.router_list()
client_list_routers_mock.assert_any_call()
@ -306,11 +306,11 @@ class NeutronApiTest(test.TestCase):
side_effect=neutron_client_exc.NeutronClientException)
with mock.patch.object(self.neutron_api.client, 'create_network',
client_create_network_mock):
client_create_network_mock):
self.assertRaises(exception.NetworkException,
self.neutron_api.network_create,
**net_args)
self.neutron_api.network_create,
**net_args)
neutron_api.LOG.exception.assert_called_once()
@mock.patch.object(neutron_api.LOG, 'exception', mock.Mock())
@ -321,11 +321,11 @@ class NeutronApiTest(test.TestCase):
side_effect=neutron_client_exc.NeutronClientException)
with mock.patch.object(self.neutron_api.client, 'create_subnet',
client_create_subnet_mock):
client_create_subnet_mock):
self.assertRaises(exception.NetworkException,
self.neutron_api.subnet_create,
**subnet_args)
self.neutron_api.subnet_create,
**subnet_args)
neutron_api.LOG.exception.assert_called_once()
@mock.patch.object(neutron_api.LOG, 'exception', mock.Mock())
@ -335,11 +335,11 @@ class NeutronApiTest(test.TestCase):
side_effect=neutron_client_exc.NeutronClientException)
with mock.patch.object(self.neutron_api.client, 'create_router',
client_create_router_mock):
client_create_router_mock):
self.assertRaises(exception.NetworkException,
self.neutron_api.router_create,
**router_args)
self.neutron_api.router_create,
**router_args)
neutron_api.LOG.exception.assert_called_once()
def test_update_port_fixed_ips(self):
@ -356,11 +356,11 @@ class NeutronApiTest(test.TestCase):
side_effect=neutron_client_exc.NeutronClientException)
with mock.patch.object(self.neutron_api.client, 'update_port',
client_update_port_mock):
client_update_port_mock):
self.assertRaises(exception.NetworkException,
self.neutron_api.update_port_fixed_ips,
port_id, fixed_ips)
self.neutron_api.update_port_fixed_ips,
port_id, fixed_ips)
neutron_api.LOG.exception.assert_called_once()
def test_router_update_routes(self):
@ -379,21 +379,21 @@ class NeutronApiTest(test.TestCase):
side_effect=neutron_client_exc.NeutronClientException)
with mock.patch.object(self.neutron_api.client, 'update_router',
client_update_router_mock):
client_update_router_mock):
self.assertRaises(exception.NetworkException,
self.neutron_api.router_update_routes,
router_id, routes)
self.neutron_api.router_update_routes,
router_id, routes)
neutron_api.LOG.exception.assert_called_once()
def test_show_router(self):
router_id = 'test router id'
fake_router = {'fake router': 'fake router info'}
client_show_router_mock = mock.Mock(return_value={'router':
fake_router})
fake_router})
with mock.patch.object(self.neutron_api.client, 'show_router',
client_show_router_mock):
client_show_router_mock):
port = self.neutron_api.show_router(router_id)
client_show_router_mock.assert_called_once_with(router_id)
@ -403,14 +403,15 @@ class NeutronApiTest(test.TestCase):
router_id = 'test port id'
subnet_id = 'test subnet id'
port_id = 'test port id'
with mock.patch.object(self.neutron_api.client, 'add_interface_router',
mock.Mock()) as client_add_interface_router_mock:
with mock.patch.object(
self.neutron_api.client, 'add_interface_router',
mock.Mock()) as client_add_interface_router_mock:
self.neutron_api.router_add_interface(router_id,
subnet_id,
port_id)
client_add_interface_router_mock.assert_called_once_with(
port_id, {'subnet_id': subnet_id, 'port_id': port_id})
port_id, {'subnet_id': subnet_id, 'port_id': port_id})
@mock.patch.object(neutron_api.LOG, 'exception', mock.Mock())
def test_router_add_interface_exception(self):
@ -421,11 +422,11 @@ class NeutronApiTest(test.TestCase):
side_effect=neutron_client_exc.NeutronClientException)
with mock.patch.object(self.neutron_api.client, 'add_interface_router',
client_add_interface_router_mock):
client_add_interface_router_mock):
self.assertRaises(exception.NetworkException,
self.neutron_api.router_add_interface,
router_id, subnet_id, port_id)
self.neutron_api.router_add_interface,
router_id, subnet_id, port_id)
neutron_api.LOG.exception.assert_called_once()

View File

@ -43,8 +43,8 @@ class SecurityServiceDBTest(test.TestCase):
super(SecurityServiceDBTest, self).__init__(*args, **kwargs)
self.fake_context = context.RequestContext(user_id='fake user',
project_id='fake project',
is_admin=False)
project_id='fake project',
is_admin=False)
def _check_expected_fields(self, result, expected):
for key in expected:
@ -168,15 +168,15 @@ class SecurityServiceDBTest(test.TestCase):
dict2)
result1 = db_api.security_service_get_all_by_project(
self.fake_context,
dict1['project_id'])
self.fake_context,
dict1['project_id'])
self.assertEqual(len(result1), 1)
self._check_expected_fields(result1[0], dict1)
result2 = db_api.security_service_get_all_by_project(
self.fake_context,
dict2['project_id'])
self.fake_context,
dict2['project_id'])
self.assertEqual(len(result2), 1)
self._check_expected_fields(result2[0], dict2)

View File

@ -234,8 +234,8 @@ class ShareNetworkDBTest(test.TestCase):
db_api.share_network_create(self.fake_context, share_nw_dict2)
result = db_api.share_network_get_all_by_project(
self.fake_context,
share_nw_dict2['project_id'])
self.fake_context,
share_nw_dict2['project_id'])
self.assertEqual(len(result), 1)
self._check_fields(expected=share_nw_dict2, actual=result[0])
@ -252,10 +252,10 @@ class ShareNetworkDBTest(test.TestCase):
security_dict1['id'])
result = sqlalchemy_api.model_query(
self.fake_context,
models.ShareNetworkSecurityServiceAssociation).\
filter_by(security_service_id=security_dict1['id']).\
filter_by(share_network_id=self.share_nw_dict['id']).first()
self.fake_context,
models.ShareNetworkSecurityServiceAssociation).\
filter_by(security_service_id=security_dict1['id']).\
filter_by(share_network_id=self.share_nw_dict['id']).first()
self.assertTrue(result is not None)
@ -316,10 +316,10 @@ class ShareNetworkDBTest(test.TestCase):
security_dict1['id'])
result = sqlalchemy_api.model_query(
self.fake_context,
models.ShareNetworkSecurityServiceAssociation).\
filter_by(security_service_id=security_dict1['id']).\
filter_by(share_network_id=self.share_nw_dict['id']).first()
self.fake_context,
models.ShareNetworkSecurityServiceAssociation).\
filter_by(security_service_id=security_dict1['id']).\
filter_by(share_network_id=self.share_nw_dict['id']).first()
self.assertTrue(result is None)

View File

@ -35,8 +35,8 @@ class HostFiltersTestCase(test.TestCase):
super(HostFiltersTestCase, self).setUp()
self.context = context.RequestContext('fake', 'fake')
self.json_query = jsonutils.dumps(
['and', ['>=', '$free_capacity_gb', 1024],
['>=', '$total_capacity_gb', 10 * 1024]])
['and', ['>=', '$free_capacity_gb', 1024],
['>=', '$total_capacity_gb', 10 * 1024]])
# This has a side effect of testing 'get_filter_classes'
# when specifying a method (in this case, our standard filters)
filter_handler = filters.HostFilterHandler('manila.scheduler.filters')

View File

@ -34,7 +34,6 @@ CONF = cfg.CONF
class QuotaIntegrationTestCase(test.TestCase):
def setUp(self):
super(QuotaIntegrationTestCase, self).setUp()
self.flags(quota_shares=2,
@ -580,9 +579,9 @@ class QuotaEngineTestCase(test.TestCase):
'test_project', 'fake_user')
self.assertEqual(driver.called, [
('destroy_all_by_project_and_user', context, 'test_project',
'fake_user'),
])
('destroy_all_by_project_and_user', context, 'test_project',
'fake_user'),
])
def test_destroy_all_by_project(self):
context = FakeContext(None, None)
@ -612,7 +611,6 @@ class QuotaEngineTestCase(test.TestCase):
class DbQuotaDriverTestCase(test.TestCase):
expected_all_context = {
"shares": {"limit": 10, "in_use": 2, "reserved": 0, },
"gigabytes": {"limit": 50, "in_use": 10, "reserved": 0, },
@ -718,11 +716,11 @@ class DbQuotaDriverTestCase(test.TestCase):
quota.QUOTAS._resources, 'test_project', 'fake_user')
self.assertEqual(self.calls, [
'quota_get_all_by_project_and_user',
'quota_get_all_by_project',
'quota_usage_get_all_by_project_and_user',
'quota_class_get_all_by_name',
])
'quota_get_all_by_project_and_user',
'quota_get_all_by_project',
'quota_usage_get_all_by_project_and_user',
'quota_class_get_all_by_name',
])
self.assertEqual(result, self.expected_all_context)
def _stub_get_by_project(self):
@ -760,10 +758,10 @@ class DbQuotaDriverTestCase(test.TestCase):
quota.QUOTAS._resources, 'test_project', 'fake_user')
self.assertEqual(self.calls, [
'quota_get_all_by_project_and_user',
'quota_get_all_by_project',
'quota_usage_get_all_by_project_and_user',
])
'quota_get_all_by_project_and_user',
'quota_get_all_by_project',
'quota_usage_get_all_by_project_and_user',
])
self.assertEqual(result, self.expected_all_context)
def test_get_project_quotas_alt_context_no_class(self):
@ -784,11 +782,11 @@ class DbQuotaDriverTestCase(test.TestCase):
quota_class='test_class')
self.assertEqual(self.calls, [
'quota_get_all_by_project_and_user',
'quota_get_all_by_project',
'quota_usage_get_all_by_project_and_user',
'quota_class_get_all_by_name',
])
'quota_get_all_by_project_and_user',
'quota_get_all_by_project',
'quota_usage_get_all_by_project_and_user',
'quota_class_get_all_by_name',
])
self.assertEqual(result, self.expected_all_context)
def test_get_project_quotas_alt_context_with_class(self):
@ -810,11 +808,11 @@ class DbQuotaDriverTestCase(test.TestCase):
defaults=False)
self.assertEqual(self.calls, [
'quota_get_all_by_project_and_user',
'quota_get_all_by_project',
'quota_usage_get_all_by_project_and_user',
'quota_class_get_all_by_name',
])
'quota_get_all_by_project_and_user',
'quota_get_all_by_project',
'quota_usage_get_all_by_project_and_user',
'quota_class_get_all_by_name',
])
expected = {
"shares": {"limit": 10, "in_use": 2, "reserved": 0, },
"gigabytes": {"limit": 50, "in_use": 10, "reserved": 0, },
@ -843,10 +841,10 @@ class DbQuotaDriverTestCase(test.TestCase):
quota.QUOTAS._resources, 'test_project', 'fake_user', usages=False)
self.assertEqual(self.calls, [
'quota_get_all_by_project_and_user',
'quota_get_all_by_project',
'quota_class_get_all_by_name',
])
'quota_get_all_by_project_and_user',
'quota_get_all_by_project',
'quota_class_get_all_by_name',
])
expected = {
"shares": {"limit": 10, },
"gigabytes": {"limit": 50, },
@ -913,10 +911,10 @@ class DbQuotaDriverTestCase(test.TestCase):
quota.QUOTAS._resources, 'test_project', user_id='test_user')
self.assertEqual(self.calls, [
'get_project_quotas',
'get_user_quotas',
'quota_get_all_by_project_and_user',
])
'get_project_quotas',
'get_user_quotas',
'quota_get_all_by_project_and_user',
])
expected = {
"shares": {"minimum": 0, "maximum": 12, },
"gigabytes": {"minimum": 0, "maximum": 1000, },
@ -932,8 +930,8 @@ class DbQuotaDriverTestCase(test.TestCase):
quota.QUOTAS._resources, 'test_project')
self.assertEqual(self.calls, [
'get_project_quotas',
])
'get_project_quotas',
])
expected = {
"shares": {"minimum": 0, "maximum": -1, },
"gigabytes": {"minimum": 0, "maximum": -1, },
@ -1103,7 +1101,7 @@ class DbQuotaDriverTestCase(test.TestCase):
'test_class'),
'test_project')
self.assertEqual(self.calls, [('quota_destroy_all_by_project',
('test_project')), ])
('test_project')), ])
class FakeSession(object):

View File

@ -85,31 +85,31 @@ class ServiceInstanceManagerTestCase(test.TestCase):
net1['name'] = CONF.service_network_name
net1['id'] = 'fake service network id'
self.stubs.Set(self._manager.neutron_api, 'get_all_tenant_networks',
mock.Mock(return_value=[net1, net2]))
mock.Mock(return_value=[net1, net2]))
result = self._manager._get_service_network()
self.assertEqual(result, net1['id'])
def test_get_service_network_net_does_not_exists(self):
net = fake_network.FakeNetwork()
self.stubs.Set(self._manager.neutron_api, 'get_all_tenant_networks',
mock.Mock(return_value=[]))
mock.Mock(return_value=[]))
self.stubs.Set(self._manager.neutron_api, 'network_create',
mock.Mock(return_value=net))
mock.Mock(return_value=net))
result = self._manager._get_service_network()
self.assertEqual(result, net['id'])
def test_get_service_network_ambiguos(self):
net = fake_network.FakeNetwork(name=CONF.service_network_name)
self.stubs.Set(self._manager.neutron_api, 'get_all_tenant_networks',
mock.Mock(return_value=[net, net]))
mock.Mock(return_value=[net, net]))
self.assertRaises(exception.ManilaException,
self._manager._get_service_network)
def test_get_service_instance_name(self):
result = self._manager._get_service_instance_name(
'fake_share_network_id')
'fake_share_network_id')
self.assertEqual(result, CONF.service_instance_name_template %
'fake_share_network_id')
'fake_share_network_id')
def test_get_server_ip_found_in_networks_section(self):
ip = '10.0.0.1'
@ -233,10 +233,9 @@ class ServiceInstanceManagerTestCase(test.TestCase):
mock.Mock(return_value=[fake_secgroup1,
fake_secgroup2]))
self.assertRaises(exception.ServiceInstanceException,
self._manager._get_or_create_security_group,
self._context,
name,
)
self._manager._get_or_create_security_group,
self._context,
name)
self._manager.compute_api.security_group_list.assert_called_once_with(
self._context)
@ -278,11 +277,10 @@ class ServiceInstanceManagerTestCase(test.TestCase):
mock.Mock(return_value=fake_server))
result = self._manager.ensure_service_instance(self._context,
server_details)
self._manager.compute_api.server_get.\
assert_called_once_with(self._context,
server_details['instance_id'])
self._manager._check_server_availability.\
assert_called_once_with(server_details)
self._manager.compute_api.server_get.assert_called_once_with(
self._context, server_details['instance_id'])
self._manager._check_server_availability.assert_called_once_with(
server_details)
self.assertTrue(result)
def test_ensure_server_not_exists(self):
@ -296,9 +294,8 @@ class ServiceInstanceManagerTestCase(test.TestCase):
instance_id=server_details['instance_id'])))
result = self._manager.ensure_service_instance(self._context,
server_details)
self._manager.compute_api.server_get.\
assert_called_once_with(self._context,
server_details['instance_id'])
self._manager.compute_api.server_get.assert_called_once_with(
self._context, server_details['instance_id'])
self.assertFalse(self._manager._check_server_availability.called)
self.assertFalse(result)
@ -313,9 +310,8 @@ class ServiceInstanceManagerTestCase(test.TestCase):
self._manager.ensure_service_instance,
self._context,
server_details)
self._manager.compute_api.server_get.\
assert_called_once_with(self._context,
server_details['instance_id'])
self._manager.compute_api.server_get.assert_called_once_with(
self._context, server_details['instance_id'])
self.assertFalse(self._manager._check_server_availability.called)
def test_ensure_server_non_active(self):
@ -349,8 +345,8 @@ class ServiceInstanceManagerTestCase(test.TestCase):
def test_get_key_exists(self):
fake_keypair = fake_compute.FakeKeypair(
name=CONF.manila_service_keypair_name,
public_key='fake_public_key')
name=CONF.manila_service_keypair_name,
public_key='fake_public_key')
self.stubs.Set(self._manager.compute_api, 'keypair_list',
mock.Mock(return_value=[fake_keypair]))
self.stubs.Set(self._manager.compute_api, 'keypair_import',
@ -368,8 +364,8 @@ class ServiceInstanceManagerTestCase(test.TestCase):
def test_get_key_exists_recreate(self):
fake_keypair = fake_compute.FakeKeypair(
name=CONF.manila_service_keypair_name,
public_key='fake_public_key1')
name=CONF.manila_service_keypair_name,
public_key='fake_public_key1')
self.stubs.Set(self._manager.compute_api, 'keypair_list',
mock.Mock(return_value=[fake_keypair]))
self.stubs.Set(self._manager.compute_api, 'keypair_import',
@ -383,9 +379,8 @@ class ServiceInstanceManagerTestCase(test.TestCase):
self._manager.compute_api.keypair_list.assert_called_once()
self._manager.compute_api.keypair_delete.assert_called_once()
self._manager.compute_api.keypair_import.\
assert_called_once_with(self._context, fake_keypair.name,
'fake_public_key2')
self._manager.compute_api.keypair_import.assert_called_once_with(
self._context, fake_keypair.name, 'fake_public_key2')
self.assertEqual(result,
(fake_keypair.name,
os.path.expanduser(CONF.path_to_private_key)))
@ -456,11 +451,11 @@ class ServiceInstanceManagerTestCase(test.TestCase):
self._manager._get_key.assert_called_once()
self._manager._setup_network_for_instance.assert_called_once()
self._manager._setup_connectivity_with_service_instances.\
assert_called_once()
assert_called_once()
self._manager.compute_api.server_create.assert_called_once_with(
self._context, name=fake_instance_name, image='fake_image_id',
flavor=CONF.service_instance_flavor_id,
key_name='fake_key_name', nics=[{'port-id': fake_port['id']}])
self._context, name=fake_instance_name, image='fake_image_id',
flavor=CONF.service_instance_flavor_id,
key_name='fake_key_name', nics=[{'port-id': fake_port['id']}])
service_instance.socket.socket.assert_called_once()
self.assertEqual(result, fake_server)
@ -531,8 +526,8 @@ class ServiceInstanceManagerTestCase(test.TestCase):
'fake-neutron-net',
'fake-neutron-subnet')
self._manager.neutron_api.delete_port.\
assert_called_once_with(fake_port['id'])
self._manager.neutron_api.delete_port.assert_called_once_with(
fake_port['id'])
self.assertFalse(self._manager.compute_api.server_create.called)
self.assertFalse(self._manager.compute_api.server_get.called)
self.assertFalse(service_instance.socket.socket.called)
@ -552,43 +547,43 @@ class ServiceInstanceManagerTestCase(test.TestCase):
def test_setup_network_for_instance(self):
fake_service_net = fake_network.FakeNetwork(subnets=[])
fake_service_subnet = fake_network.\
FakeSubnet(name=self.share['share_network_id'])
FakeSubnet(name=self.share['share_network_id'])
fake_router = fake_network.FakeRouter()
fake_port = fake_network.FakePort()
self.stubs.Set(self._manager.neutron_api, 'get_network',
mock.Mock(return_value=fake_service_net))
mock.Mock(return_value=fake_service_net))
self.stubs.Set(self._manager.neutron_api, 'subnet_create',
mock.Mock(return_value=fake_service_subnet))
mock.Mock(return_value=fake_service_subnet))
self.stubs.Set(self._manager.db, 'share_network_get',
mock.Mock(return_value='fake_share_network'))
mock.Mock(return_value='fake_share_network'))
self.stubs.Set(self._manager, '_get_private_router',
mock.Mock(return_value=fake_router))
mock.Mock(return_value=fake_router))
self.stubs.Set(self._manager.neutron_api, 'router_add_interface',
mock.Mock())
mock.Mock())
self.stubs.Set(self._manager.neutron_api, 'create_port',
mock.Mock(return_value=fake_port))
mock.Mock(return_value=fake_port))
self.stubs.Set(self._manager, '_get_cidr_for_subnet',
mock.Mock(return_value='fake_cidr'))
mock.Mock(return_value='fake_cidr'))
result = self._manager._setup_network_for_instance('fake-net',
'fake-subnet')
self._manager.neutron_api.get_network.\
assert_called_once_with(self._manager.service_network_id)
self._manager._get_private_router.\
assert_called_once_with('fake-net', 'fake-subnet')
self._manager.neutron_api.router_add_interface.\
assert_called_once_with('fake_router_id', 'fake_subnet_id')
self._manager.neutron_api.get_network.assert_called_once_with(
self._manager.service_network_id)
self._manager._get_private_router.assert_called_once_with(
'fake-net', 'fake-subnet')
self._manager.neutron_api.router_add_interface.assert_called_once_with(
'fake_router_id', 'fake_subnet_id')
self._manager.neutron_api.subnet_create.assert_called_once_with(
self._manager.service_tenant_id,
self._manager.service_network_id,
'routed_to_fake-subnet',
'fake_cidr')
self._manager.neutron_api.create_port.assert_called_once_with(
self._manager.service_tenant_id,
self._manager.service_network_id,
subnet_id='fake_subnet_id',
device_owner='manila')
self._manager.service_tenant_id,
self._manager.service_network_id,
subnet_id='fake_subnet_id',
device_owner='manila')
self._manager._get_cidr_for_subnet.assert_called_once()
self.assertEqual(result,
('fake_subnet_id', 'fake_router_id', fake_port['id']))
@ -599,28 +594,28 @@ class ServiceInstanceManagerTestCase(test.TestCase):
fake_share_network = {'neutron_net_id': fake_net['id'],
'neutron_subnet_id': fake_subnet['id']}
self.stubs.Set(self._manager.db, 'share_network_get',
mock.Mock(return_value=fake_share_network))
mock.Mock(return_value=fake_share_network))
fake_port = fake_network.FakePort(fixed_ips=[
{'subnet_id': fake_subnet['id'],
'ip_address': fake_subnet['gateway_ip']}],
device_id='fake_router_id')
{'subnet_id': fake_subnet['id'],
'ip_address': fake_subnet['gateway_ip']}],
device_id='fake_router_id')
fake_router = fake_network.FakeRouter(id='fake_router_id')
self.stubs.Set(self._manager.neutron_api, 'get_subnet',
mock.Mock(return_value=fake_subnet))
mock.Mock(return_value=fake_subnet))
self.stubs.Set(self._manager.neutron_api, 'list_ports',
mock.Mock(return_value=[fake_port]))
mock.Mock(return_value=[fake_port]))
self.stubs.Set(self._manager.neutron_api, 'show_router',
mock.Mock(return_value=fake_router))
mock.Mock(return_value=fake_router))
result = self._manager._get_private_router(fake_net['id'],
fake_subnet['id'])
self._manager.neutron_api.get_subnet.\
assert_called_once_with(fake_subnet['id'])
self._manager.neutron_api.list_ports.\
assert_called_once_with(network_id=fake_net['id'])
self._manager.neutron_api.show_router.\
assert_called_once_with(fake_router['id'])
self._manager.neutron_api.get_subnet.assert_called_once_with(
fake_subnet['id'])
self._manager.neutron_api.list_ports.assert_called_once_with(
network_id=fake_net['id'])
self._manager.neutron_api.show_router.assert_called_once_with(
fake_router['id'])
self.assertEqual(result, fake_router)
def test_get_private_router_exception(self):
@ -629,11 +624,11 @@ class ServiceInstanceManagerTestCase(test.TestCase):
fake_share_network = {'neutron_net_id': fake_net['id'],
'neutron_subnet_id': fake_subnet['id']}
self.stubs.Set(self._manager.db, 'share_network_get',
mock.Mock(return_value=fake_share_network))
mock.Mock(return_value=fake_share_network))
self.stubs.Set(self._manager.neutron_api, 'get_subnet',
mock.Mock(return_value=fake_subnet))
mock.Mock(return_value=fake_subnet))
self.stubs.Set(self._manager.neutron_api, 'list_ports',
mock.Mock(return_value=[]))
mock.Mock(return_value=[]))
self.assertRaises(exception.ManilaException,
self._manager._get_private_router,
@ -650,11 +645,11 @@ class ServiceInstanceManagerTestCase(test.TestCase):
mac_address='fake_mac_address')
self.stubs.Set(self._manager, '_get_service_port',
mock.Mock(return_value=fake_port))
mock.Mock(return_value=fake_port))
self.stubs.Set(self._manager.vif_driver, 'get_device_name',
mock.Mock(return_value=interface_name))
mock.Mock(return_value=interface_name))
self.stubs.Set(self._manager.neutron_api, 'get_subnet',
mock.Mock(return_value=fake_subnet))
mock.Mock(return_value=fake_subnet))
self.stubs.Set(self._manager, '_remove_outdated_interfaces',
mock.Mock())
self.stubs.Set(self._manager.vif_driver, 'plug', mock.Mock())
@ -666,7 +661,7 @@ class ServiceInstanceManagerTestCase(test.TestCase):
self._manager._get_service_port.assert_called_once_with()
self._manager.vif_driver.get_device_name.assert_called_once_with(
fake_port)
fake_port)
self._manager.vif_driver.plug.assert_called_once_with(
interface_name, fake_port['id'], fake_port['mac_address'])
self._manager.neutron_api.get_subnet.assert_called_once_with(
@ -677,7 +672,7 @@ class ServiceInstanceManagerTestCase(test.TestCase):
interface_name)
device_mock.route.pullup_route.assert_called_once_with(interface_name)
self._manager._remove_outdated_interfaces.assert_called_once_with(
device_mock)
device_mock)
def test_get_service_port(self):
fake_service_port = fake_network.FakePort(device_id='manila-share')
@ -695,25 +690,26 @@ class ServiceInstanceManagerTestCase(test.TestCase):
result = self._manager._get_service_port()
self._manager.neutron_api.list_ports.\
assert_called_once_with(device_id='manila-share')
self._manager.neutron_api.list_ports.assert_called_once_with(
device_id='manila-share')
self._manager.db.service_get_all_by_topic.assert_called_once()
self._manager.neutron_api.create_port.assert_called_once_with(
self._manager.service_tenant_id,
self._manager.service_network_id,
device_id='manila-share',
device_owner='manila:share',
host_id='fake_host'
)
self._manager.service_tenant_id,
self._manager.service_network_id,
device_id='manila-share',
device_owner='manila:share',
host_id='fake_host'
)
self._manager.neutron_api.get_network.assert_called_once()
self.assertFalse(self._manager.neutron_api.
update_port_fixed_ips.called)
update_port_fixed_ips.called)
self.assertEqual(result, fake_service_port)
def test_get_service_port_ambigious_ports(self):
fake_service_port = fake_network.FakePort(device_id='manila-share')
self.stubs.Set(self._manager.neutron_api, 'list_ports',
mock.Mock(return_value=[fake_service_port, fake_service_port]))
mock.Mock(return_value=[fake_service_port,
fake_service_port]))
self.assertRaises(exception.ManilaException,
self._manager._get_service_port)
@ -723,7 +719,7 @@ class ServiceInstanceManagerTestCase(test.TestCase):
self.stubs.Set(self._manager.neutron_api, 'list_ports',
mock.Mock(return_value=[fake_service_port]))
self.stubs.Set(self._manager.db, 'service_get_all_by_topic',
mock.Mock(return_value=[{'host': 'fake_host'}]))
mock.Mock(return_value=[{'host': 'fake_host'}]))
self.stubs.Set(self._manager.neutron_api, 'create_port',
mock.Mock(return_value=fake_service_port))
self.stubs.Set(self._manager.neutron_api, 'get_network',
@ -734,17 +730,17 @@ class ServiceInstanceManagerTestCase(test.TestCase):
result = self._manager._get_service_port()
self._manager.neutron_api.list_ports.assert_called_once_with(
device_id='manila-share')
device_id='manila-share')
self.assertFalse(self._manager.db.service_get_all_by_topic.called)
self.assertFalse(self._manager.neutron_api.create_port.called)
self._manager.neutron_api.get_network.assert_called_once()
self.assertFalse(self._manager.neutron_api.
update_port_fixed_ips.called)
update_port_fixed_ips.called)
self.assertEqual(result, fake_service_port)
def test_get_cidr_for_subnet(self):
serv_cidr = service_instance.netaddr.IPNetwork(
CONF.service_network_cidr)
CONF.service_network_cidr)
fake_division_mask = CONF.service_network_division_mask
cidrs = serv_cidr.subnet(fake_division_mask)
cidr1 = str(cidrs.next())
@ -766,9 +762,9 @@ class ServiceInstanceManagerTestCase(test.TestCase):
subnet_id = 'fake_subnet_id'
self.stubs.Set(self._manager, '_delete_server', mock.Mock())
self.stubs.Set(self._manager.neutron_api, 'router_remove_interface',
mock.Mock())
mock.Mock())
self.stubs.Set(self._manager.neutron_api, 'update_subnet',
mock.Mock())
mock.Mock())
self._manager.delete_service_instance(
self._context, instance_id, subnet_id, router_id)

View File

@ -453,7 +453,7 @@ class ShareAPITestCase(test.TestCase):
'snapshot_id': share['snapshot_id'],
}
with mock.patch.object(db_driver, 'share_create',
mock.Mock(return_value=share)):
mock.Mock(return_value=share)):
self.api.create(self.context, 'nfs', '1', 'fakename', 'fakedesc',
snapshot=snapshot, availability_zone='fakeaz')
self.scheduler_rpcapi.create_share.assert_called_once_with(

View File

@ -161,7 +161,7 @@ class GenericShareDriverTestCase(test.TestCase):
def test_create_share(self):
self._helper_nfs.create_export.return_value = 'fakelocation'
methods = ('get_service_instance', '_allocate_container',
'_attach_volume', '_format_device', '_mount_device')
'_attach_volume', '_format_device', '_mount_device')
for method in methods:
self.stubs.Set(self._driver, method, mock.Mock())
result = self._driver.create_share(self._context, self.share,
@ -178,33 +178,14 @@ class GenericShareDriverTestCase(test.TestCase):
def test_format_device(self):
volume = {'mountpoint': 'fake_mount_point'}
self._driver._format_device('fake_server', volume)
self._driver._ssh_exec.assert_called_once_with('fake_server',
['sudo', 'mkfs.ext4', volume['mountpoint']])
self._driver._ssh_exec.assert_called_once_with(
'fake_server',
['sudo', 'mkfs.ext4', volume['mountpoint']])
def _test_mount_device(self):
volume = {'mountpoint': 'fake_mount_point'}
self.stubs.Set(self._driver, '_get_mount_path',
mock.Mock(return_value='fake_mount_path'))
self._driver._mount_device(self._context, self.share, 'fake_server',
volume)
self._driver._ssh_exec.assert_has_calls([
mock.call('fake_server', ['sudo', 'mkdir', '-p',
'fake_mount_path',
';', 'sudo', 'mount',
volume['mountpoint'],
'fake_mount_path']),
mock.call('fake_server', ['sudo', 'chmod', '777',
'fake_mount_path'])
])
def test_mount_device_exception_01(self):
volume = {'mountpoint': 'fake_mount_point'}
self._driver._ssh_exec.side_effect = [
exception.ProcessExecutionError(stderr='already mounted'), None]
self.stubs.Set(self._driver, '_get_mount_path',
mock.Mock(return_value='fake_mount_path'))
mock.Mock(return_value='fake_mount_path'))
self._driver._mount_device(self._context, self.share, 'fake_server',
volume)
@ -217,22 +198,43 @@ class GenericShareDriverTestCase(test.TestCase):
'fake_mount_path']),
mock.call('fake_server', ['sudo', 'chmod', '777',
'fake_mount_path'])
])
])
def test_mount_device_exception_01(self):
volume = {'mountpoint': 'fake_mount_point'}
self._driver._ssh_exec.side_effect = [
exception.ProcessExecutionError(stderr='already mounted'), None]
self.stubs.Set(self._driver, '_get_mount_path',
mock.Mock(return_value='fake_mount_path'))
self._driver._mount_device(self._context, self.share, 'fake_server',
volume)
self._driver._ssh_exec.assert_has_calls([
mock.call('fake_server', ['sudo', 'mkdir', '-p',
'fake_mount_path',
';', 'sudo', 'mount',
volume['mountpoint'],
'fake_mount_path']),
mock.call('fake_server', ['sudo', 'chmod', '777',
'fake_mount_path'])
])
def test_mount_device_exception_02(self):
volume = {'mountpoint': 'fake_mount_point'}
self._driver._ssh_exec.side_effect = exception.ManilaException
self.stubs.Set(self._driver, '_get_mount_path',
mock.Mock(return_value='fake_mount_path'))
mock.Mock(return_value='fake_mount_path'))
self.assertRaises(exception.ManilaException,
self._driver._mount_device,
self._context, self.share, 'fake_server', volume)
def test_umount_device(self):
self.stubs.Set(self._driver, '_get_mount_path',
mock.Mock(return_value='fake_mount_path'))
mock.Mock(return_value='fake_mount_path'))
self._driver._unmount_device(self.share, 'fake_server')
self._driver._ssh_exec.assert_called_once_with('fake_server',
self._driver._ssh_exec.assert_called_once_with(
'fake_server',
['sudo', 'umount', 'fake_mount_path', ';', 'sudo', 'rmdir',
'fake_mount_path'])
@ -254,10 +256,10 @@ class GenericShareDriverTestCase(test.TestCase):
'fake_inst_id', availiable_volume)
self._driver.compute_api.instance_volume_attach.\
assert_called_once_with(self._context, 'fake_inst_id',
availiable_volume['id'])
self._driver.volume_api.get.\
assert_called_once_with(self._context, attached_volume['id'])
assert_called_once_with(self._context, 'fake_inst_id',
availiable_volume['id'])
self._driver.volume_api.get.assert_called_once_with(
self._context, attached_volume['id'])
self.assertEqual(result, attached_volume)
def test_attach_volume_attached_correct(self):
@ -285,7 +287,7 @@ class GenericShareDriverTestCase(test.TestCase):
fake_server = fake_compute.FakeServer()
availiable_volume = fake_volume.FakeVolume()
self.stubs.Set(self._driver.compute_api, 'instance_volume_attach',
mock.Mock(side_effect=exception.ManilaException))
mock.Mock(side_effect=exception.ManilaException))
self.assertRaises(exception.ManilaException,
self._driver._attach_volume,
self._context, self.share, fake_server,
@ -306,7 +308,7 @@ class GenericShareDriverTestCase(test.TestCase):
def test_get_volume(self):
volume = fake_volume.FakeVolume(
display_name=CONF.volume_name_template % self.share['id'])
display_name=CONF.volume_name_template % self.share['id'])
self.stubs.Set(self._driver.volume_api, 'get_all',
mock.Mock(return_value=[volume]))
result = self._driver._get_volume(self._context, self.share['id'])
@ -320,11 +322,12 @@ class GenericShareDriverTestCase(test.TestCase):
def test_get_volume_error(self):
volume = fake_volume.FakeVolume(
display_name=CONF.volume_name_template % self.share['id'])
display_name=CONF.volume_name_template % self.share['id'])
self.stubs.Set(self._driver.volume_api, 'get_all',
mock.Mock(return_value=[volume, volume]))
self.assertRaises(exception.ManilaException,
self._driver._get_volume, self._context, self.share['id'])
self._driver._get_volume,
self._context, self.share['id'])
def test_get_volume_snapshot(self):
volume_snapshot = fake_volume.FakeVolumeSnapshot(
@ -333,14 +336,14 @@ class GenericShareDriverTestCase(test.TestCase):
self.stubs.Set(self._driver.volume_api, 'get_all_snapshots',
mock.Mock(return_value=[volume_snapshot]))
result = self._driver._get_volume_snapshot(self._context,
self.snapshot['id'])
self.snapshot['id'])
self.assertEqual(result, volume_snapshot)
def test_get_volume_snapshot_none(self):
self.stubs.Set(self._driver.volume_api, 'get_all_snapshots',
mock.Mock(return_value=[]))
result = self._driver._get_volume_snapshot(self._context,
self.share['id'])
self.share['id'])
self.assertEqual(result, None)
def test_get_volume_snapshot_error(self):
@ -348,9 +351,11 @@ class GenericShareDriverTestCase(test.TestCase):
display_name=CONF.volume_snapshot_name_template %
self.snapshot['id'])
self.stubs.Set(self._driver.volume_api, 'get_all_snapshots',
mock.Mock(return_value=[volume_snapshot, volume_snapshot]))
mock.Mock(return_value=[volume_snapshot,
volume_snapshot]))
self.assertRaises(exception.ManilaException,
self._driver._get_volume_snapshot, self._context, self.share['id'])
self._driver._get_volume_snapshot, self._context,
self.share['id'])
def test_detach_volume(self):
fake_server = fake_compute.FakeServer()
@ -373,8 +378,8 @@ class GenericShareDriverTestCase(test.TestCase):
self._context,
self.server['backend_details']['instance_id'],
availiable_volume['id'])
self._driver.volume_api.get.\
assert_called_once_with(self._context, availiable_volume['id'])
self._driver.volume_api.get.assert_called_once_with(
self._context, availiable_volume['id'])
def test_detach_volume_detached(self):
fake_server = fake_compute.FakeServer()
@ -403,11 +408,12 @@ class GenericShareDriverTestCase(test.TestCase):
result = self._driver._allocate_container(self._context, self.share)
self.assertEqual(result, fake_vol)
self._driver.volume_api.create.assert_called_once_with(self._context,
self.share['size'],
CONF.volume_name_template % self.share['id'],
'',
snapshot=None)
self._driver.volume_api.create.assert_called_once_with(
self._context,
self.share['size'],
CONF.volume_name_template % self.share['id'],
'',
snapshot=None)
def test_allocate_container_with_snaphot(self):
fake_vol = fake_volume.FakeVolume()
@ -421,11 +427,12 @@ class GenericShareDriverTestCase(test.TestCase):
self.share,
self.snapshot)
self.assertEqual(result, fake_vol)
self._driver.volume_api.create.assert_called_once_with(self._context,
self.share['size'],
CONF.volume_name_template % self.share['id'],
'',
snapshot=fake_vol_snap)
self._driver.volume_api.create.assert_called_once_with(
self._context,
self.share['size'],
CONF.volume_name_template % self.share['id'],
'',
snapshot=fake_vol_snap)
def test_allocate_container_error(self):
fake_vol = fake_volume.FakeVolume(status='error')
@ -443,7 +450,7 @@ class GenericShareDriverTestCase(test.TestCase):
mock.Mock(return_value=fake_vol))
self.stubs.Set(self._driver.volume_api, 'delete', mock.Mock())
self.stubs.Set(self._driver.volume_api, 'get', mock.Mock(
side_effect=exception.VolumeNotFound(volume_id=fake_vol['id'])))
side_effect=exception.VolumeNotFound(volume_id=fake_vol['id'])))
self._driver._deallocate_container(self._context, self.share)
@ -454,7 +461,7 @@ class GenericShareDriverTestCase(test.TestCase):
def test_create_share_from_snapshot(self):
self._helper_nfs.create_export.return_value = 'fakelocation'
methods = ('get_service_instance', '_allocate_container',
'_attach_volume', '_mount_device')
'_attach_volume', '_mount_device')
for method in methods:
self.stubs.Set(self._driver, method, mock.Mock())
result = self._driver.create_share_from_snapshot(
@ -495,11 +502,11 @@ class GenericShareDriverTestCase(test.TestCase):
self._driver._get_volume.assert_called_once()
self._driver.volume_api.create_snapshot_force.assert_called_once_with(
self._context,
fake_vol['id'],
CONF.volume_snapshot_name_template % self.snapshot['id'],
''
)
self._context,
fake_vol['id'],
CONF.volume_snapshot_name_template % self.snapshot['id'],
''
)
def test_delete_snapshot(self):
fake_vol_snap = fake_volume.FakeVolumeSnapshot()
@ -507,8 +514,8 @@ class GenericShareDriverTestCase(test.TestCase):
mock.Mock(return_value=fake_vol_snap))
self.stubs.Set(self._driver.volume_api, 'delete_snapshot', mock.Mock())
self.stubs.Set(self._driver.volume_api, 'get_snapshot',
mock.Mock(side_effect=exception.VolumeSnapshotNotFound(
snapshot_id=fake_vol_snap['id'])))
mock.Mock(side_effect=exception.VolumeSnapshotNotFound(
snapshot_id=fake_vol_snap['id'])))
self._driver.delete_snapshot(self._context, fake_vol_snap,
share_server=self.server)
@ -520,7 +527,7 @@ class GenericShareDriverTestCase(test.TestCase):
def test_ensure_share(self):
self._helper_nfs.create_export.return_value = 'fakelocation'
methods = ('get_service_instance', '_get_volume',
'_attach_volume', '_mount_device')
'_attach_volume', '_mount_device')
for method in methods:
self.stubs.Set(self._driver, method, mock.Mock())
self._driver.ensure_share(self._context, self.share,
@ -603,7 +610,8 @@ class NFSHelperTestCase(test.TestCase):
fake_server = fake_compute.FakeServer(ip='10.254.0.3')
ret = self._helper.create_export(fake_server, 'volume-00001')
expected_location = ':'.join([fake_server['ip'],
os.path.join(CONF.share_mount_path, 'volume-00001')])
os.path.join(CONF.share_mount_path,
'volume-00001')])
self.assertEqual(ret, expected_location)
def test_allow_access(self):
@ -616,7 +624,7 @@ class NFSHelperTestCase(test.TestCase):
mock.call(fake_server, ['sudo', 'exportfs', '-o',
'rw,no_subtree_check',
':'.join(['10.0.0.2', local_path])])
])
])
def test_allow_access_no_ip(self):
self.assertRaises(exception.InvalidShareAccess,

View File

@ -24,10 +24,7 @@ from mock import patch
from oslo.config import cfg
from manila import context
from manila.db.sqlalchemy import models
from manila import exception
from manila.openstack.common import importutils
from manila.openstack.common import log as logging
from manila.share import configuration as config
from manila.share.drivers import glusterfs
from manila import test
@ -98,11 +95,11 @@ class GlusterAddressTestCase(test.TestCase):
# python 2.6 compat thingy
check_output = lambda cmd:\
subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE).\
communicate()[0]
communicate()[0]
# shell unescaping thru echo(1)
self.assertEqual(check_output('echo ' + ' '.join(ret[0]),)[:-1],
'ssh testuser@127.0.0.1 gluster ' +
' '.join(self._gluster_args))
' '.join(self._gluster_args))
self.assertEqual(ret[1], {})
@ -121,8 +118,8 @@ class GlusterfsShareDriverTestCase(test.TestCase):
self.fake_conf = config.Configuration(None)
self._db = Mock()
self._driver = glusterfs.GlusterfsShareDriver(
self._db, execute=self._execute,
configuration=self.fake_conf)
self._db, execute=self._execute,
configuration=self.fake_conf)
self._driver.gluster_address = Mock(**gluster_address_attrs)
self.share = fake_share()
@ -330,7 +327,7 @@ class GlusterfsShareDriverTestCase(test.TestCase):
self.assertEqual(fake_utils.fake_execute_get_log(), expected_exec)
self.assertEqual(ret,
{'foo': ['10.0.0.1', '10.0.0.2'], 'bar': ['10.0.0.1']}
)
)
def test_get_local_share_path(self):
with patch.object(os, 'access', return_value=True):
@ -373,7 +370,7 @@ class GlusterfsShareDriverTestCase(test.TestCase):
some_no = 42
not_some_no = some_no + 1
os_stat = lambda path: Mock(st_dev=some_no) if path == '/mnt/nfs' \
else Mock(st_dev=not_some_no)
else Mock(st_dev=not_some_no)
with patch.object(os, 'statvfs', return_value=test_statvfs):
with patch.object(os, 'stat', os_stat):
ret = self._driver._update_share_stats()
@ -486,8 +483,8 @@ class GlusterfsShareDriverTestCase(test.TestCase):
self.assertEqual(ret, None)
self.assertTrue(self._driver.gluster_address.make_gluster_args.called)
self.assertEqual(
self._driver.gluster_address.make_gluster_args.call_args[0][-1],
'/example.com(10.0.0.1),/fakename(10.0.0.1|10.0.0.2)')
self._driver.gluster_address.make_gluster_args.call_args[0][-1],
'/example.com(10.0.0.1),/fakename(10.0.0.1|10.0.0.2)')
def test_manage_access_adding_entry_cmd_fail(self):
def cbk(d, key, value):
@ -510,8 +507,8 @@ class GlusterfsShareDriverTestCase(test.TestCase):
self.assertEqual(fake_utils.fake_execute_get_log(), expected_exec)
self.assertTrue(self._driver.gluster_address.make_gluster_args.called)
self.assertEqual(
self._driver.gluster_address.make_gluster_args.call_args[0][-1],
'/example.com(10.0.0.1),/fakename(10.0.0.1|10.0.0.2)')
self._driver.gluster_address.make_gluster_args.call_args[0][-1],
'/example.com(10.0.0.1),/fakename(10.0.0.1|10.0.0.2)')
def test_manage_access_removing_last_entry(self):
def cbk(d, key, value):
@ -528,11 +525,11 @@ class GlusterfsShareDriverTestCase(test.TestCase):
self.assertEqual(ret, None)
self.assertTrue(self._driver.gluster_address.make_gluster_args.called)
self.assertEqual(
self._driver.gluster_address.make_gluster_args.call_args[0][1],
'reset')
self._driver.gluster_address.make_gluster_args.call_args[0][1],
'reset')
self.assertEqual(
self._driver.gluster_address.make_gluster_args.call_args[0][-1],
'nfs.export-dir')
self._driver.gluster_address.make_gluster_args.call_args[0][-1],
'nfs.export-dir')
def test_allow_access_with_share_having_noaccess(self):
access = {'access_type': 'ip', 'access_to': '10.0.0.1'}
@ -543,8 +540,8 @@ class GlusterfsShareDriverTestCase(test.TestCase):
self._driver.allow_access(self._context, self.share, access)
self.assertTrue(self._driver.gluster_address.make_gluster_args.called)
self.assertEqual(
self._driver.gluster_address.make_gluster_args.call_args[0][-1],
'/example.com(10.0.0.1),/fakename(10.0.0.1)')
self._driver.gluster_address.make_gluster_args.call_args[0][-1],
'/example.com(10.0.0.1),/fakename(10.0.0.1)')
def test_allow_access_with_share_having_access(self):
access = {'access_type': 'ip', 'access_to': '10.0.0.1'}
@ -582,8 +579,8 @@ class GlusterfsShareDriverTestCase(test.TestCase):
self._driver.deny_access(self._context, self.share, access)
self.assertTrue(self._driver.gluster_address.make_gluster_args.called)
self.assertEqual(
self._driver.gluster_address.make_gluster_args.call_args[0][-1],
'/example.com(10.0.0.1)')
self._driver.gluster_address.make_gluster_args.call_args[0][-1],
'/example.com(10.0.0.1)')
def test_deny_access_can_be_called_with_extra_arg_share_server(self):
access = None

View File

@ -109,7 +109,7 @@ class GetFromPathTestCase(test.TestCase):
input = [{'a': {'b': {'c': 'c_1'}}},
{'a': {'b': {'c': 'c_2'}}}]
self.assertEqual([{'b': {'c': 'c_1'}}, {'b': {'c': 'c_2'}}],
f(input, "a"))
f(input, "a"))
self.assertEqual([{'c': 'c_1'}, {'c': 'c_2'}], f(input, "a/b"))
self.assertEqual(['c_1', 'c_2'], f(input, "a/b/c"))
@ -233,8 +233,9 @@ class GenericUtilsTestCase(test.TestCase):
fake_context_manager = mock.Mock()
fake_context_manager.__enter__ = mock.Mock(return_value=fake_file)
fake_context_manager.__exit__ = mock.Mock()
with mock.patch.object(__builtin__, 'open',
mock.Mock(return_value=fake_context_manager)):
with mock.patch.object(
__builtin__, 'open',
mock.Mock(return_value=fake_context_manager)):
cache_data = {"data": 1123, "mtime": 1}
self.reload_called = False
@ -403,7 +404,7 @@ class GenericUtilsTestCase(test.TestCase):
(socket.AF_INET6, socket.SOCK_STREAM, 0, '', (u'127.0.0.1', 80)),
]
with mock.patch.dict('sys.modules', {
'eventlet.support.greendns': fake_dns}):
'eventlet.support.greendns': fake_dns}):
self.assertFalse(utils.is_eventlet_bug105())
fake_dns.getaddrinfo.assert_called_once()