Fix update_access concurrency issue
With the new update_access interface, concurrent requests to allow access might get lost when the driver takes a long time to process the new rules. This patch fixes this issue by verifying for the existence of new unprocessed rules at the end of the update_access method. APIImpact access_allow action on shares with access_rules_status 'out_of_sync' was previously disallowed with HTTPBadRequest. Now, the action is disallowed only for shares with access_rules_status set to 'error'. Change-Id: I0f863cbae4d8af0660114161deda7bf7aa60d71d Closes-bug: #1550295
This commit is contained in:
parent
f5a537f039
commit
70f59f7b7c
@ -23,6 +23,8 @@ STATUS_AVAILABLE = 'available'
|
||||
STATUS_ACTIVE = 'active'
|
||||
STATUS_INACTIVE = 'inactive'
|
||||
STATUS_OUT_OF_SYNC = 'out_of_sync'
|
||||
STATUS_UPDATING = 'updating'
|
||||
STATUS_UPDATING_MULTIPLE = 'updating_multiple'
|
||||
STATUS_MANAGING = 'manage_starting'
|
||||
STATUS_MANAGE_ERROR = 'manage_error'
|
||||
STATUS_UNMANAGING = 'unmanage_starting'
|
||||
@ -79,6 +81,11 @@ TRANSITIONAL_STATUSES = (
|
||||
STATUS_MIGRATING, STATUS_MIGRATING_TO,
|
||||
)
|
||||
|
||||
UPDATING_RULES_STATUSES = (
|
||||
STATUS_UPDATING,
|
||||
STATUS_UPDATING_MULTIPLE,
|
||||
)
|
||||
|
||||
SUPPORTED_SHARE_PROTOCOLS = (
|
||||
'NFS', 'CIFS', 'GLUSTERFS', 'HDFS', 'CEPHFS')
|
||||
|
||||
|
@ -370,11 +370,15 @@ class ShareInstance(BASE, ManilaBase):
|
||||
ACCESS_STATUS_PRIORITIES = {
|
||||
constants.STATUS_ACTIVE: 0,
|
||||
constants.STATUS_OUT_OF_SYNC: 1,
|
||||
constants.STATUS_ERROR: 2,
|
||||
constants.STATUS_UPDATING: 2,
|
||||
constants.STATUS_UPDATING_MULTIPLE: 3,
|
||||
constants.STATUS_ERROR: 4,
|
||||
}
|
||||
|
||||
access_rules_status = Column(Enum(constants.STATUS_ACTIVE,
|
||||
constants.STATUS_OUT_OF_SYNC,
|
||||
constants.STATUS_UPDATING,
|
||||
constants.STATUS_UPDATING_MULTIPLE,
|
||||
constants.STATUS_ERROR),
|
||||
default=constants.STATUS_ACTIVE)
|
||||
|
||||
@ -554,7 +558,10 @@ class ShareAccessMapping(BASE, ManilaBase):
|
||||
instances = [im.instance for im in self.instance_mappings]
|
||||
access_rules_status = get_access_rules_status(instances)
|
||||
|
||||
if access_rules_status == constants.STATUS_OUT_OF_SYNC:
|
||||
if access_rules_status in (
|
||||
constants.STATUS_OUT_OF_SYNC,
|
||||
constants.STATUS_UPDATING,
|
||||
constants.STATUS_UPDATING_MULTIPLE):
|
||||
return constants.STATUS_NEW
|
||||
else:
|
||||
return access_rules_status
|
||||
|
@ -41,6 +41,12 @@ class ShareInstanceAccess(object):
|
||||
be deleted.
|
||||
:param share_server: Share server model or None
|
||||
"""
|
||||
self.db.share_instance_update_access_status(
|
||||
context,
|
||||
share_instance_id,
|
||||
constants.STATUS_UPDATING
|
||||
)
|
||||
|
||||
share_instance = self.db.share_instance_get(
|
||||
context, share_instance_id, with_share_data=True)
|
||||
|
||||
@ -56,8 +62,8 @@ class ShareInstanceAccess(object):
|
||||
context, share_instance['id'])
|
||||
rules = []
|
||||
else:
|
||||
rules = self.db.share_access_get_all_for_share(
|
||||
context, share_instance['share_id'])
|
||||
rules = self.db.share_access_get_all_for_instance(
|
||||
context, share_instance['id'])
|
||||
if delete_rules:
|
||||
delete_ids = [rule['id'] for rule in delete_rules]
|
||||
rules = list(filter(lambda r: r['id'] not in delete_ids,
|
||||
@ -98,15 +104,33 @@ class ShareInstanceAccess(object):
|
||||
|
||||
self._remove_access_rules(context, delete_rules, share_instance['id'])
|
||||
|
||||
self.db.share_instance_update_access_status(
|
||||
context,
|
||||
share_instance['id'],
|
||||
constants.STATUS_ACTIVE
|
||||
)
|
||||
share_instance = self.db.share_instance_get(context, share_instance_id,
|
||||
with_share_data=True)
|
||||
|
||||
LOG.info(_LI("Access rules were successfully applied for "
|
||||
"share instance: %s"),
|
||||
share_instance['id'])
|
||||
if self._check_needs_refresh(context, rules, share_instance):
|
||||
self.update_access_rules(context, share_instance_id,
|
||||
share_server=share_server)
|
||||
else:
|
||||
self.db.share_instance_update_access_status(
|
||||
context,
|
||||
share_instance['id'],
|
||||
constants.STATUS_ACTIVE
|
||||
)
|
||||
|
||||
LOG.info(_LI("Access rules were successfully applied for "
|
||||
"share instance: %s"),
|
||||
share_instance['id'])
|
||||
|
||||
def _check_needs_refresh(self, context, rules, share_instance):
|
||||
rule_ids = set([rule['id'] for rule in rules])
|
||||
queried_rules = self.db.share_access_get_all_for_instance(
|
||||
context, share_instance['id'])
|
||||
queried_ids = set([rule['id'] for rule in queried_rules])
|
||||
|
||||
access_rules_status = share_instance['access_rules_status']
|
||||
|
||||
return (access_rules_status == constants.STATUS_UPDATING_MULTIPLE or
|
||||
rule_ids != queried_ids)
|
||||
|
||||
def _update_access_fallback(self, add_rules, context, delete_rules,
|
||||
remove_rules, share_instance, share_server):
|
||||
|
@ -1122,23 +1122,32 @@ class API(base.Base):
|
||||
msg = _("Invalid share instance host: %s") % share_instance['host']
|
||||
raise exception.InvalidShareInstance(reason=msg)
|
||||
|
||||
if share_instance['access_rules_status'] != constants.STATUS_ACTIVE:
|
||||
status = share_instance['access_rules_status']
|
||||
msg = _("Share instance should have '%(valid_status)s' "
|
||||
"access rules status, but current status is: "
|
||||
"%(status)s.") % {
|
||||
'valid_status': constants.STATUS_ACTIVE,
|
||||
status = share_instance['access_rules_status']
|
||||
|
||||
if status == constants.STATUS_ERROR:
|
||||
values = {
|
||||
'instance_id': share_instance['id'],
|
||||
'status': status,
|
||||
'valid_status': constants.STATUS_ACTIVE
|
||||
}
|
||||
msg = _("Share instance %(instance_id)s access rules status is: "
|
||||
"%(status)s. Please remove any incorrect rules to get it "
|
||||
"back to %(valid_status)s.") % values
|
||||
|
||||
raise exception.InvalidShareInstance(reason=msg)
|
||||
else:
|
||||
if status == constants.STATUS_ACTIVE:
|
||||
self.db.share_instance_update_access_status(
|
||||
context, share_instance['id'],
|
||||
constants.STATUS_OUT_OF_SYNC
|
||||
)
|
||||
elif status == constants.STATUS_UPDATING:
|
||||
self.db.share_instance_update_access_status(
|
||||
context, share_instance['id'],
|
||||
constants.STATUS_UPDATING_MULTIPLE
|
||||
)
|
||||
|
||||
self.db.share_instance_update_access_status(
|
||||
context, share_instance['id'],
|
||||
constants.STATUS_OUT_OF_SYNC
|
||||
)
|
||||
|
||||
self.share_rpcapi.allow_access(context, share_instance, access)
|
||||
self.share_rpcapi.allow_access(context, share_instance, access)
|
||||
|
||||
def deny_access(self, ctx, share, access):
|
||||
"""Deny access to share."""
|
||||
@ -1168,10 +1177,17 @@ class API(base.Base):
|
||||
msg = _("Invalid share instance host: %s") % share_instance['host']
|
||||
raise exception.InvalidShareInstance(reason=msg)
|
||||
|
||||
if share_instance['access_rules_status'] != constants.STATUS_ERROR:
|
||||
status = share_instance['access_rules_status']
|
||||
|
||||
if status != constants.STATUS_ERROR:
|
||||
new_status = constants.STATUS_OUT_OF_SYNC
|
||||
|
||||
if status in constants.UPDATING_RULES_STATUSES:
|
||||
new_status = constants.STATUS_UPDATING_MULTIPLE
|
||||
|
||||
self.db.share_instance_update_access_status(
|
||||
context, share_instance['id'],
|
||||
constants.STATUS_OUT_OF_SYNC)
|
||||
new_status)
|
||||
|
||||
self.share_rpcapi.deny_access(context, share_instance, access)
|
||||
|
||||
|
@ -1849,18 +1849,23 @@ class ShareManager(manager.SchedulerDependentManager):
|
||||
@utils.require_driver_initialized
|
||||
def allow_access(self, context, share_instance_id, access_rules):
|
||||
"""Allow access to some share instance."""
|
||||
add_rules = [self.db.share_access_get(context, rule_id)
|
||||
for rule_id in access_rules]
|
||||
|
||||
share_instance = self._get_share_instance(context, share_instance_id)
|
||||
share_server = self._get_share_server(context, share_instance)
|
||||
status = share_instance['access_rules_status']
|
||||
|
||||
return self.access_helper.update_access_rules(
|
||||
context,
|
||||
share_instance_id,
|
||||
add_rules=add_rules,
|
||||
share_server=share_server
|
||||
)
|
||||
if status not in (constants.STATUS_UPDATING,
|
||||
constants.STATUS_UPDATING_MULTIPLE,
|
||||
constants.STATUS_ACTIVE):
|
||||
add_rules = [self.db.share_access_get(context, rule_id)
|
||||
for rule_id in access_rules]
|
||||
|
||||
share_server = self._get_share_server(context, share_instance)
|
||||
|
||||
return self.access_helper.update_access_rules(
|
||||
context,
|
||||
share_instance_id,
|
||||
add_rules=add_rules,
|
||||
share_server=share_server
|
||||
)
|
||||
|
||||
@add_hooks
|
||||
@utils.require_driver_initialized
|
||||
|
@ -65,6 +65,8 @@ class ShareInstanceAccessTestCase(test.TestCase):
|
||||
return_value=self.share_instance))
|
||||
self.mock_object(db, "share_access_get_all_for_share",
|
||||
mock.Mock(return_value=original_rules))
|
||||
self.mock_object(db, "share_access_get_all_for_instance",
|
||||
mock.Mock(return_value=original_rules))
|
||||
self.mock_object(db, "share_instance_update_access_status",
|
||||
mock.Mock())
|
||||
self.mock_object(self.driver, "update_access",
|
||||
@ -117,3 +119,30 @@ class ShareInstanceAccessTestCase(test.TestCase):
|
||||
|
||||
db.share_instance_update_access_status.assert_called_with(
|
||||
self.context, self.share_instance['id'], constants.STATUS_ERROR)
|
||||
|
||||
def test_update_access_rules_recursive_call(self):
|
||||
share_instance = db_utils.create_share_instance(
|
||||
access_rules_status=constants.STATUS_ACTIVE,
|
||||
share_id=self.share['id'])
|
||||
add_rules = [db_utils.create_access(
|
||||
share_id=self.share['id'])]
|
||||
original_rules = []
|
||||
|
||||
self.mock_object(db, "share_instance_get", mock.Mock(
|
||||
return_value=share_instance))
|
||||
self.mock_object(db, "share_access_get_all_for_instance",
|
||||
mock.Mock(return_value=original_rules))
|
||||
mock_update_access = self.mock_object(self.driver, "update_access")
|
||||
self.mock_object(self.share_access_helper, '_check_needs_refresh',
|
||||
mock.Mock(side_effect=[True, False]))
|
||||
|
||||
self.share_access_helper.update_access_rules(self.context,
|
||||
share_instance['id'],
|
||||
add_rules=add_rules)
|
||||
|
||||
mock_update_access.assert_has_calls([
|
||||
mock.call(self.context, share_instance, original_rules,
|
||||
add_rules=add_rules, delete_rules=[], share_server=None),
|
||||
mock.call(self.context, share_instance, original_rules,
|
||||
add_rules=[], delete_rules=[], share_server=None)
|
||||
])
|
||||
|
@ -1378,15 +1378,18 @@ class ShareAPITestCase(test.TestCase):
|
||||
self.assertRaises(exception.InvalidShare, self.api.allow_access,
|
||||
self.context, share, 'fakeacctype', 'fakeaccto')
|
||||
|
||||
def test_allow_access_to_instance(self):
|
||||
@ddt.data(constants.STATUS_ACTIVE, constants.STATUS_UPDATING)
|
||||
def test_allow_access_to_instance(self, status):
|
||||
share = db_utils.create_share(host='fake')
|
||||
share_instance = db_utils.create_share_instance(
|
||||
share_id=share['id'], access_rules_status=status, host='fake')
|
||||
access = db_utils.create_access(share_id=share['id'])
|
||||
rpc_method = self.mock_object(self.api.share_rpcapi, 'allow_access')
|
||||
|
||||
self.api.allow_access_to_instance(self.context, share.instance, access)
|
||||
self.api.allow_access_to_instance(self.context, share_instance, access)
|
||||
|
||||
rpc_method.assert_called_once_with(
|
||||
self.context, share.instance, access)
|
||||
self.context, share_instance, access)
|
||||
|
||||
def test_allow_access_to_instance_exception(self):
|
||||
share = db_utils.create_share(host='fake')
|
||||
@ -1398,22 +1401,42 @@ class ShareAPITestCase(test.TestCase):
|
||||
self.api.allow_access_to_instance, self.context,
|
||||
share.instance, access)
|
||||
|
||||
def test_deny_access_to_instance(self):
|
||||
def test_allow_access_to_instance_out_of_sync(self):
|
||||
share = db_utils.create_share(host='fake')
|
||||
access = db_utils.create_access(share_id=share['id'])
|
||||
rpc_method = self.mock_object(self.api.share_rpcapi, 'allow_access')
|
||||
|
||||
share.instance['access_rules_status'] = constants.STATUS_OUT_OF_SYNC
|
||||
|
||||
self.api.allow_access_to_instance(self.context, share.instance, access)
|
||||
rpc_method.assert_called_once_with(
|
||||
self.context, share.instance, access)
|
||||
|
||||
@ddt.data(constants.STATUS_ACTIVE, constants.STATUS_UPDATING,
|
||||
constants.STATUS_UPDATING_MULTIPLE)
|
||||
def test_deny_access_to_instance(self, status):
|
||||
share = db_utils.create_share(host='fake')
|
||||
share_instance = db_utils.create_share_instance(
|
||||
share_id=share['id'], access_rules_status=status, host='fake')
|
||||
access = db_utils.create_access(share_id=share['id'])
|
||||
rpc_method = self.mock_object(self.api.share_rpcapi, 'deny_access')
|
||||
self.mock_object(db_api, 'share_instance_access_get',
|
||||
mock.Mock(return_value=access.instance_mappings[0]))
|
||||
self.mock_object(db_api, 'share_instance_update_access_status')
|
||||
|
||||
self.api.deny_access_to_instance(self.context, share.instance, access)
|
||||
self.api.deny_access_to_instance(self.context, share_instance, access)
|
||||
|
||||
if status == constants.STATUS_ACTIVE:
|
||||
expected_new_status = constants.STATUS_OUT_OF_SYNC
|
||||
else:
|
||||
expected_new_status = constants.STATUS_UPDATING_MULTIPLE
|
||||
|
||||
rpc_method.assert_called_once_with(
|
||||
self.context, share.instance, access)
|
||||
self.context, share_instance, access)
|
||||
db_api.share_instance_update_access_status.assert_called_once_with(
|
||||
self.context,
|
||||
share.instance['id'],
|
||||
constants.STATUS_OUT_OF_SYNC
|
||||
share_instance['id'],
|
||||
expected_new_status
|
||||
)
|
||||
|
||||
@ddt.data('allow_access_to_instance', 'deny_access_to_instance')
|
||||
|
@ -2195,6 +2195,10 @@ class ShareManagerTestCase(test.TestCase):
|
||||
self.share_manager.driver, 'delete_share',
|
||||
mock.Mock(side_effect=exception.ShareResourceNotFound(
|
||||
share_id=share['id'])))
|
||||
self.mock_object(
|
||||
self.share_manager.access_helper, '_check_needs_refresh',
|
||||
mock.Mock(return_value=False)
|
||||
)
|
||||
|
||||
self.mock_object(manager.LOG, 'warning')
|
||||
|
||||
@ -2215,22 +2219,28 @@ class ShareManagerTestCase(test.TestCase):
|
||||
|
||||
share = db_utils.create_share()
|
||||
share_id = share['id']
|
||||
access = db_utils.create_access(share_id=share_id)
|
||||
share_instance = db_utils.create_share_instance(
|
||||
share_id=share_id,
|
||||
access_rules_status=constants.STATUS_OUT_OF_SYNC)
|
||||
share_instance_id = share_instance['id']
|
||||
access = db_utils.create_access(share_id=share_id,
|
||||
share_instance_id=share_instance_id)
|
||||
access_id = access['id']
|
||||
self.share_manager.allow_access(self.context, share.instance['id'],
|
||||
|
||||
self.share_manager.allow_access(self.context, share_instance_id,
|
||||
[access_id])
|
||||
self.assertEqual('active', db.share_instance_get(
|
||||
self.context, share.instance['id']).access_rules_status)
|
||||
self.context, share_instance_id).access_rules_status)
|
||||
|
||||
share_access.LOG.info.assert_called_with(mock.ANY,
|
||||
share.instance['id'])
|
||||
share_instance_id)
|
||||
share_access.LOG.info.reset_mock()
|
||||
|
||||
self.share_manager.deny_access(self.context, share.instance['id'],
|
||||
self.share_manager.deny_access(self.context, share_instance_id,
|
||||
[access_id])
|
||||
|
||||
share_access.LOG.info.assert_called_with(mock.ANY,
|
||||
share.instance['id'])
|
||||
share_instance_id)
|
||||
share_access.LOG.info.reset_mock()
|
||||
|
||||
def test_allow_deny_access_error(self):
|
||||
@ -2249,14 +2259,19 @@ class ShareManagerTestCase(test.TestCase):
|
||||
|
||||
share = db_utils.create_share()
|
||||
share_id = share['id']
|
||||
access = db_utils.create_access(share_id=share_id)
|
||||
share_instance = db_utils.create_share_instance(
|
||||
share_id=share_id,
|
||||
access_rules_status=constants.STATUS_OUT_OF_SYNC)
|
||||
share_instance_id = share_instance['id']
|
||||
access = db_utils.create_access(share_id=share_id,
|
||||
share_instance_id=share_instance_id)
|
||||
access_id = access['id']
|
||||
|
||||
def validate(method):
|
||||
self.assertRaises(exception.ManilaException, method, self.context,
|
||||
share.instance['id'], [access_id])
|
||||
share_instance_id, [access_id])
|
||||
|
||||
inst = db.share_instance_get(self.context, share.instance['id'])
|
||||
inst = db.share_instance_get(self.context, share_instance_id)
|
||||
self.assertEqual(constants.STATUS_ERROR,
|
||||
inst['access_rules_status'])
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user