Clustering service for managing homogeneous objects in OpenStack
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

692 lines
26 KiB

# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import eventlet
import time
from oslo_config import cfg
from oslo_log import log as logging
from oslo_utils import timeutils
from senlin.common import consts
from senlin.common import context as req_context
from senlin.common import exception
from senlin.common import utils
from senlin.engine import dispatcher
from senlin.engine import event as EVENT
from senlin.objects import action as ao
from senlin.objects import cluster_lock as cl
from senlin.objects import cluster_policy as cpo
from senlin.objects import dependency as dobj
from senlin.objects import node_lock as nl
from senlin.policies import base as policy_mod
wallclock = time.time
LOG = logging.getLogger(__name__)
class Action(object):
"""An action can be performed on a cluster or a node of a cluster."""
) = (
# Action status definitions:
# INIT: Not ready to be executed because fields are being modified,
# or dependency with other actions are being analyzed.
# READY: Initialized and ready to be executed by a worker.
# RUNNING: Being executed by a worker thread.
# SUCCEEDED: Completed with success.
# FAILED: Completed with failure.
# CANCELLED: Action cancelled because worker thread was cancelled.
) = (
# Signal commands
) = (
def __new__(cls, target, action, ctx, **kwargs):
if (cls != Action):
return super(Action, cls).__new__(cls)
target_type = action.split('_')[0]
if target_type == 'CLUSTER':
from senlin.engine.actions import cluster_action
ActionClass = cluster_action.ClusterAction
elif target_type == 'NODE':
from senlin.engine.actions import node_action
ActionClass = node_action.NodeAction
from senlin.engine.actions import custom_action
ActionClass = custom_action.CustomAction
return super(Action, cls).__new__(ActionClass)
def __init__(self, target, action, ctx, **kwargs):
# context will be persisted into database so that any worker thread
# can pick the action up and execute it on behalf of the initiator = kwargs.get('id', None) = kwargs.get('name', '')
self.cluster_id = kwargs.get('cluster_id', '')
self.context = ctx
self.user = ctx.user_id
self.project = ctx.project_id
self.domain = ctx.domain_id
self.action = action = target
# Why this action is fired, it can be a UUID of another action
self.cause = kwargs.get('cause', '')
# Owner can be an UUID format ID for the worker that is currently
# working on the action. It also serves as a lock.
self.owner = kwargs.get('owner', None)
# An action may need to be executed repeatitively, interval is the
# time in seconds between two consecutive execution.
# A value of -1 indicates that this action is only to be executed once
self.interval = kwargs.get('interval', -1)
# Start time can be an absolute time or a time relative to another
# action. E.g.
# - '2014-12-18 08:41:39.908569'
# - 'AFTER: 57292917-af90-4c45-9457-34777d939d4d'
# - 'WHEN: 0265f93b-b1d7-421f-b5ad-cb83de2f559d'
self.start_time = kwargs.get('start_time', None)
self.end_time = kwargs.get('end_time', None)
# Timeout is a placeholder in case some actions may linger too long
self.timeout = kwargs.get('timeout', cfg.CONF.default_action_timeout)
# Return code, useful when action is not automatically deleted
# after execution
self.status = kwargs.get('status', self.INIT)
self.status_reason = kwargs.get('status_reason', '')
# All parameters are passed in using keyword arguments which is
# a dictionary stored as JSON in DB
self.inputs = kwargs.get('inputs', {})
self.outputs = kwargs.get('outputs', {})
self.created_at = kwargs.get('created_at', None)
self.updated_at = kwargs.get('updated_at', None) = kwargs.get('data', {})
def store(self, ctx):
"""Store the action record into database table.
:param ctx: An instance of the request context.
:return: The ID of the stored object.
timestamp = timeutils.utcnow(True)
values = {
'cluster_id': self.cluster_id,
'context': self.context.to_dict(),
'action': self.action,
'cause': self.cause,
'owner': self.owner,
'interval': self.interval,
'start_time': self.start_time,
'end_time': self.end_time,
'timeout': self.timeout,
'status': self.status,
'status_reason': self.status_reason,
'inputs': self.inputs,
'outputs': self.outputs,
'created_at': self.created_at,
'updated_at': self.updated_at,
'user': self.user,
'project': self.project,
'domain': self.domain,
self.updated_at = timestamp
values['updated_at'] = timestamp
ao.Action.update(ctx,, values)
self.created_at = timestamp
values['created_at'] = timestamp
action = ao.Action.create(ctx, values) =
def _from_object(cls, obj):
"""Construct an action from database object.
:param obj: a DB action object that contains all fields.
:return: An `Action` object deserialized from the DB action object.
ctx = req_context.RequestContext.from_dict(obj.context)
kwargs = {
'cluster_id': obj.cluster_id,
'cause': obj.cause,
'owner': obj.owner,
'interval': obj.interval,
'start_time': obj.start_time,
'end_time': obj.end_time,
'timeout': obj.timeout,
'status': obj.status,
'status_reason': obj.status_reason,
'inputs': obj.inputs or {},
'outputs': obj.outputs or {},
'created_at': obj.created_at,
'updated_at': obj.updated_at,
target_type = obj.action.split('_')[0]
if target_type == 'CLUSTER':
from senlin.engine.actions import cluster_action
ActionClass = cluster_action.ClusterAction
elif target_type == 'NODE':
from senlin.engine.actions import node_action
ActionClass = node_action.NodeAction
from senlin.engine.actions import custom_action
ActionClass = custom_action.CustomAction
return ActionClass(, obj.action, ctx, **kwargs)
def load(cls, ctx, action_id=None, db_action=None, project_safe=True):
"""Retrieve an action from database.
:param ctx: Instance of request context.
:param action_id: An UUID for the action to deserialize.
:param db_action: An action object for the action to deserialize.
:return: A `Action` object instance.
if db_action is None:
db_action = ao.Action.get(ctx, action_id,
if db_action is None:
raise exception.ResourceNotFound(type='action', id=action_id)
return cls._from_object(db_action)
def create(cls, ctx, target, action, force=False, **kwargs):
"""Create an action object.
:param ctx: The requesting context.
:param target: The ID of the target cluster/node.
:param action: Name of the action.
:param force: Skip checking locks/conflicts
:param dict kwargs: Other keyword arguments for the action.
:return: ID of the action created.
if not force:
cls._check_action_lock(target, action)
cls._check_conflicting_actions(ctx, target, action)
params = {
'user_id': ctx.user_id,
'project_id': ctx.project_id,
'domain_id': ctx.domain_id,
'is_admin': ctx.is_admin,
'request_id': ctx.request_id,
'trusts': ctx.trusts,
c = req_context.RequestContext.from_dict(params)
if action in consts.CLUSTER_SCALE_ACTIONS:
Action.validate_scaling_action(c, target, action)
obj = cls(target, action, c, **kwargs)
def _check_action_lock(target, action):
if action in consts.LOCK_BYPASS_ACTIONS:
elif (action in list(consts.CLUSTER_ACTION_NAMES) and
raise exception.ResourceIsLocked(
action=action, type='cluster', id=target)
elif (action in list(consts.NODE_ACTION_NAMES) and
raise exception.ResourceIsLocked(
action=action, type='node', id=target)
def _check_conflicting_actions(ctx, target, action):
conflict_actions = ao.Action.get_all_active_by_target(ctx, target)
# Ignore conflicting actions on deletes.
if not conflict_actions or action in consts.CONFLICT_BYPASS_ACTIONS:
action_ids = [a['id'] for a in conflict_actions]
raise exception.ActionConflict(
type=action, target=target, actions=",".join(action_ids))
def delete(cls, ctx, action_id):
"""Delete an action from database.
:param ctx: An instance of the request context.
:param action_id: The UUID of the target action to be deleted.
:return: Nothing.
ao.Action.delete(ctx, action_id)
def signal(self, cmd):
"""Send a signal to the action.
:param cmd: One of the command word defined in self.COMMANDS.
:returns: None
if cmd not in self.COMMANDS:
if cmd == self.SIG_CANCEL:
expected = (self.INIT, self.WAITING, self.READY, self.RUNNING,
elif cmd == self.SIG_SUSPEND:
expected = (self.RUNNING)
else: # SIG_RESUME
expected = (self.SUSPENDED)
if self.status not in expected:"Action (%(id)s) is in status (%(actual)s) while "
"expected status must be one of (%(expected)s).",
dict([:8], expected=expected,
ao.Action.signal(self.context,, cmd)
def signal_cancel(self):
"""Signal the action and any depended actions to cancel.
If the action or any depended actions are in status
'WAITING_LIFECYCLE_COMPLETION' or 'INIT' update the status to cancelled
:raises: `ActionImmutable` if the action is in an unchangeable state
expected = (self.INIT, self.WAITING, self.READY, self.RUNNING,
if self.status not in expected:
raise exception.ActionImmutable([:8], expected=expected,
ao.Action.signal(self.context,, self.SIG_CANCEL)
if self.status in (self.WAITING_LIFECYCLE_COMPLETION, self.INIT):
self.set_status(self.RES_CANCEL, 'Action execution cancelled')
depended = dobj.Dependency.get_depended(self.context,
if not depended:
for child in depended:
# Try to cancel all dependant actions
action = self.load(self.context, action_id=child)
if not action.is_cancelled():
ao.Action.signal(self.context, child, self.SIG_CANCEL)
# If the action is in WAITING_LIFECYCLE_COMPLETION or INIT update
# the status to CANCELLED immediately.
if action.status in (action.WAITING_LIFECYCLE_COMPLETION,
'Action execution cancelled')
def force_cancel(self):
"""Force the action and any depended actions to cancel.
If the action or any depended actions are in status 'INIT', 'WAITING',
update their status to cancelled. This should only be used if an action
is stuck/dead and has no expectation of ever completing.
:raises: `ActionImmutable` if the action is in an unchangeable state
LOG.debug('Forcing action %s to cancel.',
self.set_status(self.RES_CANCEL, 'Action execution force cancelled')
depended = dobj.Dependency.get_depended(self.context,
if not depended:
for child in depended:
# Force cancel all dependant actions
action = self.load(self.context, action_id=child)
if action.status in (action.INIT, action.WAITING, action.READY,
LOG.debug('Forcing action %s to cancel.',
'Action execution force cancelled')
def execute(self, **kwargs):
"""Execute the action.
In theory, the action encapsulates all information needed for
execution. 'kwargs' may specify additional parameters.
:param kwargs: additional parameters that may override the default
properties stored in the action record.
raise NotImplementedError
def release_lock(self):
"""Release the lock associated with the action."""
raise NotImplementedError
def set_status(self, result, reason=None):
"""Set action status based on return value from execute."""
timestamp = wallclock()
if result == self.RES_OK:
status = self.SUCCEEDED
ao.Action.mark_succeeded(self.context,, timestamp)
elif result == self.RES_ERROR:
status = self.FAILED
ao.Action.mark_failed(self.context,, timestamp,
reason or 'ERROR')
elif result == self.RES_TIMEOUT:
status = self.FAILED
ao.Action.mark_failed(self.context,, timestamp,
reason or 'TIMEOUT')
elif result == self.RES_CANCEL:
status = self.CANCELLED
ao.Action.mark_cancelled(self.context,, timestamp)
else: # result == self.RES_RETRY:
retries ='retries', 0)
# Action failed at the moment, but can be retried
# retries time is configurable
if retries < cfg.CONF.lock_retry_times:
status = self.READY
retries += 1{'retries': retries})
ao.Action.abandon(self.context,, {'data':})
# sleep for a while
status = self.RES_ERROR
if not reason:
reason = ('Exceeded maximum number of retries (%d)'
'') % cfg.CONF.lock_retry_times
ao.Action.mark_failed(self.context,, timestamp, reason)
if status == self.SUCCEEDED:, consts.PHASE_END, reason or 'SUCCEEDED')
elif status == self.READY:
EVENT.warning(self, consts.PHASE_ERROR, reason or 'RETRY')
EVENT.error(self, consts.PHASE_ERROR, reason or 'ERROR')
self.status = status
self.status_reason = reason
def get_status(self):
timestamp = wallclock()
status = ao.Action.check_status(self.context,, timestamp)
self.status = status
return status
def is_timeout(self, timeout=None):
if timeout is None:
timeout = self.timeout
if self.start_time is None:
return False
time_elapse = wallclock() - self.start_time
return time_elapse > timeout
def _check_signal(self):
# Check timeout first, if true, return timeout message
if self.timeout is not None and self.is_timeout():
EVENT.debug(self, consts.PHASE_ERROR, 'TIMEOUT')
return self.RES_TIMEOUT
result = ao.Action.signal_query(self.context,
return result
def is_cancelled(self):
return self._check_signal() == self.SIG_CANCEL
def is_suspended(self):
return self._check_signal() == self.SIG_SUSPEND
def is_resumed(self):
return self._check_signal() == self.SIG_RESUME
def policy_check(self, cluster_id, target):
"""Check all policies attached to cluster and give result.
:param cluster_id: The ID of the cluster to which the policy is
:param target: A tuple of ('when', action_name)
:return: A dictionary that contains the check result.
if target not in ['BEFORE', 'AFTER']:
bindings = cpo.ClusterPolicy.get_all(self.context, cluster_id,
filters={'enabled': True})
# default values['status'] = policy_mod.CHECK_NONE['reason'] = ''
for pb in bindings:
policy = policy_mod.Policy.load(self.context, pb.policy_id,
# add last_op as input for the policy so that it can be used
# during pre_op
self.inputs['last_op'] = pb.last_op
if not policy.need_check(target, self):
if target == 'BEFORE':
method = getattr(policy, 'pre_op', None)
else: # target == 'AFTER'
method = getattr(policy, 'post_op', None)
# call policy check function
# it will set result in data['status']
if method is not None:
method(cluster_id, self)
# stop policy check is one of them fails
if['status'] == policy_mod.CHECK_ERROR:
reason =['reason']['reason'] = ("Failed policy '%(name)s': %(reason)s"
) % {'name':,
'reason': reason}
return['status'] = policy_mod.CHECK_OK['reason'] = 'Completed policy checking.'
def validate_scaling_action(ctx, cluster_id, action):
"""Validate scaling action against actions table and policy cooldown.
:param ctx: An instance of the request context.
:param cluster_id: ID of the cluster the scaling action is targeting.
:param action: Scaling action being validated.
:return: None
:raises: An exception of ``ActionCooldown`` when the action being
validated is still in cooldown based off the policy or
``ActionConflict`` when a scaling action is already in the action
# Check for conflicting actions in the actions table.
conflicting_actions = Action._get_conflicting_scaling_actions(
ctx, cluster_id)
if conflicting_actions:
action_ids = [a.get('id', None) for a in conflicting_actions]"Unable to process %(action)s for cluster %(cluster_id)s "
"the action conflicts with %(conflicts)s",
{'action': action,
'cluster_id': cluster_id,
'conflicts': action_ids})
raise exception.ActionConflict(
# Check to see if action cooldown should be observed.
bindings = cpo.ClusterPolicy.get_all(ctx, cluster_id,
filters={'enabled': True})
for pb in bindings:
policy = policy_mod.Policy.load(ctx, pb.policy_id)
if getattr(policy, 'cooldown', None) and policy.event == action:
if pb.last_op and not timeutils.is_older_than(
pb.last_op, policy.cooldown):"Unable to process %(action)s for cluster "
"%(cluster_id)s the actions policy %(policy)s "
"cooldown still in progress",
{'action': action,
'cluster_id': cluster_id,
'policy': pb.policy_id})
raise exception.ActionCooldown(
def _get_conflicting_scaling_actions(ctx, cluster_id):
"""Check actions table for conflicting scaling actions.
:param ctx: An instance of the request context.
:param cluster_id: ID of the cluster the scaling action is targeting.
:return: A list of conflicting actions.
scaling_actions = ao.Action.action_list_active_scaling(
ctx, cluster_id)
if scaling_actions:
return [a.to_dict() for a in scaling_actions]
return None
def to_dict(self):
dep_on = dobj.Dependency.get_depended(self.context,
dep_by = dobj.Dependency.get_dependents(self.context,
dep_on = []
dep_by = []
action_dict = {
'cluster_id': self.cluster_id,
'action': self.action,
'cause': self.cause,
'owner': self.owner,
'interval': self.interval,
'start_time': self.start_time,
'end_time': self.end_time,
'timeout': self.timeout,
'status': self.status,
'status_reason': self.status_reason,
'inputs': self.inputs,
'outputs': self.outputs,
'depends_on': dep_on,
'depended_by': dep_by,
'created_at': utils.isotime(self.created_at),
'updated_at': utils.isotime(self.updated_at),
'user': self.user,
'project': self.project,
return action_dict
def ActionProc(ctx, action_id):
"""Action process."""
# Step 1: materialize the action object
action = Action.load(ctx, action_id=action_id, project_safe=False)
if action is None:
LOG.error('Action "%s" could not be found.', action_id)
return False
if action.is_cancelled():
reason = '%(action)s [%(id)s] cancelled' % {
'action': action.action, 'id':[:8]}
action.set_status(action.RES_CANCEL, reason)
return True, consts.PHASE_START, action_id[:8])
reason = 'Action completed'
success = True
# Step 2: execute the action
result, reason = action.execute()
if result == action.RES_RETRY:
success = False
except Exception as ex:
# We catch exception here to make sure the following logics are
# executed.
result = action.RES_ERROR
reason = str(ex)
LOG.exception('Unexpected exception occurred during action '
'%(action)s (%(id)s) execution: %(reason)s',
{'action': action.action, 'id':,
'reason': reason})
success = False
# NOTE: locks on action is eventually released here by status update
action.set_status(result, reason)
return success