Merge "Reject actions if target resource is locked"

This commit is contained in:
Zuul 2018-10-15 21:37:59 +00:00 committed by Gerrit Code Review
commit b83f22a22f
12 changed files with 274 additions and 2 deletions

View File

@ -56,6 +56,8 @@ class FaultWrapper(wsgi.Middleware):
'ProfileOperationFailed': webob.exc.HTTPInternalServerError, 'ProfileOperationFailed': webob.exc.HTTPInternalServerError,
'RequestLimitExceeded': webob.exc.HTTPBadRequest, 'RequestLimitExceeded': webob.exc.HTTPBadRequest,
'ResourceInUse': webob.exc.HTTPConflict, 'ResourceInUse': webob.exc.HTTPConflict,
'ResourceIsLocked': webob.exc.HTTPConflict,
'ActionConflict': webob.exc.HTTPConflict,
'ResourceNotFound': webob.exc.HTTPNotFound, 'ResourceNotFound': webob.exc.HTTPNotFound,
} }

View File

@ -128,6 +128,15 @@ class ResourceInUse(SenlinException):
msg_fmt = _("The %(type)s '%(id)s' cannot be deleted: %(reason)s.") msg_fmt = _("The %(type)s '%(id)s' cannot be deleted: %(reason)s.")
class ResourceIsLocked(SenlinException):
"""Generic exception for resource in use.
The resource type here can be 'cluster', 'node'.
"""
msg_fmt = _("%(action)s for %(type)s '%(id)s' cannot be completed "
"because it is already locked.")
class ProfileNotSpecified(SenlinException): class ProfileNotSpecified(SenlinException):
msg_fmt = _("Profile not specified.") msg_fmt = _("Profile not specified.")
@ -180,6 +189,11 @@ class ActionInProgress(SenlinException):
msg_fmt = _("The %(type)s '%(id)s' is in status %(status)s.") msg_fmt = _("The %(type)s '%(id)s' is in status %(status)s.")
class ActionConflict(SenlinException):
msg_fmt = _("The %(type)s action for target %(target)s conflicts with "
"the following action(s): %(actions)s")
class NodeNotOrphan(SenlinException): class NodeNotOrphan(SenlinException):
msg_fmt = _("%(message)s") msg_fmt = _("%(message)s")

View File

