Rewrited mox tests to mock (part 1)

Preffered module for unittests is mock,
so, this commit is intended to remove mox module
from manila's dependencies in modules:
manila/tests/api/v1/test_shares.py
manila/tests/test_service.py
manila/tests/test_share_api.py
manila/tests/test_share_lvm.py
manila/tests/test_utils.py

Change-Id: Ie488b80dea0a1e66837c16d3e6ac8658a2045a2e
Partially implements: blueprint replace-mox-with-mock
This commit is contained in:
vponomaryov 2014-06-11 13:52:09 +03:00
parent 149af8946f
commit 990f108e68
6 changed files with 443 additions and 471 deletions

View File

@ -32,7 +32,6 @@ class ShareApiTest(test.TestCase):
def setUp(self):
super(ShareApiTest, self).setUp()
self.controller = shares.ShareController()
self.stubs.Set(share_api.API, 'get_all',
stubs.stub_get_all_shares)
self.stubs.Set(share_api.API, 'get',
@ -276,7 +275,6 @@ class ShareApiTest(test.TestCase):
search_opts = {'a': 'a', 'b': 'b', 'c': 'c', 'd': 'd'}
expected_opts = {'a': 'a', 'c': 'c'}
allowed_opts = ['a', 'c']
self.mox.ReplayAll()
common.remove_invalid_options(ctx, search_opts, allowed_opts)
self.assertEqual(search_opts, expected_opts)
@ -285,6 +283,5 @@ class ShareApiTest(test.TestCase):
search_opts = {'a': 'a', 'b': 'b', 'c': 'c', 'd': 'd'}
expected_opts = {'a': 'a', 'b': 'b', 'c': 'c', 'd': 'd'}
allowed_opts = ['a', 'c']
self.mox.ReplayAll()
common.remove_invalid_options(ctx, search_opts, allowed_opts)
self.assertEqual(search_opts, expected_opts)

View File

@ -1,7 +1,8 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# Copyright 2014 NetApp, Inc.
# Copyright 2014 Mirantis, Inc.
#
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
@ -20,13 +21,12 @@
Unit Tests for remote procedure calls using queue
"""
import mox
import mock
from oslo.config import cfg
from manila import context
from manila import db
from manila import exception
from manila import manager
from manila import service
from manila import test
@ -41,7 +41,8 @@ test_service_opts = [
help="Host to bind test service to"),
cfg.IntOpt("test_service_listen_port",
default=0,
help="Port number to bind test service to"), ]
help="Port number to bind test service to"),
]
CONF = cfg.CONF
CONF.register_opts(test_service_opts)
@ -49,10 +50,8 @@ CONF.register_opts(test_service_opts)
class FakeManager(manager.Manager):
"""Fake manager for tests"""
def __init__(self, host=None,
db_driver=None, service_name=None):
super(FakeManager, self).__init__(host=host,
db_driver=db_driver)
def __init__(self, host=None, db_driver=None, service_name=None):
super(FakeManager, self).__init__(host=host, db_driver=db_driver)
def test_method(self):
return 'manager'
@ -67,18 +66,12 @@ class ServiceManagerTestCase(test.TestCase):
"""Test cases for Services"""
def test_message_gets_to_manager(self):
serv = service.Service('test',
'test',
'test',
'manila.tests.test_service.FakeManager')
serv = service.Service('test', 'test', 'test', CONF.fake_manager)
serv.start()
self.assertEqual(serv.test_method(), 'manager')
def test_override_manager_method(self):
serv = ExtendedService('test',
'test',
'test',
'manila.tests.test_service.FakeManager')
serv = ExtendedService('test', 'test', 'test', CONF.fake_manager)
serv.start()
self.assertEqual(serv.test_method(), 'service')
@ -93,7 +86,7 @@ class ServiceFlagsTestCase(test.TestCase):
app.stop()
ref = db.service_get(context.get_admin_context(), app.service_id)
db.service_destroy(context.get_admin_context(), app.service_id)
self.assertTrue(not ref['disabled'])
self.assertFalse(ref['disabled'])
def test_service_disabled_on_create_based_on_flag(self):
self.flags(enable_new_services=False)
@ -107,119 +100,105 @@ class ServiceFlagsTestCase(test.TestCase):
self.assertTrue(ref['disabled'])
def fake_service_get_by_args(*args, **kwargs):
raise exception.NotFound()
def fake_service_get(*args, **kwargs):
raise Exception()
host = 'foo'
binary = 'bar'
topic = 'test'
service_create = {
'host': host,
'binary': binary,
'topic': topic,
'report_count': 0,
'availability_zone': 'nova',
}
service_ref = {
'host': host,
'binary': binary,
'topic': topic,
'report_count': 0,
'availability_zone': 'nova',
'id': 1,
}
class ServiceTestCase(test.TestCase):
"""Test cases for Services"""
def setUp(self):
super(ServiceTestCase, self).setUp()
self.mox.StubOutWithMock(service, 'db')
def test_create(self):
host = 'foo'
binary = 'manila-fake'
topic = 'fake'
# NOTE(vish): Create was moved out of mox replay to make sure that
# the looping calls are created in StartService.
app = service.Service.create(host=host, binary=binary, topic=topic)
app = service.Service.create(host='foo',
binary='manila-fake',
topic='fake')
self.assertTrue(app)
@mock.patch.object(service.db, 'service_get_by_args',
mock.Mock(side_effect=fake_service_get_by_args))
@mock.patch.object(service.db, 'service_create',
mock.Mock(return_value=service_ref))
@mock.patch.object(service.db, 'service_get',
mock.Mock(side_effect=fake_service_get))
def test_report_state_newly_disconnected(self):
host = 'foo'
binary = 'bar'
topic = 'test'
service_create = {'host': host,
'binary': binary,
'topic': topic,
'report_count': 0,
'availability_zone': 'nova'}
service_ref = {'host': host,
'binary': binary,
'topic': topic,
'report_count': 0,
'availability_zone': 'nova',
'id': 1}
service.db.service_get_by_args(mox.IgnoreArg(),
host,
binary).AndRaise(exception.NotFound())
service.db.service_create(mox.IgnoreArg(),
service_create).AndReturn(service_ref)
service.db.service_get(mox.IgnoreArg(),
mox.IgnoreArg()).AndRaise(Exception())
self.mox.ReplayAll()
serv = service.Service(host,
binary,
topic,
'manila.tests.test_service.FakeManager')
serv = service.Service(host, binary, topic, CONF.fake_manager)
serv.start()
serv.report_state()
self.assertTrue(serv.model_disconnected)
service.db.service_get_by_args.assert_called_once_with(
mock.ANY, host, binary)
service.db.service_create.assert_called_once_with(
mock.ANY, service_create)
service.db.service_get.assert_called_once_with(mock.ANY, mock.ANY)
@mock.patch.object(service.db, 'service_get_by_args',
mock.Mock(side_effect=fake_service_get_by_args))
@mock.patch.object(service.db, 'service_create',
mock.Mock(return_value=service_ref))
@mock.patch.object(service.db, 'service_get',
mock.Mock(return_value=service_ref))
@mock.patch.object(service.db, 'service_update',
mock.Mock(return_value=service_ref.
update({'report_count': 1})))
def test_report_state_newly_connected(self):
host = 'foo'
binary = 'bar'
topic = 'test'
service_create = {'host': host,
'binary': binary,
'topic': topic,
'report_count': 0,
'availability_zone': 'nova'}
service_ref = {'host': host,
'binary': binary,
'topic': topic,
'report_count': 0,
'availability_zone': 'nova',
'id': 1}
service.db.service_get_by_args(mox.IgnoreArg(),
host,
binary).AndRaise(exception.NotFound())
service.db.service_create(mox.IgnoreArg(),
service_create).AndReturn(service_ref)
service.db.service_get(mox.IgnoreArg(),
service_ref['id']).AndReturn(service_ref)
service.db.service_update(mox.IgnoreArg(), service_ref['id'],
mox.ContainsKeyValue('report_count', 1))
self.mox.ReplayAll()
serv = service.Service(host,
binary,
topic,
'manila.tests.test_service.FakeManager')
serv = service.Service(host, binary, topic, CONF.fake_manager)
serv.start()
serv.model_disconnected = True
serv.report_state()
self.assertTrue(not serv.model_disconnected)
self.assertFalse(serv.model_disconnected)
service.db.service_get_by_args.assert_called_once_with(
mock.ANY, host, binary)
service.db.service_create.assert_called_once_with(
mock.ANY, service_create)
service.db.service_get.assert_called_once_with(
mock.ANY, service_ref['id'])
service.db.service_update.assert_called_once_with(
mock.ANY, service_ref['id'], mock.ANY)
class TestWSGIService(test.TestCase):
def setUp(self):
super(TestWSGIService, self).setUp()
self.stubs.Set(wsgi.Loader, "load_app", mox.MockAnything())
@mock.patch.object(wsgi.Loader, 'load_app', mock.Mock())
def test_service_random_port(self):
test_service = service.WSGIService("test_service")
self.assertEqual(0, test_service.port)
test_service.start()
self.assertNotEqual(0, test_service.port)
test_service.stop()
wsgi.Loader.load_app.assert_called_once_with("test_service")
class TestLauncher(test.TestCase):
def setUp(self):
super(TestLauncher, self).setUp()
self.stubs.Set(wsgi.Loader, "load_app", mox.MockAnything())
self.service = service.WSGIService("test_service")
@mock.patch.object(wsgi.Loader, 'load_app', mock.Mock())
def test_launch_app(self):
self.service = service.WSGIService("test_service")
self.assertEqual(0, self.service.port)
launcher = service.Launcher()
launcher.launch_server(self.service)
self.assertEqual(0, self.service.port)
launcher.stop()
wsgi.Loader.load_app.assert_called_once_with("test_service")

View File

@ -1,4 +1,3 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2012 NetApp
# All Rights Reserved.
#
@ -20,7 +19,6 @@ import random
import uuid
import mock
import mox
import suds
from manila import context
@ -34,6 +32,7 @@ from manila.share import api as share_api
from manila.share import rpcapi as share_rpcapi
from manila import test
from manila.tests.db import fakes as db_fakes
from manila import utils
def fake_share(id, **kwargs):
@ -102,14 +101,13 @@ def fake_access(id, **kwargs):
class ShareAPITestCase(test.TestCase):
def setUp(self):
super(ShareAPITestCase, self).setUp()
self.context = context.get_admin_context()
self.scheduler_rpcapi = self.mox.CreateMock(
scheduler_rpcapi.SchedulerAPI)
self.share_rpcapi = self.mox.CreateMock(share_rpcapi.ShareAPI)
self.scheduler_rpcapi = mock.Mock()
self.share_rpcapi = mock.Mock()
self.api = share.API()
self.stubs.Set(self.api, 'scheduler_rpcapi', self.scheduler_rpcapi)
self.stubs.Set(self.api, 'share_rpcapi', self.share_rpcapi)
self.stubs.Set(quota.QUOTAS, 'reserve', lambda *args, **kwargs: None)
@ -117,10 +115,12 @@ class ShareAPITestCase(test.TestCase):
self.patcher = mock.patch.object(timeutils, 'utcnow')
self.mock_utcnow = self.patcher.start()
self.mock_utcnow.return_value = datetime.datetime.utcnow()
self.addCleanup(self.patcher.stop)
def tearDown(self):
self.patcher.stop()
super(ShareAPITestCase, self).tearDown()
self.policy_patcher = mock.patch.object(share_api.policy,
'check_policy')
self.policy_patcher.start()
self.addCleanup(self.policy_patcher.stop)
def test_create(self):
date = datetime.datetime(1, 1, 1, 1, 1, 1)
@ -133,100 +133,103 @@ class ShareAPITestCase(test.TestCase):
for name in ('id', 'export_location', 'host', 'launched_at',
'terminated_at'):
options.pop(name, None)
request_spec = {'share_properties': options,
'share_proto': share['share_proto'],
'share_id': share['id'],
'snapshot_id': share['snapshot_id'],
'volume_type': None
}
self.mox.StubOutWithMock(db_driver, 'share_create')
db_driver.share_create(self.context, options).AndReturn(share)
self.scheduler_rpcapi.create_share(self.context, mox.IgnoreArg(),
share['id'], share['snapshot_id'],
request_spec=request_spec,
filter_properties={})
self.mox.ReplayAll()
self.api.create(self.context, 'nfs', '1', 'fakename', 'fakedesc',
availability_zone='fakeaz')
request_spec = {
'share_properties': options,
'share_proto': share['share_proto'],
'share_id': share['id'],
'snapshot_id': share['snapshot_id'],
'volume_type': None,
}
with mock.patch.object(db_driver, 'share_create',
mock.Mock(return_value=share)):
self.api.create(self.context, 'nfs', '1', 'fakename', 'fakedesc',
availability_zone='fakeaz')
db_driver.share_create.assert_called_once_with(
self.context, options)
@mock.patch.object(quota.QUOTAS, 'reserve',
mock.Mock(return_value='reservation'))
@mock.patch.object(quota.QUOTAS, 'commit', mock.Mock())
def test_create_snapshot(self):
date = datetime.datetime(1, 1, 1, 1, 1, 1)
self.mock_utcnow.return_value = date
share = fake_share('fakeid',
status='available')
share = fake_share('fakeid', status='available')
snapshot = fake_snapshot('fakesnapshotid',
share_id=share['id'],
status='creating')
fake_name = 'fakename'
fake_desc = 'fakedesc'
options = {'share_id': share['id'],
'user_id': self.context.user_id,
'project_id': self.context.project_id,
'status': "creating",
'progress': '0%',
'share_size': share['size'],
'size': 1,
'display_name': fake_name,
'display_description': fake_desc,
'share_proto': share['share_proto'],
'export_location': share['export_location']}
self.mox.StubOutWithMock(share_api.policy, 'check_policy')
share_api.policy.check_policy(self.context, 'share',
'create_snapshot', share)
self.mox.StubOutWithMock(quota.QUOTAS, 'reserve')
quota.QUOTAS.reserve(self.context, snapshots=1, gigabytes=1).\
AndReturn('reservation')
self.mox.StubOutWithMock(db_driver, 'share_snapshot_create')
db_driver.share_snapshot_create(self.context,
options).AndReturn(snapshot)
self.mox.StubOutWithMock(quota.QUOTAS, 'commit')
quota.QUOTAS.commit(self.context, 'reservation')
self.share_rpcapi.create_snapshot(self.context, share, snapshot)
self.mox.ReplayAll()
self.api.create_snapshot(self.context, share, fake_name, fake_desc)
options = {
'share_id': share['id'],
'user_id': self.context.user_id,
'project_id': self.context.project_id,
'status': "creating",
'progress': '0%',
'share_size': share['size'],
'size': 1,
'display_name': fake_name,
'display_description': fake_desc,
'share_proto': share['share_proto'],
'export_location': share['export_location'],
}
with mock.patch.object(db_driver, 'share_snapshot_create',
mock.Mock(return_value=snapshot)):
self.api.create_snapshot(self.context, share, fake_name,
fake_desc)
share_api.policy.check_policy.assert_called_once_with(
self.context, 'share', 'create_snapshot', share)
quota.QUOTAS.reserve.assert_called_once_with(
self.context, snapshots=1, gigabytes=1)
quota.QUOTAS.commit.assert_called_once_with(
self.context, 'reservation')
db_driver.share_snapshot_create.assert_called_once_with(
self.context, options)
@mock.patch.object(db_driver, 'share_snapshot_update', mock.Mock())
def test_delete_snapshot(self):
date = datetime.datetime(1, 1, 1, 1, 1, 1)
self.mock_utcnow.return_value = date
share = fake_share('fakeid')
snapshot = fake_snapshot('fakesnapshotid', share_id=share['id'],
snapshot = fake_snapshot('fakesnapshotid',
share_id=share['id'],
status='available')
self.mox.StubOutWithMock(share_api.policy, 'check_policy')
share_api.policy.check_policy(
self.context, 'share', 'delete_snapshot', snapshot)
self.mox.StubOutWithMock(db_driver, 'share_snapshot_update')
db_driver.share_snapshot_update(self.context, snapshot['id'],
{'status': 'deleting'})
self.mox.StubOutWithMock(db_driver, 'share_get')
db_driver.share_get(self.context,
snapshot['share_id']).AndReturn(share)
self.share_rpcapi.delete_snapshot(self.context, snapshot,
share['host'])
self.mox.ReplayAll()
self.api.delete_snapshot(self.context, snapshot)
with mock.patch.object(db_driver, 'share_get',
mock.Mock(return_value=share)):
self.api.delete_snapshot(self.context, snapshot)
self.share_rpcapi.delete_snapshot.assert_called_once_with(
self.context, snapshot, share['host'])
share_api.policy.check_policy.assert_called_once_with(
self.context, 'share', 'delete_snapshot', snapshot)
db_driver.share_snapshot_update.assert_called_once_with(
self.context, snapshot['id'], {'status': 'deleting'})
db_driver.share_get.assert_called_once_with(
self.context, snapshot['share_id'])
def test_delete_snapshot_wrong_status(self):
snapshot = fake_snapshot('fakesnapshotid', share_id='fakeshareid',
snapshot = fake_snapshot('fakesnapshotid',
share_id='fakeshareid',
status='creating')
self.mox.StubOutWithMock(share_api.policy, 'check_policy')
share_api.policy.check_policy(
self.context, 'share', 'delete_snapshot', snapshot)
self.mox.ReplayAll()
self.assertRaises(exception.InvalidShareSnapshot,
self.api.delete_snapshot, self.context, snapshot)
self.api.delete_snapshot,
self.context,
snapshot)
share_api.policy.check_policy.assert_called_once_with(
self.context, 'share', 'delete_snapshot', snapshot)
def test_create_snapshot_if_share_not_available(self):
share = fake_share('fakeid',
status='error')
self.mox.StubOutWithMock(share_api.policy, 'check_policy')
share_api.policy.check_policy(self.context, 'share',
'create_snapshot', share)
self.mox.ReplayAll()
self.assertRaises(exception.InvalidShare, self.api.create_snapshot,
self.context, share, 'fakename', 'fakedesc')
share = fake_share('fakeid', status='error')
self.assertRaises(exception.InvalidShare,
self.api.create_snapshot,
self.context,
share,
'fakename',
'fakedesc')
share_api.policy.check_policy.assert_called_once_with(
self.context, 'share', 'create_snapshot', share)
@mock.patch.object(quota.QUOTAS, 'reserve',
mock.Mock(return_value='reservation'))
@mock.patch.object(quota.QUOTAS, 'commit', mock.Mock())
def test_create_from_snapshot_available(self):
date = datetime.datetime(1, 1, 1, 1, 1, 1)
self.mock_utcnow.return_value = date
@ -242,39 +245,45 @@ class ShareAPITestCase(test.TestCase):
for name in ('id', 'export_location', 'host', 'launched_at',
'terminated_at'):
options.pop(name, None)
request_spec = {'share_properties': options,
'share_proto': share['share_proto'],
'share_id': share['id'],
'volume_type': None,
'snapshot_id': share['snapshot_id'],
}
self.mox.StubOutWithMock(db_driver, 'share_create')
db_driver.share_create(self.context, options).AndReturn(share)
self.scheduler_rpcapi.create_share(self.context, mox.IgnoreArg(),
share['id'], share['snapshot_id'],
request_spec=request_spec,
filter_properties={})
self.mox.ReplayAll()
self.api.create(self.context, 'nfs', '1', 'fakename', 'fakedesc',
snapshot=snapshot, availability_zone='fakeaz')
request_spec = {
'share_properties': options,
'share_proto': share['share_proto'],
'share_id': share['id'],
'volume_type': None,
'snapshot_id': share['snapshot_id'],
}
with mock.patch.object(db_driver, 'share_create',
mock.Mock(return_value=share)):
self.api.create(self.context, 'nfs', '1', 'fakename', 'fakedesc',
snapshot=snapshot, availability_zone='fakeaz')
self.scheduler_rpcapi.create_share.assert_called_once_with(
self.context, 'manila-share', share['id'],
share['snapshot_id'], request_spec=request_spec,
filter_properties={})
db_driver.share_create.assert_called_once_with(
self.context, options)
share_api.policy.check_policy.assert_called_once_with(
self.context, 'share', 'create')
quota.QUOTAS.reserve.assert_called_once_with(
self.context, gigabytes=1, shares=1)
quota.QUOTAS.commit.assert_called_once_with(
self.context, 'reservation')
def test_get_snapshot(self):
fake_get_snap = {'fake_key': 'fake_val'}
self.mox.StubOutWithMock(share_api.policy, 'check_policy')
share_api.policy.check_policy(self.context, 'share', 'get_snapshot')
self.mox.StubOutWithMock(db_driver, 'share_snapshot_get')
db_driver.share_snapshot_get(self.context,
'fakeid').AndReturn(fake_get_snap)
self.mox.ReplayAll()
rule = self.api.get_snapshot(self.context, 'fakeid')
self.assertEqual(rule, fake_get_snap)
with mock.patch.object(db_driver, 'share_snapshot_get',
mock.Mock(return_value=fake_get_snap)):
rule = self.api.get_snapshot(self.context, 'fakeid')
self.assertEqual(rule, fake_get_snap)
share_api.policy.check_policy.assert_called_once_with(
self.context, 'share', 'get_snapshot')
db_driver.share_snapshot_get.assert_called_once_with(
self.context, 'fakeid')
def test_create_from_snapshot_not_available(self):
snapshot = fake_snapshot('fakesnapshotid',
share_id='fakeshare_id',
status='error')
self.mox.ReplayAll()
self.assertRaises(exception.InvalidShareSnapshot, self.api.create,
self.context, 'nfs', '1', 'fakename',
'fakedesc', snapshot=snapshot,
@ -282,19 +291,16 @@ class ShareAPITestCase(test.TestCase):
def test_create_from_snapshot_larger_size(self):
snapshot = fake_snapshot(1, size=100, status='available')
self.mox.ReplayAll()
self.assertRaises(exception.InvalidInput, self.api.create,
self.context, 'nfs', 1, 'fakename', 'fakedesc',
availability_zone='fakeaz', snapshot=snapshot)
def test_create_wrong_size_0(self):
self.mox.ReplayAll()
self.assertRaises(exception.InvalidInput, self.api.create,
self.context, 'nfs', 0, 'fakename', 'fakedesc',
availability_zone='fakeaz')
def test_create_wrong_size_some(self):
self.mox.ReplayAll()
self.assertRaises(exception.InvalidInput, self.api.create,
self.context, 'nfs', 'some', 'fakename',
'fakedesc', availability_zone='fakeaz')
@ -303,255 +309,251 @@ class ShareAPITestCase(test.TestCase):
date = datetime.datetime(2, 2, 2, 2, 2, 2)
self.mock_utcnow.return_value = date
share = fake_share('fakeid', status='available')
options = {'status': 'deleting',
'terminated_at': date}
options = {'status': 'deleting', 'terminated_at': date}
deleting_share = share.copy()
deleting_share.update(options)
self.mox.StubOutWithMock(db_driver, 'share_update')
db_driver.share_update(self.context, share['id'], options).\
AndReturn(deleting_share)
self.share_rpcapi.delete_share(self.context, deleting_share)
self.mox.ReplayAll()
self.api.delete(self.context, share)
self.mox.UnsetStubs()
self.mox.VerifyAll()
with mock.patch.object(db_driver, 'share_update',
mock.Mock(return_value=deleting_share)):
self.api.delete(self.context, share)
db_driver.share_update.assert_called_once_with(
self.context, share['id'], options)
self.share_rpcapi.delete_share.assert_called_once_with(
self.context, deleting_share)
def test_delete_error(self):
date = datetime.datetime(2, 2, 2, 2, 2, 2)
self.mock_utcnow.return_value = date
share = fake_share('fakeid', status='error')
options = {'status': 'deleting',
'terminated_at': date}
options = {'status': 'deleting', 'terminated_at': date}
deleting_share = share.copy()
deleting_share.update(options)
self.mox.StubOutWithMock(db_driver, 'share_update')
db_driver.share_update(self.context, share['id'], options).\
AndReturn(deleting_share)
self.share_rpcapi.delete_share(self.context, deleting_share)
self.mox.ReplayAll()
self.api.delete(self.context, share)
self.mox.UnsetStubs()
self.mox.VerifyAll()
with mock.patch.object(db_driver, 'share_update',
mock.Mock(return_value=deleting_share)):
self.api.delete(self.context, share)
db_driver.share_update.assert_called_once_with(
self.context, share['id'], options)
self.share_rpcapi.delete_share.assert_called_once_with(
self.context, deleting_share)
def test_delete_wrong_status(self):
share = fake_share('fakeid')
self.mox.ReplayAll()
self.assertRaises(exception.InvalidShare, self.api.delete,
self.context, share)
@mock.patch.object(db_driver, 'share_delete', mock.Mock())
def test_delete_no_host(self):
share = fake_share('fakeid')
share['host'] = None
self.mox.StubOutWithMock(db_driver, 'share_delete')
db_driver.share_delete(mox.IsA(context.RequestContext), 'fakeid')
self.mox.ReplayAll()
self.api.delete(self.context, share)
db_driver.share_delete.assert_called_once_with(
utils.IsAMatcher(context.RequestContext), 'fakeid')
def test_get(self):
self.mox.StubOutWithMock(db_driver, 'share_get')
db_driver.share_get(self.context, 'fakeid').AndReturn('fakeshare')
self.mox.StubOutWithMock(share_api.policy, 'check_policy')
share_api.policy.check_policy(self.context, 'share', 'get',
'fakeshare')
self.mox.ReplayAll()
result = self.api.get(self.context, 'fakeid')
self.assertEqual(result, 'fakeshare')
with mock.patch.object(db_driver, 'share_get',
mock.Mock(return_value='fakeshare')):
result = self.api.get(self.context, 'fakeid')
self.assertEqual(result, 'fakeshare')
share_api.policy.check_policy.assert_called_once_with(
self.context, 'share', 'get', 'fakeshare')
db_driver.share_get.assert_called_once_with(
self.context, 'fakeid')
@mock.patch.object(db_driver, 'share_get_all_by_project', mock.Mock())
def test_get_all_admin_not_all_tenants(self):
ctx = context.RequestContext('fakeuid', 'fakepid', id_admin=True)
self.mox.StubOutWithMock(share_api.policy, 'check_policy')
share_api.policy.check_policy(ctx, 'share', 'get_all')
self.mox.StubOutWithMock(db_driver, 'share_get_all_by_project')
db_driver.share_get_all_by_project(ctx, 'fakepid')
self.mox.ReplayAll()
self.api.get_all(ctx)
share_api.policy.check_policy.assert_called_once_with(
ctx, 'share', 'get_all')
db_driver.share_get_all_by_project.assert_called_once_with(
ctx, 'fakepid')
@mock.patch.object(db_driver, 'share_get_all', mock.Mock())
def test_get_all_admin_all_tenants(self):
self.mox.StubOutWithMock(share_api.policy, 'check_policy')
share_api.policy.check_policy(self.context, 'share', 'get_all')
self.mox.StubOutWithMock(db_driver, 'share_get_all')
db_driver.share_get_all(self.context)
self.mox.ReplayAll()
self.api.get_all(self.context, search_opts={'all_tenants': 1})
share_api.policy.check_policy.assert_called_once_with(
self.context, 'share', 'get_all')
db_driver.share_get_all.assert_called_once_with(self.context)
@mock.patch.object(db_driver, 'share_get_all_by_project', mock.Mock())
def test_get_all_not_admin(self):
ctx = context.RequestContext('fakeuid', 'fakepid', id_admin=False)
self.mox.StubOutWithMock(share_api.policy, 'check_policy')
share_api.policy.check_policy(ctx, 'share', 'get_all')
self.mox.StubOutWithMock(db_driver, 'share_get_all_by_project')
db_driver.share_get_all_by_project(ctx, 'fakepid')
self.mox.ReplayAll()
self.api.get_all(ctx)
share_api.policy.check_policy.assert_called_once_with(
ctx, 'share', 'get_all')
db_driver.share_get_all_by_project.assert_called_once_with(
ctx, 'fakepid')
def test_get_all_not_admin_search_opts(self):
search_opts = {'size': 'fakesize'}
fake_objs = [{'name': 'fakename1'}, search_opts]
ctx = context.RequestContext('fakeuid', 'fakepid', id_admin=False)
self.mox.StubOutWithMock(share_api.policy, 'check_policy')
share_api.policy.check_policy(ctx, 'share', 'get_all')
self.mox.StubOutWithMock(db_driver, 'share_get_all_by_project')
db_driver.share_get_all_by_project(ctx,
'fakepid').AndReturn(fake_objs)
self.mox.ReplayAll()
result = self.api.get_all(ctx, search_opts)
self.assertEqual([search_opts], result)
with mock.patch.object(db_driver, 'share_get_all_by_project',
mock.Mock(return_value=fake_objs)):
result = self.api.get_all(ctx, search_opts)
self.assertEqual([search_opts], result)
share_api.policy.check_policy.assert_called_once_with(
ctx, 'share', 'get_all')
db_driver.share_get_all_by_project.assert_called_once_with(
ctx, 'fakepid')
@mock.patch.object(db_driver, 'share_snapshot_get_all_by_project',
mock.Mock())
def test_get_all_snapshots_admin_not_all_tenants(self):
ctx = context.RequestContext('fakeuid', 'fakepid', id_admin=True)
self.mox.StubOutWithMock(share_api.policy, 'check_policy')
share_api.policy.check_policy(ctx, 'share', 'get_all_snapshots')
self.mox.StubOutWithMock(db_driver,
'share_snapshot_get_all_by_project')
db_driver.share_snapshot_get_all_by_project(ctx, 'fakepid')
self.mox.ReplayAll()
self.api.get_all_snapshots(ctx)
share_api.policy.check_policy.assert_called_once_with(
ctx, 'share', 'get_all_snapshots')
db_driver.share_snapshot_get_all_by_project.assert_called_once_with(
ctx, 'fakepid')
@mock.patch.object(db_driver, 'share_snapshot_get_all', mock.Mock())
def test_get_all_snapshots_admin_all_tenants(self):
self.mox.StubOutWithMock(share_api.policy, 'check_policy')
share_api.policy.check_policy(self.context, 'share',
'get_all_snapshots')
self.mox.StubOutWithMock(db_driver, 'share_snapshot_get_all')
db_driver.share_snapshot_get_all(self.context)
self.mox.ReplayAll()
self.api.get_all_snapshots(self.context,
search_opts={'all_tenants': 1})
share_api.policy.check_policy.assert_called_once_with(
self.context, 'share', 'get_all_snapshots')
db_driver.share_snapshot_get_all.assert_called_once_with(self.context)
@mock.patch.object(db_driver, 'share_snapshot_get_all_by_project',
mock.Mock())
def test_get_all_snapshots_not_admin(self):
ctx = context.RequestContext('fakeuid', 'fakepid', id_admin=False)
self.mox.StubOutWithMock(share_api.policy, 'check_policy')
share_api.policy.check_policy(ctx, 'share', 'get_all_snapshots')
self.mox.StubOutWithMock(db_driver,
'share_snapshot_get_all_by_project')
db_driver.share_snapshot_get_all_by_project(ctx, 'fakepid')
self.mox.ReplayAll()
self.api.get_all_snapshots(ctx)
share_api.policy.check_policy.assert_called_once_with(
ctx, 'share', 'get_all_snapshots')
db_driver.share_snapshot_get_all_by_project.assert_called_once_with(
ctx, 'fakepid')
def test_get_all_snapshots_not_admin_search_opts(self):
search_opts = {'size': 'fakesize'}
fake_objs = [{'name': 'fakename1'}, search_opts]
ctx = context.RequestContext('fakeuid', 'fakepid', id_admin=False)
self.mox.StubOutWithMock(share_api.policy, 'check_policy')
share_api.policy.check_policy(ctx, 'share', 'get_all_snapshots')
self.mox.StubOutWithMock(db_driver,
'share_snapshot_get_all_by_project')
db_driver.share_snapshot_get_all_by_project(ctx, 'fakepid').\
AndReturn(fake_objs)
self.mox.ReplayAll()
result = self.api.get_all_snapshots(ctx, search_opts)
self.assertEqual([search_opts], result)
with mock.patch.object(db_driver,
'share_snapshot_get_all_by_project',
mock.Mock(return_value=fake_objs)):
result = self.api.get_all_snapshots(ctx, search_opts)
self.assertEqual([search_opts], result)
share_api.policy.check_policy.assert_called_once_with(
ctx, 'share', 'get_all_snapshots')
db_driver.share_snapshot_get_all_by_project.\
assert_called_once_with(ctx, 'fakepid')
def test_allow_access(self):
share = fake_share('fakeid', status='available')
values = {'share_id': share['id'],
'access_type': 'fakeacctype',
'access_to': 'fakeaccto'}
self.mox.StubOutWithMock(share_api.policy, 'check_policy')
share_api.policy.check_policy(self.context, 'share', 'allow_access')
self.mox.StubOutWithMock(db_driver, 'share_access_create')
db_driver.share_access_create(self.context, values).\
AndReturn('fakeacc')
self.share_rpcapi.allow_access(self.context, share, 'fakeacc')
self.mox.ReplayAll()
access = self.api.allow_access(self.context, share, 'fakeacctype',
'fakeaccto')
self.assertEqual(access, 'fakeacc')
values = {
'share_id': share['id'],
'access_type': 'fakeacctype',
'access_to': 'fakeaccto',
}
with mock.patch.object(db_driver, 'share_access_create',
mock.Mock(return_value='fakeacc')):
self.share_rpcapi.allow_access(self.context, share, 'fakeacc')
access = self.api.allow_access(self.context, share, 'fakeacctype',
'fakeaccto')
self.assertEqual(access, 'fakeacc')
db_driver.share_access_create.assert_called_once_with(
self.context, values)
share_api.policy.check_policy.assert_called_once_with(
self.context, 'share', 'allow_access')
def test_allow_access_status_not_available(self):
share = fake_share('fakeid', status='error')
self.mox.ReplayAll()
self.assertRaises(exception.InvalidShare, self.api.allow_access,
self.context, share, 'fakeacctype', 'fakeaccto')
def test_allow_access_no_host(self):
share = fake_share('fakeid', host=None)
self.mox.ReplayAll()
self.assertRaises(exception.InvalidShare, self.api.allow_access,
self.context, share, 'fakeacctype', 'fakeaccto')
@mock.patch.object(db_driver, 'share_access_delete', mock.Mock())
def test_deny_access_error(self):
share = fake_share('fakeid', status='available')
access = fake_access('fakaccid', state='fakeerror')
self.mox.StubOutWithMock(share_api.policy, 'check_policy')
share_api.policy.check_policy(self.context, 'share', 'deny_access')
self.mox.StubOutWithMock(db_driver, 'share_access_delete')
db_driver.share_access_delete(self.context, access['id'])
self.mox.ReplayAll()
self.api.deny_access(self.context, share, access)
share_api.policy.check_policy.assert_called_once_with(
self.context, 'share', 'deny_access')
db_driver.share_access_delete.assert_called_once_with(
self.context, access['id'])
@mock.patch.object(db_driver, 'share_access_update', mock.Mock())
def test_deny_access_active(self):
share = fake_share('fakeid', status='available')
access = fake_access('fakaccid', state='fakeactive')
self.mox.StubOutWithMock(share_api.policy, 'check_policy')
share_api.policy.check_policy(self.context, 'share', 'deny_access')
self.mox.StubOutWithMock(db_driver, 'share_access_update')
db_driver.share_access_update(self.context, access['id'],
{'state': 'fakedeleting'})
self.share_rpcapi.deny_access(self.context, share, access)
self.mox.ReplayAll()
self.api.deny_access(self.context, share, access)
db_driver.share_access_update.assert_called_once_with(
self.context, access['id'], {'state': 'fakedeleting'})
share_api.policy.check_policy.assert_called_once_with(
self.context, 'share', 'deny_access')
self.share_rpcapi.deny_access.assert_called_once_with(
self.context, share, access)
def test_deny_access_not_active_not_error(self):
share = fake_share('fakeid', status='available')
access = fake_access('fakaccid', state='fakenew')
self.mox.StubOutWithMock(share_api.policy, 'check_policy')
share_api.policy.check_policy(self.context, 'share', 'deny_access')
self.mox.ReplayAll()
self.assertRaises(exception.InvalidShareAccess, self.api.deny_access,
self.context, share, access)
share_api.policy.check_policy.assert_called_once_with(
self.context, 'share', 'deny_access')
def test_deny_access_status_not_available(self):
share = fake_share('fakeid', status='error')
self.mox.StubOutWithMock(share_api.policy, 'check_policy')
share_api.policy.check_policy(self.context, 'share', 'deny_access')
self.mox.ReplayAll()
self.assertRaises(exception.InvalidShare, self.api.deny_access,
self.context, share, 'fakeacc')
share_api.policy.check_policy.assert_called_once_with(
self.context, 'share', 'deny_access')
def test_deny_access_no_host(self):
share = fake_share('fakeid', host=None)
self.mox.StubOutWithMock(share_api.policy, 'check_policy')
share_api.policy.check_policy(self.context, 'share', 'deny_access')
self.mox.ReplayAll()
self.assertRaises(exception.InvalidShare, self.api.deny_access,
self.context, share, 'fakeacc')
share_api.policy.check_policy.assert_called_once_with(
self.context, 'share', 'deny_access')
def test_access_get(self):
self.mox.StubOutWithMock(share_api.policy, 'check_policy')
share_api.policy.check_policy(self.context, 'share', 'access_get')
self.mox.StubOutWithMock(db_driver, 'share_access_get')
db_driver.share_access_get(self.context, 'fakeid').AndReturn('fake')
self.mox.ReplayAll()
rule = self.api.access_get(self.context, 'fakeid')
self.assertEqual(rule, 'fake')
with mock.patch.object(db_driver, 'share_access_get',
mock.Mock(return_value='fake')):
rule = self.api.access_get(self.context, 'fakeid')
self.assertEqual(rule, 'fake')
share_api.policy.check_policy.assert_called_once_with(
self.context, 'share', 'access_get')
db_driver.share_access_get.assert_called_once_with(
self.context, 'fakeid')
def test_access_get_all(self):
share = fake_share('fakeid')
self.mox.StubOutWithMock(share_api.policy, 'check_policy')
share_api.policy.check_policy(self.context, 'share', 'access_get_all')
self.mox.StubOutWithMock(db_driver, 'share_access_get_all_for_share')
db_driver.share_access_get_all_for_share(self.context, 'fakeid').\
AndReturn([fake_access('fakeacc0id', state='fakenew'),
fake_access('fakeacc1id', state='fakeerror')])
self.mox.ReplayAll()
rules = self.api.access_get_all(self.context, share)
self.assertEqual(rules, [{'id': 'fakeacc0id',
'access_type': 'fakeacctype',
'access_to': 'fakeaccto',
'state': 'fakenew'},
{'id': 'fakeacc1id',
'access_type': 'fakeacctype',
'access_to': 'fakeaccto',
'state': 'fakeerror'}])
rules = [
fake_access('fakeacc0id', state='fakenew'),
fake_access('fakeacc1id', state='fakeerror'),
]
expected = [
{
'id': 'fakeacc0id',
'access_type': 'fakeacctype',
'access_to': 'fakeaccto',
'state': 'fakenew',
},
{
'id': 'fakeacc1id',
'access_type': 'fakeacctype',
'access_to': 'fakeaccto',
'state': 'fakeerror',
},
]
with mock.patch.object(db_driver, 'share_access_get_all_for_share',
mock.Mock(return_value=rules)):
actual = self.api.access_get_all(self.context, share)
self.assertEqual(actual, expected)
share_api.policy.check_policy.assert_called_once_with(
self.context, 'share', 'access_get_all')
db_driver.share_access_get_all_for_share.assert_called_once_with(
self.context, 'fakeid')
def test_share_metadata_get(self):
metadata = {'a': 'b', 'c': 'd'}
share_id = str(uuid.uuid4())
db_driver.share_create(self.context, {'id': share_id,
'metadata': metadata})
db_driver.share_create(self.context,
{'id': share_id, 'metadata': metadata})
self.assertEqual(metadata,
db_driver.share_metadata_get(self.context, share_id))
@ -559,13 +561,11 @@ class ShareAPITestCase(test.TestCase):
metadata1 = {'a': '1', 'c': '2'}
metadata2 = {'a': '3', 'd': '5'}
should_be = {'a': '3', 'c': '2', 'd': '5'}
share_id = str(uuid.uuid4())
db_driver.share_create(self.context, {'id': share_id,
'metadata': metadata1})
db_driver.share_create(self.context,
{'id': share_id, 'metadata': metadata1})
db_driver.share_metadata_update(self.context, share_id,
metadata2, False)
self.assertEqual(should_be,
db_driver.share_metadata_get(self.context, share_id))
@ -573,12 +573,10 @@ class ShareAPITestCase(test.TestCase):
metadata1 = {'a': '1', 'c': '2'}
metadata2 = {'a': '3', 'd': '4'}
should_be = metadata2
share_id = str(uuid.uuid4())
db_driver.share_create(self.context, {'id': share_id,
'metadata': metadata1})
db_driver.share_create(self.context,
{'id': share_id, 'metadata': metadata1})
db_driver.share_metadata_update(self.context, share_id,
metadata2, True)
self.assertEqual(should_be,
db_driver.share_metadata_get(self.context, share_id))

View File

@ -1,4 +1,3 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2012 NetApp
# All Rights Reserved.
#
@ -15,13 +14,14 @@
# under the License.
"""Unit tests for the NFS driver module."""
import mock
import os
import mock
from oslo.config import cfg
from manila import context
from manila.db.sqlalchemy import models
from manila import exception
from manila.openstack.common import importutils
from manila.openstack.common import log as logging
from manila.share.configuration import Configuration
@ -29,7 +29,6 @@ from manila.share.drivers import lvm
from manila import test
from manila.tests.db import fakes as db_fakes
from manila.tests import fake_utils
from oslo.config import cfg
CONF = cfg.CONF
@ -313,14 +312,16 @@ class LVMShareDriverTestCase(test.TestCase):
self.assertEqual(fake_utils.fake_execute_get_log(), expected_exec)
def test_ensure_share(self):
mount_path = self._get_mount_path(self.share)
self.mox.StubOutWithMock(self._driver, '_mount_device')
self._driver._mount_device(self.share, '/dev/mapper/fakevg-fakename').\
AndReturn(mount_path)
self._helper_nfs.create_export(mount_path, self.share['name'],
recreate=True).AndReturn('fakelocation')
self.mox.ReplayAll()
self._driver.ensure_share(self._context, self.share)
device_name = '/dev/mapper/fakevg-fakename'
location = 'fake_location'
with mock.patch.object(self._driver,
'_mount_device',
mock.Mock(return_value=location)):
self._driver.ensure_share(self._context, self.share)
self._driver._mount_device.assert_called_with(self.share,
device_name)
self._helper_nfs.create_export.assert_called_once_with(
location, self.share['name'], recreate=True)
def test_delete_share(self):
mount_path = self._get_mount_path(self.share)

View File

@ -1,6 +1,6 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 Justin Santa Barbara
# Copyright 2014 NetApp, Inc.
# Copyright 2014 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
@ -24,7 +24,6 @@ import tempfile
import uuid
import mock
import mox
from oslo.config import cfg
import paramiko
@ -307,43 +306,38 @@ class GenericUtilsTestCase(test.TestCase):
self.assertEqual(generated_url, actual_url)
def test_read_cached_file(self):
self.mox.StubOutWithMock(os.path, "getmtime")
os.path.getmtime(mox.IgnoreArg()).AndReturn(1)
self.mox.ReplayAll()
cache_data = {"data": 1123, "mtime": 1}
data = utils.read_cached_file("/this/is/a/fake", cache_data)
self.assertEqual(cache_data["data"], data)
with mock.patch.object(os.path, "getmtime", mock.Mock(return_value=1)):
data = utils.read_cached_file("/this/is/a/fake", cache_data)
self.assertEqual(cache_data["data"], data)
os.path.getmtime.assert_called_once_with("/this/is/a/fake")
def test_read_modified_cached_file(self):
self.mox.StubOutWithMock(os.path, "getmtime")
self.mox.StubOutWithMock(__builtin__, 'open')
os.path.getmtime(mox.IgnoreArg()).AndReturn(2)
with mock.patch.object(os.path, "getmtime", mock.Mock(return_value=2)):
fake_contents = "lorem ipsum"
fake_file = mock.Mock()
fake_file.read = mock.Mock(return_value=fake_contents)
fake_context_manager = mock.Mock()
fake_context_manager.__enter__ = mock.Mock(return_value=fake_file)
fake_context_manager.__exit__ = mock.Mock()
with mock.patch.object(__builtin__, 'open',
mock.Mock(return_value=fake_context_manager)):
cache_data = {"data": 1123, "mtime": 1}
self.reload_called = False
fake_contents = "lorem ipsum"
fake_file = self.mox.CreateMockAnything()
fake_file.read().AndReturn(fake_contents)
fake_context_manager = self.mox.CreateMockAnything()
fake_context_manager.__enter__().AndReturn(fake_file)
fake_context_manager.__exit__(mox.IgnoreArg(),
mox.IgnoreArg(),
mox.IgnoreArg())
def test_reload(reloaded_data):
self.assertEqual(reloaded_data, fake_contents)
self.reload_called = True
__builtin__.open(mox.IgnoreArg()).AndReturn(fake_context_manager)
self.mox.ReplayAll()
cache_data = {"data": 1123, "mtime": 1}
self.reload_called = False
def test_reload(reloaded_data):
self.assertEqual(reloaded_data, fake_contents)
self.reload_called = True
data = utils.read_cached_file("/this/is/a/fake",
cache_data,
reload_func=test_reload)
self.assertEqual(data, fake_contents)
self.assertTrue(self.reload_called)
data = utils.read_cached_file("/this/is/a/fake",
cache_data,
reload_func=test_reload)
self.assertEqual(data, fake_contents)
self.assertTrue(self.reload_called)
fake_file.read.assert_called_once_with()
fake_context_manager.__enter__.assert_any_call()
__builtin__.open.assert_called_once_with("/this/is/a/fake")
os.path.getmtime.assert_called_once_with("/this/is/a/fake")
def test_generate_password(self):
password = utils.generate_password()
@ -385,35 +379,34 @@ class GenericUtilsTestCase(test.TestCase):
fts_func = datetime.datetime.fromtimestamp
fake_now = 1000
down_time = 5
self.flags(service_down_time=down_time)
self.mox.StubOutWithMock(timeutils, 'utcnow')
with mock.patch.object(timeutils, 'utcnow',
mock.Mock(return_value=fts_func(fake_now))):
# Up (equal)
timeutils.utcnow().AndReturn(fts_func(fake_now))
service = {'updated_at': fts_func(fake_now - down_time),
'created_at': fts_func(fake_now - down_time)}
self.mox.ReplayAll()
result = utils.service_is_up(service)
self.assertTrue(result)
# Up (equal)
service = {'updated_at': fts_func(fake_now - down_time),
'created_at': fts_func(fake_now - down_time)}
result = utils.service_is_up(service)
self.assertTrue(result)
timeutils.utcnow.assert_called_once_with()
self.mox.ResetAll()
# Up
timeutils.utcnow().AndReturn(fts_func(fake_now))
service = {'updated_at': fts_func(fake_now - down_time + 1),
'created_at': fts_func(fake_now - down_time + 1)}
self.mox.ReplayAll()
result = utils.service_is_up(service)
self.assertTrue(result)
with mock.patch.object(timeutils, 'utcnow',
mock.Mock(return_value=fts_func(fake_now))):
# Up
service = {'updated_at': fts_func(fake_now - down_time + 1),
'created_at': fts_func(fake_now - down_time + 1)}
result = utils.service_is_up(service)
self.assertTrue(result)
timeutils.utcnow.assert_called_once_with()
self.mox.ResetAll()
# Down
timeutils.utcnow().AndReturn(fts_func(fake_now))
service = {'updated_at': fts_func(fake_now - down_time - 1),
'created_at': fts_func(fake_now - down_time - 1)}
self.mox.ReplayAll()
result = utils.service_is_up(service)
self.assertFalse(result)
with mock.patch.object(timeutils, 'utcnow',
mock.Mock(return_value=fts_func(fake_now))):
# Down
service = {'updated_at': fts_func(fake_now - down_time - 1),
'created_at': fts_func(fake_now - down_time - 1)}
result = utils.service_is_up(service)
self.assertFalse(result)
timeutils.utcnow.assert_called_once_with()
def test_safe_parse_xml(self):
@ -690,45 +683,41 @@ class FakeTransport(object):
class SSHPoolTestCase(test.TestCase):
"""Unit test for SSH Connection Pool."""
def setup(self):
self.mox.StubOutWithMock(paramiko, "SSHClient")
paramiko.SSHClient().AndReturn(FakeSSHClient())
self.mox.ReplayAll()
def test_single_ssh_connect(self):
self.setup()
sshpool = utils.SSHPool("127.0.0.1", 22, 10, "test", password="test",
min_size=1, max_size=1)
with sshpool.item() as ssh:
first_id = ssh.id
with mock.patch.object(paramiko, "SSHClient",
mock.Mock(return_value=FakeSSHClient())):
sshpool = utils.SSHPool("127.0.0.1", 22, 10, "test",
password="test", min_size=1, max_size=1)
with sshpool.item() as ssh:
first_id = ssh.id
with sshpool.item() as ssh:
second_id = ssh.id
with sshpool.item() as ssh:
second_id = ssh.id
self.assertEqual(first_id, second_id)
self.assertEqual(first_id, second_id)
paramiko.SSHClient.assert_called_once_with()
def test_closed_reopend_ssh_connections(self):
self.setup()
sshpool = utils.SSHPool("127.0.0.1", 22, 10, "test", password="test",
min_size=1, max_size=2)
with sshpool.item() as ssh:
first_id = ssh.id
with sshpool.item() as ssh:
second_id = ssh.id
# Close the connection and test for a new connection
ssh.get_transport().active = False
with mock.patch.object(paramiko, "SSHClient",
mock.Mock(return_value=FakeSSHClient())):
sshpool = utils.SSHPool("127.0.0.1", 22, 10, "test",
password="test", min_size=1, max_size=2)
with sshpool.item() as ssh:
first_id = ssh.id
with sshpool.item() as ssh:
second_id = ssh.id
# Close the connection and test for a new connection
ssh.get_transport().active = False
self.assertEqual(first_id, second_id)
paramiko.SSHClient.assert_called_once_with()
self.assertEqual(first_id, second_id)
# The mox items are not getting setup in a new pool connection,
# so had to reset and set again.
self.mox.UnsetStubs()
self.setup()
with sshpool.item() as ssh:
third_id = ssh.id
self.assertNotEqual(first_id, third_id)
# Expected new ssh pool
with mock.patch.object(paramiko, "SSHClient",
mock.Mock(return_value=FakeSSHClient())):
with sshpool.item() as ssh:
third_id = ssh.id
self.assertNotEqual(first_id, third_id)
paramiko.SSHClient.assert_called_once_with()
class CidrToNetmaskTestCase(test.TestCase):

View File

@ -1221,3 +1221,11 @@ def cidr_to_netmask(cidr):
return str(network.netmask)
except netaddr.AddrFormatError:
raise exception.InvalidInput(_("Invalid cidr supplied %s") % cidr)
class IsAMatcher(object):
def __init__(self, expected_value=None):
self.expected_value = expected_value
def __eq__(self, actual_value):
return isinstance(actual_value, self.expected_value)