From 27a0cf7556d3abe3cffeab35902bc51875d3e030 Mon Sep 17 00:00:00 2001 From: tengqm Date: Wed, 31 Dec 2014 17:43:18 +0800 Subject: [PATCH] A bunch of changes to Action implementation. - No target object is passed as positional argument; - Added CustomAction as a placeholder for future extension; - Reworked intialization logic; - Renamed default action sets to make them more explicit; - Added some pseudo logics to some methods, to be completed yet. --- senlin/engine/action.py | 258 ++++++++++++++++++++++++++++++---------- 1 file changed, 192 insertions(+), 66 deletions(-) diff --git a/senlin/engine/action.py b/senlin/engine/action.py index cb6369b06..51f8a2cea 100644 --- a/senlin/engine/action.py +++ b/senlin/engine/action.py @@ -10,9 +10,16 @@ # License for the specific language governing permissions and limitations # under the License. +import copy +import datetime + from oslo.config import cfg +from senlin.common import exception from senlin.db import api as db_api +from senlin.engine import cluster as clusters +from senlin.engine import node as nodes +from senlin.engine import scheduler class Action(object): @@ -20,9 +27,9 @@ class Action(object): An action can be performed on a cluster or a node of a cluster. ''' RETURNS = ( - OK, FAILED, RETRY, + RES_OK, RES_ERROR, RES_RETRY, ) = ( - 'OK', 'FAILED', 'RETRY', + 'OK', 'ERROR', 'RETRY', ) # Action status definitions: @@ -41,57 +48,72 @@ class Action(object): 'SUCCEEDED', 'FAILED', 'CANCELLED', ) - def __init__(self, context): - self.id = None + def __new__(cls, context, action, **kwargs): + if (cls != Action): + return super(Action, cls).__new__(cls) + + target_type = action.split('_')[0] + if target_type == 'CLUSTER': + ActionClass = ClusterAction + elif target_type == 'NODE': + ActionClass = NodeAction + elif target_type == 'POLICY': + ActionClass = PolicyAction + else: + ActionClass = CustomAction + + return super(Action, cls).__new__(ActionClass) + + def __init__(self, context, action, **kwargs): # context will be persisted into database so that any worker thread # can pick the action up and execute it on behalf of the initiator - self.context = context + self.context = copy.deepcopy(context) - self.description = '' + self.description = kwargs.get('description', '') # Target is the ID of a cluster, a node, a profile - self.target = '' + self.target = kwargs.get('target', None) + if self.target is None: + raise exception.ActionMissingTarget(action=action) - # An action - self.action = '' + self.action = action # Why this action is fired, it can be a UUID of another action - self.cause = '' + self.cause = kwargs.get('cause', '') # Owner can be an UUID format ID for the worker that is currently # working on the action. It also serves as a lock. - self.owner = '' + self.owner = kwargs.get('owner', None) # An action may need to be executed repeatitively, interval is the # time in seconds between two consequtive execution. # A value of -1 indicates that this action is only to be executed once - self.interval = -1 + self.interval = kwargs.get('interval', -1) # Start time can be an absolute time or a time relative to another # action. E.g. # - '2014-12-18 08:41:39.908569' # - 'AFTER: 57292917-af90-4c45-9457-34777d939d4d' - # - 'WHEN: 0265f93b-b1d7-421f-b5ad-cb83de2f559d' - self.start_time = '' - self.end_time = '' + # - 'WHEN: 0265f93b-b1d7-421f-b5ad-cb83de2f559d' + self.start_time = kwargs.get('start_time', None) + self.end_time = kwargs.get('end_time', None) # Timeout is a placeholder in case some actions may linger too long - self.timeout = cfg.CONF.default_action_timeout + self.timeout = kwargs.get('timeout', cfg.CONF.default_action_timeout) # Return code, useful when action is not automatically deleted # after execution - self.status = '' - self.status_reason = '' + self.status = kwargs.get('status', self.INIT) + self.status_reason = kwargs.get('status_reason', '') - # All parameters are passed in using keyward arguments which is - # a list stored as JSON in DB - self.inputs = {} - self.outputs = {} + # All parameters are passed in using keyword arguments which is + # a dictionary stored as JSON in DB + self.inputs = kwargs.get('inputs', {}) + self.outputs = kwargs.get('outputs', {}) # Dependency with other actions - self.depends_on = [] - self.depended_by = [] - + self.depends_on = kwargs.get('depends_on', []) + self.depended_by = kwargs.get('depended_by', []) def execute(self, **kwargs): return NotImplemented @@ -100,43 +122,101 @@ class Action(object): return NotImplemented def store(self): - #db_api.action_update(self.id) + #db_api.action_update(self.id) return + def set_status(self, status): + ''' + Set action status. + This is not merely about a db record update. + ''' + if status == self.SUCCEEDED: + db_api.action_mark_succeeded(self.context, self.id) + elif status == self.FAILED: + db_api.action_mark_failed(self.context, self.id) + elif status == self.CANCELLED: + db_api.action_mark_cancelled(self.context, self.id) + + self.status = status + + def get_status(self): + action = db_api.action_get(self.context, self.id) + self.status = action.status + return action.status + class ClusterAction(Action): ''' An action performed on a cluster. ''' ACTIONS = ( - CREATE, DELETE, ADD_NODE, DEL_NODE, UPDATE, - ATTACH_POLICY, DETACH_POLICY, + CLUSTER_CREATE, CLUSTER_DELETE, CLUSTER_UPDATE, + CLUSTER_ADD_NODES, CLUSTER_DEL_NODES, CLUSTER_RESIZE, + CLUSTER_ATTACH_POLICY, CLUSTER_DETACH_POLICY, ) = ( - 'CREATE', 'DELETE', 'ADD_NODE', 'DEL_NODE', 'UPDATE', - 'ATTACH_POLICY', 'DETACH_POLICY', + 'CLUSTER_CREATE', 'CLUSTER_DELETE', 'CLUSTER_UPDATE', + 'CLUSTER_ADD_NODES', 'CLUSTER_DEL_NODES', 'CLUSTER_RESIZE', + 'CLUSTER_ATTACH_POLICY', 'CLUSTER_DETACH_POLICY', ) - def __init__(self, context, cluster): - super(ClusterAction, self).__init__(context) - self.target = cluster - - def execute(self, action, **kwargs): + def __init__(self, context, action, **kwargs): + super(ClusterAction, self).__init__(context, action, **kwargs) if action not in self.ACTIONS: - return self.FAILED + raise exception.ActionNotSupported( + action=action, object=_('cluster %s') % self.target) - if action == self.CREATE: - # TODO: + def execute(self, **kwargs): + ''' + Execute the action. + In theory, the action encapsulates all information needed for + execution. 'kwargs' may specify additional parameters. + :param kwargs: additional parameters that may override the default + properties stored in the action record. + ''' + cluster = db_api.cluster_get(self.context, self.target) + if not cluster: + return self.RES_ERROR + + if self.action == self.CLUSTER_CREATE: + # TODO(Qiming): # We should query the lock of cluster here and wrap # cluster.do_create, and then let threadgroupmanager # to start a thread for this progress. - cluster.do_create(kwargs) - else: - return self.FAILED + cluster.do_create() - return self.OK + for m in range(cluster.size): + name = 'node-%003s' % m + node = nodes.Node(name, cluster.profile_id, cluster.id) + node.store() + kwargs = { + 'target': node.id, + } + + action = NodeAction(context, 'NODE_CREATE', **kwargs) + action.set_status(self.READY) + + scheduler.notify() + + elif self.action == self.CLUSTER_UPDATE: + # TODO(Yanyan): grab lock + cluster._set_status(self.UPDATING) + node_list = cluster.get_nodes() + for node_id in node_list: + node = db_api.node_get(node_id) + action = actions.Action(context, node, 'NODE_UPDATE', **kwargs) + + # start a thread asynchronously + handle = scheduler.runAction(action) + scheduler.wait(handle) + # TODO(Yanyan): release lock + cluster._set_status(self.ACTIVE) + + return self.RES_ERROR + + return self.RES_OK def cancel(self): - return self.FAILED + return self.RES_OK class NodeAction(Action): @@ -144,24 +224,44 @@ class NodeAction(Action): An action performed on a cluster member. ''' ACTIONS = ( - CREATE, DELETE, UPDATE, JOIN, LEAVE, + NODE_CREATE, NODE_DELETE, NODE_UPDATE, + NODE_JOIN_CLUSTER, NODE_LEAVE_CLUSTER, ) = ( - 'CREATE', 'DELETE', 'UPDATE', 'JOIN', 'LEAVE', + 'NODE_CREATE', 'NODE_DELETE', 'NODE_UPDATE', + 'NODE_JOIN_CLUSTER', 'NODE_LEAVE_CLUSTER', ) - def __init__(self, context, node): - super(NodeAction, self).__init__(context) + def __init__(self, context, action, **kwargs): + super(NodeAction, self).__init__(context, action, **kwargs) - # get cluster of this node + if action not in self.ACTIONS: + return self.RES_ERROR + + # get cluster of this node # get policies associated with the cluster - def execute(self, action, **kwargs): - if action not in self.ACTIONS: - return self.FAILED - return self.OK + def execute(self, **kwargs): + if self.action == self.NODE_CREATE: + profile_id = kwargs.get('profile_id') + name = kwargs.get('name') + profile = db_api.profile_get(self.context, profile_id) + node = profile.create_object(name, profile_id) + if not node: + return self.RES_ERROR + elif self.action == self.NODE_DELETE: + node_id = self.target + profile.delete_object(node_id) + elif self.action == self.NODE_UPDATE: + node_id = self.target + profile_id = kwargs.get('profile_id') + profile.update_object(node_id, profile_id) + else: + return self.RES_ERROR + + return self.RES_OK def cancel(self): - return self.OK + return self.RES_OK class PolicyAction(Action): @@ -173,18 +273,26 @@ class PolicyAction(Action): ''' ACTIONS = ( - ENABLE, DISABLE, UPDATE, + POLICY_ENABLE, POLICY_DISABLE, POLICY_UPDATE, ) = ( - 'ENABLE', 'DISABLE', 'UPDATE', + 'POLICY_ENABLE', 'POLICY_DISABLE', 'POLICY_UPDATE', ) - def __init__(self, context, cluster_id, policy_id): - super(PolicyAction, self).__init__(context) + def __init__(self, context, action, **kwargs): + super(PolicyAction, self).__init__(context, action, **kwargs) + self.cluster_id = kwargs.get('cluster_id', None) + if self.cluster_id is None: + raise exception.ActionMissingTarget(action) + + self.policy_id = kwargs.get('policy_id', None) + if self.policy_id is None: + raise exception.ActionMissingPolicy(action) + # get policy associaton using the cluster id and policy id - def execute(self, action, **kwargs): - if action not in self.ACTIONS: - return self.FAILED + def execute(self, **kwargs): + if self.action not in self.ACTIONS: + return self.RES_ERROR self.store(start_time=datetime.datetime.utcnow(), status=self.RUNNING) @@ -193,23 +301,41 @@ class PolicyAction(Action): policy_id = kwargs.get('policy_id') # an ENABLE/DISABLE action only changes the database table - if action == self.ENABLE: + if self.action == self.POLICY_ENABLE: db_api.cluster_enable_policy(cluster_id, policy_id) - elif action == self.DISABLE: + elif self.action == self.POLICY_DISABLE: db_api.cluster_disable_policy(cluster_id, policy_id) - else: # action == self.UPDATE: + else: # self.action == self.UPDATE: # There is not direct way to update a policy because the policy # might be shared with another cluster, instead, we clone a new # policy and replace the cluster-policy entry. pass # TODO(Qiming): Add DB API complete this. - + self.store(end_time=datetime.datetime.utcnow(), status=self.SUCCEEDED) - return self.OK + + return self.RES_OK def cancel(self): self.store(end_time=datetime.datetime.utcnow(), status=self.CANCELLED) - return self.OK + return self.RES_OK + + +class CustomAction(Action): + ACTIONS = ( + ACTION_EXECUTE, + ) = ( + 'ACTION_EXECUTE', + ) + + def __init__(self, context, action, **kwargs): + super(CustomAction, self).__init__(context, action, **kwargs) + + def execute(self, **kwargs): + return self.RES_OK + + def cancel(self): + return self.RES_OK