Cluster resize operation support

This patch adds 'CLUSTER_RESIZE' operation to the actions module and
engine service layer. This operation carries all necessary data needed
for a resize operation. With this operation, webhooks creation for
scaling policies would be much easier.

Change-Id: Id65ad20e1bbac61d07267703cb135952277f7b05
This commit is contained in:
tengqm 2015-05-12 04:11:29 -04:00
parent be286b1ad3
commit 5f76d399f9
8 changed files with 389 additions and 198 deletions

View File

@ -14,6 +14,8 @@ POLICY
------
- healthy policy[Liuh]
- Formalize policy enforcement levels [Qiming]
- Enable placement policy and deletion policy to handle CLUSTER_RESIZE
action.
TEST CASES
----------

View File

@ -213,8 +213,7 @@ class ClusterController(object):
data.validate_for_update()
self.rpc_client.cluster_update(
req.context, cluster_id, data.name, data.desired_capacity,
data.profile, data.min_size, data.max_size, data.parent,
req.context, cluster_id, data.name, data.profile, data.parent,
data.metadata, data.timeout)
raise exc.HTTPAccepted()

View File

@ -0,0 +1,75 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
'''
Utilities for scaling actions and related policies.
'''
import math
from oslo_log import log as logging
from senlin.common import consts
from senlin.common.i18n import _
LOG = logging.getLogger(__name__)
def calculate_desired(current, adj_type, number, min_step):
'''Calculate desired capacity based on the type and number values.'''
if adj_type == consts.EXACT_CAPACITY:
desired = number
elif adj_type == consts.CHANGE_IN_CAPACITY:
desired = current + number
elif adj_type == consts.CHANGE_IN_PERCENTAGE:
delta = (number * current) / 100.0
if delta > 0.0:
rounded = int(math.ceil(delta) if math.fabs(delta) < 1.0
else math.floor(delta))
else:
rounded = int(math.floor(delta) if math.fabs(delta) < 1.0
else math.ceil(delta))
if min_step is not None and min_step > abs(rounded):
adjust = min_step if rounded > 0 else -min_step
desired = current + adjust
else:
desired = rounded
return desired
def truncate_desired(cluster, desired, min_size, max_size):
'''Do truncation of desired capacity for non-strict cases.'''
if min_size is not None and desired < min_size:
desired = min_size
LOG.debug(_("Truncating shrinkage to specified min_size (%s).")
% desired)
if desired < cluster.min_size:
desired = cluster.min_size
LOG.debug(_("Truncating shrinkage to cluster's min_size (%s).")
% desired)
if (max_size is not None and max_size > 0 and desired > max_size):
desired = max_size
LOG.debug(_("Truncating growth to specified max_size (%s).")
% desired)
if desired > cluster.max_size and cluster.max_size > 0:
desired = cluster.max_size
LOG.debug(_("Truncating growth to cluster's max_size (%s).")
% desired)
return desired

View File

