Shared filesystem management project for OpenStack.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

4235 lines
181 KiB

# Copyright 2012 NetApp. All rights reserved.
# Copyright (c) 2015 Tom Barron. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Unit tests for the Share API module."""
import copy
import datetime
import ddt
import mock
from oslo_config import cfg
from oslo_utils import timeutils
from oslo_utils import uuidutils
from manila.common import constants
from manila import context
from manila.data import rpcapi as data_rpc
from manila import db as db_api
from manila.db.sqlalchemy import models
from manila import exception
from manila import policy
from manila import quota
from manila import share
from manila.share import api as share_api
from manila.share import share_types
from manila import test
from manila.tests import db_utils
from manila.tests import fake_share as fakes
from manila.tests import utils as test_utils
from manila import utils
CONF = cfg.CONF
_FAKE_LIST_OF_ALL_SHARES = [
{
'name': 'foo',
'description': 'ds',
'status': constants.STATUS_AVAILABLE,
'project_id': 'fake_pid_1',
'share_server_id': 'fake_server_1',
},
{
'name': 'bar',
'status': constants.STATUS_ERROR,
'project_id': 'fake_pid_2',
'share_server_id': 'fake_server_2',
},
{
'name': 'foo1',
'description': 'ds1',
'status': constants.STATUS_AVAILABLE,
'project_id': 'fake_pid_2',
'share_server_id': 'fake_server_3',
},
{
'name': 'bar',
'status': constants.STATUS_ERROR,
'project_id': 'fake_pid_2',
'share_server_id': 'fake_server_3',
},
]
_FAKE_LIST_OF_ALL_SNAPSHOTS = [
{
'name': 'foo',
'status': constants.STATUS_AVAILABLE,
'project_id': 'fake_pid_1',
'share_id': 'fake_server_1',
},
{
'name': 'bar',
'status': constants.STATUS_ERROR,
'project_id': 'fake_pid_2',
'share_id': 'fake_server_2',
},
{
'name': 'foo',
'status': constants.STATUS_AVAILABLE,
'project_id': 'fake_pid_2',
'share_id': 'fake_share_id_3',
},
{
'name': 'bar',
'status': constants.STATUS_ERROR,
'project_id': 'fake_pid_2',
'share_id': 'fake_share_id_3',
},
]
@ddt.ddt
class ShareAPITestCase(test.TestCase):
def setUp(self):
super(ShareAPITestCase, self).setUp()
self.context = context.get_admin_context()
self.scheduler_rpcapi = mock.Mock()
self.share_rpcapi = mock.Mock()
self.api = share.API()
self.mock_object(self.api, 'scheduler_rpcapi', self.scheduler_rpcapi)
self.mock_object(self.api, 'share_rpcapi', self.share_rpcapi)
self.mock_object(quota.QUOTAS, 'reserve',
lambda *args, **kwargs: None)
self.dt_utc = datetime.datetime.utcnow()
self.mock_object(timeutils, 'utcnow',
mock.Mock(return_value=self.dt_utc))
self.mock_object(share_api.policy, 'check_policy')
def _setup_create_mocks(self, protocol='nfs', **kwargs):
share = db_utils.create_share(
user_id=self.context.user_id,
project_id=self.context.project_id,
share_type_id=kwargs.pop('share_type_id', 'fake'),
**kwargs
)
share_data = {
'share_proto': protocol,
'size': 1,
'display_name': 'fakename',
'display_description': 'fakedesc',
'availability_zone': 'fakeaz'
}
self.mock_object(db_api, 'share_create', mock.Mock(return_value=share))
self.mock_object(self.api, 'create_instance')
return share, share_data
def _setup_create_instance_mocks(self):
host = 'fake'
share_type_id = "fake_share_type"
share = db_utils.create_share(
user_id=self.context.user_id,
project_id=self.context.project_id,
create_share_instance=False,
)
share_instance = db_utils.create_share_instance(
share_id=share['id'],
share_type_id=share_type_id)
share_type = {'fake': 'fake'}
self.mock_object(db_api, 'share_instance_create',
mock.Mock(return_value=share_instance))
self.mock_object(db_api, 'share_type_get',
mock.Mock(return_value=share_type))
az_mock = mock.Mock()
type(az_mock.return_value).id = mock.PropertyMock(
return_value='fake_id')
self.mock_object(db_api, 'availability_zone_get', az_mock)
self.mock_object(self.api.share_rpcapi, 'create_share_instance')
self.mock_object(self.api.scheduler_rpcapi, 'create_share_instance')
return host, share, share_instance
def _setup_create_from_snapshot_mocks(self, use_scheduler=True, host=None):
CONF.set_default("use_scheduler_creating_share_from_snapshot",
use_scheduler)
share_type = fakes.fake_share_type()
original_share = db_utils.create_share(
user_id=self.context.user_id,
project_id=self.context.project_id,
status=constants.STATUS_AVAILABLE,
host=host if host else 'fake',
size=1,
share_type_id=share_type['id'],
)
snapshot = db_utils.create_snapshot(
share_id=original_share['id'],
status=constants.STATUS_AVAILABLE,
size=1
)
share, share_data = self._setup_create_mocks(
snapshot_id=snapshot['id'], share_type_id=share_type['id'])
request_spec = {
'share_properties': share.to_dict(),
'share_proto': share['share_proto'],
'share_id': share['id'],
'share_type': None,
'snapshot_id': share['snapshot_id'],
}
self.mock_object(quota.QUOTAS, 'reserve',
mock.Mock(return_value='reservation'))
self.mock_object(quota.QUOTAS, 'commit')
self.mock_object(
share_types, 'get_share_type', mock.Mock(return_value=share_type))
return snapshot, share, share_data, request_spec
def _setup_delete_mocks(self, status, snapshots=None, **kwargs):
if snapshots is None:
snapshots = []
share = db_utils.create_share(status=status, **kwargs)
self.mock_object(db_api, 'share_delete')
self.mock_object(db_api, 'share_server_update')
self.mock_object(db_api, 'share_snapshot_get_all_for_share',
mock.Mock(return_value=snapshots))
self.mock_object(self.api, 'delete_instance')
return share
def _setup_delete_share_instance_mocks(self, **kwargs):
share = db_utils.create_share(**kwargs)
self.mock_object(db_api, 'share_instance_update',
mock.Mock(return_value=share.instance))
self.mock_object(self.api.share_rpcapi, 'delete_share_instance')
self.mock_object(db_api, 'share_server_update')
return share.instance
def test_get_all_admin_no_filters(self):
self.mock_object(db_api, 'share_get_all_by_project',
mock.Mock(return_value=_FAKE_LIST_OF_ALL_SHARES[0]))
ctx = context.RequestContext('fake_uid', 'fake_pid_1', is_admin=True)
shares = self.api.get_all(ctx)
share_api.policy.check_policy.assert_called_once_with(
ctx, 'share', 'get_all')
db_api.share_get_all_by_project.assert_called_once_with(
ctx, sort_dir='desc', sort_key='created_at',
project_id='fake_pid_1', filters={}, is_public=False
)
self.assertEqual(_FAKE_LIST_OF_ALL_SHARES[0], shares)
def test_get_all_admin_filter_by_all_tenants(self):
ctx = context.RequestContext('fake_uid', 'fake_pid_1', is_admin=True)
self.mock_object(db_api, 'share_get_all',
mock.Mock(return_value=_FAKE_LIST_OF_ALL_SHARES))
shares = self.api.get_all(ctx, {'all_tenants': 1})
share_api.policy.check_policy.assert_called_once_with(
ctx, 'share', 'get_all')
db_api.share_get_all.assert_called_once_with(
ctx, sort_dir='desc', sort_key='created_at', filters={})
self.assertEqual(_FAKE_LIST_OF_ALL_SHARES, shares)
def test_get_all_admin_filter_by_all_tenants_with_blank(self):
ctx = context.RequestContext('fake_uid', 'fake_pid_1', is_admin=True)
self.mock_object(db_api, 'share_get_all',
mock.Mock(return_value=_FAKE_LIST_OF_ALL_SHARES))
shares = self.api.get_all(ctx, {'all_tenants': ''})
share_api.policy.check_policy.assert_called_once_with(
ctx, 'share', 'get_all')
db_api.share_get_all.assert_called_once_with(
ctx, sort_dir='desc', sort_key='created_at', filters={})
self.assertEqual(_FAKE_LIST_OF_ALL_SHARES, shares)
def test_get_all_admin_filter_by_all_tenants_with_false(self):
ctx = context.RequestContext('fake_uid', 'fake_pid_1', is_admin=True)
self.mock_object(db_api, 'share_get_all_by_project',
mock.Mock(return_value=_FAKE_LIST_OF_ALL_SHARES[0]))
shares = self.api.get_all(ctx, {'all_tenants': 'false'})
share_api.policy.check_policy.assert_called_once_with(
ctx, 'share', 'get_all')
db_api.share_get_all_by_project.assert_called_once_with(
ctx, sort_dir='desc', sort_key='created_at',
project_id='fake_pid_1', filters={}, is_public=False
)
self.assertEqual(_FAKE_LIST_OF_ALL_SHARES[0], shares)
def test_get_all_admin_filter_by_all_tenants_with_invaild_value(self):
ctx = context.RequestContext('fake_uid', 'fake_pid_1', is_admin=True)
self.mock_object(db_api, 'share_get_all')
self.assertRaises(
exception.InvalidInput,
self.api.get_all, ctx, {'all_tenants': 'wonk'})
@ddt.data(
({'share_server_id': 'fake_share_server'}, 'list_by_share_server_id'),
({'host': 'fake_host'}, 'list_by_host'),
)
@ddt.unpack
def test_get_all_by_non_admin_using_admin_filter(self, filters, policy):
def fake_policy_checker(*args, **kwargs):
if policy == args[2] and not args[0].is_admin:
raise exception.NotAuthorized
ctx = context.RequestContext('fake_uid', 'fake_pid_1', is_admin=False)
self.mock_object(
share_api.policy, 'check_policy',
mock.Mock(side_effect=fake_policy_checker))
self.assertRaises(
exception.NotAuthorized,
self.api.get_all, ctx, filters)
share_api.policy.check_policy.assert_has_calls([
mock.call(ctx, 'share', 'get_all'),
mock.call(ctx, 'share', policy),
])
def test_get_all_admin_filter_by_share_server_and_all_tenants(self):
# NOTE(vponomaryov): if share_server_id provided, 'all_tenants' opt
# should not make any influence.
ctx = context.RequestContext('fake_uid', 'fake_pid_1', is_admin=True)
self.mock_object(db_api, 'share_get_all_by_share_server',
mock.Mock(return_value=_FAKE_LIST_OF_ALL_SHARES[2:]))
self.mock_object(db_api, 'share_get_all')
self.mock_object(db_api, 'share_get_all_by_project')
shares = self.api.get_all(
ctx, {'share_server_id': 'fake_server_3', 'all_tenants': 1})
share_api.policy.check_policy.assert_has_calls([
mock.call(ctx, 'share', 'get_all'),
mock.call(ctx, 'share', 'list_by_share_server_id'),
])
db_api.share_get_all_by_share_server.assert_called_once_with(
ctx, 'fake_server_3', sort_dir='desc', sort_key='created_at',
filters={},
)
db_api.share_get_all_by_project.assert_has_calls([])
db_api.share_get_all.assert_has_calls([])
self.assertEqual(_FAKE_LIST_OF_ALL_SHARES[2:], shares)
def test_get_all_admin_filter_by_name(self):
ctx = context.RequestContext('fake_uid', 'fake_pid_2', is_admin=True)
self.mock_object(db_api, 'share_get_all_by_project',
mock.Mock(return_value=_FAKE_LIST_OF_ALL_SHARES[1:]))
shares = self.api.get_all(ctx, {'name': 'bar'})
share_api.policy.check_policy.assert_has_calls([
mock.call(ctx, 'share', 'get_all'),
])
db_api.share_get_all_by_project.assert_called_once_with(
ctx, sort_dir='desc', sort_key='created_at',
project_id='fake_pid_2', filters={}, is_public=False
)
self.assertEqual(_FAKE_LIST_OF_ALL_SHARES[1::2], shares)
@ddt.data(({'name': 'fo'}, 0), ({'description': 'd'}, 0),
({'name': 'foo', 'description': 'd'}, 0),
({'name': 'foo'}, 1), ({'description': 'ds'}, 1),
({'name~': 'foo', 'description~': 'ds'}, 2),
({'name': 'foo', 'description~': 'ds'}, 1),
({'name~': 'foo', 'description': 'ds'}, 1))
@ddt.unpack
def test_get_all_admin_filter_by_name_and_description(
self, search_opts, get_share_number):
ctx = context.RequestContext('fake_uid', 'fake_pid_2', is_admin=True)
self.mock_object(db_api, 'share_get_all_by_project',
mock.Mock(return_value=_FAKE_LIST_OF_ALL_SHARES))
shares = self.api.get_all(ctx, search_opts)
share_api.policy.check_policy.assert_has_calls([
mock.call(ctx, 'share', 'get_all'),
])
db_api.share_get_all_by_project.assert_called_once_with(
ctx, sort_dir='desc', sort_key='created_at',
project_id='fake_pid_2',
filters={}, is_public=False
)
self.assertEqual(get_share_number, len(shares))
if get_share_number == 2:
self.assertEqual(_FAKE_LIST_OF_ALL_SHARES[0::2], shares)
elif get_share_number == 1:
self.assertEqual(_FAKE_LIST_OF_ALL_SHARES[:1], shares)
@ddt.data('id', 'path')
def test_get_all_admin_filter_by_export_location(self, type):
ctx = context.RequestContext('fake_uid', 'fake_pid_2', is_admin=True)
self.mock_object(db_api, 'share_get_all_by_project',
mock.Mock(return_value=_FAKE_LIST_OF_ALL_SHARES[1:]))
shares = self.api.get_all(ctx, {'export_location_' + type: 'test'})
share_api.policy.check_policy.assert_has_calls([
mock.call(ctx, 'share', 'get_all'),
])
db_api.share_get_all_by_project.assert_called_once_with(
ctx, sort_dir='desc', sort_key='created_at',
project_id='fake_pid_2',
filters={'export_location_' + type: 'test'}, is_public=False
)
self.assertEqual(_FAKE_LIST_OF_ALL_SHARES[1:], shares)
def test_get_all_admin_filter_by_name_and_all_tenants(self):
ctx = context.RequestContext('fake_uid', 'fake_pid_2', is_admin=True)
self.mock_object(db_api, 'share_get_all',
mock.Mock(return_value=_FAKE_LIST_OF_ALL_SHARES))
shares = self.api.get_all(ctx, {'name': 'foo', 'all_tenants': 1})
share_api.policy.check_policy.assert_has_calls([
mock.call(ctx, 'share', 'get_all'),
])
db_api.share_get_all.assert_called_once_with(
ctx, sort_dir='desc', sort_key='created_at', filters={})
self.assertEqual(_FAKE_LIST_OF_ALL_SHARES[:1], shares)
def test_get_all_admin_filter_by_status(self):
ctx = context.RequestContext('fake_uid', 'fake_pid_2', is_admin=True)
self.mock_object(db_api, 'share_get_all_by_project',
mock.Mock(return_value=_FAKE_LIST_OF_ALL_SHARES[1:]))
shares = self.api.get_all(ctx, {'status': constants.STATUS_AVAILABLE})
share_api.policy.check_policy.assert_has_calls([
mock.call(ctx, 'share', 'get_all'),
])
db_api.share_get_all_by_project.assert_called_once_with(
ctx, sort_dir='desc', sort_key='created_at',
project_id='fake_pid_2', filters={}, is_public=False
)
self.assertEqual(_FAKE_LIST_OF_ALL_SHARES[2::4], shares)
def test_get_all_admin_filter_by_status_and_all_tenants(self):
ctx = context.RequestContext('fake_uid', 'fake_pid_2', is_admin=True)
self.mock_object(db_api, 'share_get_all',
mock.Mock(return_value=_FAKE_LIST_OF_ALL_SHARES))
shares = self.api.get_all(
ctx, {'status': constants.STATUS_ERROR, 'all_tenants': 1})
share_api.policy.check_policy.assert_has_calls([
mock.call(ctx, 'share', 'get_all'),
])
db_api.share_get_all.assert_called_once_with(
ctx, sort_dir='desc', sort_key='created_at', filters={})
self.assertEqual(_FAKE_LIST_OF_ALL_SHARES[1::2], shares)
def test_get_all_non_admin_filter_by_all_tenants(self):
# Expected share list only by project of non-admin user
ctx = context.RequestContext('fake_uid', 'fake_pid_2', is_admin=False)
self.mock_object(db_api, 'share_get_all_by_project',
mock.Mock(return_value=_FAKE_LIST_OF_ALL_SHARES[1:]))
shares = self.api.get_all(ctx, {'all_tenants': 1})
share_api.policy.check_policy.assert_has_calls([
mock.call(ctx, 'share', 'get_all'),
])
db_api.share_get_all_by_project.assert_called_once_with(
ctx, sort_dir='desc', sort_key='created_at',
project_id='fake_pid_2', filters={}, is_public=False
)
self.assertEqual(_FAKE_LIST_OF_ALL_SHARES[1:], shares)
def test_get_all_non_admin_with_name_and_status_filters(self):
ctx = context.RequestContext('fake_uid', 'fake_pid_2', is_admin=False)
self.mock_object(db_api, 'share_get_all_by_project',
mock.Mock(return_value=_FAKE_LIST_OF_ALL_SHARES[1:]))
shares = self.api.get_all(
ctx, {'name': 'bar', 'status': constants.STATUS_ERROR})
share_api.policy.check_policy.assert_has_calls([
mock.call(ctx, 'share', 'get_all'),
])
db_api.share_get_all_by_project.assert_called_once_with(
ctx, sort_dir='desc', sort_key='created_at',
project_id='fake_pid_2', filters={}, is_public=False
)
# two items expected, one filtered
self.assertEqual(_FAKE_LIST_OF_ALL_SHARES[1::2], shares)
# one item expected, two filtered
shares = self.api.get_all(
ctx, {'name': 'foo1', 'status': constants.STATUS_AVAILABLE})
self.assertEqual(_FAKE_LIST_OF_ALL_SHARES[2::4], shares)
share_api.policy.check_policy.assert_has_calls([
mock.call(ctx, 'share', 'get_all'),
mock.call(ctx, 'share', 'get_all'),
])
db_api.share_get_all_by_project.assert_has_calls([
mock.call(ctx, sort_dir='desc', sort_key='created_at',
project_id='fake_pid_2', filters={}, is_public=False),
mock.call(ctx, sort_dir='desc', sort_key='created_at',
project_id='fake_pid_2', filters={}, is_public=False),
])
@ddt.data('True', 'true', '1', 'yes', 'y', 'on', 't', True)
def test_get_all_non_admin_public(self, is_public):
ctx = context.RequestContext('fake_uid', 'fake_pid_2',
is_admin=False)
self.mock_object(db_api, 'share_get_all_by_project', mock.Mock(
return_value=_FAKE_LIST_OF_ALL_SHARES[1:]))
shares = self.api.get_all(ctx, {'is_public': is_public})
share_api.policy.check_policy.assert_has_calls([
mock.call(ctx, 'share', 'get_all'),
])
db_api.share_get_all_by_project.assert_called_once_with(
ctx, sort_dir='desc', sort_key='created_at',
project_id='fake_pid_2', filters={}, is_public=True
)
self.assertEqual(_FAKE_LIST_OF_ALL_SHARES[1:], shares)
@ddt.data('False', 'false', '0', 'no', 'n', 'off', 'f', False)
def test_get_all_non_admin_not_public(self, is_public):
ctx = context.RequestContext('fake_uid', 'fake_pid_2',
is_admin=False)
self.mock_object(db_api, 'share_get_all_by_project', mock.Mock(
return_value=_FAKE_LIST_OF_ALL_SHARES[1:]))
shares = self.api.get_all(ctx, {'is_public': is_public})
share_api.policy.check_policy.assert_has_calls([
mock.call(ctx, 'share', 'get_all'),
])
db_api.share_get_all_by_project.assert_called_once_with(
ctx, sort_dir='desc', sort_key='created_at',
project_id='fake_pid_2', filters={}, is_public=False
)
self.assertEqual(_FAKE_LIST_OF_ALL_SHARES[1:], shares)
@ddt.data('truefoo', 'bartrue')
def test_get_all_invalid_public_value(self, is_public):
ctx = context.RequestContext('fake_uid', 'fake_pid_2',
is_admin=False)
self.assertRaises(ValueError, self.api.get_all,
ctx, {'is_public': is_public})
share_api.policy.check_policy.assert_has_calls([
mock.call(ctx, 'share', 'get_all'),
])
def test_get_all_with_sorting_valid(self):
self.mock_object(db_api, 'share_get_all_by_project',
mock.Mock(return_value=_FAKE_LIST_OF_ALL_SHARES[0]))
ctx = context.RequestContext('fake_uid', 'fake_pid_1', is_admin=False)
shares = self.api.get_all(ctx, sort_key='status', sort_dir='asc')
share_api.policy.check_policy.assert_called_once_with(
ctx, 'share', 'get_all')
db_api.share_get_all_by_project.assert_called_once_with(
ctx, sort_dir='asc', sort_key='status',
project_id='fake_pid_1', filters={}, is_public=False
)
self.assertEqual(_FAKE_LIST_OF_ALL_SHARES[0], shares)
def test_get_all_sort_key_invalid(self):
self.mock_object(db_api, 'share_get_all_by_project',
mock.Mock(return_value=_FAKE_LIST_OF_ALL_SHARES[0]))
ctx = context.RequestContext('fake_uid', 'fake_pid_1', is_admin=False)
self.assertRaises(
exception.InvalidInput,
self.api.get_all,
ctx,
sort_key=1,
)
share_api.policy.check_policy.assert_called_once_with(
ctx, 'share', 'get_all')
def test_get_all_sort_dir_invalid(self):
self.mock_object(db_api, 'share_get_all_by_project',
mock.Mock(return_value=_FAKE_LIST_OF_ALL_SHARES[0]))
ctx = context.RequestContext('fake_uid', 'fake_pid_1', is_admin=False)
self.assertRaises(
exception.InvalidInput,
self.api.get_all,
ctx,
sort_dir=1,
)
share_api.policy.check_policy.assert_called_once_with(
ctx, 'share', 'get_all')
def _get_all_filter_metadata_or_extra_specs_valid(self, key):
self.mock_object(db_api, 'share_get_all_by_project',
mock.Mock(return_value=_FAKE_LIST_OF_ALL_SHARES[0]))
ctx = context.RequestContext('fake_uid', 'fake_pid_1', is_admin=False)
search_opts = {key: {'foo1': 'bar1', 'foo2': 'bar2'}}
shares = self.api.get_all(ctx, search_opts=search_opts.copy())
if key == 'extra_specs':
share_api.policy.check_policy.assert_has_calls([
mock.call(ctx, 'share', 'get_all'),
mock.call(ctx, 'share_types_extra_spec', 'index'),
])
else:
share_api.policy.check_policy.assert_called_once_with(
ctx, 'share', 'get_all')
db_api.share_get_all_by_project.assert_called_once_with(
ctx, sort_dir='desc', sort_key='created_at',
project_id='fake_pid_1', filters=search_opts, is_public=False)
self.assertEqual(_FAKE_LIST_OF_ALL_SHARES[0], shares)
def test_get_all_filter_by_metadata(self):
self._get_all_filter_metadata_or_extra_specs_valid(key='metadata')
def test_get_all_filter_by_extra_specs(self):
self._get_all_filter_metadata_or_extra_specs_valid(key='extra_specs')
def _get_all_filter_metadata_or_extra_specs_invalid(self, key):
self.mock_object(db_api, 'share_get_all_by_project',
mock.Mock(return_value=_FAKE_LIST_OF_ALL_SHARES[0]))
ctx = context.RequestContext('fake_uid', 'fake_pid_1', is_admin=False)
search_opts = {key: "{'foo': 'bar'}"}
self.assertRaises(exception.InvalidInput, self.api.get_all, ctx,
search_opts=search_opts)
if key == 'extra_specs':
share_api.policy.check_policy.assert_has_calls([
mock.call(ctx, 'share', 'get_all'),
mock.call(ctx, 'share_types_extra_spec', 'index'),
])
else:
share_api.policy.check_policy.assert_called_once_with(
ctx, 'share', 'get_all')
def test_get_all_filter_by_invalid_metadata(self):
self._get_all_filter_metadata_or_extra_specs_invalid(key='metadata')
def test_get_all_filter_by_invalid_extra_specs(self):
self._get_all_filter_metadata_or_extra_specs_invalid(key='extra_specs')
@ddt.data(True, False)
def test_create_public_and_private_share(self, is_public):
share, share_data = self._setup_create_mocks(is_public=is_public)
az = share_data.pop('availability_zone')
self.api.create(
self.context,
share_data['share_proto'],
share_data['size'],
share_data['display_name'],
share_data['display_description'],
availability_zone=az
)
share['status'] = constants.STATUS_CREATING
share['host'] = None
self.assertSubDictMatch(share_data,
db_api.share_create.call_args[0][1])
@ddt.data(
{},
{
constants.ExtraSpecs.SNAPSHOT_SUPPORT: True,
},
{
constants.ExtraSpecs.SNAPSHOT_SUPPORT: False,
constants.ExtraSpecs.CREATE_SHARE_FROM_SNAPSHOT_SUPPORT: False,
},
{
constants.ExtraSpecs.SNAPSHOT_SUPPORT: True,
constants.ExtraSpecs.CREATE_SHARE_FROM_SNAPSHOT_SUPPORT: False,
},
{
constants.ExtraSpecs.SNAPSHOT_SUPPORT: True,
constants.ExtraSpecs.CREATE_SHARE_FROM_SNAPSHOT_SUPPORT: True,
}
)
def test_create_default_snapshot_semantics(self, extra_specs):
share, share_data = self._setup_create_mocks(is_public=False)
az = share_data.pop('availability_zone')
share_type = fakes.fake_share_type(extra_specs=extra_specs)
self.api.create(
self.context,
share_data['share_proto'],
share_data['size'],
share_data['display_name'],
share_data['display_description'],
availability_zone=az,
share_type=share_type
)
share['status'] = constants.STATUS_CREATING
share['host'] = None
share_data.update(extra_specs)
if extra_specs.get('snapshot_support') is None:
share_data['snapshot_support'] = False
if extra_specs.get('create_share_from_snapshot_support') is None:
share_data['create_share_from_snapshot_support'] = False
self.assertSubDictMatch(share_data,
db_api.share_create.call_args[0][1])
@ddt.data(*constants.SUPPORTED_SHARE_PROTOCOLS)
def test_create_share_valid_protocol(self, proto):
share, share_data = self._setup_create_mocks(protocol=proto)
az = share_data.pop('availability_zone')
all_protos = ','.join(
proto for proto in constants.SUPPORTED_SHARE_PROTOCOLS)
data = dict(DEFAULT=dict(enabled_share_protocols=all_protos))
with test_utils.create_temp_config_with_opts(data):
self.api.create(
self.context, proto, share_data['size'],
share_data['display_name'],
share_data['display_description'],
availability_zone=az)
share['status'] = constants.STATUS_CREATING
share['host'] = None
self.assertSubDictMatch(share_data,
db_api.share_create.call_args[0][1])
@ddt.data(
{'get_all_azs_return': [], 'subnet_by_az_side_effect': []},
{'get_all_azs_return': [{'name': 'az1', 'id': 'az_id_1'}],
'subnet_by_az_side_effect': [None]},
{'get_all_azs_return': [{'name': 'az1', 'id': 'az_id_1'}],
'subnet_by_az_side_effect': ['fake_sns_1']},
{'get_all_azs_return': [{'name': 'az1', 'id': 'az_id_1'},
{'name': 'az2', 'id': 'az_id_2'}],
'subnet_by_az_side_effect': [None, 'fake_sns_2']}
)
@ddt.unpack
def test__get_all_availability_zones_with_subnets(
self, get_all_azs_return, subnet_by_az_side_effect):
fake_share_network_id = 'fake_sn_id'
self.mock_object(db_api, 'availability_zone_get_all',
mock.Mock(return_value=get_all_azs_return))
self.mock_object(db_api,
'share_network_subnet_get_by_availability_zone_id',
mock.Mock(side_effect=subnet_by_az_side_effect))
expected_az_names = []
expected_get_az_calls = []
for index, value in enumerate(get_all_azs_return):
expected_get_az_calls.append(mock.call(
self.context, share_network_id=fake_share_network_id,
availability_zone_id=value['id']))
if subnet_by_az_side_effect[index] is not None:
expected_az_names.append(value['name'])
get_all_subnets = self.api._get_all_availability_zones_with_subnets
compatible_azs = get_all_subnets(self.context, fake_share_network_id)
db_api.availability_zone_get_all.assert_called_once_with(
self.context)
db_get_azs_with_subnet = (
db_api.share_network_subnet_get_by_availability_zone_id)
db_get_azs_with_subnet.assert_has_calls(expected_get_az_calls)
self.assertEqual(expected_az_names, compatible_azs)
@ddt.data(
{'availability_zones': None, 'azs_with_subnet': ['fake_az_1']},
{'availability_zones': ['fake_az_2'],
'azs_with_subnet': ['fake_az_2']},
{'availability_zones': ['fake_az_1', 'faze_az_2', 'fake_az_3'],
'azs_with_subnet': ['fake_az_3']}
)
@ddt.unpack
def test_create_share_with_subnets(self, availability_zones,
azs_with_subnet):
share, share_data = self._setup_create_mocks()
reservation = 'fake'
self.mock_object(quota.QUOTAS, 'reserve',
mock.Mock(return_value=reservation))
self.mock_object(self.api, '_get_all_availability_zones_with_subnets',
mock.Mock(return_value=azs_with_subnet))
self.mock_object(quota.QUOTAS, 'commit')
self.mock_object(self.api, 'create_instance')
self.mock_object(db_api, 'share_get')
fake_share_network_id = 'fake_sn_id'
if availability_zones:
expected_azs = (
[az for az in availability_zones if az in azs_with_subnet])
else:
expected_azs = azs_with_subnet
self.api.create(
self.context,
share_data['share_proto'],
share_data['size'],
share_data['display_name'],
share_data['display_description'],
share_network_id=fake_share_network_id,
availability_zones=availability_zones)
share['status'] = constants.STATUS_CREATING
share['host'] = None
quota.QUOTAS.reserve.assert_called_once()
get_all_azs_sns = self.api._get_all_availability_zones_with_subnets
get_all_azs_sns.assert_called_once_with(
self.context, fake_share_network_id)
quota.QUOTAS.commit.assert_called_once()
self.api.create_instance.assert_called_once_with(
self.context, share, share_network_id=fake_share_network_id,
host=None, availability_zone=None, share_group=None,
share_group_snapshot_member=None, share_type_id=None,
availability_zones=expected_azs
)
db_api.share_get.assert_called_once()
@ddt.data(
{'availability_zones': None, 'azs_with_subnet': []},
{'availability_zones': ['fake_az_1'],
'azs_with_subnet': ['fake_az_2']}
)
@ddt.unpack
def test_create_share_with_subnets_invalid_azs(self, availability_zones,
azs_with_subnet):
share, share_data = self._setup_create_mocks()
reservation = 'fake'
self.mock_object(quota.QUOTAS, 'reserve',
mock.Mock(return_value=reservation))
self.mock_object(self.api, '_get_all_availability_zones_with_subnets',
mock.Mock(return_value=azs_with_subnet))
self.mock_object(quota.QUOTAS, 'commit')
self.mock_object(self.api, 'create_instance')
self.mock_object(db_api, 'share_get')
fake_share_network_id = 'fake_sn_id'
self.assertRaises(
exception.InvalidInput,
self.api.create,
self.context, share_data['share_proto'], share_data['size'],
share_data['display_name'], share_data['display_description'],
share_network_id=fake_share_network_id,
availability_zones=availability_zones)
quota.QUOTAS.reserve.assert_called_once()
get_all_azs_sns = self.api._get_all_availability_zones_with_subnets
get_all_azs_sns.assert_called_once_with(
self.context, fake_share_network_id)
@ddt.data(
None, '', 'fake', 'nfsfake', 'cifsfake', 'glusterfsfake', 'hdfsfake')
def test_create_share_invalid_protocol(self, proto):
share, share_data = self._setup_create_mocks(protocol=proto)
all_protos = ','.join(
proto for proto in constants.SUPPORTED_SHARE_PROTOCOLS)
data = dict(DEFAULT=dict(enabled_share_protocols=all_protos))
with test_utils.create_temp_config_with_opts(data):
self.assertRaises(
exception.InvalidInput,
self.api.create,
self.context, proto, share_data['size'],
share_data['display_name'],
share_data['display_description'])
@ddt.data({'overs': {'gigabytes': 'fake'},
'expected_exception': exception.ShareSizeExceedsAvailableQuota},
{'overs': {'shares': 'fake'},
'expected_exception': exception.ShareLimitExceeded})
@ddt.unpack
def test_create_share_over_quota(self, overs, expected_exception):
share, share_data = self._setup_create_mocks()
usages = {'gigabytes': {'reserved': 5, 'in_use': 5},
'shares': {'reserved': 10, 'in_use': 10}}
quotas = {'gigabytes': 5, 'shares': 10}
exc = exception.OverQuota(overs=overs, usages=usages, quotas=quotas)
self.mock_object(quota.QUOTAS, 'reserve', mock.Mock(side_effect=exc))
self.assertRaises(
expected_exception,
self.api.create,
self.context,
share_data['share_proto'],
share_data['size'],
share_data['display_name'],
share_data['display_description']
)
quota.QUOTAS.reserve.assert_called_once_with(
self.context, share_type_id=None,
shares=1, gigabytes=share_data['size'])
@ddt.data(exception.QuotaError, exception.InvalidShare)
def test_create_share_error_on_quota_commit(self, expected_exception):
share, share_data = self._setup_create_mocks()
reservation = 'fake'
self.mock_object(quota.QUOTAS, 'reserve',
mock.Mock(return_value=reservation))
self.mock_object(quota.QUOTAS, 'commit',
mock.Mock(side_effect=expected_exception('fake')))
self.mock_object(quota.QUOTAS, 'rollback')
self.mock_object(db_api, 'share_delete')
self.assertRaises(
expected_exception,
self.api.create,
self.context,
share_data['share_proto'],
share_data['size'],
share_data['display_name'],
share_data['display_description']
)
quota.QUOTAS.rollback.assert_called_once_with(
self.context, reservation, share_type_id=None)
db_api.share_delete.assert_called_once_with(self.context, share['id'])
def test_create_share_instance_with_host_and_az(self):
host, share, share_instance = self._setup_create_instance_mocks()
self.api.create_instance(self.context, share, host=host,
availability_zone='fake',
share_type_id='fake_share_type')
db_api.share_instance_create.assert_called_once_with(
self.context, share['id'],
{
'share_network_id': None,
'status': constants.STATUS_CREATING,
'scheduled_at': self.dt_utc,
'host': host,
'availability_zone_id': 'fake_id',
'share_type_id': 'fake_share_type',
'cast_rules_to_readonly': False,
}
)
db_api.share_type_get.assert_called_once_with(
self.context, share_instance['share_type_id'])
self.api.share_rpcapi.create_share_instance.assert_called_once_with(
self.context,
share_instance,
host,
request_spec=mock.ANY,
filter_properties={},
snapshot_id=share['snapshot_id'],
)
self.assertFalse(
self.api.scheduler_rpcapi.create_share_instance.called)
def test_create_share_instance_without_host(self):
_, share, share_instance = self._setup_create_instance_mocks()
self.api.create_instance(self.context, share)
(self.api.scheduler_rpcapi.create_share_instance.
assert_called_once_with(
self.context, request_spec=mock.ANY, filter_properties={}))
self.assertFalse(self.api.share_rpcapi.create_share_instance.called)
def test_create_instance_share_group_snapshot_member(self):
fake_req_spec = {
'share_properties': 'fake_share_properties',
'share_instance_properties': 'fake_share_instance_properties',
}
share = fakes.fake_share()
member_info = {
'host': 'host',
'share_network_id': 'share_network_id',
'share_server_id': 'share_server_id',
}
fake_instance = fakes.fake_share_instance(
share_id=share['id'], **member_info)
sg_snap_member = {'share_instance': fake_instance}
self.mock_policy_check = self.mock_object(
policy, 'check_policy', mock.Mock(return_value=True))
mock_share_rpcapi_call = self.mock_object(self.share_rpcapi,
'create_share_instance')
mock_scheduler_rpcapi_call = self.mock_object(self.scheduler_rpcapi,
'create_share_instance')
mock_db_share_instance_update = self.mock_object(
db_api, 'share_instance_update')
self.mock_object(
share_api.API, 'create_share_instance_and_get_request_spec',
mock.Mock(return_value=(fake_req_spec, fake_instance)))
retval = self.api.create_instance(
self.context, fakes.fake_share(),
share_group_snapshot_member=sg_snap_member)
self.assertIsNone(retval)
mock_db_share_instance_update.assert_called_once_with(
self.context, fake_instance['id'], member_info)
self.assertFalse(mock_scheduler_rpcapi_call.called)
self.assertFalse(mock_share_rpcapi_call.called)
def test_get_share_attributes_from_share_type(self):
share_type = {
'extra_specs': {
'snapshot_support': True,
'create_share_from_snapshot_support': False,
'revert_to_snapshot_support': False,
'mount_snapshot_support': False,
'replication_type': 'dr',
}
}
result = self.api.get_share_attributes_from_share_type(share_type)
self.assertEqual(share_type['extra_specs'], result)
@ddt.data({}, {'extra_specs': {}}, None)
def test_get_share_attributes_from_share_type_defaults(self, share_type):
result = self.api.get_share_attributes_from_share_type(share_type)
expected = {
'snapshot_support': False,
'create_share_from_snapshot_support': False,
'revert_to_snapshot_support': False,
'mount_snapshot_support': False,
'replication_type': None,
}
self.assertEqual(expected, result)
@ddt.data({'extra_specs': {'snapshot_support': 'fake'}},
{'extra_specs': {'create_share_from_snapshot_support': 'fake'}})
def test_get_share_attributes_from_share_type_invalid(self, share_type):
self.assertRaises(exception.InvalidExtraSpec,
self.api.get_share_attributes_from_share_type,
share_type)
@ddt.data(
{'replication_type': 'dr', 'dhss': False, 'share_server_id': None},
{'replication_type': 'readable', 'dhss': False,
'share_server_id': None},
{'replication_type': None, 'dhss': False, 'share_server_id': None},
{'replication_type': None, 'dhss': True, 'share_server_id': 'fake'}
)
@ddt.unpack
def test_manage_new(self, replication_type, dhss, share_server_id):
share_data = {
'host': 'fake',
'export_location': 'fake',
'share_proto': 'fake',
'share_type_id': 'fake',
}
if dhss:
share_data['share_server_id'] = share_server_id
driver_options = {}
date = datetime.datetime(1, 1, 1, 1, 1, 1)
timeutils.utcnow.return_value = date
fake_subnet = db_utils.create_share_network_subnet(
share_network_id='fake')
share_server = db_utils.create_share_server(
status=constants.STATUS_ACTIVE, id=share_server_id,
share_network_subnet_id=fake_subnet['id'])
fake_share_data = {
'id': 'fakeid',
'status': constants.STATUS_CREATING,
}
fake_type = {
'id': 'fake_type_id',
'extra_specs': {
'snapshot_support': False,
'replication_type': replication_type,
'create_share_from_snapshot_support': False,
'revert_to_snapshot_support': False,
'mount_snapshot_support': False,
'driver_handles_share_servers': dhss,
},
}
share = db_api.share_create(self.context, fake_share_data)
self.mock_object(self.scheduler_rpcapi, 'manage_share')
self.mock_object(db_api, 'share_create',
mock.Mock(return_value=share))
self.mock_object(db_api, 'share_export_locations_update')
self.mock_object(db_api, 'share_get',
mock.Mock(return_value=share))
self.mock_object(share_types, 'get_share_type',
mock.Mock(return_value=fake_type))
self.mock_object(db_api, 'share_server_get',
mock.Mock(return_value=share_server))
self.mock_object(db_api, 'share_network_subnet_get',
mock.Mock(return_value=fake_subnet))
self.mock_object(self.api, 'get_all', mock.Mock(return_value=[]))
self.api.manage(self.context, copy.deepcopy(share_data),
driver_options)
share_data.update({
'user_id': self.context.user_id,
'project_id': self.context.project_id,
'status': constants.STATUS_MANAGING,
'scheduled_at': date,
'snapshot_support': fake_type['extra_specs']['snapshot_support'],
'create_share_from_snapshot_support':
fake_type['extra_specs']['create_share_from_snapshot_support'],
'revert_to_snapshot_support':
fake_type['extra_specs']['revert_to_snapshot_support'],
'mount_snapshot_support':
fake_type['extra_specs']['mount_snapshot_support'],
'replication_type': replication_type,
})
expected_request_spec = self._get_request_spec_dict(
share, fake_type, size=0, share_proto=share_data['share_proto'],
host=share_data['host'])
if dhss:
share_data.update({
'share_network_id': fake_subnet['share_network_id']})
export_location = share_data.pop('export_location')
self.api.get_all.assert_called_once_with(self.context, mock.ANY)
db_api.share_create.assert_called_once_with(self.context, share_data)
db_api.share_get.assert_called_once_with(self.context, share['id'])
db_api.share_export_locations_update.assert_called_once_with(
self.context, share.instance['id'], export_location
)
self.scheduler_rpcapi.manage_share.assert_called_once_with(
self.context, share['id'], driver_options, expected_request_spec)
if dhss:
db_api.share_server_get.assert_called_once_with(
self.context, share_data['share_server_id'])
db_api.share_network_subnet_get.assert_called_once_with(
self.context, share_server['share_network_subnet_id'])
@ddt.data((True, exception.InvalidInput, True),
(True, exception.InvalidInput, False),
(False, exception.InvalidInput, True),
(True, exception.InvalidInput, True))
@ddt.unpack
def test_manage_new_dhss_true_and_false(self, dhss, exception_type,
has_share_server_id):
share_data = {
'host': 'fake',
'export_location': 'fake',
'share_proto': 'fake',
'share_type_id': 'fake',
}
if has_share_server_id:
share_data['share_server_id'] = 'fake'
driver_options = {}
date = datetime.datetime(1, 1, 1, 1, 1, 1)
timeutils.utcnow.return_value = date
fake_type = {
'id': 'fake_type_id',
'extra_specs': {
'snapshot_support': False,
'create_share_from_snapshot_support': False,
'revert_to_snapshot_support': False,
'mount_snapshot_support': False,
'driver_handles_share_servers': dhss,
},
}
self.mock_object(share_types, 'get_share_type',
mock.Mock(return_value=fake_type))
self.mock_object(self.api, 'get_all', mock.Mock(return_value=[]))
self.assertRaises(exception_type,
self.api.manage,
self.context,
share_data=share_data,
driver_options=driver_options
)
share_types.get_share_type.assert_called_once_with(
self.context, share_data['share_type_id']
)
self.api.get_all.assert_called_once_with(
self.context, {
'host': share_data['host'],
'export_location': share_data['export_location'],
'share_proto': share_data['share_proto'],
'share_type_id': share_data['share_type_id']
}
)
def test_manage_new_share_server_not_found(self):
share_data = {
'host': 'fake',
'export_location': 'fake',
'share_proto': 'fake',
'share_type_id': 'fake',
'share_server_id': 'fake'
}
driver_options = {}
date = datetime.datetime(1, 1, 1, 1, 1, 1)
timeutils.utcnow.return_value = date
fake_type = {
'id': 'fake_type_id',
'extra_specs': {
'snapshot_support': False,
'replication_type': 'dr',
'create_share_from_snapshot_support': False,
'revert_to_snapshot_support': False,
'mount_snapshot_support': False,
'driver_handles_share_servers': True,
},
}
self.mock_object(share_types, 'get_share_type',
mock.Mock(return_value=fake_type))
self.mock_object(self.api, 'get_all', mock.Mock(return_value=[]))
self.assertRaises(exception.InvalidInput,
self.api.manage,
self.context,
share_data=share_data,
driver_options=driver_options
)
share_types.get_share_type.assert_called_once_with(
self.context, share_data['share_type_id']
)
self.api.get_all.assert_called_once_with(
self.context, {
'host': share_data['host'],
'export_location': share_data['export_location'],
'share_proto': share_data['share_proto'],
'share_type_id': share_data['share_type_id']
}
)
def test_manage_new_share_server_not_active(self):
share_data = {
'host': 'fake',
'export_location': 'fake',
'share_proto': 'fake',
'share_type_id': 'fake',
'share_server_id': 'fake'
}
fake_share_data = {
'id': 'fakeid',
'status': constants.STATUS_ERROR,
}
driver_options = {}
date = datetime.datetime(1, 1, 1, 1, 1, 1)
timeutils.utcnow.return_value = date
fake_type = {
'id': 'fake_type_id',
'extra_specs': {
'snapshot_support': False,
'replication_type': 'dr',
'create_share_from_snapshot_support': False,
'revert_to_snapshot_support': False,
'mount_snapshot_support': False,
'driver_handles_share_servers': True,
},
}
share = db_api.share_create(self.context, fake_share_data)
self.mock_object(share_types, 'get_share_type',
mock.Mock(return_value=fake_type))
self.mock_object(self.api, 'get_all', mock.Mock(return_value=[]))
self.mock_object(db_api, 'share_server_get',
mock.Mock(return_value=share))
self.assertRaises(exception.InvalidShareServer,
self.api.manage,
self.context,
share_data=share_data,
driver_options=driver_options
)
share_types.get_share_type.assert_called_once_with(
self.context, share_data['share_type_id']
)
self.api.get_all.assert_called_once_with(
self.context, {
'host': share_data['host'],
'export_location': share_data['export_location'],
'share_proto': share_data['share_proto'],
'share_type_id': share_data['share_type_id']
}
)
db_api.share_server_get.assert_called_once_with(
self.context, share_data['share_server_id']
)
@ddt.data(constants.STATUS_MANAGE_ERROR, constants.STATUS_AVAILABLE)
def test_manage_duplicate(self, status):
share_data = {
'host': 'fake',
'export_location': 'fake',
'share_proto': 'fake',
'share_type_id': 'fake',
}
driver_options = {}
fake_type = {
'id': 'fake_type_id',
'extra_specs': {
'snapshot_support': False,
'create_share_from_snapshot_support': False,
'driver_handles_share_servers': False,
},
}
shares = [{'id': 'fake', 'status': status}]
self.mock_object(self.api, 'get_all',
mock.Mock(return_value=shares))
self.mock_object(share_types, 'get_share_type',
mock.Mock(return_value=fake_type))
self.assertRaises(exception.InvalidShare, self.api.manage,
self.context, share_data, driver_options)
def _get_request_spec_dict(self, share, share_type, **kwargs):
if share is None:
share = {'instance': {}}
share_instance = share['instance']
share_properties = {
'size': kwargs.get('size', share.get('size')),
'user_id': kwargs.get('user_id', share.get('user_id')),
'project_id': kwargs.get('project_id', share.get('project_id')),
'snapshot_support': kwargs.get(
'snapshot_support',
share_type['extra_specs']['snapshot_support']),
'create_share_from_snapshot_support': kwargs.get(
'create_share_from_snapshot_support',
share_type['extra_specs'].get(
'create_share_from_snapshot_support')),
'revert_to_snapshot_support': kwargs.get(
'revert_to_snapshot_support',
share_type['extra_specs'].get('revert_to_snapshot_support')),
'mount_snapshot_support': kwargs.get(
'mount_snapshot_support',
share_type['extra_specs'].get('mount_snapshot_support')),
'share_proto': kwargs.get('share_proto', share.get('share_proto')),
'share_type_id': share_type['id'],
'is_public': kwargs.get('is_public', share.get('is_public')),
'share_group_id': kwargs.get(
'share_group_id', share.get('share_group_id')),
'source_share_group_snapshot_member_id': kwargs.get(
'source_share_group_snapshot_member_id',
share.get('source_share_group_snapshot_member_id')),
'snapshot_id': kwargs.get('snapshot_id', share.get('snapshot_id')),
}
share_instance_properties = {
'availability_zone_id': kwargs.get(
'availability_zone_id',
share_instance.get('availability_zone_id')),
'share_network_id': kwargs.get(
'share_network_id', share_instance.get('share_network_id')),
'share_server_id': kwargs.get(
'share_server_id', share_instance.get('share_server_id')),
'share_id': kwargs.get('share_id', share_instance.get('share_id')),
'host': kwargs.get('host', share_instance.get('host')),
'status': kwargs.get('status', share_instance.get('status')),
}
request_spec = {
'share_properties': share_properties,
'share_instance_properties': share_instance_properties,
'share_type': share_type,
'share_id': share.get('id'),
}
return request_spec
def test_unmanage(self):
share = db_utils.create_share(
id='fakeid',
host='fake',
size='1',
status=constants.STATUS_AVAILABLE,
user_id=self.context.user_id,
project_id=self.context.project_id,
task_state=None)
self.mock_object(db_api, 'share_update', mock.Mock())
self.api.unmanage(self.context, share)
self.share_rpcapi.unmanage_share.assert_called_once_with(
self.context, mock.ANY)
db_api.share_update.assert_called_once_with(
mock.ANY, share['id'], mock.ANY)
def test_unmanage_task_state_busy(self):
share = db_utils.create_share(
id='fakeid',
host='fake',
size='1',
status=constants.STATUS_AVAILABLE,
user_id=self.context.user_id,
project_id=self.context.project_id,
task_state=constants.TASK_STATE_MIGRATION_IN_PROGRESS)
self.assertRaises(exception.ShareBusyException, self.api.unmanage,
self.context, share)
@mock.patch.object(quota.QUOTAS, 'reserve',
mock.Mock(return_value='reservation'))
@mock.patch.object(quota.QUOTAS, 'commit', mock.Mock())
def test_create_snapshot(self):
snapshot = db_utils.create_snapshot(
with_share=True, status=constants.STATUS_CREATING, size=1)
share = snapshot['share']
fake_name = 'fakename'
fake_desc = 'fakedesc'
options = {
'share_id': share['id'],
'user_id': self.context.user_id,
'project_id': self.context.project_id,
'status': constants.STATUS_CREATING,
'progress': '0%',
'share_size': share['size'],
'size': 1,
'display_name': fake_name,
'display_description': fake_desc,
'share_proto': share['share_proto'],
}
with mock.patch.object(db_api, 'share_snapshot_create',
mock.Mock(return_value=snapshot)):
self.api.create_snapshot(self.context, share, fake_name,
fake_desc)
share_api.policy.check_policy.assert_called_once_with(
self.context, 'share', 'create_snapshot', share)
quota.QUOTAS.reserve.assert_called_once_with(
self.context, share_type_id=None,
snapshot_gigabytes=1, snapshots=1)
quota.QUOTAS.commit.assert_called_once_with(
self.context, 'reservation', share_type_id=None)
db_api.share_snapshot_create.assert_called_once_with(
self.context, options)
def test_create_snapshot_space_quota_exceeded(self):
share = fakes.fake_share(
id=uuidutils.generate_uuid(), size=1, project_id='fake_project',
user_id='fake_user', has_replicas=False, status='available')
usages = {'snapshot_gigabytes': {'reserved': 10, 'in_use': 0}}
quotas = {'snapshot_gigabytes': 10}
side_effect = exception.OverQuota(
overs='snapshot_gigabytes', usages=usages, quotas=quotas)
self.mock_object(
quota.QUOTAS, 'reserve', mock.Mock(side_effect=side_effect))
mock_snap_create = self.mock_object(db_api, 'share_snapshot_create')
self.assertRaises(exception.SnapshotSizeExceedsAvailableQuota,
self.api.create_snapshot,
self.context,
share,
'fake_name',
'fake_description')
mock_snap_create.assert_not_called()
def test_create_snapshot_count_quota_exceeded(self):
share = fakes.fake_share(
id=uuidutils.generate_uuid(), size=1, project_id='fake_project',
user_id='fake_user', has_replicas=False, status='available')
usages = {'snapshots': {'reserved': 10, 'in_use': 0}}
quotas = {'snapshots': 10}
side_effect = exception.OverQuota(
overs='snapshots', usages=usages, quotas=quotas)
self.mock_object(
quota.QUOTAS, 'reserve', mock.Mock(side_effect=side_effect))
mock_snap_create = self.mock_object(db_api, 'share_snapshot_create')
self.assertRaises(exception.SnapshotLimitExceeded,
self.api.create_snapshot,
self.context,
share,
'fake_name',
'fake_description')
mock_snap_create.assert_not_called()
def test_manage_snapshot_share_not_found(self):
snapshot = fakes.fake_snapshot(share_id='fake_share',
as_primitive=True)
mock_share_get_call = self.mock_object(
db_api, 'share_get', mock.Mock(side_effect=exception.NotFound))
mock_db_snapshot_call = self.mock_object(
db_api, 'share_snapshot_get_all_for_share')
self.assertRaises(exception.ShareNotFound,
self.api.manage_snapshot,
self.context,
snapshot,
{})
self.assertFalse(mock_db_snapshot_call.called)
mock_share_get_call.assert_called_once_with(
self.context, snapshot['share_id'])
def test_manage_snapshot_share_has_replicas(self):
share_ref = fakes.fake_share(
has_replicas=True, status=constants.STATUS_AVAILABLE)
self.mock_object(
db_api, 'share_get', mock.Mock(return_value=share_ref))
snapshot = fakes.fake_snapshot(create_instance=True, as_primitive=True)
mock_db_snapshot_get_all_for_share_call = self.mock_object(
db_api, 'share_snapshot_get_all_for_share')
self.assertRaises(exception.InvalidShare,
self.api.manage_snapshot,
context,
snapshot,
{})
self.assertFalse(mock_db_snapshot_get_all_for_share_call.called)
def test_manage_snapshot_already_managed(self):
share_ref = fakes.fake_share(
has_replicas=False, status=constants.STATUS_AVAILABLE)
snapshot = fakes.fake_snapshot(create_instance=True, as_primitive=True)
self.mock_object(
db_api, 'share_get', mock.Mock(return_value=share_ref))
mock_db_snapshot_call = self.mock_object(
db_api, 'share_snapshot_get_all_for_share', mock.Mock(
return_value=[snapshot]))
mock_db_snapshot_create_call = self.mock_object(
db_api, 'share_snapshot_create')
self.assertRaises(exception.ManageInvalidShareSnapshot,
self.api.manage_snapshot,
self.context,
snapshot,
{})
mock_db_snapshot_call.assert_called_once_with(
self.context, snapshot['share_id'])
self.assertFalse(mock_db_snapshot_create_call.called)
def test_manage_snapshot(self):
share_ref = fakes.fake_share(
has_replicas=False, status=constants.STATUS_AVAILABLE,
host='fake_host')
existing_snapshot = fakes.fake_snapshot(
create_instance=True, share_id=share_ref['id'])
self.mock_object(db_api, 'share_snapshot_get_all_for_share',
mock.Mock(return_value=[existing_snapshot]))
snapshot_data = {
'share_id': share_ref['id'],
'provider_location': 'someproviderlocation',
}
expected_snapshot_data = {
'user_id': self.context.user_id,
'project_id': self.context.project_id,
'status': constants.STATUS_MANAGING,
'share_size': share_ref['size'],
'progress': '0%',
'share_proto': share_ref['share_proto'],
}
expected_snapshot_data.update(**snapshot_data)
snapshot = fakes.fake_snapshot(
create_instance=True, **expected_snapshot_data)
self.mock_object(
db_api, 'share_get', mock.Mock(return_value=share_ref))
mock_db_snapshot_create_call = self.mock_object(
db_api, 'share_snapshot_create', mock.Mock(return_value=snapshot))
mock_rpc_call = self.mock_object(self.share_rpcapi, 'manage_snapshot',
mock.Mock(return_value=snapshot))
new_snap = self.api.manage_snapshot(
self.context, snapshot_data, {})
self.assertEqual(new_snap, snapshot)
mock_db_snapshot_create_call.assert_called_once_with(
self.context, expected_snapshot_data)
mock_rpc_call.assert_called_once_with(
self.context, snapshot, share_ref['host'], {})
def test_manage_share_server(self):
"""Tests manage share server"""
host = 'fake_host'
fake_share_network = {
'id': 'fake_net_id'
}
fake_share_net_subnet = {
'id': 'fake_subnet_id',
'share_network_id': fake_share_network['id']
}
identifier = 'fake_identifier'
values = {
'host': host,
'share_network_subnet_id': fake_share_net_subnet['id'],
'status': constants.STATUS_MANAGING,
'is_auto_deletable': False,
'identifier': identifier,
}
server_managing = {
'id': 'fake_server_id',
'status': constants.STATUS_MANAGING,
'host': host,
'share_network_subnet_id': fake_share_net_subnet['id'],
'is_auto_deletable': False,
'identifier': identifier,
}
mock_share_server_search = self.mock_object(
db_api, 'share_server_search_by_identifier',
mock.Mock(side_effect=exception.ShareServerNotFound('fake')))
mock_share_server_get = self.mock_object(
db_api, 'share_server_get',
mock.Mock(
return_value=server_managing)
)
mock_share_server_create = self.mock_object(
db_api, 'share_server_create',
mock.Mock(return_value=server_managing)
)
result = self.api.manage_share_server(
self.context, 'fake_identifier', host, fake_share_net_subnet,
{'opt1': 'val1', 'opt2': 'val2'}
)
mock_share_server_create.assert_called_once_with(
self.context, values)
mock_share_server_get.assert_called_once_with(
self.context, 'fake_server_id')
mock_share_server_search.assert_called_once_with(
self.context, 'fake_identifier')
result_dict = {
'host': result['host'],
'share_network_subnet_id': result['share_network_subnet_id'],
'status': result['status'],
'is_auto_deletable': result['is_auto_deletable'],
'identifier': result['identifier'],
}
self.assertEqual(values, result_dict)
def test_manage_share_server_invalid(self):
server = {'identifier': 'fake_server'}
mock_share_server_search = self.mock_object(
db_api, 'share_server_search_by_identifier',
mock.Mock(return_value=[server]))
self.assertRaises(
exception.InvalidInput, self.api.manage_share_server,
self.context, 'invalid_identifier', 'fake_host', 'fake_share_net',
{})
mock_share_server_search.assert_called_once_with(
self.context, 'invalid_identifier')
def test_unmanage_snapshot(self):
fake_host = 'fake_host'
snapshot_data = {
'status': constants.STATUS_UNMANAGING,
'terminated_at': timeutils.utcnow(),
}
snapshot = fakes.fake_snapshot(
create_instance=True, share_instance_id='id2', **snapshot_data)
mock_db_snap_update_call = self.mock_object(
db_api, 'share_snapshot_update', mock.Mock(return_value=snapshot))
mock_rpc_call = self.mock_object(
self.share_rpcapi, 'unmanage_snapshot')
retval = self.api.unmanage_snapshot(
self.context, snapshot, fake_host)
self.assertIsNone(retval)
mock_db_snap_update_call.assert_called_once_with(
self.context, snapshot['id'], snapshot_data)
mock_rpc_call.assert_called_once_with(
self.context, snapshot, fake_host)
def test_unmanage_share_server(self):
shr1 = {}
share_server = db_utils.create_share_server(**shr1)
update_data = {'status': constants.STATUS_UNMANAGING,
'terminated_at': timeutils.utcnow()}
mock_share_instances_get_all = self.mock_object(
db_api, 'share_instances_get_all_by_share_server',
mock.Mock(return_value={}))
mock_share_group_get_all = self.mock_object(
db_api, 'share_group_get_all_by_share_server',
mock.Mock(return_value={}))
mock_share_server_update = self.mock_object(
db_api, 'share_server_update',
mock.Mock(return_value=share_server))
mock_rpc = self.mock_object(
self.api.share_rpcapi, 'unmanage_share_server')
self.api.unmanage_share_server(self.context, share_server, True)
mock_share_instances_get_all.assert_called_once_with(
self.context, share_server['id']
)
mock_share_group_get_all.assert_called_once_with(
self.context, share_server['id']
)
mock_share_server_update.assert_called_once_with(
self.context, share_server['id'], update_data
)
mock_rpc.assert_called_once_with(
self.context, share_server, force=True)
def test_unmanage_share_server_in_use(self):
fake_share = db_utils.create_share()
fake_share_server = db_utils.create_share_server()
fake_share_instance = db_utils.create_share_instance(
share_id=fake_share['id'])
share_instance_get_all_mock = self.mock_object(
db_api, 'share_instances_get_all_by_share_server',
mock.Mock(return_value=fake_share_instance)
)
self.assertRaises(exception.ShareServerInUse,
self.api.unmanage_share_server,
self.context,
fake_share_server, True)
share_instance_get_all_mock.assert_called_once_with(
self.context, fake_share_server['id']
)
def test_unmanage_share_server_in_use_share_groups(self):
fake_share_server = db_utils.create_share_server()
fake_share_groups = db_utils.create_share_group()
share_instance_get_all_mock = self.mock_object(
db_api, 'share_instances_get_all_by_share_server',
mock.Mock(return_value={})
)
group_get_all_mock = self.mock_object(
db_api, 'share_group_get_all_by_share_server',
mock.Mock(return_value=fake_share_groups)
)
self.assertRaises(exception.ShareServerInUse,
self.api.unmanage_share_server,
self.context,
fake_share_server, True)
share_instance_get_all_mock.assert_called_once_with(
self.context, fake_share_server['id']
)
group_get_all_mock.assert_called_once_with(
self.context, fake_share_server['id']
)
@ddt.data(True, False)
def test_revert_to_snapshot(self, has_replicas):
share = fakes.fake_share(id=uuidutils.generate_uuid(),
has_replicas=has_replicas)
self.mock_object(db_api, 'share_get', mock.Mock(return_value=share))
mock_handle_revert_to_snapshot_quotas = self.mock_object(
self.api, '_handle_revert_to_snapshot_quotas',
mock.Mock(return_value='fake_reservations'))
mock_revert_to_replicated_snapshot = self.mock_object(
self.api, '_revert_to_replicated_snapshot')
mock_revert_to_snapshot = self.mock_object(
self.api, '_revert_to_snapshot')
snapshot = fakes.fake_snapshot(share_id=share['id'])
self.api.revert_to_snapshot(self.context, share, snapshot)
mock_handle_revert_to_snapshot_quotas.assert_called_once_with(
self.context, share, snapshot)
if not has_replicas:
self.assertFalse(mock_revert_to_replicated_snapshot.called)
mock_revert_to_snapshot.assert_called_once_with(
self.context, share, snapshot, 'fake_reservations')
else:
mock_revert_to_replicated_snapshot.assert_called_once_with(
self.context, share, snapshot, 'fake_reservations')
self.assertFalse(mock_revert_to_snapshot.called)
@ddt.data(None, 'fake_reservations')
def test_revert_to_snapshot_exception(self, reservations):
share = fakes.fake_share(id=uuidutils.generate_uuid(),
has_replicas=False)
self.mock_object(db_api, 'share_get', mock.Mock(return_value=share))
self.mock_object(
self.api, '_handle_revert_to_snapshot_quotas',
mock.Mock(return_value=reservations))
side_effect = exception.ReplicationException(reason='error')
self.mock_object(
self.api, '_revert_to_snapshot',
mock.Mock(side_effect=side_effect))
mock_quotas_rollback = self.mock_object(quota.QUOTAS, 'rollback')
snapshot = fakes.fake_snapshot(share_id=share['id'])
self.assertRaises(exception.ReplicationException,
self.api.revert_to_snapshot,
self.context,
share,
snapshot)
if reservations is not None:
mock_quotas_rollback.assert_called_once_with(
self.context, reservations,
share_type_id=share['instance']['share_type_id'])
else:
self.assertFalse(mock_quotas_rollback.called)
def test_handle_revert_to_snapshot_quotas(self):
share = fakes.fake_share(
id=uuidutils.generate_uuid(), size=1, project_id='fake_project',
user_id='fake_user', has_replicas=False)
snapshot = fakes.fake_snapshot(
id=uuidutils.generate_uuid(), share_id=share['id'], size=1)
mock_quotas_reserve = self.mock_object(quota.QUOTAS, 'reserve')
result = self.api._handle_revert_to_snapshot_quotas(
self.context, share, snapshot)
self.assertIsNone(result)
self.assertFalse(mock_quotas_reserve.called)
def test_handle_revert_to_snapshot_quotas_different_size(self):
share = fakes.fake_share(
id=uuidutils.generate_uuid(), size=1, project_id='fake_project',
user_id='fake_user', has_replicas=False)
snapshot = fakes.fake_snapshot(
id=uuidutils.generate_uuid(), share_id=share['id'], size=2)
mock_quotas_reserve = self.mock_object(
quota.QUOTAS, 'reserve',
mock.Mock(return_value='fake_reservations'))
result = self.api._handle_revert_to_snapshot_quotas(
self.context, share, snapshot)
self.assertEqual('fake_reservations', result)
mock_quotas_reserve.assert_called_once_with(
self.context, project_id='fake_project', gigabytes=1,
share_type_id=share['instance']['share_type_id'],
user_id='fake_user')
def test_handle_revert_to_snapshot_quotas_quota_exceeded(self):
share = fakes.fake_share(
id=uuidutils.generate_uuid(), size=1, project_id='fake_project',
user_id='fake_user', has_replicas=False)
snapshot = fakes.fake_snapshot(
id=uuidutils.generate_uuid(), share_id=share['id'], size=2)
usages = {'gigabytes': {'reserved': 10, 'in_use': 0}}
quotas = {'gigabytes': 10}
side_effect = exception.OverQuota(
overs='fake', usages=usages, quotas=quotas)
self.mock_object(
quota.QUOTAS, 'reserve', mock.Mock(side_effect=side_effect))
self.assertRaises(exception.ShareSizeExceedsAvailableQuota,
self.api._handle_revert_to_snapshot_quotas,
self.context,
share,
snapshot)
def test__revert_to_snapshot(self):
share = fakes.fake_share(
id=uuidutils.generate_uuid(), size=1, project_id='fake_project',
user_id='fake_user', has_replicas=False)
snapshot = fakes.fake_snapshot(
id=uuidutils.generate_uuid(), share_id=share['id'], size=2)
mock_share_update = self.mock_object(db_api, 'share_update')
mock_share_snapshot_update = self.mock_object(
db_api, 'share_snapshot_update')
mock_revert_rpc_call = self.mock_object(
self.share_rpcapi, 'revert_to_snapshot')
self.api._revert_to_snapshot(
self.context, share, snapshot, 'fake_reservations')
mock_share_update.assert_called_once_with(
self.context, share['id'], {'status': constants.STATUS_REVERTING})
mock_share_snapshot_update.assert_called_once_with(
self.context, snapshot['id'],
{'status': constants.STATUS_RESTORING})
mock_revert_rpc_call.assert_called_once_with(
self.context, share, snapshot, share['instance']['host'],
'fake_reservations')
def test_revert_to_replicated_snapshot(self):
share = fakes.fake_share(
has_replicas=True, status=constants.STATUS_AVAILABLE)
snapshot = fakes.fake_snapshot(share_instance_id='id1')
snapshot_instance = fakes.fake_snapshot_instance(
base_snapshot=snapshot, id='sid1')
replicas = [
fakes.fake_replica(
id='rid1', replica_state=constants.REPLICA_STATE_ACTIVE),
fakes.fake_replica(
id='rid2', replica_state=constants.REPLICA_STATE_IN_SYNC),
]
self.mock_object(
db_api, 'share_replicas_get_available_active_replica',
mock.Mock(return_value=replicas[0]))
self.mock_object(
db_api, 'share_snapshot_instance_get_all_with_filters',
mock.Mock(return_value=[snapshot_instance]))
mock_share_replica_update = self.mock_object(
db_api, 'share_replica_update')
mock_share_snapshot_instance_update = self.mock_object(
db_api, 'share_snapshot_instance_update')
mock_revert_rpc_call = self.mock_object(
self.share_rpcapi, 'revert_to_snapshot')
self.api._revert_to_replicated_snapshot(
self.context, share, snapshot, 'fake_reservations')
mock_share_replica_update.assert_called_once_with(
self.context, 'rid1', {'status': constants.STATUS_REVERTING})
mock_share_snapshot_instance_update.assert_called_once_with(
self.context, 'sid1', {'status': constants.STATUS_RESTORING})
mock_revert_rpc_call.assert_called_once_with(
self.context, share, snapshot, replicas[0]['host'],
'fake_reservations')
def test_revert_to_replicated_snapshot_no_active_replica(self):
share = fakes.fake_share(
has_replicas=True, status=constants.STATUS_AVAILABLE)
snapshot = fakes.fake_snapshot(share_instance_id='id1')
self.mock_object(
db_api, 'share_replicas_get_available_active_replica',
mock.Mock(return_value=None))
self.assertRaises(exception.ReplicationException,
self.api._revert_to_replicated_snapshot,
self.context,
share,
snapshot,
'fake_reservations')
def test_revert_to_replicated_snapshot_no_snapshot_instance(self):
share = fakes.fake_share(
has_replicas=True, status=constants.STATUS_AVAILABLE)
snapshot = fakes.fake_snapshot(share_instance_id='id1')
replicas = [
fakes.fake_replica(
id='rid1', replica_state=constants.REPLICA_STATE_ACTIVE),
fakes.fake_replica(
id='rid2', replica_state=constants.REPLICA_STATE_IN_SYNC),
]
self.mock_object(
db_api, 'share_replicas_get_available_active_replica',
mock.Mock(return_value=replicas[0]))
self.mock_object(
db_api, 'share_snapshot_instance_get_all_with_filters',
mock.Mock(return_value=[None]))
self.assertRaises(exception.ReplicationException,
self.api._revert_to_replicated_snapshot,
self.context,
share,
snapshot,
'fake_reservations')
def test_create_snapshot_for_replicated_share(self):
share = fakes.fake_share(
has_replicas=True, status=constants.STATUS_AVAILABLE)
snapshot = fakes.fake_snapshot(
create_instance=True, share_instance_id='id2')
replicas = [
fakes.fake_replica(
id='id1', replica_state=constants.REPLICA_STATE_ACTIVE),
fakes.fake_replica(
id='id2', replica_state=constants.REPLICA_STATE_IN_SYNC)
]
self.mock_object(share_api.policy, 'check_policy')
self.mock_object(quota.QUOTAS, 'reserve',
mock.Mock(return_value='reservation'))
self.mock_object(
db_api, 'share_snapshot_create', mock.Mock(return_value=snapshot))
self.mock_object(db_api, 'share_replicas_get_all_by_share',
mock.Mock(return_value=replicas))
self.mock_object(
db_api, 'share_snapshot_get', mock.Mock(return_value=snapshot))
self.mock_object(quota.QUOTAS, 'commit')
mock_instance_create_call = self.mock_object(
db_api, 'share_snapshot_instance_create')
mock_snapshot_rpc_call = self.mock_object(
self.share_rpcapi, 'create_snapshot')
mock_replicated_snapshot_rpc_call = self.mock_object(
self.share_rpcapi, 'create_replicated_snapshot')
snapshot_instance_args = {
'status': constants.STATUS_CREATING,
'progress': '0%',
'share_instance_id': 'id1',
}
retval = self.api.create_snapshot(
self.context, share, 'fake_name', 'fake_description')
self.assertEqual(snapshot['id'], retval['id'])
mock_instance_create_call.assert_called_once_with(
self.context, snapshot['id'], snapshot_instance_args)
self.assertFalse(mock_snapshot_rpc_call.called)
self.assertTrue(mock_replicated_snapshot_rpc_call.called)
@mock.patch.object(db_api, 'share_instances_get_all_by_share_server',
mock.Mock(return_value=[]))
@mock.patch.object(db_api, 'share_group_get_all_by_share_server',
mock.Mock(return_value=[]))
def test_delete_share_server_no_dependent_shares(self):
server = {'id': 'fake_share_server_id'}
server_returned = {
'id': 'fake_share_server_id',
}
self.mock_object(db_api, 'share_server_update',
mock.Mock(return_value=server_returned))
self.api.delete_share_server(self.context, server)
db_api.share_instances_get_all_by_share_server.assert_called_once_with(
self.context, server['id'])
(db_api.share_group_get_all_by_share_server.
assert_called_once_with(self.context, server['id']))
self.share_rpcapi.delete_share_server.assert_called_once_with(
self.context, server_returned)
@mock.patch.object(db_api, 'share_instances_get_all_by_share_server',
mock.Mock(return_value=['fake_share', ]))
@mock.patch.object(db_api, 'share_group_get_all_by_share_server',
mock.Mock(return_value=[]))
def test_delete_share_server_dependent_share_exists(self):
server = {'id': 'fake_share_server_id'}
self.assertRaises(exception.ShareServerInUse,
self.api.delete_share_server,
self.context,
server)
db_api.share_instances_get_all_by_share_server.assert_called_once_with(
self.context, server['id'])
@mock.patch.object(db_api, 'share_instances_get_all_by_share_server',
mock.Mock(return_value=[]))
@mock.patch.object(db_api, 'share_group_get_all_by_share_server',
mock.Mock(return_value=['fake_group', ]))
def test_delete_share_server_dependent_group_exists(self):
server = {'id': 'fake_share_server_id'}
self.assertRaises(exception.ShareServerInUse,
self.api.delete_share_server,
self.context,
server)
db_api.share_instances_get_all_by_share_server.assert_called_once_with(
self.context, server['id'])
(db_api.share_group_get_all_by_share_server.
assert_called_once_with(self.context, server['id']))
@mock.patch.object(db_api, 'share_snapshot_instance_update', mock.Mock())
def test_delete_snapshot(self):
snapshot = db_utils.create_snapshot(
with_share=True, status=constants.STATUS_AVAILABLE)
share = snapshot['share']
with mock.patch.object(db_api, 'share_get',
mock.Mock(return_value=share)):
self.api.delete_snapshot(self.context, snapshot)
self.share_rpcapi.delete_snapshot.assert_called_once_with(
self.context, snapshot, share['host'], force=False)
share_api.policy.check_policy.assert_called_once_with(
self.context, 'share', 'delete_snapshot', snapshot)
db_api.share_snapshot_instance_update.assert_called_once_with(
self.context,
snapshot['instance']['id'],
{'status': constants.STATUS_DELETING})
db_api.share_get.assert_called_once_with(
self.context, snapshot['share_id'])
def test_delete_snapshot_wrong_status(self):
snapshot = db_utils.create_snapshot(
with_share=True, status=constants.STATUS_CREATING)
self.assertRaises(exception.InvalidShareSnapshot,
self.api.delete_snapshot,
self.context,
snapshot)
share_api.policy.check_policy.assert_called_once_with(
self.context, 'share', 'delete_snapshot', snapshot)
@ddt.data(constants.STATUS_MANAGING, constants.STATUS_ERROR_DELETING,
constants.STATUS_CREATING, constants.STATUS_AVAILABLE)
def test_delete_snapshot_force_delete(self, status):
share = fakes.fake_share(id=uuidutils.generate_uuid(),
has_replicas=False)
snapshot = fakes.fake_snapshot(aggregate_status=status, share=share)
snapshot_instance = fakes.fake_snapshot_instance(
base_snapshot=snapshot)
self.mock_object(db_api, 'share_get', mock.Mock(return_value=share))
self.mock_object(
db_api, 'share_snapshot_instance_get_all_with_filters',
mock.Mock(return_value=[snapshot_instance]))
mock_instance_update_call = self.mock_object(
db_api, 'share_snapshot_instance_update')
mock_rpc_call = self.mock_object(self.share_rpcapi, 'delete_snapshot')
retval = self.api.delete_snapshot(self.context, snapshot, force=True)
self.assertIsNone(retval)
mock_instance_update_call.assert_called_once_with(
self.context, snapshot_instance['id'],
{'status': constants.STATUS_DELETING})
mock_rpc_call.assert_called_once_with(
self.context, snapshot, share['instance']['host'], force=True)
@ddt.data(True, False)
def test_delete_snapshot_replicated_snapshot(self, force):
share = fakes.fake_share(has_replicas=True)
snapshot = fakes.fake_snapshot(
create_instance=True, share_id=share['id'],
status=constants.STATUS_ERROR)
snapshot_instance = fakes.fake_snapshot_instance(
base_snapshot=snapshot)
expected_update_calls = [
mock.call(self.context, x, {'status': constants.STATUS_DELETING})
for x in (snapshot['instance']['id'], snapshot_instance['id'])
]
self.mock_object(db_api, 'share_get', mock.Mock(return_value=share))
self.mock_object(
db_api, 'share_snapshot_instance_get_all_with_filters',
mock.Mock(return_value=[snapshot['instance'], snapshot_instance]))
mock_db_update_call = self.mock_object(
db_api, 'share_snapshot_instance_update')
mock_snapshot_rpc_call = self.mock_object(
self.share_rpcapi, 'delete_snapshot')
mock_replicated_snapshot_rpc_call = self.mock_object(
self.share_rpcapi, 'delete_replicated_snapshot')
retval = self.api.delete_snapshot(self.context, snapshot, force=force)
self.assertIsNone(retval)
self.assertEqual(2, mock_db_update_call.call_count)
mock_db_update_call.assert_has_calls(expected_update_calls)
mock_replicated_snapshot_rpc_call.assert_called_once_with(
self.context, snapshot, share['instance']['host'],
share_id=share['id'], force=force)
self.assertFalse(mock_snapshot_rpc_call.called)
def test_create_snapshot_if_share_not_available(self):
share = db_utils.create_share(status=constants.STATUS_ERROR)
self.assertRaises(exception.InvalidShare,
self.api.create_snapshot,
self.context,
share,
'fakename',
'fakedesc')
share_api.policy.check_policy.assert_called_once_with(
self.context, 'share', 'create_snapshot', share)
def test_create_snapshot_invalid_task_state(self):
share = db_utils.create_share(
status=constants.STATUS_AVAILABLE,
task_state=constants.TASK_STATE_MIGRATION_IN_PROGRESS)
self.assertRaises(exception.ShareBusyException,
self.api.create_snapshot,
self.context,
share,
'fakename',
'fakedesc')
share_api.policy.check_policy.assert_called_once_with(
self.context, 'share', 'create_snapshot', share)
@ddt.data({'use_scheduler': False, 'valid_host': 'fake'},
{'use_scheduler': True, 'valid_host': None})
@ddt.unpack
def test_create_from_snapshot(self, use_scheduler, valid_host):
snapshot, share, share_data, request_spec = (
self._setup_create_from_snapshot_mocks(
use_scheduler=use_scheduler, host=valid_host)
)
share_type = fakes.fake_share_type()
mock_get_share_type_call = self.mock_object(
share_types, 'get_share_type', mock.Mock(return_value=share_type))
az = share_data.pop('availability_zone')
self.api.create(
self.context,
share_data['share_proto'],
None, # NOTE(u_glide): Get share size from snapshot
share_data['display_name'],
share_data['display_description'],
snapshot_id=snapshot['id'],
availability_zone=az
)
mock_get_share_type_call.assert_called_once_with(
self.context, share['share_type_id'])
self.assertSubDictMatch(share_data,
db_api.share_create.