From 990f108e6831feaab42503922ced9e17caff1bf6 Mon Sep 17 00:00:00 2001 From: vponomaryov Date: Wed, 11 Jun 2014 13:52:09 +0300 Subject: [PATCH] Rewrited mox tests to mock (part 1) Preffered module for unittests is mock, so, this commit is intended to remove mox module from manila's dependencies in modules: manila/tests/api/v1/test_shares.py manila/tests/test_service.py manila/tests/test_share_api.py manila/tests/test_share_lvm.py manila/tests/test_utils.py Change-Id: Ie488b80dea0a1e66837c16d3e6ac8658a2045a2e Partially implements: blueprint replace-mox-with-mock --- manila/tests/api/v1/test_shares.py | 3 - manila/tests/test_service.py | 177 +++++----- manila/tests/test_share_api.py | 526 ++++++++++++++--------------- manila/tests/test_share_lvm.py | 25 +- manila/tests/test_utils.py | 175 +++++----- manila/utils.py | 8 + 6 files changed, 443 insertions(+), 471 deletions(-) diff --git a/manila/tests/api/v1/test_shares.py b/manila/tests/api/v1/test_shares.py index 2b7227ee62..d6e6212fa2 100644 --- a/manila/tests/api/v1/test_shares.py +++ b/manila/tests/api/v1/test_shares.py @@ -32,7 +32,6 @@ class ShareApiTest(test.TestCase): def setUp(self): super(ShareApiTest, self).setUp() self.controller = shares.ShareController() - self.stubs.Set(share_api.API, 'get_all', stubs.stub_get_all_shares) self.stubs.Set(share_api.API, 'get', @@ -276,7 +275,6 @@ class ShareApiTest(test.TestCase): search_opts = {'a': 'a', 'b': 'b', 'c': 'c', 'd': 'd'} expected_opts = {'a': 'a', 'c': 'c'} allowed_opts = ['a', 'c'] - self.mox.ReplayAll() common.remove_invalid_options(ctx, search_opts, allowed_opts) self.assertEqual(search_opts, expected_opts) @@ -285,6 +283,5 @@ class ShareApiTest(test.TestCase): search_opts = {'a': 'a', 'b': 'b', 'c': 'c', 'd': 'd'} expected_opts = {'a': 'a', 'b': 'b', 'c': 'c', 'd': 'd'} allowed_opts = ['a', 'c'] - self.mox.ReplayAll() common.remove_invalid_options(ctx, search_opts, allowed_opts) self.assertEqual(search_opts, expected_opts) diff --git a/manila/tests/test_service.py b/manila/tests/test_service.py index 26c367bf8f..d8fec25dcc 100644 --- a/manila/tests/test_service.py +++ b/manila/tests/test_service.py @@ -1,7 +1,8 @@ -# vim: tabstop=4 shiftwidth=4 softtabstop=4 - # Copyright 2010 United States Government as represented by the # Administrator of the National Aeronautics and Space Administration. +# Copyright 2014 NetApp, Inc. +# Copyright 2014 Mirantis, Inc. +# # All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); you may @@ -20,13 +21,12 @@ Unit Tests for remote procedure calls using queue """ -import mox +import mock from oslo.config import cfg from manila import context from manila import db from manila import exception - from manila import manager from manila import service from manila import test @@ -41,7 +41,8 @@ test_service_opts = [ help="Host to bind test service to"), cfg.IntOpt("test_service_listen_port", default=0, - help="Port number to bind test service to"), ] + help="Port number to bind test service to"), +] CONF = cfg.CONF CONF.register_opts(test_service_opts) @@ -49,10 +50,8 @@ CONF.register_opts(test_service_opts) class FakeManager(manager.Manager): """Fake manager for tests""" - def __init__(self, host=None, - db_driver=None, service_name=None): - super(FakeManager, self).__init__(host=host, - db_driver=db_driver) + def __init__(self, host=None, db_driver=None, service_name=None): + super(FakeManager, self).__init__(host=host, db_driver=db_driver) def test_method(self): return 'manager' @@ -67,18 +66,12 @@ class ServiceManagerTestCase(test.TestCase): """Test cases for Services""" def test_message_gets_to_manager(self): - serv = service.Service('test', - 'test', - 'test', - 'manila.tests.test_service.FakeManager') + serv = service.Service('test', 'test', 'test', CONF.fake_manager) serv.start() self.assertEqual(serv.test_method(), 'manager') def test_override_manager_method(self): - serv = ExtendedService('test', - 'test', - 'test', - 'manila.tests.test_service.FakeManager') + serv = ExtendedService('test', 'test', 'test', CONF.fake_manager) serv.start() self.assertEqual(serv.test_method(), 'service') @@ -93,7 +86,7 @@ class ServiceFlagsTestCase(test.TestCase): app.stop() ref = db.service_get(context.get_admin_context(), app.service_id) db.service_destroy(context.get_admin_context(), app.service_id) - self.assertTrue(not ref['disabled']) + self.assertFalse(ref['disabled']) def test_service_disabled_on_create_based_on_flag(self): self.flags(enable_new_services=False) @@ -107,119 +100,105 @@ class ServiceFlagsTestCase(test.TestCase): self.assertTrue(ref['disabled']) +def fake_service_get_by_args(*args, **kwargs): + raise exception.NotFound() + + +def fake_service_get(*args, **kwargs): + raise Exception() + + +host = 'foo' +binary = 'bar' +topic = 'test' +service_create = { + 'host': host, + 'binary': binary, + 'topic': topic, + 'report_count': 0, + 'availability_zone': 'nova', +} +service_ref = { + 'host': host, + 'binary': binary, + 'topic': topic, + 'report_count': 0, + 'availability_zone': 'nova', + 'id': 1, +} + + class ServiceTestCase(test.TestCase): """Test cases for Services""" - def setUp(self): - super(ServiceTestCase, self).setUp() - self.mox.StubOutWithMock(service, 'db') - def test_create(self): - host = 'foo' - binary = 'manila-fake' - topic = 'fake' - - # NOTE(vish): Create was moved out of mox replay to make sure that - # the looping calls are created in StartService. - app = service.Service.create(host=host, binary=binary, topic=topic) - + app = service.Service.create(host='foo', + binary='manila-fake', + topic='fake') self.assertTrue(app) + @mock.patch.object(service.db, 'service_get_by_args', + mock.Mock(side_effect=fake_service_get_by_args)) + @mock.patch.object(service.db, 'service_create', + mock.Mock(return_value=service_ref)) + @mock.patch.object(service.db, 'service_get', + mock.Mock(side_effect=fake_service_get)) def test_report_state_newly_disconnected(self): - host = 'foo' - binary = 'bar' - topic = 'test' - service_create = {'host': host, - 'binary': binary, - 'topic': topic, - 'report_count': 0, - 'availability_zone': 'nova'} - service_ref = {'host': host, - 'binary': binary, - 'topic': topic, - 'report_count': 0, - 'availability_zone': 'nova', - 'id': 1} - - service.db.service_get_by_args(mox.IgnoreArg(), - host, - binary).AndRaise(exception.NotFound()) - service.db.service_create(mox.IgnoreArg(), - service_create).AndReturn(service_ref) - service.db.service_get(mox.IgnoreArg(), - mox.IgnoreArg()).AndRaise(Exception()) - - self.mox.ReplayAll() - serv = service.Service(host, - binary, - topic, - 'manila.tests.test_service.FakeManager') + serv = service.Service(host, binary, topic, CONF.fake_manager) serv.start() serv.report_state() self.assertTrue(serv.model_disconnected) + service.db.service_get_by_args.assert_called_once_with( + mock.ANY, host, binary) + service.db.service_create.assert_called_once_with( + mock.ANY, service_create) + service.db.service_get.assert_called_once_with(mock.ANY, mock.ANY) + @mock.patch.object(service.db, 'service_get_by_args', + mock.Mock(side_effect=fake_service_get_by_args)) + @mock.patch.object(service.db, 'service_create', + mock.Mock(return_value=service_ref)) + @mock.patch.object(service.db, 'service_get', + mock.Mock(return_value=service_ref)) + @mock.patch.object(service.db, 'service_update', + mock.Mock(return_value=service_ref. + update({'report_count': 1}))) def test_report_state_newly_connected(self): - host = 'foo' - binary = 'bar' - topic = 'test' - service_create = {'host': host, - 'binary': binary, - 'topic': topic, - 'report_count': 0, - 'availability_zone': 'nova'} - service_ref = {'host': host, - 'binary': binary, - 'topic': topic, - 'report_count': 0, - 'availability_zone': 'nova', - 'id': 1} - - service.db.service_get_by_args(mox.IgnoreArg(), - host, - binary).AndRaise(exception.NotFound()) - service.db.service_create(mox.IgnoreArg(), - service_create).AndReturn(service_ref) - service.db.service_get(mox.IgnoreArg(), - service_ref['id']).AndReturn(service_ref) - service.db.service_update(mox.IgnoreArg(), service_ref['id'], - mox.ContainsKeyValue('report_count', 1)) - - self.mox.ReplayAll() - serv = service.Service(host, - binary, - topic, - 'manila.tests.test_service.FakeManager') + serv = service.Service(host, binary, topic, CONF.fake_manager) serv.start() serv.model_disconnected = True serv.report_state() - - self.assertTrue(not serv.model_disconnected) + self.assertFalse(serv.model_disconnected) + service.db.service_get_by_args.assert_called_once_with( + mock.ANY, host, binary) + service.db.service_create.assert_called_once_with( + mock.ANY, service_create) + service.db.service_get.assert_called_once_with( + mock.ANY, service_ref['id']) + service.db.service_update.assert_called_once_with( + mock.ANY, service_ref['id'], mock.ANY) class TestWSGIService(test.TestCase): - def setUp(self): - super(TestWSGIService, self).setUp() - self.stubs.Set(wsgi.Loader, "load_app", mox.MockAnything()) - + @mock.patch.object(wsgi.Loader, 'load_app', mock.Mock()) def test_service_random_port(self): test_service = service.WSGIService("test_service") self.assertEqual(0, test_service.port) test_service.start() self.assertNotEqual(0, test_service.port) test_service.stop() + wsgi.Loader.load_app.assert_called_once_with("test_service") class TestLauncher(test.TestCase): - def setUp(self): - super(TestLauncher, self).setUp() - self.stubs.Set(wsgi.Loader, "load_app", mox.MockAnything()) - self.service = service.WSGIService("test_service") - + @mock.patch.object(wsgi.Loader, 'load_app', mock.Mock()) def test_launch_app(self): + self.service = service.WSGIService("test_service") self.assertEqual(0, self.service.port) launcher = service.Launcher() launcher.launch_server(self.service) self.assertEqual(0, self.service.port) launcher.stop() + wsgi.Loader.load_app.assert_called_once_with("test_service") diff --git a/manila/tests/test_share_api.py b/manila/tests/test_share_api.py index 9e315b2b7a..86e54949cc 100644 --- a/manila/tests/test_share_api.py +++ b/manila/tests/test_share_api.py @@ -1,4 +1,3 @@ -# vim: tabstop=4 shiftwidth=4 softtabstop=4 # Copyright 2012 NetApp # All Rights Reserved. # @@ -20,7 +19,6 @@ import random import uuid import mock -import mox import suds from manila import context @@ -34,6 +32,7 @@ from manila.share import api as share_api from manila.share import rpcapi as share_rpcapi from manila import test from manila.tests.db import fakes as db_fakes +from manila import utils def fake_share(id, **kwargs): @@ -102,14 +101,13 @@ def fake_access(id, **kwargs): class ShareAPITestCase(test.TestCase): + def setUp(self): super(ShareAPITestCase, self).setUp() self.context = context.get_admin_context() - self.scheduler_rpcapi = self.mox.CreateMock( - scheduler_rpcapi.SchedulerAPI) - self.share_rpcapi = self.mox.CreateMock(share_rpcapi.ShareAPI) + self.scheduler_rpcapi = mock.Mock() + self.share_rpcapi = mock.Mock() self.api = share.API() - self.stubs.Set(self.api, 'scheduler_rpcapi', self.scheduler_rpcapi) self.stubs.Set(self.api, 'share_rpcapi', self.share_rpcapi) self.stubs.Set(quota.QUOTAS, 'reserve', lambda *args, **kwargs: None) @@ -117,10 +115,12 @@ class ShareAPITestCase(test.TestCase): self.patcher = mock.patch.object(timeutils, 'utcnow') self.mock_utcnow = self.patcher.start() self.mock_utcnow.return_value = datetime.datetime.utcnow() + self.addCleanup(self.patcher.stop) - def tearDown(self): - self.patcher.stop() - super(ShareAPITestCase, self).tearDown() + self.policy_patcher = mock.patch.object(share_api.policy, + 'check_policy') + self.policy_patcher.start() + self.addCleanup(self.policy_patcher.stop) def test_create(self): date = datetime.datetime(1, 1, 1, 1, 1, 1) @@ -133,100 +133,103 @@ class ShareAPITestCase(test.TestCase): for name in ('id', 'export_location', 'host', 'launched_at', 'terminated_at'): options.pop(name, None) - request_spec = {'share_properties': options, - 'share_proto': share['share_proto'], - 'share_id': share['id'], - 'snapshot_id': share['snapshot_id'], - 'volume_type': None - } - - self.mox.StubOutWithMock(db_driver, 'share_create') - db_driver.share_create(self.context, options).AndReturn(share) - self.scheduler_rpcapi.create_share(self.context, mox.IgnoreArg(), - share['id'], share['snapshot_id'], - request_spec=request_spec, - filter_properties={}) - self.mox.ReplayAll() - self.api.create(self.context, 'nfs', '1', 'fakename', 'fakedesc', - availability_zone='fakeaz') + request_spec = { + 'share_properties': options, + 'share_proto': share['share_proto'], + 'share_id': share['id'], + 'snapshot_id': share['snapshot_id'], + 'volume_type': None, + } + with mock.patch.object(db_driver, 'share_create', + mock.Mock(return_value=share)): + self.api.create(self.context, 'nfs', '1', 'fakename', 'fakedesc', + availability_zone='fakeaz') + db_driver.share_create.assert_called_once_with( + self.context, options) + @mock.patch.object(quota.QUOTAS, 'reserve', + mock.Mock(return_value='reservation')) + @mock.patch.object(quota.QUOTAS, 'commit', mock.Mock()) def test_create_snapshot(self): date = datetime.datetime(1, 1, 1, 1, 1, 1) self.mock_utcnow.return_value = date - share = fake_share('fakeid', - status='available') + share = fake_share('fakeid', status='available') snapshot = fake_snapshot('fakesnapshotid', share_id=share['id'], status='creating') fake_name = 'fakename' fake_desc = 'fakedesc' - options = {'share_id': share['id'], - 'user_id': self.context.user_id, - 'project_id': self.context.project_id, - 'status': "creating", - 'progress': '0%', - 'share_size': share['size'], - 'size': 1, - 'display_name': fake_name, - 'display_description': fake_desc, - 'share_proto': share['share_proto'], - 'export_location': share['export_location']} - - self.mox.StubOutWithMock(share_api.policy, 'check_policy') - share_api.policy.check_policy(self.context, 'share', - 'create_snapshot', share) - self.mox.StubOutWithMock(quota.QUOTAS, 'reserve') - quota.QUOTAS.reserve(self.context, snapshots=1, gigabytes=1).\ - AndReturn('reservation') - self.mox.StubOutWithMock(db_driver, 'share_snapshot_create') - db_driver.share_snapshot_create(self.context, - options).AndReturn(snapshot) - self.mox.StubOutWithMock(quota.QUOTAS, 'commit') - quota.QUOTAS.commit(self.context, 'reservation') - self.share_rpcapi.create_snapshot(self.context, share, snapshot) - self.mox.ReplayAll() - self.api.create_snapshot(self.context, share, fake_name, fake_desc) + options = { + 'share_id': share['id'], + 'user_id': self.context.user_id, + 'project_id': self.context.project_id, + 'status': "creating", + 'progress': '0%', + 'share_size': share['size'], + 'size': 1, + 'display_name': fake_name, + 'display_description': fake_desc, + 'share_proto': share['share_proto'], + 'export_location': share['export_location'], + } + with mock.patch.object(db_driver, 'share_snapshot_create', + mock.Mock(return_value=snapshot)): + self.api.create_snapshot(self.context, share, fake_name, + fake_desc) + share_api.policy.check_policy.assert_called_once_with( + self.context, 'share', 'create_snapshot', share) + quota.QUOTAS.reserve.assert_called_once_with( + self.context, snapshots=1, gigabytes=1) + quota.QUOTAS.commit.assert_called_once_with( + self.context, 'reservation') + db_driver.share_snapshot_create.assert_called_once_with( + self.context, options) + @mock.patch.object(db_driver, 'share_snapshot_update', mock.Mock()) def test_delete_snapshot(self): date = datetime.datetime(1, 1, 1, 1, 1, 1) self.mock_utcnow.return_value = date share = fake_share('fakeid') - snapshot = fake_snapshot('fakesnapshotid', share_id=share['id'], + snapshot = fake_snapshot('fakesnapshotid', + share_id=share['id'], status='available') - self.mox.StubOutWithMock(share_api.policy, 'check_policy') - share_api.policy.check_policy( - self.context, 'share', 'delete_snapshot', snapshot) - self.mox.StubOutWithMock(db_driver, 'share_snapshot_update') - db_driver.share_snapshot_update(self.context, snapshot['id'], - {'status': 'deleting'}) - self.mox.StubOutWithMock(db_driver, 'share_get') - db_driver.share_get(self.context, - snapshot['share_id']).AndReturn(share) - self.share_rpcapi.delete_snapshot(self.context, snapshot, - share['host']) - self.mox.ReplayAll() - self.api.delete_snapshot(self.context, snapshot) + with mock.patch.object(db_driver, 'share_get', + mock.Mock(return_value=share)): + self.api.delete_snapshot(self.context, snapshot) + self.share_rpcapi.delete_snapshot.assert_called_once_with( + self.context, snapshot, share['host']) + share_api.policy.check_policy.assert_called_once_with( + self.context, 'share', 'delete_snapshot', snapshot) + db_driver.share_snapshot_update.assert_called_once_with( + self.context, snapshot['id'], {'status': 'deleting'}) + db_driver.share_get.assert_called_once_with( + self.context, snapshot['share_id']) def test_delete_snapshot_wrong_status(self): - snapshot = fake_snapshot('fakesnapshotid', share_id='fakeshareid', + snapshot = fake_snapshot('fakesnapshotid', + share_id='fakeshareid', status='creating') - self.mox.StubOutWithMock(share_api.policy, 'check_policy') - share_api.policy.check_policy( - self.context, 'share', 'delete_snapshot', snapshot) - self.mox.ReplayAll() self.assertRaises(exception.InvalidShareSnapshot, - self.api.delete_snapshot, self.context, snapshot) + self.api.delete_snapshot, + self.context, + snapshot) + share_api.policy.check_policy.assert_called_once_with( + self.context, 'share', 'delete_snapshot', snapshot) def test_create_snapshot_if_share_not_available(self): - share = fake_share('fakeid', - status='error') - self.mox.StubOutWithMock(share_api.policy, 'check_policy') - share_api.policy.check_policy(self.context, 'share', - 'create_snapshot', share) - self.mox.ReplayAll() - self.assertRaises(exception.InvalidShare, self.api.create_snapshot, - self.context, share, 'fakename', 'fakedesc') + share = fake_share('fakeid', status='error') + self.assertRaises(exception.InvalidShare, + self.api.create_snapshot, + self.context, + share, + 'fakename', + 'fakedesc') + share_api.policy.check_policy.assert_called_once_with( + self.context, 'share', 'create_snapshot', share) + @mock.patch.object(quota.QUOTAS, 'reserve', + mock.Mock(return_value='reservation')) + @mock.patch.object(quota.QUOTAS, 'commit', mock.Mock()) def test_create_from_snapshot_available(self): date = datetime.datetime(1, 1, 1, 1, 1, 1) self.mock_utcnow.return_value = date @@ -242,39 +245,45 @@ class ShareAPITestCase(test.TestCase): for name in ('id', 'export_location', 'host', 'launched_at', 'terminated_at'): options.pop(name, None) - request_spec = {'share_properties': options, - 'share_proto': share['share_proto'], - 'share_id': share['id'], - 'volume_type': None, - 'snapshot_id': share['snapshot_id'], - } - - self.mox.StubOutWithMock(db_driver, 'share_create') - db_driver.share_create(self.context, options).AndReturn(share) - self.scheduler_rpcapi.create_share(self.context, mox.IgnoreArg(), - share['id'], share['snapshot_id'], - request_spec=request_spec, - filter_properties={}) - self.mox.ReplayAll() - self.api.create(self.context, 'nfs', '1', 'fakename', 'fakedesc', - snapshot=snapshot, availability_zone='fakeaz') + request_spec = { + 'share_properties': options, + 'share_proto': share['share_proto'], + 'share_id': share['id'], + 'volume_type': None, + 'snapshot_id': share['snapshot_id'], + } + with mock.patch.object(db_driver, 'share_create', + mock.Mock(return_value=share)): + self.api.create(self.context, 'nfs', '1', 'fakename', 'fakedesc', + snapshot=snapshot, availability_zone='fakeaz') + self.scheduler_rpcapi.create_share.assert_called_once_with( + self.context, 'manila-share', share['id'], + share['snapshot_id'], request_spec=request_spec, + filter_properties={}) + db_driver.share_create.assert_called_once_with( + self.context, options) + share_api.policy.check_policy.assert_called_once_with( + self.context, 'share', 'create') + quota.QUOTAS.reserve.assert_called_once_with( + self.context, gigabytes=1, shares=1) + quota.QUOTAS.commit.assert_called_once_with( + self.context, 'reservation') def test_get_snapshot(self): fake_get_snap = {'fake_key': 'fake_val'} - self.mox.StubOutWithMock(share_api.policy, 'check_policy') - share_api.policy.check_policy(self.context, 'share', 'get_snapshot') - self.mox.StubOutWithMock(db_driver, 'share_snapshot_get') - db_driver.share_snapshot_get(self.context, - 'fakeid').AndReturn(fake_get_snap) - self.mox.ReplayAll() - rule = self.api.get_snapshot(self.context, 'fakeid') - self.assertEqual(rule, fake_get_snap) + with mock.patch.object(db_driver, 'share_snapshot_get', + mock.Mock(return_value=fake_get_snap)): + rule = self.api.get_snapshot(self.context, 'fakeid') + self.assertEqual(rule, fake_get_snap) + share_api.policy.check_policy.assert_called_once_with( + self.context, 'share', 'get_snapshot') + db_driver.share_snapshot_get.assert_called_once_with( + self.context, 'fakeid') def test_create_from_snapshot_not_available(self): snapshot = fake_snapshot('fakesnapshotid', share_id='fakeshare_id', status='error') - self.mox.ReplayAll() self.assertRaises(exception.InvalidShareSnapshot, self.api.create, self.context, 'nfs', '1', 'fakename', 'fakedesc', snapshot=snapshot, @@ -282,19 +291,16 @@ class ShareAPITestCase(test.TestCase): def test_create_from_snapshot_larger_size(self): snapshot = fake_snapshot(1, size=100, status='available') - self.mox.ReplayAll() self.assertRaises(exception.InvalidInput, self.api.create, self.context, 'nfs', 1, 'fakename', 'fakedesc', availability_zone='fakeaz', snapshot=snapshot) def test_create_wrong_size_0(self): - self.mox.ReplayAll() self.assertRaises(exception.InvalidInput, self.api.create, self.context, 'nfs', 0, 'fakename', 'fakedesc', availability_zone='fakeaz') def test_create_wrong_size_some(self): - self.mox.ReplayAll() self.assertRaises(exception.InvalidInput, self.api.create, self.context, 'nfs', 'some', 'fakename', 'fakedesc', availability_zone='fakeaz') @@ -303,255 +309,251 @@ class ShareAPITestCase(test.TestCase): date = datetime.datetime(2, 2, 2, 2, 2, 2) self.mock_utcnow.return_value = date share = fake_share('fakeid', status='available') - options = {'status': 'deleting', - 'terminated_at': date} + options = {'status': 'deleting', 'terminated_at': date} deleting_share = share.copy() deleting_share.update(options) - - self.mox.StubOutWithMock(db_driver, 'share_update') - db_driver.share_update(self.context, share['id'], options).\ - AndReturn(deleting_share) - self.share_rpcapi.delete_share(self.context, deleting_share) - self.mox.ReplayAll() - self.api.delete(self.context, share) - self.mox.UnsetStubs() - self.mox.VerifyAll() + with mock.patch.object(db_driver, 'share_update', + mock.Mock(return_value=deleting_share)): + self.api.delete(self.context, share) + db_driver.share_update.assert_called_once_with( + self.context, share['id'], options) + self.share_rpcapi.delete_share.assert_called_once_with( + self.context, deleting_share) def test_delete_error(self): date = datetime.datetime(2, 2, 2, 2, 2, 2) self.mock_utcnow.return_value = date share = fake_share('fakeid', status='error') - options = {'status': 'deleting', - 'terminated_at': date} + options = {'status': 'deleting', 'terminated_at': date} deleting_share = share.copy() deleting_share.update(options) - - self.mox.StubOutWithMock(db_driver, 'share_update') - db_driver.share_update(self.context, share['id'], options).\ - AndReturn(deleting_share) - self.share_rpcapi.delete_share(self.context, deleting_share) - self.mox.ReplayAll() - self.api.delete(self.context, share) - self.mox.UnsetStubs() - self.mox.VerifyAll() + with mock.patch.object(db_driver, 'share_update', + mock.Mock(return_value=deleting_share)): + self.api.delete(self.context, share) + db_driver.share_update.assert_called_once_with( + self.context, share['id'], options) + self.share_rpcapi.delete_share.assert_called_once_with( + self.context, deleting_share) def test_delete_wrong_status(self): share = fake_share('fakeid') - self.mox.ReplayAll() self.assertRaises(exception.InvalidShare, self.api.delete, self.context, share) + @mock.patch.object(db_driver, 'share_delete', mock.Mock()) def test_delete_no_host(self): share = fake_share('fakeid') share['host'] = None - - self.mox.StubOutWithMock(db_driver, 'share_delete') - db_driver.share_delete(mox.IsA(context.RequestContext), 'fakeid') - self.mox.ReplayAll() self.api.delete(self.context, share) + db_driver.share_delete.assert_called_once_with( + utils.IsAMatcher(context.RequestContext), 'fakeid') def test_get(self): - self.mox.StubOutWithMock(db_driver, 'share_get') - db_driver.share_get(self.context, 'fakeid').AndReturn('fakeshare') - self.mox.StubOutWithMock(share_api.policy, 'check_policy') - share_api.policy.check_policy(self.context, 'share', 'get', - 'fakeshare') - self.mox.ReplayAll() - result = self.api.get(self.context, 'fakeid') - self.assertEqual(result, 'fakeshare') + with mock.patch.object(db_driver, 'share_get', + mock.Mock(return_value='fakeshare')): + result = self.api.get(self.context, 'fakeid') + self.assertEqual(result, 'fakeshare') + share_api.policy.check_policy.assert_called_once_with( + self.context, 'share', 'get', 'fakeshare') + db_driver.share_get.assert_called_once_with( + self.context, 'fakeid') + @mock.patch.object(db_driver, 'share_get_all_by_project', mock.Mock()) def test_get_all_admin_not_all_tenants(self): ctx = context.RequestContext('fakeuid', 'fakepid', id_admin=True) - self.mox.StubOutWithMock(share_api.policy, 'check_policy') - share_api.policy.check_policy(ctx, 'share', 'get_all') - self.mox.StubOutWithMock(db_driver, 'share_get_all_by_project') - db_driver.share_get_all_by_project(ctx, 'fakepid') - self.mox.ReplayAll() self.api.get_all(ctx) + share_api.policy.check_policy.assert_called_once_with( + ctx, 'share', 'get_all') + db_driver.share_get_all_by_project.assert_called_once_with( + ctx, 'fakepid') + @mock.patch.object(db_driver, 'share_get_all', mock.Mock()) def test_get_all_admin_all_tenants(self): - self.mox.StubOutWithMock(share_api.policy, 'check_policy') - share_api.policy.check_policy(self.context, 'share', 'get_all') - self.mox.StubOutWithMock(db_driver, 'share_get_all') - db_driver.share_get_all(self.context) - self.mox.ReplayAll() self.api.get_all(self.context, search_opts={'all_tenants': 1}) + share_api.policy.check_policy.assert_called_once_with( + self.context, 'share', 'get_all') + db_driver.share_get_all.assert_called_once_with(self.context) + @mock.patch.object(db_driver, 'share_get_all_by_project', mock.Mock()) def test_get_all_not_admin(self): ctx = context.RequestContext('fakeuid', 'fakepid', id_admin=False) - self.mox.StubOutWithMock(share_api.policy, 'check_policy') - share_api.policy.check_policy(ctx, 'share', 'get_all') - self.mox.StubOutWithMock(db_driver, 'share_get_all_by_project') - db_driver.share_get_all_by_project(ctx, 'fakepid') - self.mox.ReplayAll() self.api.get_all(ctx) + share_api.policy.check_policy.assert_called_once_with( + ctx, 'share', 'get_all') + db_driver.share_get_all_by_project.assert_called_once_with( + ctx, 'fakepid') def test_get_all_not_admin_search_opts(self): search_opts = {'size': 'fakesize'} fake_objs = [{'name': 'fakename1'}, search_opts] ctx = context.RequestContext('fakeuid', 'fakepid', id_admin=False) - self.mox.StubOutWithMock(share_api.policy, 'check_policy') - share_api.policy.check_policy(ctx, 'share', 'get_all') - self.mox.StubOutWithMock(db_driver, 'share_get_all_by_project') - db_driver.share_get_all_by_project(ctx, - 'fakepid').AndReturn(fake_objs) - self.mox.ReplayAll() - result = self.api.get_all(ctx, search_opts) - self.assertEqual([search_opts], result) + with mock.patch.object(db_driver, 'share_get_all_by_project', + mock.Mock(return_value=fake_objs)): + result = self.api.get_all(ctx, search_opts) + self.assertEqual([search_opts], result) + share_api.policy.check_policy.assert_called_once_with( + ctx, 'share', 'get_all') + db_driver.share_get_all_by_project.assert_called_once_with( + ctx, 'fakepid') + @mock.patch.object(db_driver, 'share_snapshot_get_all_by_project', + mock.Mock()) def test_get_all_snapshots_admin_not_all_tenants(self): ctx = context.RequestContext('fakeuid', 'fakepid', id_admin=True) - self.mox.StubOutWithMock(share_api.policy, 'check_policy') - share_api.policy.check_policy(ctx, 'share', 'get_all_snapshots') - self.mox.StubOutWithMock(db_driver, - 'share_snapshot_get_all_by_project') - db_driver.share_snapshot_get_all_by_project(ctx, 'fakepid') - self.mox.ReplayAll() self.api.get_all_snapshots(ctx) + share_api.policy.check_policy.assert_called_once_with( + ctx, 'share', 'get_all_snapshots') + db_driver.share_snapshot_get_all_by_project.assert_called_once_with( + ctx, 'fakepid') + @mock.patch.object(db_driver, 'share_snapshot_get_all', mock.Mock()) def test_get_all_snapshots_admin_all_tenants(self): - self.mox.StubOutWithMock(share_api.policy, 'check_policy') - share_api.policy.check_policy(self.context, 'share', - 'get_all_snapshots') - self.mox.StubOutWithMock(db_driver, 'share_snapshot_get_all') - db_driver.share_snapshot_get_all(self.context) - self.mox.ReplayAll() self.api.get_all_snapshots(self.context, search_opts={'all_tenants': 1}) + share_api.policy.check_policy.assert_called_once_with( + self.context, 'share', 'get_all_snapshots') + db_driver.share_snapshot_get_all.assert_called_once_with(self.context) + @mock.patch.object(db_driver, 'share_snapshot_get_all_by_project', + mock.Mock()) def test_get_all_snapshots_not_admin(self): ctx = context.RequestContext('fakeuid', 'fakepid', id_admin=False) - self.mox.StubOutWithMock(share_api.policy, 'check_policy') - share_api.policy.check_policy(ctx, 'share', 'get_all_snapshots') - self.mox.StubOutWithMock(db_driver, - 'share_snapshot_get_all_by_project') - db_driver.share_snapshot_get_all_by_project(ctx, 'fakepid') - self.mox.ReplayAll() self.api.get_all_snapshots(ctx) + share_api.policy.check_policy.assert_called_once_with( + ctx, 'share', 'get_all_snapshots') + db_driver.share_snapshot_get_all_by_project.assert_called_once_with( + ctx, 'fakepid') def test_get_all_snapshots_not_admin_search_opts(self): search_opts = {'size': 'fakesize'} fake_objs = [{'name': 'fakename1'}, search_opts] ctx = context.RequestContext('fakeuid', 'fakepid', id_admin=False) - self.mox.StubOutWithMock(share_api.policy, 'check_policy') - share_api.policy.check_policy(ctx, 'share', 'get_all_snapshots') - self.mox.StubOutWithMock(db_driver, - 'share_snapshot_get_all_by_project') - db_driver.share_snapshot_get_all_by_project(ctx, 'fakepid').\ - AndReturn(fake_objs) - self.mox.ReplayAll() - result = self.api.get_all_snapshots(ctx, search_opts) - self.assertEqual([search_opts], result) + with mock.patch.object(db_driver, + 'share_snapshot_get_all_by_project', + mock.Mock(return_value=fake_objs)): + result = self.api.get_all_snapshots(ctx, search_opts) + self.assertEqual([search_opts], result) + share_api.policy.check_policy.assert_called_once_with( + ctx, 'share', 'get_all_snapshots') + db_driver.share_snapshot_get_all_by_project.\ + assert_called_once_with(ctx, 'fakepid') def test_allow_access(self): share = fake_share('fakeid', status='available') - values = {'share_id': share['id'], - 'access_type': 'fakeacctype', - 'access_to': 'fakeaccto'} - self.mox.StubOutWithMock(share_api.policy, 'check_policy') - share_api.policy.check_policy(self.context, 'share', 'allow_access') - self.mox.StubOutWithMock(db_driver, 'share_access_create') - db_driver.share_access_create(self.context, values).\ - AndReturn('fakeacc') - self.share_rpcapi.allow_access(self.context, share, 'fakeacc') - self.mox.ReplayAll() - access = self.api.allow_access(self.context, share, 'fakeacctype', - 'fakeaccto') - self.assertEqual(access, 'fakeacc') + values = { + 'share_id': share['id'], + 'access_type': 'fakeacctype', + 'access_to': 'fakeaccto', + } + with mock.patch.object(db_driver, 'share_access_create', + mock.Mock(return_value='fakeacc')): + self.share_rpcapi.allow_access(self.context, share, 'fakeacc') + access = self.api.allow_access(self.context, share, 'fakeacctype', + 'fakeaccto') + self.assertEqual(access, 'fakeacc') + db_driver.share_access_create.assert_called_once_with( + self.context, values) + share_api.policy.check_policy.assert_called_once_with( + self.context, 'share', 'allow_access') def test_allow_access_status_not_available(self): share = fake_share('fakeid', status='error') - self.mox.ReplayAll() self.assertRaises(exception.InvalidShare, self.api.allow_access, self.context, share, 'fakeacctype', 'fakeaccto') def test_allow_access_no_host(self): share = fake_share('fakeid', host=None) - self.mox.ReplayAll() self.assertRaises(exception.InvalidShare, self.api.allow_access, self.context, share, 'fakeacctype', 'fakeaccto') + @mock.patch.object(db_driver, 'share_access_delete', mock.Mock()) def test_deny_access_error(self): share = fake_share('fakeid', status='available') access = fake_access('fakaccid', state='fakeerror') - self.mox.StubOutWithMock(share_api.policy, 'check_policy') - share_api.policy.check_policy(self.context, 'share', 'deny_access') - self.mox.StubOutWithMock(db_driver, 'share_access_delete') - db_driver.share_access_delete(self.context, access['id']) - self.mox.ReplayAll() self.api.deny_access(self.context, share, access) + share_api.policy.check_policy.assert_called_once_with( + self.context, 'share', 'deny_access') + db_driver.share_access_delete.assert_called_once_with( + self.context, access['id']) + @mock.patch.object(db_driver, 'share_access_update', mock.Mock()) def test_deny_access_active(self): share = fake_share('fakeid', status='available') access = fake_access('fakaccid', state='fakeactive') - self.mox.StubOutWithMock(share_api.policy, 'check_policy') - share_api.policy.check_policy(self.context, 'share', 'deny_access') - self.mox.StubOutWithMock(db_driver, 'share_access_update') - db_driver.share_access_update(self.context, access['id'], - {'state': 'fakedeleting'}) - self.share_rpcapi.deny_access(self.context, share, access) - self.mox.ReplayAll() self.api.deny_access(self.context, share, access) + db_driver.share_access_update.assert_called_once_with( + self.context, access['id'], {'state': 'fakedeleting'}) + share_api.policy.check_policy.assert_called_once_with( + self.context, 'share', 'deny_access') + self.share_rpcapi.deny_access.assert_called_once_with( + self.context, share, access) def test_deny_access_not_active_not_error(self): share = fake_share('fakeid', status='available') access = fake_access('fakaccid', state='fakenew') - self.mox.StubOutWithMock(share_api.policy, 'check_policy') - share_api.policy.check_policy(self.context, 'share', 'deny_access') - self.mox.ReplayAll() self.assertRaises(exception.InvalidShareAccess, self.api.deny_access, self.context, share, access) + share_api.policy.check_policy.assert_called_once_with( + self.context, 'share', 'deny_access') def test_deny_access_status_not_available(self): share = fake_share('fakeid', status='error') - self.mox.StubOutWithMock(share_api.policy, 'check_policy') - share_api.policy.check_policy(self.context, 'share', 'deny_access') - self.mox.ReplayAll() self.assertRaises(exception.InvalidShare, self.api.deny_access, self.context, share, 'fakeacc') + share_api.policy.check_policy.assert_called_once_with( + self.context, 'share', 'deny_access') def test_deny_access_no_host(self): share = fake_share('fakeid', host=None) - self.mox.StubOutWithMock(share_api.policy, 'check_policy') - share_api.policy.check_policy(self.context, 'share', 'deny_access') - self.mox.ReplayAll() self.assertRaises(exception.InvalidShare, self.api.deny_access, self.context, share, 'fakeacc') + share_api.policy.check_policy.assert_called_once_with( + self.context, 'share', 'deny_access') def test_access_get(self): - self.mox.StubOutWithMock(share_api.policy, 'check_policy') - share_api.policy.check_policy(self.context, 'share', 'access_get') - self.mox.StubOutWithMock(db_driver, 'share_access_get') - db_driver.share_access_get(self.context, 'fakeid').AndReturn('fake') - self.mox.ReplayAll() - rule = self.api.access_get(self.context, 'fakeid') - self.assertEqual(rule, 'fake') + with mock.patch.object(db_driver, 'share_access_get', + mock.Mock(return_value='fake')): + rule = self.api.access_get(self.context, 'fakeid') + self.assertEqual(rule, 'fake') + share_api.policy.check_policy.assert_called_once_with( + self.context, 'share', 'access_get') + db_driver.share_access_get.assert_called_once_with( + self.context, 'fakeid') def test_access_get_all(self): share = fake_share('fakeid') - self.mox.StubOutWithMock(share_api.policy, 'check_policy') - share_api.policy.check_policy(self.context, 'share', 'access_get_all') - self.mox.StubOutWithMock(db_driver, 'share_access_get_all_for_share') - db_driver.share_access_get_all_for_share(self.context, 'fakeid').\ - AndReturn([fake_access('fakeacc0id', state='fakenew'), - fake_access('fakeacc1id', state='fakeerror')]) - self.mox.ReplayAll() - rules = self.api.access_get_all(self.context, share) - self.assertEqual(rules, [{'id': 'fakeacc0id', - 'access_type': 'fakeacctype', - 'access_to': 'fakeaccto', - 'state': 'fakenew'}, - {'id': 'fakeacc1id', - 'access_type': 'fakeacctype', - 'access_to': 'fakeaccto', - 'state': 'fakeerror'}]) + rules = [ + fake_access('fakeacc0id', state='fakenew'), + fake_access('fakeacc1id', state='fakeerror'), + ] + expected = [ + { + 'id': 'fakeacc0id', + 'access_type': 'fakeacctype', + 'access_to': 'fakeaccto', + 'state': 'fakenew', + }, + { + 'id': 'fakeacc1id', + 'access_type': 'fakeacctype', + 'access_to': 'fakeaccto', + 'state': 'fakeerror', + }, + ] + with mock.patch.object(db_driver, 'share_access_get_all_for_share', + mock.Mock(return_value=rules)): + actual = self.api.access_get_all(self.context, share) + self.assertEqual(actual, expected) + share_api.policy.check_policy.assert_called_once_with( + self.context, 'share', 'access_get_all') + db_driver.share_access_get_all_for_share.assert_called_once_with( + self.context, 'fakeid') def test_share_metadata_get(self): metadata = {'a': 'b', 'c': 'd'} share_id = str(uuid.uuid4()) - db_driver.share_create(self.context, {'id': share_id, - 'metadata': metadata}) - + db_driver.share_create(self.context, + {'id': share_id, 'metadata': metadata}) self.assertEqual(metadata, db_driver.share_metadata_get(self.context, share_id)) @@ -559,13 +561,11 @@ class ShareAPITestCase(test.TestCase): metadata1 = {'a': '1', 'c': '2'} metadata2 = {'a': '3', 'd': '5'} should_be = {'a': '3', 'c': '2', 'd': '5'} - share_id = str(uuid.uuid4()) - db_driver.share_create(self.context, {'id': share_id, - 'metadata': metadata1}) + db_driver.share_create(self.context, + {'id': share_id, 'metadata': metadata1}) db_driver.share_metadata_update(self.context, share_id, metadata2, False) - self.assertEqual(should_be, db_driver.share_metadata_get(self.context, share_id)) @@ -573,12 +573,10 @@ class ShareAPITestCase(test.TestCase): metadata1 = {'a': '1', 'c': '2'} metadata2 = {'a': '3', 'd': '4'} should_be = metadata2 - share_id = str(uuid.uuid4()) - db_driver.share_create(self.context, {'id': share_id, - 'metadata': metadata1}) + db_driver.share_create(self.context, + {'id': share_id, 'metadata': metadata1}) db_driver.share_metadata_update(self.context, share_id, metadata2, True) - self.assertEqual(should_be, db_driver.share_metadata_get(self.context, share_id)) diff --git a/manila/tests/test_share_lvm.py b/manila/tests/test_share_lvm.py index 24f07b7412..4a530b5552 100644 --- a/manila/tests/test_share_lvm.py +++ b/manila/tests/test_share_lvm.py @@ -1,4 +1,3 @@ -# vim: tabstop=4 shiftwidth=4 softtabstop=4 # Copyright 2012 NetApp # All Rights Reserved. # @@ -15,13 +14,14 @@ # under the License. """Unit tests for the NFS driver module.""" -import mock import os +import mock +from oslo.config import cfg + from manila import context from manila.db.sqlalchemy import models from manila import exception - from manila.openstack.common import importutils from manila.openstack.common import log as logging from manila.share.configuration import Configuration @@ -29,7 +29,6 @@ from manila.share.drivers import lvm from manila import test from manila.tests.db import fakes as db_fakes from manila.tests import fake_utils -from oslo.config import cfg CONF = cfg.CONF @@ -313,14 +312,16 @@ class LVMShareDriverTestCase(test.TestCase): self.assertEqual(fake_utils.fake_execute_get_log(), expected_exec) def test_ensure_share(self): - mount_path = self._get_mount_path(self.share) - self.mox.StubOutWithMock(self._driver, '_mount_device') - self._driver._mount_device(self.share, '/dev/mapper/fakevg-fakename').\ - AndReturn(mount_path) - self._helper_nfs.create_export(mount_path, self.share['name'], - recreate=True).AndReturn('fakelocation') - self.mox.ReplayAll() - self._driver.ensure_share(self._context, self.share) + device_name = '/dev/mapper/fakevg-fakename' + location = 'fake_location' + with mock.patch.object(self._driver, + '_mount_device', + mock.Mock(return_value=location)): + self._driver.ensure_share(self._context, self.share) + self._driver._mount_device.assert_called_with(self.share, + device_name) + self._helper_nfs.create_export.assert_called_once_with( + location, self.share['name'], recreate=True) def test_delete_share(self): mount_path = self._get_mount_path(self.share) diff --git a/manila/tests/test_utils.py b/manila/tests/test_utils.py index 30683ea0c2..c1750c99be 100644 --- a/manila/tests/test_utils.py +++ b/manila/tests/test_utils.py @@ -1,6 +1,6 @@ -# vim: tabstop=4 shiftwidth=4 softtabstop=4 - # Copyright 2011 Justin Santa Barbara +# Copyright 2014 NetApp, Inc. +# Copyright 2014 Mirantis, Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain @@ -24,7 +24,6 @@ import tempfile import uuid import mock -import mox from oslo.config import cfg import paramiko @@ -307,43 +306,38 @@ class GenericUtilsTestCase(test.TestCase): self.assertEqual(generated_url, actual_url) def test_read_cached_file(self): - self.mox.StubOutWithMock(os.path, "getmtime") - os.path.getmtime(mox.IgnoreArg()).AndReturn(1) - self.mox.ReplayAll() - cache_data = {"data": 1123, "mtime": 1} - data = utils.read_cached_file("/this/is/a/fake", cache_data) - self.assertEqual(cache_data["data"], data) + with mock.patch.object(os.path, "getmtime", mock.Mock(return_value=1)): + data = utils.read_cached_file("/this/is/a/fake", cache_data) + self.assertEqual(cache_data["data"], data) + os.path.getmtime.assert_called_once_with("/this/is/a/fake") def test_read_modified_cached_file(self): - self.mox.StubOutWithMock(os.path, "getmtime") - self.mox.StubOutWithMock(__builtin__, 'open') - os.path.getmtime(mox.IgnoreArg()).AndReturn(2) + with mock.patch.object(os.path, "getmtime", mock.Mock(return_value=2)): + fake_contents = "lorem ipsum" + fake_file = mock.Mock() + fake_file.read = mock.Mock(return_value=fake_contents) + fake_context_manager = mock.Mock() + fake_context_manager.__enter__ = mock.Mock(return_value=fake_file) + fake_context_manager.__exit__ = mock.Mock() + with mock.patch.object(__builtin__, 'open', + mock.Mock(return_value=fake_context_manager)): + cache_data = {"data": 1123, "mtime": 1} + self.reload_called = False - fake_contents = "lorem ipsum" - fake_file = self.mox.CreateMockAnything() - fake_file.read().AndReturn(fake_contents) - fake_context_manager = self.mox.CreateMockAnything() - fake_context_manager.__enter__().AndReturn(fake_file) - fake_context_manager.__exit__(mox.IgnoreArg(), - mox.IgnoreArg(), - mox.IgnoreArg()) + def test_reload(reloaded_data): + self.assertEqual(reloaded_data, fake_contents) + self.reload_called = True - __builtin__.open(mox.IgnoreArg()).AndReturn(fake_context_manager) - - self.mox.ReplayAll() - cache_data = {"data": 1123, "mtime": 1} - self.reload_called = False - - def test_reload(reloaded_data): - self.assertEqual(reloaded_data, fake_contents) - self.reload_called = True - - data = utils.read_cached_file("/this/is/a/fake", - cache_data, - reload_func=test_reload) - self.assertEqual(data, fake_contents) - self.assertTrue(self.reload_called) + data = utils.read_cached_file("/this/is/a/fake", + cache_data, + reload_func=test_reload) + self.assertEqual(data, fake_contents) + self.assertTrue(self.reload_called) + fake_file.read.assert_called_once_with() + fake_context_manager.__enter__.assert_any_call() + __builtin__.open.assert_called_once_with("/this/is/a/fake") + os.path.getmtime.assert_called_once_with("/this/is/a/fake") def test_generate_password(self): password = utils.generate_password() @@ -385,35 +379,34 @@ class GenericUtilsTestCase(test.TestCase): fts_func = datetime.datetime.fromtimestamp fake_now = 1000 down_time = 5 - self.flags(service_down_time=down_time) - self.mox.StubOutWithMock(timeutils, 'utcnow') + with mock.patch.object(timeutils, 'utcnow', + mock.Mock(return_value=fts_func(fake_now))): - # Up (equal) - timeutils.utcnow().AndReturn(fts_func(fake_now)) - service = {'updated_at': fts_func(fake_now - down_time), - 'created_at': fts_func(fake_now - down_time)} - self.mox.ReplayAll() - result = utils.service_is_up(service) - self.assertTrue(result) + # Up (equal) + service = {'updated_at': fts_func(fake_now - down_time), + 'created_at': fts_func(fake_now - down_time)} + result = utils.service_is_up(service) + self.assertTrue(result) + timeutils.utcnow.assert_called_once_with() - self.mox.ResetAll() - # Up - timeutils.utcnow().AndReturn(fts_func(fake_now)) - service = {'updated_at': fts_func(fake_now - down_time + 1), - 'created_at': fts_func(fake_now - down_time + 1)} - self.mox.ReplayAll() - result = utils.service_is_up(service) - self.assertTrue(result) + with mock.patch.object(timeutils, 'utcnow', + mock.Mock(return_value=fts_func(fake_now))): + # Up + service = {'updated_at': fts_func(fake_now - down_time + 1), + 'created_at': fts_func(fake_now - down_time + 1)} + result = utils.service_is_up(service) + self.assertTrue(result) + timeutils.utcnow.assert_called_once_with() - self.mox.ResetAll() - # Down - timeutils.utcnow().AndReturn(fts_func(fake_now)) - service = {'updated_at': fts_func(fake_now - down_time - 1), - 'created_at': fts_func(fake_now - down_time - 1)} - self.mox.ReplayAll() - result = utils.service_is_up(service) - self.assertFalse(result) + with mock.patch.object(timeutils, 'utcnow', + mock.Mock(return_value=fts_func(fake_now))): + # Down + service = {'updated_at': fts_func(fake_now - down_time - 1), + 'created_at': fts_func(fake_now - down_time - 1)} + result = utils.service_is_up(service) + self.assertFalse(result) + timeutils.utcnow.assert_called_once_with() def test_safe_parse_xml(self): @@ -690,45 +683,41 @@ class FakeTransport(object): class SSHPoolTestCase(test.TestCase): """Unit test for SSH Connection Pool.""" - def setup(self): - self.mox.StubOutWithMock(paramiko, "SSHClient") - paramiko.SSHClient().AndReturn(FakeSSHClient()) - self.mox.ReplayAll() - def test_single_ssh_connect(self): - self.setup() - sshpool = utils.SSHPool("127.0.0.1", 22, 10, "test", password="test", - min_size=1, max_size=1) - with sshpool.item() as ssh: - first_id = ssh.id + with mock.patch.object(paramiko, "SSHClient", + mock.Mock(return_value=FakeSSHClient())): + sshpool = utils.SSHPool("127.0.0.1", 22, 10, "test", + password="test", min_size=1, max_size=1) + with sshpool.item() as ssh: + first_id = ssh.id - with sshpool.item() as ssh: - second_id = ssh.id + with sshpool.item() as ssh: + second_id = ssh.id - self.assertEqual(first_id, second_id) + self.assertEqual(first_id, second_id) + paramiko.SSHClient.assert_called_once_with() def test_closed_reopend_ssh_connections(self): - self.setup() - sshpool = utils.SSHPool("127.0.0.1", 22, 10, "test", password="test", - min_size=1, max_size=2) - with sshpool.item() as ssh: - first_id = ssh.id - with sshpool.item() as ssh: - second_id = ssh.id - # Close the connection and test for a new connection - ssh.get_transport().active = False + with mock.patch.object(paramiko, "SSHClient", + mock.Mock(return_value=FakeSSHClient())): + sshpool = utils.SSHPool("127.0.0.1", 22, 10, "test", + password="test", min_size=1, max_size=2) + with sshpool.item() as ssh: + first_id = ssh.id + with sshpool.item() as ssh: + second_id = ssh.id + # Close the connection and test for a new connection + ssh.get_transport().active = False + self.assertEqual(first_id, second_id) + paramiko.SSHClient.assert_called_once_with() - self.assertEqual(first_id, second_id) - - # The mox items are not getting setup in a new pool connection, - # so had to reset and set again. - self.mox.UnsetStubs() - self.setup() - - with sshpool.item() as ssh: - third_id = ssh.id - - self.assertNotEqual(first_id, third_id) + # Expected new ssh pool + with mock.patch.object(paramiko, "SSHClient", + mock.Mock(return_value=FakeSSHClient())): + with sshpool.item() as ssh: + third_id = ssh.id + self.assertNotEqual(first_id, third_id) + paramiko.SSHClient.assert_called_once_with() class CidrToNetmaskTestCase(test.TestCase): diff --git a/manila/utils.py b/manila/utils.py index 685aa2abb0..b891fc9e33 100644 --- a/manila/utils.py +++ b/manila/utils.py @@ -1221,3 +1221,11 @@ def cidr_to_netmask(cidr): return str(network.netmask) except netaddr.AddrFormatError: raise exception.InvalidInput(_("Invalid cidr supplied %s") % cidr) + + +class IsAMatcher(object): + def __init__(self, expected_value=None): + self.expected_value = expected_value + + def __eq__(self, actual_value): + return isinstance(actual_value, self.expected_value)