Make NODE_DELETE operation respect grace_period

This patch fixes the logic of NODE_DELETE operation when it is
originated from a RPC request. If a node is a member of a cluster, we
should respect the grace_period setting of the deletion policy, when
there exists one.

This patch also ensures that policy checking is only done when a node
action is originated from RPC. For derived actions, we don't do policy
checking twice.

Change-Id: I287512801e1f2b5d8bccd6f26b5bd24cb3c589d6
This commit is contained in:
tengqm 2016-07-21 05:24:15 -04:00
parent 55b6b76c29
commit 69480fc3cf
4 changed files with 180 additions and 69 deletions

View File

@ -70,6 +70,10 @@ class ClusterAction(base.Action):
except Exception:
self.cluster = None
def _sleep(self, period):
if period:
eventlet.sleep(period)
def _wait_for_dependents(self):
"""Wait for dependent actions to complete.
@ -300,9 +304,6 @@ class ClusterAction(base.Action):
return self.RES_OK, ''
def _wait_before_deletion(self, period):
eventlet.sleep(period)
def do_delete(self):
"""Handler for the CLUSTER_DELETE action.
@ -448,8 +449,7 @@ class ClusterAction(base.Action):
if len(nodes) == 0:
return self.RES_OK, reason
if grace_period:
self._wait_before_deletion(grace_period)
self._sleep(grace_period)
result, new_reason = self._delete_nodes(nodes)
if result != self.RES_OK:
@ -581,7 +581,7 @@ class ClusterAction(base.Action):
node_list = self.cluster.nodes
current_size = len(node_list)
count, desired, candidates = self._get_action_data(current_size)
grace_period = None
grace_period = 0
# if policy is attached to the cluster, use policy data directly,
# or parse resize params to get action data.
if count == 0:
@ -593,15 +593,14 @@ class ClusterAction(base.Action):
return result, reason
count, desired, candidates = self._get_action_data(current_size)
elif 'deletion' in self.data:
grace_period = self.data['deletion'].get('grace_period', None)
grace_period = self.data['deletion'].get('grace_period', 0)
if candidates is not None and len(candidates) == 0:
# Choose victims randomly
candidates = scaleutils.nodes_by_random(self.cluster.nodes, count)
# delete nodes if necessary
if desired < current_size:
if grace_period is not None:
self._wait_before_deletion(grace_period)
self._sleep(grace_period)
result, reason = self._delete_nodes(candidates)
# Create new nodes if desired_capacity increased
else:
@ -685,8 +684,8 @@ class ClusterAction(base.Action):
# We use policy data if any, deletion policy and scaling policy might
# be attached.
pd = self.data.get('deletion', None)
grace_period = None
if pd is not None:
grace_period = 0
if pd:
grace_period = pd.get('grace_period', 0)
candidates = pd.get('candidates', [])
# if scaling policy is attached, get 'count' from action data
@ -727,8 +726,7 @@ class ClusterAction(base.Action):
if len(candidates) == 0:
candidates = scaleutils.nodes_by_random(self.cluster.nodes, count)
if grace_period is not None:
self._wait_before_deletion(grace_period)
self._sleep(grace_period)
# The policy data may contain destroy flag and grace period option
result, reason = self._delete_nodes(candidates)

View File