@ -19,6 +19,7 @@ from senlin.common import consts
from senlin.common import exception
from senlin.common.i18n import _
from senlin.common.i18n import _LE
from senlin.common import scaleutils
from senlin.db import api as db_api
from senlin.engine.actions import base
from senlin.engine import cluster as cluster_mod
@ -37,11 +38,13 @@ class ClusterAction(base.Action):
ACTIONS = (
CLUSTER_CREATE, CLUSTER_DELETE, CLUSTER_UPDATE,
CLUSTER_ADD_NODES, CLUSTER_DEL_NODES,
CLUSTER_RESIZE,
CLUSTER_SCALE_OUT, CLUSTER_SCALE_IN,
CLUSTER_ATTACH_POLICY, CLUSTER_DETACH_POLICY, CLUSTER_UPDATE_POLICY
) = (
consts.CLUSTER_CREATE, consts.CLUSTER_DELETE, consts.CLUSTER_UPDATE,
consts.CLUSTER_ADD_NODES, consts.CLUSTER_DEL_NODES,
consts.CLUSTER_RESIZE,
consts.CLUSTER_SCALE_OUT, consts.CLUSTER_SCALE_IN,
consts.CLUSTER_ATTACH_POLICY, consts.CLUSTER_DETACH_POLICY,
consts.CLUSTER_UPDATE_POLICY,
@ -151,150 +154,39 @@ class ClusterAction(base.Action):
return result, reason
def _check_size_params(self, cluster, desired, min_size, max_size):
# sanity checking: the desired_capacity must be within the existing
# range of the cluster, if new range is not provided
if desired is not None:
if min_size is None and desired < cluster.min_size:
reason = _("The specified desired_capacity is less than the "
"min_size of the cluster.")
return self.RES_ERROR, reason
if max_size is None and cluster.max_size >= 0:
if (desired > cluster.max_size):
reason = _("The specified desired_capacity is greater "
"than the max_size of the cluster.")
return self.RES_ERROR, reason
if min_size is not None:
if max_size is None and min_size > cluster.max_size:
reason = _("The specified min_size is greater than the "
"current max_size of the cluster.")
return self.RES_ERROR, reason
if desired is None and min_size > cluster.desired_capacity:
reason = _("The specified min_size is greater than the "
"current desired_capacity of the cluster.")
return self.RES_ERROR, reason
if max_size is not None:
if (min_size is None and max_size >= 0
and max_size < cluster.min_size):
reason = _("The specified max_size is less than the "
"current min_size of the cluster.")
return self.RES_ERROR, reason
if desired is None and max_size < cluster.desired_capacity:
reason = _("The specified max_size is less than the "
"current desired_capacity of the cluster.")
return self.RES_ERROR, reason
return self.RES_OK, ''
def _update_cluster_properties(self, cluster, desired, min_size, max_size):
# update cluster properties related to size and profile
need_store = False
if min_size is not None and min_size != cluster.min_size:
cluster.min_size = min_size
need_store = True
if max_size is not None and max_size != cluster.max_size:
cluster.max_size = max_size
need_store = True
if desired is not None and desired != cluster.desired_capacity:
cluster.desired_capacity = desired
need_store = True
if need_store is False:
# ensure node list is up to date
cluster._load_runtime_data(self.context)
return self.RES_OK, ''
cluster.updated_time = datetime.datetime.utcnow()
cluster.status_reason = 'Cluster properties updated'
res = cluster.store(self.context)
if not res:
reason = 'Cluster object cannot be updated.'
# Reset status to active
cluster.set_status(self.context, cluster.ACTIVE, reason)
return self.RES_ERROR, reason
return self.RES_OK, ''
def do_update(self, cluster, policy_data):
profile_id = self.inputs.get('new_profile_id')
min_size = self.inputs.get('min_size')
max_size = self.inputs.get('max_size')
desired = self.inputs.get('desired_capacity')
# check provided params against current properties
result, reason = self._check_size_params(
cluster, desired, min_size, max_size)
if result != self.RES_OK:
return result, reason
# save sanitized properties
result, reason = self._update_cluster_properties(
cluster, desired, min_size, max_size)
if result != self.RES_OK:
return result, reason
node_list = cluster.get_nodes()
current_size = len(node_list)
desired = cluster.desired_capacity
# delete nodes if necessary
if desired < current_size:
adjustment = current_size - desired
candidates = []
# Choose victims randomly
i = adjustment
while i > 0:
r = random.randrange(len(node_list))
candidates.append(node_list[r].id)
node_list.remove(node_list[r])
i = i - 1
result, reason = self._delete_nodes(cluster, candidates,
policy_data)
if result != self.RES_OK:
return result, reason
# update profile for nodes left if needed
if profile_id is not None and profile_id != cluster.profile_id:
for node in node_list:
kwargs = {
'name': 'node_update_%s' % node.id[:8],
'target': node.id,
'cause': base.CAUSE_DERIVED,
'inputs': {
'new_profile_id': profile_id,
}
for node in node_list:
kwargs = {
'name': 'node_update_%s' % node.id[:8],
'target': node.id,
'cause': base.CAUSE_DERIVED,
'inputs': {
'new_profile_id': profile_id,
}
action = base.Action(self.context, 'NODE_UPDATE', **kwargs)
action.store(self.context)
}
action = base.Action(self.context, 'NODE_UPDATE', **kwargs)
action.store(self.context)
db_api.action_add_dependency(self.context, action.id, self.id)
action.set_status(self.READY)
dispatcher.notify(self.context,
dispatcher.Dispatcher.NEW_ACTION,
None, action_id=action.id)
db_api.action_add_dependency(self.context, action.id, self.id)
action.set_status(self.READY)
dispatcher.notify(self.context,
dispatcher.Dispatcher.NEW_ACTION,
None, action_id=action.id)
# Wait for nodes to complete update
result = self.RES_OK
if current_size > 0:
result, reason = self._wait_for_dependents()
# Wait for nodes to complete update
result = self.RES_OK
if len(node_list) > 0:
result, reason = self._wait_for_dependents()
if result != self.RES_OK:
return result, reason
if result != self.RES_OK:
return result, reason
cluster.profile_id = profile_id
cluster.updated_time = datetime.datetime.utcnow()
cluster.store()
# Create new nodes if desired_capacity increased
if desired > current_size:
delta = desired - current_size
result, reason = self._create_nodes(cluster, delta, policy_data)
if result != self.RES_OK:
return result, reason
# TODO(anyone): this seems an overhead
cluster.profile_id = profile_id
cluster.store(self.context)
cluster.set_status(self.context, cluster.ACTIVE, reason)
return self.RES_OK, _('Cluster update succeeded')
@ -444,6 +336,165 @@ class ClusterAction(base.Action):
reason = new_reason
return result, reason
def _check_size_params(self, cluster, desired, min_size, max_size, strict):
'''Validate provided arguments with cluster properties.
Sanity Checking 1: the desired, min_size, max_size parameters must
form a reasonable relationship among themselves,
if specified.
Sanity Checking 2: the desired_capacity must be within the existing
range of the cluster, if new range is not provided.
'''
if desired is not None and strict is True:
# recalculate/validate desired based on strict setting
if (min_size is not None and desired < min_size):
v = {'d': desired, 'm': min_size}
reason = _("The target capacity (%(d)s) is less than "
"the specified min_size (%(m)s).") % v
return self.RES_ERROR, reason
if min_size is None and desired < cluster.min_size:
v = {'d': desired, 'm': cluster.min_size}
reason = _("The target capacity (%(d)s) is less than "
"the cluster's min_size (%(m)s).") % v
return self.RES_ERROR, reason
if (max_size is not None and desired > max_size and
max_size >= 0):
v = {'d': desired, 'm': max_size}
reason = _("The target capacity (%(d)s) is greater "
"than the specified max_size (%(m)s).") % v
return self.RES_ERROR, reason
if (max_size is None and
desired > cluster.max_size and cluster.max_size >= 0):
v = {'d': desired, 'm': cluster.max_size}
reason = _("The target capacity (%(d)s) is greater "
"than the cluster's max_size (%(m)s).") % v
return self.RES_ERROR, reason
if min_size is not None:
if max_size is not None and max_size >= 0 and min_size > max_size:
reason = _("The specified min_size is greater than the "
"specified max_size.")
return self.RES_ERROR, reason
if (max_size is None and cluster.max_size >= 0 and
min_size > cluster.max_size):
reason = _("The specified min_size is greater than the "
"current max_size of the cluster.")
return self.RES_ERROR, reason
if desired is None and min_size > cluster.desired_capacity:
reason = _("The specified min_size is greater than the "
"current desired_capacity of the cluster.")
return self.RES_ERROR, reason
if max_size is not None:
if (min_size is None and max_size >= 0
and max_size < cluster.min_size):
reason = _("The specified max_size is less than the "
"current min_size of the cluster.")
return self.RES_ERROR, reason
if desired is None and max_size < cluster.desired_capacity:
reason = _("The specified max_size is less than the "
"current desired_capacity of the cluster.")
return self.RES_ERROR, reason
return self.RES_OK, ''
def _update_cluster_properties(self, cluster, desired, min_size, max_size):
# update cluster properties related to size and profile
need_store = False
if min_size is not None and min_size != cluster.min_size:
cluster.min_size = min_size
need_store = True
if max_size is not None and max_size != cluster.max_size:
cluster.max_size = max_size
need_store = True
if desired is not None and desired != cluster.desired_capacity:
cluster.desired_capacity = desired
need_store = True
if need_store is False:
# ensure node list is up to date
cluster._load_runtime_data(self.context)
return self.RES_OK, ''
cluster.updated_time = datetime.datetime.utcnow()
cluster.status_reason = _('Cluster properties updated.')
res = cluster.store(self.context)
if not res:
reason = _('Cluster object cannot be updated.')
# Reset status to active
cluster.set_status(self.context, cluster.ACTIVE, reason)
return self.RES_ERROR, reason
return self.RES_OK, ''
def do_resize(self, cluster, policy_data):
adj_type = self.inputs.get('adjustment_type')
number = self.inputs.get('number')
min_size = self.inputs.get('min_size')
max_size = self.inputs.get('max_size')
min_step = self.inputs.get('min_step')
strict = self.inputs.get('strict', False)
desired = None
if adj_type is not None:
# number must be not None according to previous tests
desired = scaleutils.calculate_desired(
cluster.desired_capacity, adj_type, number, min_step)
# truncate adjustment if permitted (strict==False)
if strict is False:
desired = scaleutils.truncate_desired(
cluster, desired, min_size, max_size)
# check provided params against current properties
# desired is checked when strict is True
result, reason = self._check_size_params(cluster, desired, min_size,
max_size, strict)
if result != self.RES_OK:
return result, reason
# save sanitized properties
result, reason = self._update_cluster_properties(
cluster, desired, min_size, max_size)
if result != self.RES_OK:
return result, reason
node_list = cluster.get_nodes()
current_size = len(node_list)
desired = cluster.desired_capacity
# delete nodes if necessary
if desired < current_size:
adjustment = current_size - desired
candidates = []
# Choose victims randomly
i = adjustment
while i > 0:
r = random.randrange(len(node_list))
candidates.append(node_list[r].id)
node_list.remove(node_list[r])
i = i - 1
result, reason = self._delete_nodes(cluster, candidates,
policy_data)
if result != self.RES_OK:
return result, reason
# Create new nodes if desired_capacity increased
if desired > current_size:
delta = desired - current_size
result, reason = self._create_nodes(cluster, delta, policy_data)
if result != self.RES_OK:
return result, reason
cluster.set_status(self.context, cluster.ACTIVE, reason)
return self.RES_OK, _('Cluster resize succeeded')
def do_scale_out(self, cluster, policy_data):
# We use policy output if any, or else the count is
# set to 1 as default.

View File

@ -493,9 +493,7 @@ class EngineService(service.Service):
return result
@request_context
def cluster_update(self, context, identity, name=None,
desired_capacity=None, profile_id=None,
min_size=None, max_size=None,
def cluster_update(self, context, identity, name=None, profile_id=None,
parent=None, metadata=None, timeout=None):
def update_cluster_properties(cluster):
@ -524,36 +522,13 @@ class EngineService(service.Service):
if changed is True:
cluster.store(context)
def may_need_resize(cluster, new_size, min_size, max_size):
# Check if cluster may need a resize
# This is a rough guess, the action will do a more accurate
# evaluation when cluster is locked
if min_size is not None and min_size != cluster.min_size:
return True
if max_size is not None and max_size != cluster.max_size:
return True
if new_size is not None and new_size != cluster.desired_capacity:
return True
return False
def profile_is_new(cluster):
return profile_id is not None and profile_id != cluster.profile_id
(new_size, min_size, max_size) = self._validate_cluster_size_params(
desired_capacity, min_size, max_size)
# Get the database representation of the existing cluster
db_cluster = self.cluster_find(context, identity)
cluster = cluster_mod.Cluster.load(context, cluster=db_cluster)
update_cluster_properties(cluster)
if not any((may_need_resize(cluster, new_size, min_size, max_size),
profile_is_new(cluster))):
# return if neither profile nor size needs an update
if profile_id is None or profile_id == cluster.profile_id:
# return if profile update is not needed
return cluster.to_dict()
if cluster.status == cluster.ERROR:
@ -561,29 +536,18 @@ class EngineService(service.Service):
raise exception.NotSupported(feature=msg)
old_profile = self.profile_find(context, cluster.profile_id)
if profile_id is not None:
new_profile = self.profile_find(context, profile_id)
if new_profile.type != old_profile.type:
msg = _('Cannot update a cluster to a different profile type, '
'operation aborted.')
raise exception.ProfileTypeNotMatch(message=msg)
new_profile = self.profile_find(context, profile_id)
if new_profile.type != old_profile.type:
msg = _('Cannot update a cluster to a different profile type, '
'operation aborted.')
raise exception.ProfileTypeNotMatch(message=msg)
profile_id = new_profile.id
profile_id = new_profile.id
fmt = _LI("Updating cluster '%(cluster)s': profile='%(profile)s', "
"desired_capacity=%(new_size)s, min_size=%(min_size)s, "
"max_size=%(max_size)s.")
LOG.info(fmt % {'cluster': identity, 'profile': profile_id,
'new_size': new_size, 'min_size': min_size,
'max_size': max_size})
inputs = {
'new_profile_id': profile_id,
'min_size': min_size,
'max_size': max_size,
'desired_capacity': new_size
}
fmt = _LI("Updating cluster '%(cluster)s': profile='%(profile)s'.")
LOG.info(fmt % {'cluster': identity, 'profile': profile_id})
inputs = {'new_profile_id': profile_id}
action = action_mod.Action(context, 'CLUSTER_UPDATE',
name='cluster_update_%s' % cluster.id[:8],
target=cluster.id,
@ -684,6 +648,105 @@ class EngineService(service.Service):
return {'action': action.id}
@request_context
def cluster_resize(self, context, identity, adj_type=None, number=None,
min_size=None, max_size=None, min_step=None,
strict=True):
'''Adjust cluster size parameters.
:param identity: cluster dentity which can be name, id or short ID;
:param adj_type: optional; if specified, must be one of the strings
defined in consts.ADJUSTMENT_TYPES;
:param number: number for adjustment. It is interpreted as the new
desired_capacity of the cluster if `adj_type` is set
to `EXACT_CAPACITY`; it is interpreted as the relative
number of nodes to add/remove when `adj_type` is set
to `CHANGE_IN_CAPACITY`; it is treated as a percentage
when `adj_type` is set to `CHANGE_IN_PERCENTAGE`.
This parameter is optional.
:param min_size: new lower bound of the cluster size, if specified.
This parameter is optional.
:param max_size: new upper bound of the cluster size, if specified;
A value of negative means no upper limit is imposed.
This parameter is optional.
:param min_step: optional. It specifies the number of nodes to be
added or removed when `adj_type` is set to value
`CHANGE_IN_PERCENTAGE` and the number calculated is
less than 1 or so.
:param strict: optional boolean value. It specifies whether Senlin
should try a best-effort style resizing or just
reject the request when scaling beyond its current
size constraint.
'''
# check adj_type
if adj_type is not None:
if adj_type not in consts.ADJUSTMENT_TYPES:
raise exception.InvalidParameter(
name=consts.ADJUSTMENT_TYPE, value=adj_type)
if number is None:
msg = _('Missing number value for size adjustment.')
raise exception.SenlinBadRequest(msg=msg)
else:
if number is not None:
msg = _('Missing adjustment_type value for size adjustment.')
raise exception.SenlinBadRequest(msg=msg)
if adj_type == consts.EXACT_CAPACITY:
number = utils.parse_int_param(consts.ADJUSTMENT_NUMBER, number)
elif adj_type == consts.CHANGE_IN_CAPACITY:
number = utils.parse_int_param(consts.ADJUSTMENT_NUMBER, number,
allow_negative=True)
elif adj_type == consts.CHANGE_IN_PERCENTAGE:
try:
number = float(number)
except ValueError:
raise exception.InvalidParameter(name=consts.ADJUSTMENT_NUMBER,
value=number)
# min_step is only used (so checked) for this case
if min_step is not None:
min_step = utils.parse_int_param(consts.ADJUSTMENT_MIN_STEP,
min_step)
# validate min_size and max_size
(_d, min_size, max_size) = self._validate_cluster_size_params(
None, min_size, max_size)
# Get the database representation of the existing cluster
db_cluster = self.cluster_find(context, identity)
cluster = cluster_mod.Cluster.load(context, cluster=db_cluster)
fmt = _LI("Resizing cluster '%(cluster)s': type=%(adj_type)s, "
"number=%(number)s, min_size=%(min_size)s, "
"max_size=%(max_size)s, min_step=%(min_step)s, "
"strict=%(strict)s.")
LOG.info(fmt % {'cluster': identity, 'adj_type': adj_type,
'number': number, 'min_size': min_size,
'max_size': max_size, 'min_step': min_step,
'strict': strict})
inputs = {
'adjustment_type': adj_type,
'number': number,
'min_size': min_size,
'max_size': max_size,
'min_step': min_step,
'strict': strict
}
action = action_mod.Action(context, 'CLUSTER_RESIZE',
name='cluster_resize_%s' % cluster.id[:8],
target=cluster.id,
cause=action_mod.CAUSE_RPC,
inputs=inputs)
action.store(context)
dispatcher.notify(context, self.dispatcher.NEW_ACTION,
None, action_id=action.id)
result = cluster.to_dict()
result['action'] = action.id
return result
@request_context
def cluster_scale_out(self, context, identity, count=None):
# Validation

View File

@ -189,6 +189,18 @@ class EngineClient(object):
identity=identity,
nodes=nodes))
def cluster_resize(self, ctxt, identity, adj_type=None, number=None,
min_size=None, max_size=None, min_step=None,
strict=True):
return self.call(ctxt, self.make_msg('cluster_resize',
identity=identity,
adj_type=adj_type,
number=number,
min_size=min_size,
max_size=max_size,
min_step=min_step,
strict=strict))
def cluster_scale_out(self, ctxt, identity, count=None):
return self.call(ctxt, self.make_msg('cluster_scale_out',
identity=identity,
@ -199,16 +211,11 @@ class EngineClient(object):
identity=identity,
count=count))
def cluster_update(self, ctxt, identity, name=None, desired_capacity=None,
profile_id=None, min_size=None, max_size=None,
def cluster_update(self, ctxt, identity, name=None, profile_id=None,
parent=None, metadata=None, timeout=None):
return self.call(ctxt, self.make_msg('cluster_update',
identity=identity,
name=name,
desired_capacity=desired_capacity,
identity=identity, name=name,
profile_id=profile_id,
min_size=min_size,
max_size=max_size,
parent=parent, metadata=metadata,
timeout=timeout))

View File

@ -729,10 +729,7 @@ class ClusterControllerTest(shared.ControllerTest, base.SenlinTestCase):
'name': None,
'parent': None,
'metadata': None,
'desired_capacity': 0,
'profile_id': 'xxxx-yyyy-zzzz',
'min_size': 0,
'max_size': 0,
'timeout': cfg.CONF.default_action_timeout,
'identity': cid,
}

View File

@ -244,10 +244,7 @@ class EngineRpcAPITestCase(base.SenlinTestCase):
kwargs = {
'identity': 'a-cluster',
'name': 'new-name',
'desired_capacity': 0,
'profile_id': 'new_profile',
'min_size': 0,
'max_size': 0,
'parent': 'another-cluster',
'metadata': {'key': 'value'},
'timeout': 120