@ -129,6 +129,10 @@ def cluster_lock_acquire(cluster_id, action_id, scope):
return IMPL.cluster_lock_acquire(cluster_id, action_id, scope) return IMPL.cluster_lock_acquire(cluster_id, action_id, scope)
def cluster_is_locked(cluster_id):
return IMPL.cluster_is_locked(cluster_id)
def cluster_lock_release(cluster_id, action_id, scope): def cluster_lock_release(cluster_id, action_id, scope):
return IMPL.cluster_lock_release(cluster_id, action_id, scope) return IMPL.cluster_lock_release(cluster_id, action_id, scope)
@ -141,6 +145,10 @@ def node_lock_acquire(node_id, action_id):
return IMPL.node_lock_acquire(node_id, action_id) return IMPL.node_lock_acquire(node_id, action_id)
def node_is_locked(node_id):
return IMPL.node_is_locked(node_id)
def node_lock_release(node_id, action_id): def node_lock_release(node_id, action_id):
return IMPL.node_lock_release(node_id, action_id) return IMPL.node_lock_release(node_id, action_id)
@ -334,6 +342,11 @@ def action_get_all_by_owner(context, owner):
return IMPL.action_get_all_by_owner(context, owner) return IMPL.action_get_all_by_owner(context, owner)
def action_get_all_active_by_target(context, target_id, project_safe=True):
return IMPL.action_get_all_active_by_target(context, target_id,
project_safe=project_safe)
def action_get_all(context, filters=None, limit=None, marker=None, sort=None, def action_get_all(context, filters=None, limit=None, marker=None, sort=None,
project_safe=True): project_safe=True):
return IMPL.action_get_all(context, filters=filters, sort=sort, return IMPL.action_get_all(context, filters=filters, sort=sort,

View File

@ -462,6 +462,14 @@ def cluster_lock_acquire(cluster_id, action_id, scope):
return lock.action_ids return lock.action_ids
@retry_on_deadlock
def cluster_is_locked(cluster_id):
with session_for_read() as session:
query = session.query(models.ClusterLock)
lock = query.get(cluster_id)
return lock is not None
@retry_on_deadlock @retry_on_deadlock
def _release_cluster_lock(session, lock, action_id, scope): def _release_cluster_lock(session, lock, action_id, scope):
success = False success = False
@ -529,6 +537,15 @@ def node_lock_acquire(node_id, action_id):
return lock.action_id return lock.action_id
@retry_on_deadlock
def node_is_locked(node_id):
with session_for_read() as session:
query = session.query(models.NodeLock)
lock = query.get(node_id)
return lock is not None
@retry_on_deadlock @retry_on_deadlock
def node_lock_release(node_id, action_id): def node_lock_release(node_id, action_id):
with session_for_write() as session: with session_for_write() as session:
@ -1065,6 +1082,22 @@ def action_get_all_by_owner(context, owner_id):
return query.all() return query.all()
def action_get_all_active_by_target(context, target_id, project_safe=True):
with session_for_read() as session:
query = session.query(models.Action)
if project_safe:
query = query.filter_by(project=context.project_id)
query = query.filter_by(target=target_id)
query = query.filter(
models.Action.status.in_(
[consts.ACTION_READY,
consts.ACTION_WAITING,
consts.ACTION_RUNNING,
consts.ACTION_WAITING_LIFECYCLE_COMPLETION]))
actions = query.all()
return actions
def action_get_all(context, filters=None, limit=None, marker=None, sort=None, def action_get_all(context, filters=None, limit=None, marker=None, sort=None,
project_safe=True): project_safe=True):

View File

@ -11,6 +11,7 @@
# under the License. # under the License.
import copy import copy
import time
from oslo_utils import uuidutils from oslo_utils import uuidutils
@ -161,6 +162,8 @@ class NovaClient(base.DriverBase):
'zoneName': 'nova', 'zoneName': 'nova',
} }
self.simulated_waits = {}
def flavor_find(self, name_or_id, ignore_missing=False): def flavor_find(self, name_or_id, ignore_missing=False):
return sdk.FakeResourceObject(self.fake_flavor) return sdk.FakeResourceObject(self.fake_flavor)
@ -180,17 +183,33 @@ class NovaClient(base.DriverBase):
return sdk.FakeResourceObject(self.keypair) return sdk.FakeResourceObject(self.keypair)
def server_create(self, **attrs): def server_create(self, **attrs):
self.fake_server_create['id'] = uuidutils.generate_uuid() server_id = uuidutils.generate_uuid()
self.fake_server_get['id'] = self.fake_server_create['id'] self.fake_server_create['id'] = server_id
self.fake_server_get['id'] = server_id
# save simulated wait time if it was set in metadata
if ('metadata' in attrs and
'simulated_wait_time' in attrs['metadata']):
simulated_wait = attrs['metadata']['simulated_wait_time']
if (isinstance(simulated_wait, int) and simulated_wait > 0):
self.simulated_waits[server_id] = simulated_wait
return sdk.FakeResourceObject(self.fake_server_create) return sdk.FakeResourceObject(self.fake_server_create)
def server_get(self, server): def server_get(self, server):
return sdk.FakeResourceObject(self.fake_server_get) return sdk.FakeResourceObject(self.fake_server_get)
def wait_for_server(self, server, timeout=None): def wait_for_server(self, server, timeout=None):
# sleep for simulated wait time if it was supplied during server_create
if server in self.simulated_waits:
time.sleep(self.simulated_waits[server])
return return
def wait_for_server_delete(self, server, timeout=None): def wait_for_server_delete(self, server, timeout=None):
# sleep for simulated wait time if it was supplied during server_create
if server in self.simulated_waits:
time.sleep(self.simulated_waits[server])
del self.simulated_waits[server]
return return
def server_update(self, server, **attrs): def server_update(self, server, **attrs):

View File

