Fix concurrent usage of update_access method for share instances

In mitaka, update_access merged with a known concurrency issue.
This concurrency has been randomly failing in our CIs. This change
adds a lock mechanism to prevent that, while rules are being removed
by a thread, a parallel thread adds back or handle the same rules
that are being removed.

Also, a late mitaka update_access patch [1] broke share migration access
rules consistency, thus leaving stale access rule data on share server
that hosted the share prior to its migration. This patch addresses this
by preventing the refresh mechanism from adding back rules
that are removed.

[1] I0f863cbae4d8af0660114161deda7bf7aa60d71d

Change-Id: Ief3b15eefc0fc325a2a5418fc7ac2724c315cc21
Co-Authored-By: Rodrigo Barbieri <rodrigo.barbieri2010@gmail.com>
Co-Authored-By: Goutham Pacha Ravi <gouthamr@netapp.com>
Closes-Bug: #1566815
Closes-Bug: #1609414
This commit is contained in:
Valeriy Ponomaryov 2016-08-02 20:27:46 +03:00 committed by Rodrigo Barbieri
parent 426646ef63
commit d878826cf3
4 changed files with 38 additions and 13 deletions

View File

@ -38,6 +38,7 @@ from oslo_log import log
from oslo_utils import timeutils
from oslo_utils import uuidutils
import six
from sqlalchemy import and_
from sqlalchemy import or_
from sqlalchemy.orm import joinedload
from sqlalchemy.sql.expression import true
@ -1666,7 +1667,7 @@ def _share_access_get_query(context, session, values, read_deleted='no'):
def _share_instance_access_query(context, session, access_id=None,
instance_id=None):
filters = {}
filters = {'deleted': 'False'}
if access_id is not None:
filters.update({'access_id': access_id})
@ -1765,9 +1766,10 @@ def share_access_get_all_for_instance(context, instance_id, session=None):
return _share_access_get_query(context, session, {}).join(
models.ShareInstanceAccessMapping,
models.ShareInstanceAccessMapping.access_id ==
models.ShareAccessMapping.id).filter(
models.ShareInstanceAccessMapping.share_instance_id ==
instance_id).all()
models.ShareAccessMapping.id).filter(and_(
models.ShareInstanceAccessMapping.share_instance_id ==
instance_id, models.ShareInstanceAccessMapping.deleted ==
"False")).all()
@require_context

View File

@ -18,6 +18,7 @@ import six
from manila.common import constants
from manila.i18n import _LI
from manila import utils
LOG = log.getLogger(__name__)
@ -41,6 +42,26 @@ class ShareInstanceAccess(object):
be deleted.
:param share_server: Share server model or None
"""
share_instance = self.db.share_instance_get(
context, share_instance_id, with_share_data=True)
share_id = share_instance["share_id"]
@utils.synchronized(
"update_access_rules_for_share_%s" % share_id, external=True)
def _update_access_rules_locked(*args, **kwargs):
return self._update_access_rules(*args, **kwargs)
_update_access_rules_locked(
context=context,
share_instance_id=share_instance_id,
add_rules=add_rules,
delete_rules=delete_rules,
share_server=share_server,
)
def _update_access_rules(self, context, share_instance_id, add_rules=None,
delete_rules=None, share_server=None):
# Reget share instance
share_instance = self.db.share_instance_get(
context, share_instance_id, with_share_data=True)
@ -63,8 +84,9 @@ class ShareInstanceAccess(object):
context, share_instance['id'])
rules = []
else:
rules = self.db.share_access_get_all_for_instance(
_rules = self.db.share_access_get_all_for_instance(
context, share_instance['id'])
rules = _rules
if delete_rules:
delete_ids = [rule['id'] for rule in delete_rules]
rules = list(filter(lambda r: r['id'] not in delete_ids,
@ -72,7 +94,9 @@ class ShareInstanceAccess(object):
# NOTE(ganso): trigger maintenance mode
if share_instance['access_rules_status'] == (
constants.STATUS_ERROR):
remove_rules = delete_rules
remove_rules = [
rule for rule in _rules
if rule["id"] in delete_ids]
delete_rules = []
try:
@ -109,8 +133,8 @@ class ShareInstanceAccess(object):
with_share_data=True)
if self._check_needs_refresh(context, rules, share_instance):
self.update_access_rules(context, share_instance_id,
share_server=share_server)
self._update_access_rules(context, share_instance_id,
share_server=share_server)
else:
self.db.share_instance_update_access_status(
context,

View File

@ -136,7 +136,9 @@ def locked_share_replica_operation(operation):
def wrapped(*args, **kwargs):
share_id = kwargs.get('share_id')
@utils.synchronized("%s" % share_id, external=True)
@utils.synchronized(
"locked_share_replica_operation_by_share_%s" % share_id,
external=True)
def locked_operation(*_args, **_kwargs):
return operation(*_args, **_kwargs)
return locked_operation(*args, **kwargs)

View File

@ -116,10 +116,7 @@ class ShareInstanceAccessTestCase(test.TestCase):
self.share_instance,
add_rules[0],
share_server=None)
self.driver.deny_access.assert_called_with(self.context,
self.share_instance,
delete_rules[0],
share_server=None)
self.assertFalse(self.driver.deny_access.called)
db.share_instance_update_access_status.assert_called_with(
self.context, self.share_instance['id'], constants.STATUS_ACTIVE)