@ -10,14 +10,16 @@
# License for the specific language governing permissions and limitations
# under the License.
import eventlet
from senlin.common.i18n import _
from senlin.common import scaleutils
from senlin.engine.actions import base
from senlin.engine import cluster as cluster_mod
from senlin.engine import cluster as cm
from senlin.engine import event as EVENT
from senlin.engine import node as node_mod
from senlin.engine import senlin_lock
from senlin.policies import base as policy_mod
from senlin.policies import base as pb
class NodeAction(base.Action):
@ -58,8 +60,7 @@ class NodeAction(base.Action):
if self.node.cluster_id and self.cause == base.CAUSE_RPC:
# If node is created with target cluster specified,
# check cluster size constraint
cluster = cluster_mod.Cluster.load(self.context,
self.node.cluster_id)
cluster = cm.Cluster.load(self.context, self.node.cluster_id)
result = scaleutils.check_size_params(
cluster, cluster.desired_capacity + 1, None, None, True)
@ -96,14 +97,20 @@ class NodeAction(base.Action):
if self.node.cluster_id and self.cause == base.CAUSE_RPC:
# If node belongs to a cluster, check size constraint
# before deleting it
cluster = cluster_mod.Cluster.load(self.context,
self.node.cluster_id)
cluster = cm.Cluster.load(self.context, self.node.cluster_id)
result = scaleutils.check_size_params(cluster,
cluster.desired_capacity - 1,
None, None, True)
if result:
return self.RES_ERROR, result
# handle grace_period
pd = self.data.get('deletion', None)
if pd:
grace_period = pd.get('grace_period', 0)
if grace_period:
eventlet.sleep(grace_period)
res = self.node.do_delete(self.context)
if not res:
return self.RES_ERROR, _('Node deletion failed.')
@ -139,7 +146,7 @@ class NodeAction(base.Action):
"""
cluster_id = self.inputs.get('cluster_id')
# Check the size constraint of parent cluster
cluster = cluster_mod.Cluster.load(self.context, cluster_id)
cluster = cm.Cluster.load(self.context, cluster_id)
new_capacity = cluster.desired_capacity + 1
result = scaleutils.check_size_params(cluster, new_capacity,
None, None, True)
@ -159,8 +166,7 @@ class NodeAction(base.Action):
:returns: A tuple containing the result and the corresponding reason.
"""
# Check the size constraint of parent cluster
cluster = cluster_mod.Cluster.load(self.context,
self.node.cluster_id)
cluster = cm.Cluster.load(self.context, self.node.cluster_id)
new_capacity = cluster.desired_capacity - 1
result = scaleutils.check_size_params(cluster, new_capacity,
None, None, True)
@ -208,22 +214,19 @@ class NodeAction(base.Action):
# Since node.cluster_id could be reset to '' during action execution,
# we record it here for policy check and cluster lock release.
saved_cluster_id = self.node.cluster_id
if self.node.cluster_id:
if self.cause == base.CAUSE_RPC:
res = senlin_lock.cluster_lock_acquire(
self.context,
self.node.cluster_id, self.id, self.owner,
senlin_lock.NODE_SCOPE, False)
if not res:
return self.RES_RETRY, _('Failed in locking cluster')
if (saved_cluster_id and self.cause == base.CAUSE_RPC):
res = senlin_lock.cluster_lock_acquire(
self.context, self.node.cluster_id, self.id, self.owner,
senlin_lock.NODE_SCOPE, False)
if not res:
return self.RES_RETRY, _('Failed in locking cluster')
self.policy_check(self.node.cluster_id, 'BEFORE')
if self.data['status'] != policy_mod.CHECK_OK:
if self.data['status'] != pb.CHECK_OK:
# Don't emit message since policy_check should have done it
if self.cause == base.CAUSE_RPC:
senlin_lock.cluster_lock_release(
self.node.cluster_id, self.id, senlin_lock.NODE_SCOPE)
senlin_lock.cluster_lock_release(saved_cluster_id, self.id,
senlin_lock.NODE_SCOPE)
return self.RES_ERROR, 'Policy check: ' + self.data['reason']
reason = ''
@ -235,17 +238,17 @@ class NodeAction(base.Action):
reason = _('Failed in locking node')
else:
res, reason = self._execute()
if (self.cause == base.CAUSE_RPC and res == self.RES_OK and
saved_cluster_id):
if (res == self.RES_OK and saved_cluster_id and
self.cause == base.CAUSE_RPC):
self.policy_check(saved_cluster_id, 'AFTER')
if self.data['status'] != policy_mod.CHECK_OK:
if self.data['status'] != pb.CHECK_OK:
res = self.RES_ERROR
reason = 'Policy check: ' + self.data['reason']
else:
res = self.RES_OK
finally:
senlin_lock.node_lock_release(self.node.id, self.id)
if self.cause == base.CAUSE_RPC and saved_cluster_id:
if saved_cluster_id and self.cause == base.CAUSE_RPC:
senlin_lock.cluster_lock_release(saved_cluster_id, self.id,
senlin_lock.NODE_SCOPE)
return res, reason

View File

