Update scaling policy logic to be applied before action acceptance
This patch alters the cluster_scale_in and cluster_scale_out actions to no longer place the action into the actions table when a conflict is detected. This behavior is an improvement on the old way actions are processed as the requester will now receive immediate feedback from the API when an action cannot be processed. This change also honors the scaling action cooldown in the same manner by erring via the API when a scaling action cannot be processed due to cooldown. Depends-On: https://review.openstack.org/#/c/602460/ Implements: blueprint scaling-action-acceptance Change-Id: If0dcf5e427d3d6973d2c5e52fada8a6c925240d5
This commit is contained in:
parent
a34e45b696
commit
364b1402a1
|
@ -437,6 +437,7 @@ Response Codes
|
|||
- 401
|
||||
- 403
|
||||
- 404
|
||||
- 409
|
||||
- 503
|
||||
|
||||
Request Parameters
|
||||
|
@ -494,6 +495,7 @@ Response Codes
|
|||
- 401
|
||||
- 403
|
||||
- 404
|
||||
- 409
|
||||
- 503
|
||||
|
||||
Request Parameters
|
||||
|
|
|
@ -22,7 +22,7 @@ Concept
|
|||
~~~~~~~
|
||||
|
||||
A :term:`Policy Type` is an abstract specification of the rules to be checked
|
||||
and/or enforced when certain :term:`Action` is performed on a cluster that
|
||||
and/or enforced when an :term:`Action` is performed on a cluster that
|
||||
contains nodes of certain :term:`Profile Type`.
|
||||
|
||||
A registry of policy types is built in memory when the Senlin engine
|
||||
|
|
|
@ -49,14 +49,16 @@ scaling behavior in both directions using the same policy.
|
|||
|
||||
Senlin has carefully designed the builtin policy types so that for scaling
|
||||
policies, you can attach more than one instance of the same policy type but
|
||||
you may get an error when you are attempting to attach two policies of anther
|
||||
you may get an error when you are attempting to attach two policies of another
|
||||
type (say ``senlin.policy.deletion``) to the same cluster.
|
||||
|
||||
The value of the ``event`` property indicates when the policy will be checked.
|
||||
A policy with ``event`` set to "``CLUSTER_SCALE_IN``" will be checked when and
|
||||
only when a corresponding action is triggered on the cluster. A policy with
|
||||
``event`` set to "``CLUSTER_SCALE_OUT``" will be checked when and only when
|
||||
a corresponding action is triggered.
|
||||
a corresponding action is triggered. If the cluster is currently processing a
|
||||
scaling action it will not accept another scaling action until the current
|
||||
action has been processed and cooldown has been observed.
|
||||
|
||||
For both types of actions that can triggered the scaling policy, there are
|
||||
always three types of adjustments to choose from as listed below. The type
|
||||
|
|
|
@ -0,0 +1,21 @@
|
|||
---
|
||||
prelude: >
|
||||
This release alters the cluster_scale_in and cluster_scale_out actions to
|
||||
no longer place the action into the actions table when a conflict is
|
||||
detected. This behavior is an improvement on the old way actions are
|
||||
processed as the requester will now receive immediate feedback from the
|
||||
API when an action cannot be processed. This release also honors the
|
||||
scaling action cooldown in the same manner by erring via the API when a
|
||||
scaling action cannot be processed due to cooldown.
|
||||
features:
|
||||
- |
|
||||
Scaling actions (IN or OUT) now validate that there is no conflicting
|
||||
action already being processed and will return an error via the API
|
||||
informing the end user if a conflict is detected. A conflicting action is
|
||||
detected when new action of either `CLUSTER_SCALE_IN` or
|
||||
`CLUSTER_SCALE_OUT` is attempted while there is already cluster scaling
|
||||
action in the action table in a pending status (READY, RUNNING, WAITING,
|
||||
ACTION_WAITING_LIFECYCLE_COMPLETION).
|
||||
Additinally the cooldown will be checked and enforced when a scaling
|
||||
action is requested. If the cooldown is being observed the requester will
|
||||
be informed of this when submitting the action via an error.
|
|
@ -42,6 +42,8 @@ class FaultWrapper(wsgi.Middleware):
|
|||
"""Replace error body with something the client can parse."""
|
||||
|
||||
error_map = {
|
||||
'ActionConflict': webob.exc.HTTPConflict,
|
||||
'ActionCooldown': webob.exc.HTTPConflict,
|
||||
'ActionInProgress': webob.exc.HTTPConflict,
|
||||
'BadRequest': webob.exc.HTTPBadRequest,
|
||||
'FeatureNotSupported': webob.exc.HTTPConflict,
|
||||
|
@ -57,7 +59,6 @@ class FaultWrapper(wsgi.Middleware):
|
|||
'RequestLimitExceeded': webob.exc.HTTPBadRequest,
|
||||
'ResourceInUse': webob.exc.HTTPConflict,
|
||||
'ResourceIsLocked': webob.exc.HTTPConflict,
|
||||
'ActionConflict': webob.exc.HTTPConflict,
|
||||
'ResourceNotFound': webob.exc.HTTPNotFound,
|
||||
}
|
||||
|
||||
|
|
|
@ -111,3 +111,9 @@ it can be used by both users and developers.
|
|||
are now sent directly in the query body rather than in the params
|
||||
field.
|
||||
|
||||
1.11
|
||||
----
|
||||
- Modified the ``cluster_action`` API. The API now responds with
|
||||
response code 409 when a scaling action conflicts with one already
|
||||
being processed or a cooldown for a scaling action is encountered.
|
||||
|
||||
|
|
|
@ -24,7 +24,7 @@ class VersionController(object):
|
|||
# This includes any semantic changes which may not affect the input or
|
||||
# output formats or even originate in the API code layer.
|
||||
_MIN_API_VERSION = "1.0"
|
||||
_MAX_API_VERSION = "1.10"
|
||||
_MAX_API_VERSION = "1.11"
|
||||
|
||||
DEFAULT_API_VERSION = _MIN_API_VERSION
|
||||
|
||||
|
|
|
@ -64,6 +64,8 @@ CLUSTER_ACTION_NAMES = (
|
|||
'CLUSTER_OPERATION',
|
||||
)
|
||||
|
||||
CLUSTER_SCALE_ACTIONS = [CLUSTER_SCALE_IN, CLUSTER_SCALE_OUT]
|
||||
|
||||
NODE_ACTION_NAMES = (
|
||||
NODE_CREATE, NODE_DELETE, NODE_UPDATE,
|
||||
NODE_JOIN, NODE_LEAVE,
|
||||
|
|
|
@ -194,6 +194,12 @@ class ActionConflict(SenlinException):
|
|||
"the following action(s): %(actions)s")
|
||||
|
||||
|
||||
class ActionCooldown(SenlinException):
|
||||
msg_fmt = _("The %(type)s action for cluster %(cluster)s cannot be "
|
||||
"processed due to Policy %(policy_id)s cooldown still in "
|
||||
"progress")
|
||||
|
||||
|
||||
class NodeNotOrphan(SenlinException):
|
||||
msg_fmt = _("%(message)s")
|
||||
|
||||
|
@ -202,8 +208,8 @@ class InternalError(SenlinException):
|
|||
"""A base class for internal exceptions in senlin.
|
||||
|
||||
The internal exception classes which inherit from :class:`SenlinException`
|
||||
class should be translated to a user facing exception type if need to be
|
||||
made user visible.
|
||||
class should be translated to a user facing exception type if they need to
|
||||
be made user visible.
|
||||
"""
|
||||
msg_fmt = _("%(message)s")
|
||||
message = _('Internal error happened')
|
||||
|
|
|
@ -329,6 +329,11 @@ def action_get(context, action_id, project_safe=True, refresh=False):
|
|||
refresh=refresh)
|
||||
|
||||
|
||||
def action_list_active_scaling(context, cluster_id, project_safe=True):
|
||||
return IMPL.action_list_active_scaling(context, cluster_id,
|
||||
project_safe=project_safe)
|
||||
|
||||
|
||||
def action_get_by_name(context, name, project_safe=True):
|
||||
return IMPL.action_get_by_name(context, name, project_safe=project_safe)
|
||||
|
||||
|
|
|
@ -1067,6 +1067,25 @@ def action_get(context, action_id, project_safe=True, refresh=False):
|
|||
return action
|
||||
|
||||
|
||||
def action_list_active_scaling(context, cluster_id=None, project_safe=True):
|
||||
with session_for_read() as session:
|
||||
query = session.query(models.Action)
|
||||
if project_safe:
|
||||
query = query.filter_by(project=context.project_id)
|
||||
if cluster_id:
|
||||
query = query.filter_by(target=cluster_id)
|
||||
query = query.filter(
|
||||
models.Action.status.in_(
|
||||
[consts.ACTION_READY,
|
||||
consts.ACTION_WAITING,
|
||||
consts.ACTION_RUNNING,
|
||||
consts.ACTION_WAITING_LIFECYCLE_COMPLETION]))
|
||||
query = query.filter(
|
||||
models.Action.action.in_(consts.CLUSTER_SCALE_ACTIONS))
|
||||
scaling_actions = query.all()
|
||||
return scaling_actions
|
||||
|
||||
|
||||
def action_get_by_name(context, name, project_safe=True):
|
||||
return query_by_name(context, models.Action, name,
|
||||
project_safe=project_safe)
|
||||
|
|
|
@ -274,6 +274,9 @@ class Action(object):
|
|||
}
|
||||
c = req_context.RequestContext.from_dict(params)
|
||||
|
||||
if action in consts.CLUSTER_SCALE_ACTIONS:
|
||||
Action.validate_scaling_action(c, target, action)
|
||||
|
||||
obj = cls(target, action, c, **kwargs)
|
||||
return obj.store(ctx)
|
||||
|
||||
|
@ -466,6 +469,70 @@ class Action(object):
|
|||
return
|
||||
return
|
||||
|
||||
@staticmethod
|
||||
def validate_scaling_action(ctx, cluster_id, action):
|
||||
"""Validate scaling action against actions table and policy cooldown.
|
||||
|
||||
:param ctx: An instance of the request context.
|
||||
:param cluster_id: ID of the cluster the scaling action is targeting.
|
||||
:param action: Scaling action being validated.
|
||||
:return: None
|
||||
:raises: An exception of ``ActionCooldown`` when the action being
|
||||
validated is still in cooldown based off the policy or
|
||||
``ActionConflict`` when a scaling action is already in the action
|
||||
table.
|
||||
"""
|
||||
# Check for conflicting actions in the actions table.
|
||||
conflicting_actions = Action._get_conflicting_scaling_actions(
|
||||
ctx, cluster_id)
|
||||
if conflicting_actions:
|
||||
action_ids = [a.get('id', None) for a in conflicting_actions]
|
||||
LOG.info("Unable to process %(action)s for cluster %(cluster_id)s "
|
||||
"the action conflicts with %(conflicts)s",
|
||||
{'action': action,
|
||||
'cluster_id': cluster_id,
|
||||
'conflicts': action_ids})
|
||||
raise exception.ActionConflict(
|
||||
type=action,
|
||||
target=cluster_id,
|
||||
actions=",".join(action_ids))
|
||||
|
||||
# Check to see if action cooldown should be observed.
|
||||
bindings = cpo.ClusterPolicy.get_all(ctx, cluster_id,
|
||||
sort='priority',
|
||||
filters={'enabled': True})
|
||||
for pb in bindings:
|
||||
policy = policy_mod.Policy.load(ctx, pb.policy_id)
|
||||
if getattr(policy, 'cooldown', None) and policy.event == action:
|
||||
if pb.last_op and not timeutils.is_older_than(
|
||||
pb.last_op, policy.cooldown):
|
||||
LOG.info("Unable to process %(action)s for cluster "
|
||||
"%(cluster_id)s the actions policy %(policy)s "
|
||||
"cooldown still in progress",
|
||||
{'action': action,
|
||||
'cluster_id': cluster_id,
|
||||
'policy': pb.policy_id})
|
||||
raise exception.ActionCooldown(
|
||||
type=action,
|
||||
cluster=cluster_id,
|
||||
policy_id=pb.policy_id)
|
||||
return
|
||||
|
||||
@staticmethod
|
||||
def _get_conflicting_scaling_actions(ctx, cluster_id):
|
||||
"""Check actions table for conflicting scaling actions.
|
||||
|
||||
:param ctx: An instance of the request context.
|
||||
:param cluster_id: ID of the cluster the scaling action is targeting.
|
||||
:return: A list of conflicting actions.
|
||||
"""
|
||||
scaling_actions = ao.Action.action_list_active_scaling(
|
||||
ctx, cluster_id)
|
||||
if scaling_actions:
|
||||
return [a.to_dict() for a in scaling_actions]
|
||||
else:
|
||||
return None
|
||||
|
||||
def to_dict(self):
|
||||
if self.id:
|
||||
dep_on = dobj.Dependency.get_depended(self.context, self.id)
|
||||
|
|
|
@ -2511,7 +2511,7 @@ class EngineService(service.Service):
|
|||
receiver = receiver_obj.Receiver.find(ctx, identity)
|
||||
|
||||
try:
|
||||
cluster = co.Cluster.find(ctx, receiver.cluster_id)
|
||||
db_cluster = co.Cluster.find(ctx, receiver.cluster_id)
|
||||
except (exception.ResourceNotFound, exception.MultipleChoices) as ex:
|
||||
msg = ex.enhance_msg('referenced', ex)
|
||||
raise exception.BadRequest(msg=msg)
|
||||
|
@ -2527,7 +2527,7 @@ class EngineService(service.Service):
|
|||
'inputs': data
|
||||
}
|
||||
|
||||
action_id = action_mod.Action.create(ctx, cluster.id,
|
||||
action_id = action_mod.Action.create(ctx, db_cluster.id,
|
||||
receiver.action, **kwargs)
|
||||
dispatcher.start_action()
|
||||
LOG.info("Webhook %(w)s triggered with action queued: %(a)s.",
|
||||
|
|
|
@ -95,6 +95,11 @@ class Action(base.SenlinObject, base.VersionedObjectDictCompat):
|
|||
obj = db_api.action_get_by_short_id(context, short_id, **kwargs)
|
||||
return cls._from_db_object(context, cls(), obj)
|
||||
|
||||
@classmethod
|
||||
def action_list_active_scaling(cls, context, cluster_id, **kwargs):
|
||||
objs = db_api.action_list_active_scaling(context, cluster_id, **kwargs)
|
||||
return [cls._from_db_object(context, cls(), obj) for obj in objs]
|
||||
|
||||
@classmethod
|
||||
def get_all(cls, context, **kwargs):
|
||||
objs = db_api.action_get_all(context, **kwargs)
|
||||
|
|
|
@ -94,6 +94,11 @@ class ActionBaseTest(base.SenlinTestCase):
|
|||
self.assertIsNone(obj.updated_at)
|
||||
self.assertEqual({}, obj.data)
|
||||
|
||||
def _create_cp_binding(self, cluster_id, policy_id):
|
||||
return cpo.ClusterPolicy(cluster_id=cluster_id, policy_id=policy_id,
|
||||
enabled=True, id=uuidutils.generate_uuid(),
|
||||
last_op=None)
|
||||
|
||||
@mock.patch.object(cluster_mod.Cluster, 'load')
|
||||
def test_action_new_cluster(self, mock_load):
|
||||
fake_cluster = mock.Mock(timeout=cfg.CONF.default_action_timeout)
|
||||
|
@ -337,6 +342,43 @@ class ActionBaseTest(base.SenlinTestCase):
|
|||
mock_store.assert_not_called()
|
||||
mock_active.assert_called_once_with(mock.ANY, OBJID)
|
||||
|
||||
@mock.patch.object(timeutils, 'is_older_than')
|
||||
@mock.patch.object(cpo.ClusterPolicy, 'get_all')
|
||||
@mock.patch.object(policy_mod.Policy, 'load')
|
||||
@mock.patch.object(ab.Action, 'store')
|
||||
def test_action_create_scaling_cooldown_in_progress(self, mock_store,
|
||||
mock_load,
|
||||
mock_load_all,
|
||||
mock_time_util):
|
||||
cluster_id = CLUSTER_ID
|
||||
# Note: policy is mocked
|
||||
policy_id = uuidutils.generate_uuid()
|
||||
policy = mock.Mock(id=policy_id,
|
||||
TARGET=[('AFTER', 'CLUSTER_SCALE_OUT')],
|
||||
event='CLUSTER_SCALE_OUT',
|
||||
cooldown=240)
|
||||
pb = self._create_cp_binding(cluster_id, policy.id)
|
||||
pb.last_op = timeutils.utcnow(True)
|
||||
mock_load_all.return_value = [pb]
|
||||
mock_load.return_value = policy
|
||||
mock_time_util.return_value = False
|
||||
self.assertRaises(exception.ActionCooldown, ab.Action.create, self.ctx,
|
||||
cluster_id, 'CLUSTER_SCALE_OUT')
|
||||
self.assertEqual(0, mock_store.call_count)
|
||||
|
||||
@mock.patch.object(ao.Action, 'action_list_active_scaling')
|
||||
@mock.patch.object(ab.Action, 'store')
|
||||
def test_action_create_scaling_conflict(self, mock_store,
|
||||
mock_list_active):
|
||||
cluster_id = CLUSTER_ID
|
||||
|
||||
mock_action = mock.Mock()
|
||||
mock_action.to_dict.return_value = {'id': 'fake_action_id'}
|
||||
mock_list_active.return_value = [mock_action]
|
||||
self.assertRaises(exception.ActionConflict, ab.Action.create, self.ctx,
|
||||
cluster_id, 'CLUSTER_SCALE_IN')
|
||||
self.assertEqual(0, mock_store.call_count)
|
||||
|
||||
def test_action_delete(self):
|
||||
result = ab.Action.delete(self.ctx, 'non-existent')
|
||||
self.assertIsNone(result)
|
||||
|
|
Loading…
Reference in New Issue