@ -25,8 +25,10 @@ from senlin.common import utils
from senlin.engine import dispatcher from senlin.engine import dispatcher
from senlin.engine import event as EVENT from senlin.engine import event as EVENT
from senlin.objects import action as ao from senlin.objects import action as ao
from senlin.objects import cluster_lock as cl
from senlin.objects import cluster_policy as cpo from senlin.objects import cluster_policy as cpo
from senlin.objects import dependency as dobj from senlin.objects import dependency as dobj
from senlin.objects import node_lock as nl
from senlin.policies import base as policy_mod from senlin.policies import base as policy_mod
wallclock = time.time wallclock = time.time
@ -247,6 +249,21 @@ class Action(object):
:param dict kwargs: Other keyword arguments for the action. :param dict kwargs: Other keyword arguments for the action.
:return: ID of the action created. :return: ID of the action created.
""" """
if (action in list(consts.CLUSTER_ACTION_NAMES) and
cl.ClusterLock.is_locked(target)):
raise exception.ResourceIsLocked(
action=action, type='cluster', id=target)
elif (action in list(consts.NODE_ACTION_NAMES) and
nl.NodeLock.is_locked(target)):
raise exception.ResourceIsLocked(
action=action, type='node', id=target)
conflict_actions = ao.Action.get_all_active_by_target(ctx, target)
if conflict_actions:
action_ids = [a['id'] for a in conflict_actions]
raise exception.ActionConflict(
type=action, target=target, actions=",".join(action_ids))
params = { params = {
'user_id': ctx.user_id, 'user_id': ctx.user_id,
'project_id': ctx.project_id, 'project_id': ctx.project_id,
@ -256,6 +273,7 @@ class Action(object):
'trusts': ctx.trusts, 'trusts': ctx.trusts,
} }
c = req_context.RequestContext.from_dict(params) c = req_context.RequestContext.from_dict(params)
obj = cls(target, action, c, **kwargs) obj = cls(target, action, c, **kwargs)
return obj.store(ctx) return obj.store(ctx)

View File

@ -105,6 +105,11 @@ class Action(base.SenlinObject, base.VersionedObjectDictCompat):
objs = db_api.action_get_all_by_owner(context, owner) objs = db_api.action_get_all_by_owner(context, owner)
return [cls._from_db_object(context, cls(), obj) for obj in objs] return [cls._from_db_object(context, cls(), obj) for obj in objs]
@classmethod
def get_all_active_by_target(cls, context, target):
objs = db_api.action_get_all_active_by_target(context, target)
return [cls._from_db_object(context, cls(), obj) for obj in objs]
@classmethod @classmethod
def check_status(cls, context, action_id, timestamp): def check_status(cls, context, action_id, timestamp):
return db_api.action_check_status(context, action_id, timestamp) return db_api.action_check_status(context, action_id, timestamp)

View File

@ -31,6 +31,10 @@ class ClusterLock(base.SenlinObject, base.VersionedObjectDictCompat):
def acquire(cls, cluster_id, action_id, scope): def acquire(cls, cluster_id, action_id, scope):
return db_api.cluster_lock_acquire(cluster_id, action_id, scope) return db_api.cluster_lock_acquire(cluster_id, action_id, scope)
@classmethod
def is_locked(cls, cluster_id):
return db_api.cluster_is_locked(cluster_id)
@classmethod @classmethod
def release(cls, cluster_id, action_id, scope): def release(cls, cluster_id, action_id, scope):
return db_api.cluster_lock_release(cluster_id, action_id, scope) return db_api.cluster_lock_release(cluster_id, action_id, scope)

View File

@ -30,6 +30,10 @@ class NodeLock(base.SenlinObject, base.VersionedObjectDictCompat):
def acquire(cls, node_id, action_id): def acquire(cls, node_id, action_id):
return db_api.node_lock_acquire(node_id, action_id) return db_api.node_lock_acquire(node_id, action_id)
@classmethod
def is_locked(cls, cluster_id):
return db_api.node_is_locked(cluster_id)
@classmethod @classmethod
def release(cls, node_id, action_id): def release(cls, node_id, action_id):
return db_api.node_lock_release(node_id, action_id) return db_api.node_lock_release(node_id, action_id)

View File