@ -981,10 +981,10 @@ class ClusterActionTest(base.SenlinTestCase):
self.assertEqual('Timeout!', res_msg)
self.assertEqual({}, action.data)
@mock.patch.object(ca.ClusterAction, '_wait_before_deletion')
@mock.patch.object(ca.ClusterAction, '_sleep')
@mock.patch.object(no.Node, 'get')
@mock.patch.object(ca.ClusterAction, '_delete_nodes')
def test_do_del_nodes(self, mock_delete, mock_get, mock_wait, mock_load):
def test_do_del_nodes(self, mock_delete, mock_get, mock_sleep, mock_load):
cluster = mock.Mock()
cluster.id = 'FAKE_CLUSTER'
cluster.desired_capacity = 4
@ -1025,21 +1025,47 @@ class ClusterActionTest(base.SenlinTestCase):
mock_delete.assert_called_once_with(['NODE_1', 'NODE_2'])
self.assertEqual(2, cluster.desired_capacity)
# deletion policy is attached to the action
@mock.patch.object(ca.ClusterAction, '_sleep')
@mock.patch.object(no.Node, 'get')
@mock.patch.object(ca.ClusterAction, '_delete_nodes')
def test_do_del_nodes_with_deletion_policy(self, mock_delete, mock_get,
mock_sleep, mock_load):
cid = 'FAKE_CLUSTER'
cluster = mock.Mock(id=cid, desired_capacity=4)
mock_load.return_value = cluster
action = ca.ClusterAction(cluster.id, 'CLUSTER_ACTION', self.ctx)
action.id = 'CLUSTER_ACTION_ID'
action.inputs = {'candidates': ['NODE_1', 'NODE_2']}
action.data = {
'deletion': {
'count': 2,
'grace_period': 2,
'candidates': ['NODE_1', 'NODE_2'],
'destroy_after_deletion': True,
'candidates': ['NODE_1', 'NODE_2']
'grace_period': 2,
'reduce_desired_capacity': False,
}
}
node1 = mock.Mock(id='NODE_1', cluster_id=cid)
node2 = mock.Mock(id='NODE_2', cluster_id=cid)
mock_get.side_effect = [node1, node2]
mock_delete.return_value = (action.RES_OK, 'Good to go!')
# do it
res_code, res_msg = action.do_del_nodes()
# assertions
self.assertEqual(action.RES_OK, res_code)
self.assertEqual('Completed deleting nodes.', res_msg)
mock_get.assert_has_calls([
mock.call(action.context, 'NODE_1'),
mock.call(action.context, 'NODE_2')])
mock_delete.assert_called_once_with(['NODE_1', 'NODE_2'])
self.assertTrue(action.data['deletion']['destroy_after_deletion'])
mock_wait.assert_called_once_with(2)
self.assertEqual(0, cluster.desired_capacity)
mock_sleep.assert_called_once_with(2)
# Note: desired_capacity not decreased due to policy enforcement
self.assertEqual(4, cluster.desired_capacity)
cluster.store.has_calls([
mock.call(action.context),
mock.call(action.context)])
@ -1532,12 +1558,12 @@ class ClusterActionTest(base.SenlinTestCase):
mock.call(action.context, 'WARNING', 'Things out of control.')])
@mock.patch.object(scaleutils, 'nodes_by_random')
@mock.patch.object(ca.ClusterAction, '_wait_before_deletion')
@mock.patch.object(ca.ClusterAction, '_sleep')
@mock.patch.object(ca.ClusterAction, '_get_action_data')
@mock.patch.object(scaleutils, 'parse_resize_params')
@mock.patch.object(ca.ClusterAction, '_delete_nodes')
def test_do_resize_shrink(self, mock_delete, mock_parse, mock_get,
mock_wait, mock_select, mock_load):
mock_sleep, mock_select, mock_load):
cluster = mock.Mock()
cluster.id = 'CID'
cluster.desired_capacity = 10
@ -1583,14 +1609,30 @@ class ClusterActionTest(base.SenlinTestCase):
mock.call(action.context, 'RESIZING', 'Cluster resize started.'),
mock.call(action.context, 'ACTIVE', 'Cluster resize succeeded.',
desired_capacity=8)])
mock_sleep.assert_called_once_with(0)
# deletion policy is attached to the action
cluster.nodes = []
@mock.patch.object(scaleutils, 'nodes_by_random')
@mock.patch.object(ca.ClusterAction, '_sleep')
@mock.patch.object(ca.ClusterAction, '_get_action_data')
@mock.patch.object(scaleutils, 'parse_resize_params')
@mock.patch.object(ca.ClusterAction, '_delete_nodes')
def test_do_resize_shrink_with_deletion_policy(self, mock_delete,
mock_parse, mock_get,
mock_sleep, mock_select,
mock_load):
cluster = mock.Mock(id='CID', desired_capacity=10, nodes=[],
ACTIVE='ACTIVE', RESIZING='RESIZING')
for n in range(10):
node = mock.Mock()
node.id = 'NODE-ID-%s' % (n + 1)
cluster.nodes.append(node)
mock_parse.return_value = 'OK', ''
mock_load.return_value = cluster
mock_get.return_value = (2, 9, [cluster.nodes[0]])
action = ca.ClusterAction(cluster.id, 'CLUSTER_ACTION', self.ctx)
action.data = {
'deletion': {
'count': 1,
@ -1598,11 +1640,16 @@ class ClusterActionTest(base.SenlinTestCase):
'destroy_after_deletion': True
}
}
mock_delete.return_value = (action.RES_OK, 'All dependents completed.')
# deletion policy is attached to the action
res_code, res_msg = action.do_resize()
self.assertEqual({'deletion': {'count': 1, 'grace_period': 2,
'destroy_after_deletion': True}},
action.data)
mock_wait.assert_called_once_with(2)
mock_sleep.assert_called_once_with(2)
def test_do_resize_failed_checking(self, mock_load):
cluster = mock.Mock()
@ -1907,9 +1954,9 @@ class ClusterActionTest(base.SenlinTestCase):
mock.call(action.context, cluster.ACTIVE,
'Cluster scaling succeeded.', desired_capacity=9)])
@mock.patch.object(ca.ClusterAction, '_wait_before_deletion')
@mock.patch.object(ca.ClusterAction, '_sleep')
@mock.patch.object(ca.ClusterAction, '_delete_nodes')
def test_do_scale_in_with_pd_no_input(self, mock_delete, mock_wait,
def test_do_scale_in_with_pd_no_input(self, mock_delete, mock_sleep,
mock_load):
cluster = mock.Mock()
cluster.id = 'CID'
@ -1949,7 +1996,7 @@ class ClusterActionTest(base.SenlinTestCase):
'Cluster scale in started.'),
mock.call(action.context, cluster.ACTIVE,
'Cluster scaling succeeded.', desired_capacity=3)])
mock_wait.assert_called_once_with(2)
mock_sleep.assert_called_once_with(2)
@mock.patch.object(scaleutils, 'nodes_by_random')
@mock.patch.object(ca.ClusterAction, '_delete_nodes')

