action handler support batch policy

This patch for the action handler layer.
The action will update the cluster by the plan
we created if user bind batch policy to the cluster.

Change-Id: I42fc8cf2d255580c57990d686c24b620abd1cde8
Implements: blueprint support-batching-policy
This commit is contained in:
RUIJIE YUAN 2016-10-12 21:04:20 +08:00
parent 816800ba58
commit d8004994ff
3 changed files with 178 additions and 72 deletions

View File

@ -204,6 +204,65 @@ class ClusterAction(base.Action):
return result, reason
def _update_nodes(self, profile_id, nodes_obj):
# Get batching policy data if any
fmt = _LI("Updating cluster '%(cluster)s': profile='%(profile)s'.")
LOG.info(fmt, {'cluster': self.cluster.id, 'profile': profile_id})
pause_time = 0
plan = []
pd = self.data.get('update', None)
if pd:
pause_time = pd.get('pause_time')
plan = pd.get('plan')
else:
pause_time = 0
nodes_list = []
for node in self.cluster.nodes:
nodes_list.append(node.id)
plan.append(set(nodes_list))
nodes = []
for node_set in plan:
child = []
nodes = list(node_set)
for node in nodes:
kwargs = {
'name': 'node_update_%s' % node[:8],
'cause': base.CAUSE_DERIVED,
'inputs': {
'new_profile_id': profile_id,
},
}
action_id = base.Action.create(self.context, node,
consts.NODE_UPDATE, **kwargs)
child.append(action_id)
if child:
dobj.Dependency.create(self.context, [c for c in child],
self.id)
for cid in child:
ao.Action.update(self.context, cid,
{'status': base.Action.READY})
dispatcher.start_action()
# clear the action list
child = []
result, new_reason = self._wait_for_dependents()
if result != self.RES_OK:
self.cluster.eval_status(self.context, self.CLUSTER_UPDATE)
return result, _('Failed in updating nodes.')
# pause time
if pause_time != 0:
self._sleep(pause_time)
self.cluster.profile_id = profile_id
self.cluster.eval_status(self.context, self.CLUSTER_UPDATE,
profile_id=profile_id,
updated_at=timeutils.utcnow(True))
return self.RES_OK, 'Cluster update completed.'
@profiler.trace('ClusterAction.do_update', hide_args=False)
def do_update(self):
"""Handler for CLUSTER_UPDATE action.
@ -235,38 +294,9 @@ class ClusterAction(base.Action):
updated_at=timeutils.utcnow(True))
return self.RES_OK, reason
fmt = _LI("Updating cluster '%(cluster)s': profile='%(profile)s'.")
LOG.info(fmt, {'cluster': self.cluster.id, 'profile': profile_id})
child = []
for node in self.cluster.nodes:
kwargs = {
'name': 'node_update_%s' % node.id[:8],
'cause': base.CAUSE_DERIVED,
'inputs': {
'new_profile_id': profile_id,
},
}
action_id = base.Action.create(self.context, node.id,
consts.NODE_UPDATE, **kwargs)
child.append(action_id)
if child:
dobj.Dependency.create(self.context, [c for c in child], self.id)
for cid in child:
ao.Action.update(self.context, cid,
{'status': base.Action.READY})
dispatcher.start_action()
result, new_reason = self._wait_for_dependents()
if result != self.RES_OK:
self.cluster.eval_status(self.context, self.CLUSTER_UPDATE)
return result, _('Failed in updating nodes.')
self.cluster.profile_id = profile_id
self.cluster.eval_status(self.context, self.CLUSTER_UPDATE,
profile_id=profile_id,
updated_at=timeutils.utcnow(True))
return self.RES_OK, reason
# Update nodes with new profile
result, reason = self._update_nodes(profile_id, self.cluster.nodes)
return result, reason
def _delete_nodes(self, node_ids):
action_name = consts.NODE_DELETE

View File

@ -360,42 +360,47 @@ class ClusterActionTest(base.SenlinTestCase):
cluster.eval_status.assert_called_once_with(
action.context, action.CLUSTER_CREATE)
@mock.patch.object(ao.Action, 'update')
@mock.patch.object(ab.Action, 'create')
@mock.patch.object(dobj.Dependency, 'create')
@mock.patch.object(dispatcher, 'start_action')
@mock.patch.object(ca.ClusterAction, '_wait_for_dependents')
def test_do_update_multi(self, mock_wait, mock_start, mock_dep,
mock_action, mock_update, mock_load):
@mock.patch.object(ca.ClusterAction, '_update_nodes')
def test_do_update_multi(self, mock_update, mock_load):
node1 = mock.Mock(id='fake id 1')
node2 = mock.Mock(id='fake id 2')
cluster = mock.Mock(id='FAKE_ID', nodes=[node1, node2],
ACTIVE='ACTIVE')
mock_load.return_value = cluster
action = ca.ClusterAction(cluster.id, 'CLUSTER_ACTION', self.ctx)
action.inputs = {'new_profile_id': 'FAKE_PROFILE'}
mock_action.side_effect = ['NODE_ACTION_1', 'NODE_ACTION_2']
mock_wait.return_value = (action.RES_OK, 'OK')
reason = 'Cluster update completed.'
mock_update.return_value = (action.RES_OK, reason)
# do it
res_code, res_msg = action.do_update()
# assertions
self.assertEqual(action.RES_OK, res_code)
self.assertEqual('Cluster update completed.', res_msg)
self.assertEqual(2, mock_action.call_count)
self.assertEqual(1, mock_dep.call_count)
update_calls = [
mock.call(action.context, 'NODE_ACTION_1', {'status': 'READY'}),
mock.call(action.context, 'NODE_ACTION_2', {'status': 'READY'})
]
mock_update.assert_has_calls(update_calls)
self.assertEqual(reason, res_msg)
mock_update.assert_called_once_with('FAKE_PROFILE',
[node1, node2])
mock_start.assert_called_once_with()
cluster.eval_status.assert_called_once_with(
action.context, action.CLUSTER_UPDATE, profile_id='FAKE_PROFILE',
updated_at=mock.ANY)
@mock.patch.object(ca.ClusterAction, '_update_nodes')
def test_do_update_multi_failed(self, mock_update, mock_load):
node1 = mock.Mock(id='fake id 1')
node2 = mock.Mock(id='fake id 2')
cluster = mock.Mock(id='FAKE_ID', nodes=[node1, node2],
ACTIVE='ACTIVE')
mock_load.return_value = cluster
action = ca.ClusterAction(cluster.id, 'CLUSTER_ACTION', self.ctx)
action.inputs = {'new_profile_id': 'FAKE_PROFILE'}
reason = 'Failed in updating nodes.'
mock_update.return_value = (action.RES_ERROR, reason)
# do it
res_code, res_msg = action.do_update()
# assertions
self.assertEqual(action.RES_ERROR, res_code)
self.assertEqual(reason, res_msg)
mock_update.assert_called_once_with('FAKE_PROFILE',
[node1, node2])
def test_do_update_not_profile(self, mock_load):
cluster = mock.Mock(id='FAKE_ID', nodes=[], ACTIVE='ACTIVE')
@ -430,29 +435,99 @@ class ClusterActionTest(base.SenlinTestCase):
@mock.patch.object(dobj.Dependency, 'create')
@mock.patch.object(dispatcher, 'start_action')
@mock.patch.object(ca.ClusterAction, '_wait_for_dependents')
def test_do_update_failed_wait(self, mock_wait, mock_start, mock_dep,
mock_action, mock_update, mock_load):
node = mock.Mock(id='fake node id')
cluster = mock.Mock(id='FAKE_CLUSTER', nodes=[node], ACTIVE='ACTIVE')
def test__update_nodes_no_policy(self, mock_wait, mock_start, mock_dep,
mock_action, mock_update, mock_load):
node1 = mock.Mock(id='node_id1')
node2 = mock.Mock(id='node_id2')
cluster = mock.Mock(id='FAKE_ID', nodes=[node1, node2],
ACTIVE='ACTIVE')
mock_load.return_value = cluster
action = ca.ClusterAction(cluster.id, 'CLUSTER_ACTION', self.ctx)
action.inputs = {'new_profile_id': 'FAKE_PROFILE'}
action.id = 'CLUSTER_ACTION_ID'
mock_wait.return_value = (action.RES_OK, 'All dependents completed')
mock_action.side_effect = ['NODE_ACTION1', 'NODE_ACTION2']
mock_action.return_value = 'NODE_ACTION'
mock_wait.return_value = (action.RES_TIMEOUT, 'Timeout')
# do it
res_code, res_msg = action.do_update()
# assertions
self.assertEqual(action.RES_TIMEOUT, res_code)
self.assertEqual('Failed in updating nodes.', res_msg)
self.assertEqual(1, mock_action.call_count)
res_code, reason = action._update_nodes('FAKE_PROFILE',
[node1, node2])
self.assertEqual(res_code, action.RES_OK)
self.assertEqual(reason, 'Cluster update completed.')
self.assertEqual(2, mock_action.call_count)
self.assertEqual(1, mock_dep.call_count)
mock_update.assert_called_once_with(action.context, 'NODE_ACTION',
{'status': 'READY'})
self.assertEqual(2, mock_update.call_count)
mock_start.assert_called_once_with()
cluster.eval_status.assert_called_once_with(
action.context, action.CLUSTER_UPDATE, profile_id='FAKE_PROFILE',
updated_at=mock.ANY)
@mock.patch.object(ao.Action, 'update')
@mock.patch.object(ab.Action, 'create')
@mock.patch.object(dobj.Dependency, 'create')
@mock.patch.object(dispatcher, 'start_action')
@mock.patch.object(ca.ClusterAction, '_wait_for_dependents')
def test__update_nodes_batch_policy(self, mock_wait, mock_start, mock_dep,
mock_action, mock_update, mock_load):
node1 = mock.Mock(id='node_id1')
node2 = mock.Mock(id='node_id2')
cluster = mock.Mock(id='FAKE_ID', nodes=[node1, node2],
ACTIVE='ACTIVE')
mock_load.return_value = cluster
action = ca.ClusterAction(cluster.id, 'CLUSTER_ACTION', self.ctx)
action.inputs = {'new_profile_id': 'FAKE_PROFILE'}
action.id = 'CLUSTER_ACTION_ID'
action.data = {
'update': {
'pause_time': 3,
'min_in_service': 1,
'plan': [{node1.id}, {node2.id}],
}
}
mock_wait.return_value = (action.RES_OK, 'All dependents completed')
mock_action.side_effect = ['NODE_ACTION1', 'NODE_ACTION2']
res_code, reason = action._update_nodes('FAKE_PROFILE',
[node1, node2])
self.assertEqual(res_code, action.RES_OK)
self.assertEqual(reason, 'Cluster update completed.')
self.assertEqual(2, mock_action.call_count)
self.assertEqual(2, mock_dep.call_count)
self.assertEqual(2, mock_update.call_count)
self.assertEqual(2, mock_start.call_count)
cluster.eval_status.assert_called_once_with(
action.context, action.CLUSTER_UPDATE, profile_id='FAKE_PROFILE',
updated_at=mock.ANY)
@mock.patch.object(ao.Action, 'update')
@mock.patch.object(ab.Action, 'create')
@mock.patch.object(dobj.Dependency, 'create')
@mock.patch.object(dispatcher, 'start_action')
@mock.patch.object(ca.ClusterAction, '_wait_for_dependents')
def test__update_nodes_fail_wait(self, mock_wait, mock_start, mock_dep,
mock_action, mock_update, mock_load):
node1 = mock.Mock(id='node_id1')
node2 = mock.Mock(id='node_id2')
cluster = mock.Mock(id='FAKE_ID', nodes=[node1, node2],
ACTIVE='ACTIVE')
mock_load.return_value = cluster
action = ca.ClusterAction(cluster.id, 'CLUSTER_ACTION', self.ctx)
action.inputs = {'new_profile_id': 'FAKE_PROFILE'}
action.id = 'CLUSTER_ACTION_ID'
mock_wait.return_value = (action.RES_ERROR, 'Oops!')
mock_action.side_effect = ['NODE_ACTION1', 'NODE_ACTION2']
res_code, reason = action._update_nodes('FAKE_PROFILE',
[node1, node2])
self.assertEqual(res_code, action.RES_ERROR)
self.assertEqual(reason, 'Failed in updating nodes.')
self.assertEqual(2, mock_action.call_count)
self.assertEqual(1, mock_dep.call_count)
self.assertEqual(2, mock_update.call_count)
mock_start.assert_called_once_with()
mock_wait.assert_called_once_with()
cluster.eval_status.assert_called_once_with(
action.context, action.CLUSTER_UPDATE)

View File

@ -45,6 +45,7 @@ senlin.policies =
senlin.policy.region_placement-1.0 = senlin.policies.region_placement:RegionPlacementPolicy
senlin.policy.zone_placement-1.0 = senlin.policies.zone_placement:ZonePlacementPolicy
senlin.policy.affinity-1.0 = senlin.policies.affinity_policy:AffinityPolicy
senlin.policy.batch-1.0 = senlin.policies.batch_policy:BatchPolicy
senlin.drivers =
openstack = senlin.drivers.openstack