@ -244,6 +244,32 @@ class DBAPIActionTest(base.SenlinTestCase):
for spec in specs: for spec in specs:
self.assertIn(spec['name'], names) self.assertIn(spec['name'], names)
def test_action_get_all_active_by_target(self):
specs = [
{'name': 'A01', 'target': 'cluster_001', 'status': 'READY'},
{'name': 'A02', 'target': 'node_001'},
{'name': 'A03', 'target': 'cluster_001', 'status': 'INIT'},
{'name': 'A04', 'target': 'cluster_001', 'status': 'WAITING'},
{'name': 'A05', 'target': 'cluster_001', 'status': 'READY'},
{'name': 'A06', 'target': 'cluster_001', 'status': 'RUNNING'},
{'name': 'A07', 'target': 'cluster_001', 'status': 'SUCCEEDED'},
{'name': 'A08', 'target': 'cluster_001', 'status': 'FAILED'},
{'name': 'A09', 'target': 'cluster_001', 'status': 'CANCELLED'},
{'name': 'A10', 'target': 'cluster_001',
'status': 'WAITING_LIFECYCLE_COMPLETION'},
{'name': 'A11', 'target': 'cluster_001', 'status': 'SUSPENDED'},
]
for spec in specs:
_create_action(self.ctx, **spec)
actions = db_api.action_get_all_active_by_target(self.ctx,
'cluster_001')
self.assertEqual(5, len(actions))
names = [p.name for p in actions]
for name in names:
self.assertIn(name, ['A01', 'A04', 'A05', 'A06', 'A10'])
def test_action_get_all_project_safe(self): def test_action_get_all_project_safe(self):
parser.simple_parse(shared.sample_action) parser.simple_parse(shared.sample_action)
_create_action(self.ctx) _create_action(self.ctx)

View File

@ -174,6 +174,27 @@ class DBAPILockTest(base.SenlinTestCase):
observed = db_api.cluster_lock_release(self.cluster.id, UUID1, -1) observed = db_api.cluster_lock_release(self.cluster.id, UUID1, -1)
self.assertTrue(observed) self.assertTrue(observed)
def test_cluster_is_locked(self):
# newly created cluster should not be locked
observed = db_api.cluster_is_locked(self.cluster.id)
self.assertFalse(observed)
# lock cluster
observed = db_api.cluster_lock_acquire(self.cluster.id, UUID1, -1)
self.assertIn(UUID1, observed)
# cluster should be locked
observed = db_api.cluster_is_locked(self.cluster.id)
self.assertTrue(observed)
# release cluster lock
observed = db_api.cluster_lock_release(self.cluster.id, UUID1, -1)
self.assertTrue(observed)
# cluster should not be locked anymore
observed = db_api.cluster_is_locked(self.cluster.id)
self.assertFalse(observed)
def test_node_lock_acquire_release(self): def test_node_lock_acquire_release(self):
observed = db_api.node_lock_acquire(self.node.id, UUID1) observed = db_api.node_lock_acquire(self.node.id, UUID1)
self.assertEqual(UUID1, observed) self.assertEqual(UUID1, observed)
@ -221,6 +242,27 @@ class DBAPILockTest(base.SenlinTestCase):
observed = db_api.node_lock_release(self.node.id, UUID2) observed = db_api.node_lock_release(self.node.id, UUID2)
self.assertTrue(observed) self.assertTrue(observed)
def test_node_is_locked(self):
# newly created node should not be locked
observed = db_api.node_is_locked(self.node.id)
self.assertFalse(observed)
# lock node
observed = db_api.node_lock_acquire(self.node.id, UUID1)
self.assertIn(UUID1, observed)
# node should be locked
observed = db_api.node_is_locked(self.node.id)
self.assertTrue(observed)
# release node lock
observed = db_api.node_lock_release(self.node.id, UUID1)
self.assertTrue(observed)
# node should not be locked anymore
observed = db_api.node_is_locked(self.node.id)
self.assertFalse(observed)
class GCByEngineTest(base.SenlinTestCase): class GCByEngineTest(base.SenlinTestCase):

View File

