Fix snapshot export locations incorrectly handled

Snapshot export locations should only be created for shares
with property mount_snapshot_support=True. Otherwise, they
should not be created, even if returned by drivers.

Change-Id: Ie0ce3c4f2e7e29bf82f5c09a80a3fb132b0481a8
Closes-bug: #1660686
This commit is contained in:
Rodrigo Barbieri 2017-01-31 15:46:11 -02:00
parent 595a2bd73c
commit 4b6f1bf449
4 changed files with 98 additions and 33 deletions

View File

@ -337,7 +337,8 @@ class ShareInstance(BASE, ManilaBase):
'display_name', 'display_description',
'snapshot_id', 'share_proto', 'is_public',
'share_group_id', 'replication_type',
'source_share_group_snapshot_member_id')
'source_share_group_snapshot_member_id',
'mount_snapshot_support')
def set_share_data(self, share):
for share_property in self._proxified_properties:

View File

@ -2267,15 +2267,18 @@ class ShareManager(manager.SchedulerDependentManager):
snapshot_export_locations = snapshot_update.pop(
'export_locations', [])
for el in snapshot_export_locations:
values = {
'share_snapshot_instance_id': snapshot_instance['id'],
'path': el['path'],
'is_admin_only': el['is_admin_only'],
}
if snapshot_instance['share']['mount_snapshot_support']:
for el in snapshot_export_locations:
values = {
'share_snapshot_instance_id': snapshot_instance['id'],
'path': el['path'],
'is_admin_only': el['is_admin_only'],
}
self.db.share_snapshot_instance_export_location_create(
context, values)
self.db.share_snapshot_instance_export_location_create(context,
values)
snapshot_update.update({
'status': constants.STATUS_AVAILABLE,
'progress': '100%',
@ -2620,15 +2623,17 @@ class ShareManager(manager.SchedulerDependentManager):
snapshot_export_locations = model_update.pop('export_locations', [])
for el in snapshot_export_locations:
values = {
'share_snapshot_instance_id': snapshot_instance_id,
'path': el['path'],
'is_admin_only': el['is_admin_only'],
}
if snapshot_instance['share']['mount_snapshot_support']:
self.db.share_snapshot_instance_export_location_create(context,
values)
for el in snapshot_export_locations:
values = {
'share_snapshot_instance_id': snapshot_instance_id,
'path': el['path'],
'is_admin_only': el['is_admin_only'],
}
self.db.share_snapshot_instance_export_location_create(context,
values)
if model_update.get('status') in (None, constants.STATUS_AVAILABLE):
model_update['status'] = constants.STATUS_AVAILABLE

View File

@ -1460,9 +1460,24 @@ class ShareManagerTestCase(test.TestCase):
self.share_manager.driver.create_snapshot.assert_called_once_with(
self.context, expected_snapshot_instance_dict, share_server=None)
def test_create_snapshot(self):
@ddt.data({'model_update': {}, 'mount_snapshot_support': True},
{'model_update': {}, 'mount_snapshot_support': False},
{'model_update': {'export_locations': [
{'path': '/path1', 'is_admin_only': True},
{'path': '/path2', 'is_admin_only': False}
]}, 'mount_snapshot_support': True},
{'model_update': {'export_locations': [
{'path': '/path1', 'is_admin_only': True},
{'path': '/path2', 'is_admin_only': False}
]}, 'mount_snapshot_support': False})
@ddt.unpack
def test_create_snapshot(self, model_update, mount_snapshot_support):
export_locations = model_update.get('export_locations')
share_id = 'FAKE_SHARE_ID'
share = fakes.fake_share(id=share_id, instance={'id': 'fake_id'})
share = fakes.fake_share(
id=share_id,
instance={'id': 'fake_id'},
mount_snapshot_support=mount_snapshot_support)
snapshot_instance = fakes.fake_snapshot_instance(
share_id=share_id, share=share, name='fake_snapshot')
snapshot = fakes.fake_snapshot(
@ -1474,6 +1489,9 @@ class ShareManagerTestCase(test.TestCase):
mock.Mock(return_value=snapshot_instance))
self.mock_object(self.share_manager, '_get_share_server',
mock.Mock(return_value=None))
mock_export_update = self.mock_object(
self.share_manager.db,
'share_snapshot_instance_export_location_create')
expected_update_calls = [
mock.call(self.context, snapshot_instance['id'],
{'status': constants.STATUS_AVAILABLE,
@ -1482,8 +1500,10 @@ class ShareManagerTestCase(test.TestCase):
expected_snapshot_instance_dict = self._get_snapshot_instance_dict(
snapshot_instance, share)
self.mock_object(self.share_manager.driver, 'create_snapshot',
mock.Mock(return_value=None))
self.mock_object(
self.share_manager.driver, 'create_snapshot',
mock.Mock(return_value=model_update))
db_update = self.mock_object(
self.share_manager.db, 'share_snapshot_instance_update')
@ -1494,6 +1514,18 @@ class ShareManagerTestCase(test.TestCase):
self.share_manager.driver.create_snapshot.assert_called_once_with(
self.context, expected_snapshot_instance_dict, share_server=None)
db_update.assert_has_calls(expected_update_calls, any_order=True)
if mount_snapshot_support and export_locations:
snap_ins_id = snapshot.instance['id']
for i in range(0, 2):
export_locations[i]['share_snapshot_instance_id'] = snap_ins_id
mock_export_update.assert_has_calls([
mock.call(utils.IsAMatcher(context.RequestContext),
export_locations[0]),
mock.call(utils.IsAMatcher(context.RequestContext),
export_locations[1]),
])
else:
mock_export_update.assert_not_called()
@ddt.data(exception.ShareSnapshotIsBusy(snapshot_name='fake_name'),
exception.NotFound())
@ -5011,15 +5043,21 @@ class ShareManagerTestCase(test.TestCase):
mock_get_share_server.assert_called_once_with(
utils.IsAMatcher(context.RequestContext), snapshot['share'])
@ddt.data(
{'size': 1},
{'size': 2, 'name': 'fake'},
{'size': 3},
{'size': 3, 'export_locations': [{'path': '/path1',
'is_admin_only': True},
{'path': '/path2',
'is_admin_only': False}]})
def test_manage_snapshot_valid_snapshot(self, driver_data):
@ddt.data({'driver_data': {'size': 1}, 'mount_snapshot_support': False},
{'driver_data': {'size': 2, 'name': 'fake'},
'mount_snapshot_support': False},
{'driver_data': {'size': 3}, 'mount_snapshot_support': False},
{'driver_data': {'size': 3, 'export_locations': [
{'path': '/path1', 'is_admin_only': True},
{'path': '/path2', 'is_admin_only': False}
]}, 'mount_snapshot_support': False},
{'driver_data': {'size': 3, 'export_locations': [
{'path': '/path1', 'is_admin_only': True},
{'path': '/path2', 'is_admin_only': False}
]}, 'mount_snapshot_support': True})
@ddt.unpack
def test_manage_snapshot_valid_snapshot(
self, driver_data, mount_snapshot_support):
mock_get_share_server = self.mock_object(self.share_manager,
'_get_share_server',
mock.Mock(return_value=None))
@ -5032,15 +5070,19 @@ class ShareManagerTestCase(test.TestCase):
"manage_existing_snapshot",
mock.Mock(return_value=driver_data))
size = driver_data['size']
share = db_utils.create_share(size=size)
export_locations = driver_data.get('export_locations')
share = db_utils.create_share(
size=size,
mount_snapshot_support=mount_snapshot_support)
snapshot = db_utils.create_snapshot(share_id=share['id'], size=size)
snapshot_id = snapshot['id']
driver_options = {}
mock_get = self.mock_object(self.share_manager.db,
'share_snapshot_get',
mock.Mock(return_value=snapshot))
self.mock_object(self.share_manager.db,
'share_snapshot_instance_export_location_create')
mock_export_update = self.mock_object(
self.share_manager.db,
'share_snapshot_instance_export_location_create')
self.share_manager.manage_snapshot(self.context, snapshot_id,
driver_options)
@ -5056,6 +5098,18 @@ class ShareManagerTestCase(test.TestCase):
utils.IsAMatcher(context.RequestContext), snapshot['share'])
mock_get.assert_called_once_with(
utils.IsAMatcher(context.RequestContext), snapshot_id)
if mount_snapshot_support and export_locations:
snap_ins_id = snapshot.instance['id']
for i in range(0, 2):
export_locations[i]['share_snapshot_instance_id'] = snap_ins_id
mock_export_update.assert_has_calls([
mock.call(utils.IsAMatcher(context.RequestContext),
export_locations[0]),
mock.call(utils.IsAMatcher(context.RequestContext),
export_locations[1]),
])
else:
mock_export_update.assert_not_called()
def test_unmanage_snapshot_invalid_driver_mode(self):
self.mock_object(self.share_manager, 'driver')

View File

@ -0,0 +1,5 @@
---
fixes:
- Fixed snapshot export locations being created for
shares with property mount_snapshot_support=False.