View File

@ -10,6 +10,7 @@
# License for the specific language governing permissions and limitations
# under the License.
import eventlet
import mock
from senlin.common import scaleutils
@ -182,21 +183,18 @@ class NodeActionTest(base.SenlinTestCase):
self.assertEqual(1, cluster.desired_capacity)
cluster.remove_node.assert_called_once_with(node.id)
@mock.patch.object(eventlet, 'sleep')
@mock.patch.object(scaleutils, 'check_size_params')
@mock.patch.object(cluster_mod.Cluster, 'load')
def test_do_delete_with_forced_changing_capacity(self, mock_c_load,
mock_check, mock_load):
cluster = mock.Mock()
cluster.id = 'CID'
cluster.desired_capacity = 1
def test_do_delete_with_forced_changing_capacity(
self, mock_c_load, mock_check, mock_sleep, mock_load):
cluster = mock.Mock(id='CID', desired_capacity=1)
mock_c_load.return_value = cluster
node = mock.Mock()
node.id = 'NID'
node = mock.Mock(id='NID', cluster_id='CID')
node.do_delete = mock.Mock(return_value=None)
node.cluster_id = cluster.id
mock_load.return_value = node
mock_check.return_value = None
action = node_action.NodeAction(node.id, 'ACTION', self.ctx,
action = node_action.NodeAction('NID', 'ACTION', self.ctx,
cause=base_action.CAUSE_RPC)
action.data = {
'deletion': {
@ -214,19 +212,49 @@ class NodeActionTest(base.SenlinTestCase):
cluster.store.assert_called_once_with(action.context)
self.assertEqual(0, cluster.desired_capacity)
cluster.remove_node.assert_called_once_with(node.id)
self.assertEqual(0, mock_sleep.call_count)
@mock.patch.object(eventlet, 'sleep')
@mock.patch.object(scaleutils, 'check_size_params')
@mock.patch.object(cluster_mod.Cluster, 'load')
def test_do_delete_for_derived_action(self, mock_c_load,
mock_check, mock_load):
node = mock.Mock()
node.id = 'NID'
def test_do_delete_with_grace_period(
self, mock_c_load, mock_check, mock_sleep, mock_load):
cluster = mock.Mock(id='CID', desired_capacity=1)
mock_c_load.return_value = cluster
node = mock.Mock(id='NID', cluster_id='CID')
node.do_delete = mock.Mock(return_value=None)
mock_load.return_value = node
mock_check.return_value = None
action = node_action.NodeAction('NID', 'ACTION', self.ctx,
cause=base_action.CAUSE_RPC)
action.data = {
'deletion': {
'grace_period': 123,
}
}
node.do_delete = mock.Mock(return_value=mock.Mock())
res_code, res_msg = action.do_delete()
self.assertEqual(action.RES_OK, res_code)
mock_check.assert_called_once_with(cluster, 0, None, None, True)
mock_c_load.assert_called_once_with(action.context, 'CID')
cluster.store.assert_called_once_with(action.context)
self.assertEqual(0, cluster.desired_capacity)
cluster.remove_node.assert_called_once_with(node.id)
mock_sleep.assert_called_once_with(123)
@mock.patch.object(eventlet, 'sleep')
@mock.patch.object(scaleutils, 'check_size_params')
@mock.patch.object(cluster_mod.Cluster, 'load')
def test_do_delete_for_derived_action(self, mock_c_load, mock_check,
mock_sleep, mock_load):
node = mock.Mock(id='NID', cluster_id='CLUSTER_ID')
node.do_delete = mock.Mock(return_value=None)
node.cluster_id = 'CLUSTER_ID'
mock_load.return_value = node
action = node_action.NodeAction(node.id, 'ACTION', self.ctx,
cause=base_action.CAUSE_DERIVED)
node.do_delete = mock.Mock(return_value=mock.Mock())
res_code, res_msg = action.do_delete()
@ -235,6 +263,7 @@ class NodeActionTest(base.SenlinTestCase):
self.assertEqual(0, mock_check.call_count)
self.assertEqual(0, mock_c_load.call_count)
mock_load.assert_called_once_with(action.context, node_id='NID')
self.assertEqual(0, mock_sleep.call_count)
def test_do_update(self, mock_load):
node = mock.Mock()
@ -507,6 +536,40 @@ class NodeActionTest(base.SenlinTestCase):
lock.NODE_SCOPE)
mock_check.assert_called_once_with('FAKE_CLUSTER', 'BEFORE')
@mock.patch.object(lock, 'cluster_lock_acquire')
@mock.patch.object(lock, 'cluster_lock_release')
@mock.patch.object(lock, 'node_lock_acquire')
@mock.patch.object(lock, 'node_lock_release')
@mock.patch.object(base_action.Action, 'policy_check')
def test_execute_no_policy_check(self, mock_check,
mock_nl_release, mock_nl_acquire,
mock_cl_release, mock_cl_acquire,
mock_load):
node_id = 'NODE_ID'
node = mock.Mock(id=node_id, cluster_id='FAKE_CLUSTER')
mock_load.return_value = node
action = node_action.NodeAction(node_id, 'NODE_FLY', self.ctx,
cause=base_action.CAUSE_DERIVED)
action.id = 'ACTION_ID'
action.owner = 'OWNER'
mock_exec = self.patchobject(action, '_execute',
return_value=(action.RES_OK, 'Good'))
mock_nl_acquire.return_value = action.id
res_code, res_msg = action.execute()
self.assertEqual(action.RES_OK, res_code)
self.assertEqual('Good', res_msg)
mock_load.assert_called_once_with(action.context, node_id=node_id)
self.assertEqual(0, mock_cl_acquire.call_count)
self.assertEqual(0, mock_cl_release.call_count)
mock_nl_acquire.assert_called_once_with(self.ctx, node_id,
action.id, action.owner,
False)
mock_nl_release.assert_called_once_with(node_id, action.id)
mock_exec.assert_called_once_with()
self.assertEqual(0, mock_check.call_count)
@mock.patch.object(lock, 'cluster_lock_acquire')
@mock.patch.object(lock, 'cluster_lock_release')
@mock.patch.object(base_action.Action, 'policy_check')