@ -29,8 +29,10 @@ from senlin.engine import environment
from senlin.engine import event as EVENT from senlin.engine import event as EVENT
from senlin.engine import node as node_mod from senlin.engine import node as node_mod
from senlin.objects import action as ao from senlin.objects import action as ao
from senlin.objects import cluster_lock as cl
from senlin.objects import cluster_policy as cpo from senlin.objects import cluster_policy as cpo
from senlin.objects import dependency as dobj from senlin.objects import dependency as dobj
from senlin.objects import node_lock as nl
from senlin.policies import base as policy_mod from senlin.policies import base as policy_mod
from senlin.tests.unit.common import base from senlin.tests.unit.common import base
from senlin.tests.unit.common import utils from senlin.tests.unit.common import utils
@ -245,6 +247,96 @@ class ActionBaseTest(base.SenlinTestCase):
self.assertEqual('FAKE_ID', result) self.assertEqual('FAKE_ID', result)
mock_store.assert_called_once_with(self.ctx) mock_store.assert_called_once_with(self.ctx)
@mock.patch.object(ab.Action, 'store')
@mock.patch.object(ao.Action, 'get_all_active_by_target')
@mock.patch.object(cl.ClusterLock, 'is_locked')
def test_action_create_lock_cluster_false(self, mock_lock,
mock_active, mock_store):
mock_store.return_value = 'FAKE_ID'
mock_active.return_value = None
mock_lock.return_value = False
result = ab.Action.create(self.ctx, OBJID, 'CLUSTER_CREATE',
name='test')
self.assertEqual('FAKE_ID', result)
mock_store.assert_called_once_with(self.ctx)
mock_active.assert_called_once_with(mock.ANY, OBJID)
@mock.patch.object(ab.Action, 'store')
@mock.patch.object(ao.Action, 'get_all_active_by_target')
@mock.patch.object(cl.ClusterLock, 'is_locked')
def test_action_create_lock_cluster_true(self, mock_lock,
mock_active, mock_store):
mock_store.return_value = 'FAKE_ID'
mock_active.return_value = None
mock_lock.return_value = True
error_message = (
'CLUSTER_CREATE for cluster \'{}\' cannot be completed because '
'it is already locked.').format(OBJID)
with self.assertRaisesRegexp(exception.ResourceIsLocked,
error_message):
ab.Action.create(self.ctx, OBJID, 'CLUSTER_CREATE', name='test')
mock_store.assert_not_called()
mock_active.assert_not_called()
@mock.patch.object(ab.Action, 'store')
@mock.patch.object(ao.Action, 'get_all_active_by_target')
@mock.patch.object(nl.NodeLock, 'is_locked')
def test_action_create_lock_node_false(self, mock_lock,
mock_active, mock_store):
mock_store.return_value = 'FAKE_ID'
mock_active.return_value = None
mock_lock.return_value = False
result = ab.Action.create(self.ctx, OBJID, 'NODE_CREATE',
name='test')
self.assertEqual('FAKE_ID', result)
mock_store.assert_called_once_with(self.ctx)
mock_active.assert_called_once_with(mock.ANY, OBJID)
@mock.patch.object(ab.Action, 'store')
@mock.patch.object(ao.Action, 'get_all_active_by_target')
@mock.patch.object(nl.NodeLock, 'is_locked')
def test_action_create_lock_node_true(self, mock_lock, mock_active,
mock_store):
mock_store.return_value = 'FAKE_ID'
mock_active.return_value = None
mock_lock.return_value = True
error_message = (
'NODE_CREATE for node \'{}\' cannot be completed because '
'it is already locked.').format(OBJID)
with self.assertRaisesRegexp(exception.ResourceIsLocked,
error_message):
ab.Action.create(self.ctx, OBJID, 'NODE_CREATE', name='test')
mock_store.assert_not_called()
mock_active.assert_not_called()
@mock.patch.object(ab.Action, 'store')
@mock.patch.object(ao.Action, 'get_all_active_by_target')
@mock.patch.object(cl.ClusterLock, 'is_locked')
def test_action_create_conflict(self, mock_lock, mock_active, mock_store):
mock_store.return_value = 'FAKE_ID'
uuid1 = 'ce982cd5-26da-4e2c-84e5-be8f720b7478'
uuid2 = 'ce982cd5-26da-4e2c-84e5-be8f720b7479'
mock_active.return_value = [ao.Action(id=uuid1), ao.Action(id=uuid2)]
mock_lock.return_value = False
error_message = (
'The NODE_CREATE action for target {} conflicts with the following'
' action\(s\): {},{}').format(OBJID, uuid1, uuid2)
with self.assertRaisesRegexp(exception.ActionConflict,
error_message):
ab.Action.create(self.ctx, OBJID, 'NODE_CREATE', name='test')
mock_store.assert_not_called()
mock_active.assert_called_once_with(mock.ANY, OBJID)
def test_action_delete(self): def test_action_delete(self):
result = ab.Action.delete(self.ctx, 'non-existent') result = ab.Action.delete(self.ctx, 'non-existent')
self.assertIsNone(result) self.assertIsNone(result)