A bunch of changes to Action implementation.
- No target object is passed as positional argument; - Added CustomAction as a placeholder for future extension; - Reworked intialization logic; - Renamed default action sets to make them more explicit; - Added some pseudo logics to some methods, to be completed yet.
This commit is contained in:
parent
4ab3e21bc1
commit
27a0cf7556
@ -10,9 +10,16 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import copy
|
||||
import datetime
|
||||
|
||||
from oslo.config import cfg
|
||||
|
||||
from senlin.common import exception
|
||||
from senlin.db import api as db_api
|
||||
from senlin.engine import cluster as clusters
|
||||
from senlin.engine import node as nodes
|
||||
from senlin.engine import scheduler
|
||||
|
||||
|
||||
class Action(object):
|
||||
@ -20,9 +27,9 @@ class Action(object):
|
||||
An action can be performed on a cluster or a node of a cluster.
|
||||
'''
|
||||
RETURNS = (
|
||||
OK, FAILED, RETRY,
|
||||
RES_OK, RES_ERROR, RES_RETRY,
|
||||
) = (
|
||||
'OK', 'FAILED', 'RETRY',
|
||||
'OK', 'ERROR', 'RETRY',
|
||||
)
|
||||
|
||||
# Action status definitions:
|
||||
@ -41,57 +48,72 @@ class Action(object):
|
||||
'SUCCEEDED', 'FAILED', 'CANCELLED',
|
||||
)
|
||||
|
||||
def __init__(self, context):
|
||||
self.id = None
|
||||
def __new__(cls, context, action, **kwargs):
|
||||
if (cls != Action):
|
||||
return super(Action, cls).__new__(cls)
|
||||
|
||||
target_type = action.split('_')[0]
|
||||
if target_type == 'CLUSTER':
|
||||
ActionClass = ClusterAction
|
||||
elif target_type == 'NODE':
|
||||
ActionClass = NodeAction
|
||||
elif target_type == 'POLICY':
|
||||
ActionClass = PolicyAction
|
||||
else:
|
||||
ActionClass = CustomAction
|
||||
|
||||
return super(Action, cls).__new__(ActionClass)
|
||||
|
||||
def __init__(self, context, action, **kwargs):
|
||||
# context will be persisted into database so that any worker thread
|
||||
# can pick the action up and execute it on behalf of the initiator
|
||||
self.context = context
|
||||
self.context = copy.deepcopy(context)
|
||||
|
||||
self.description = ''
|
||||
self.description = kwargs.get('description', '')
|
||||
|
||||
# Target is the ID of a cluster, a node, a profile
|
||||
self.target = ''
|
||||
self.target = kwargs.get('target', None)
|
||||
if self.target is None:
|
||||
raise exception.ActionMissingTarget(action=action)
|
||||
|
||||
# An action
|
||||
self.action = ''
|
||||
self.action = action
|
||||
|
||||
# Why this action is fired, it can be a UUID of another action
|
||||
self.cause = ''
|
||||
self.cause = kwargs.get('cause', '')
|
||||
|
||||
# Owner can be an UUID format ID for the worker that is currently
|
||||
# working on the action. It also serves as a lock.
|
||||
self.owner = ''
|
||||
self.owner = kwargs.get('owner', None)
|
||||
|
||||
# An action may need to be executed repeatitively, interval is the
|
||||
# time in seconds between two consequtive execution.
|
||||
# A value of -1 indicates that this action is only to be executed once
|
||||
self.interval = -1
|
||||
self.interval = kwargs.get('interval', -1)
|
||||
|
||||
# Start time can be an absolute time or a time relative to another
|
||||
# action. E.g.
|
||||
# - '2014-12-18 08:41:39.908569'
|
||||
# - 'AFTER: 57292917-af90-4c45-9457-34777d939d4d'
|
||||
# - 'WHEN: 0265f93b-b1d7-421f-b5ad-cb83de2f559d'
|
||||
self.start_time = ''
|
||||
self.end_time = ''
|
||||
# - 'WHEN: 0265f93b-b1d7-421f-b5ad-cb83de2f559d'
|
||||
self.start_time = kwargs.get('start_time', None)
|
||||
self.end_time = kwargs.get('end_time', None)
|
||||
|
||||
# Timeout is a placeholder in case some actions may linger too long
|
||||
self.timeout = cfg.CONF.default_action_timeout
|
||||
self.timeout = kwargs.get('timeout', cfg.CONF.default_action_timeout)
|
||||
|
||||
# Return code, useful when action is not automatically deleted
|
||||
# after execution
|
||||
self.status = ''
|
||||
self.status_reason = ''
|
||||
self.status = kwargs.get('status', self.INIT)
|
||||
self.status_reason = kwargs.get('status_reason', '')
|
||||
|
||||
# All parameters are passed in using keyward arguments which is
|
||||
# a list stored as JSON in DB
|
||||
self.inputs = {}
|
||||
self.outputs = {}
|
||||
# All parameters are passed in using keyword arguments which is
|
||||
# a dictionary stored as JSON in DB
|
||||
self.inputs = kwargs.get('inputs', {})
|
||||
self.outputs = kwargs.get('outputs', {})
|
||||
|
||||
# Dependency with other actions
|
||||
self.depends_on = []
|
||||
self.depended_by = []
|
||||
|
||||
self.depends_on = kwargs.get('depends_on', [])
|
||||
self.depended_by = kwargs.get('depended_by', [])
|
||||
|
||||
def execute(self, **kwargs):
|
||||
return NotImplemented
|
||||
@ -100,43 +122,101 @@ class Action(object):
|
||||
return NotImplemented
|
||||
|
||||
def store(self):
|
||||
#db_api.action_update(self.id)
|
||||
#db_api.action_update(self.id)
|
||||
return
|
||||
|
||||
def set_status(self, status):
|
||||
'''
|
||||
Set action status.
|
||||
This is not merely about a db record update.
|
||||
'''
|
||||
if status == self.SUCCEEDED:
|
||||
db_api.action_mark_succeeded(self.context, self.id)
|
||||
elif status == self.FAILED:
|
||||
db_api.action_mark_failed(self.context, self.id)
|
||||
elif status == self.CANCELLED:
|
||||
db_api.action_mark_cancelled(self.context, self.id)
|
||||
|
||||
self.status = status
|
||||
|
||||
def get_status(self):
|
||||
action = db_api.action_get(self.context, self.id)
|
||||
self.status = action.status
|
||||
return action.status
|
||||
|
||||
|
||||
class ClusterAction(Action):
|
||||
'''
|
||||
An action performed on a cluster.
|
||||
'''
|
||||
ACTIONS = (
|
||||
CREATE, DELETE, ADD_NODE, DEL_NODE, UPDATE,
|
||||
ATTACH_POLICY, DETACH_POLICY,
|
||||
CLUSTER_CREATE, CLUSTER_DELETE, CLUSTER_UPDATE,
|
||||
CLUSTER_ADD_NODES, CLUSTER_DEL_NODES, CLUSTER_RESIZE,
|
||||
CLUSTER_ATTACH_POLICY, CLUSTER_DETACH_POLICY,
|
||||
) = (
|
||||
'CREATE', 'DELETE', 'ADD_NODE', 'DEL_NODE', 'UPDATE',
|
||||
'ATTACH_POLICY', 'DETACH_POLICY',
|
||||
'CLUSTER_CREATE', 'CLUSTER_DELETE', 'CLUSTER_UPDATE',
|
||||
'CLUSTER_ADD_NODES', 'CLUSTER_DEL_NODES', 'CLUSTER_RESIZE',
|
||||
'CLUSTER_ATTACH_POLICY', 'CLUSTER_DETACH_POLICY',
|
||||
)
|
||||
|
||||
def __init__(self, context, cluster):
|
||||
super(ClusterAction, self).__init__(context)
|
||||
self.target = cluster
|
||||
|
||||
def execute(self, action, **kwargs):
|
||||
def __init__(self, context, action, **kwargs):
|
||||
super(ClusterAction, self).__init__(context, action, **kwargs)
|
||||
if action not in self.ACTIONS:
|
||||
return self.FAILED
|
||||
raise exception.ActionNotSupported(
|
||||
action=action, object=_('cluster %s') % self.target)
|
||||
|
||||
if action == self.CREATE:
|
||||
# TODO:
|
||||
def execute(self, **kwargs):
|
||||
'''
|
||||
Execute the action.
|
||||
In theory, the action encapsulates all information needed for
|
||||
execution. 'kwargs' may specify additional parameters.
|
||||
:param kwargs: additional parameters that may override the default
|
||||
properties stored in the action record.
|
||||
'''
|
||||
cluster = db_api.cluster_get(self.context, self.target)
|
||||
if not cluster:
|
||||
return self.RES_ERROR
|
||||
|
||||
if self.action == self.CLUSTER_CREATE:
|
||||
# TODO(Qiming):
|
||||
# We should query the lock of cluster here and wrap
|
||||
# cluster.do_create, and then let threadgroupmanager
|
||||
# to start a thread for this progress.
|
||||
cluster.do_create(kwargs)
|
||||
else:
|
||||
return self.FAILED
|
||||
cluster.do_create()
|
||||
|
||||
return self.OK
|
||||
for m in range(cluster.size):
|
||||
name = 'node-%003s' % m
|
||||
node = nodes.Node(name, cluster.profile_id, cluster.id)
|
||||
node.store()
|
||||
kwargs = {
|
||||
'target': node.id,
|
||||
}
|
||||
|
||||
action = NodeAction(context, 'NODE_CREATE', **kwargs)
|
||||
action.set_status(self.READY)
|
||||
|
||||
scheduler.notify()
|
||||
|
||||
elif self.action == self.CLUSTER_UPDATE:
|
||||
# TODO(Yanyan): grab lock
|
||||
cluster._set_status(self.UPDATING)
|
||||
node_list = cluster.get_nodes()
|
||||
for node_id in node_list:
|
||||
node = db_api.node_get(node_id)
|
||||
action = actions.Action(context, node, 'NODE_UPDATE', **kwargs)
|
||||
|
||||
# start a thread asynchronously
|
||||
handle = scheduler.runAction(action)
|
||||
scheduler.wait(handle)
|
||||
# TODO(Yanyan): release lock
|
||||
cluster._set_status(self.ACTIVE)
|
||||
|
||||
return self.RES_ERROR
|
||||
|
||||
return self.RES_OK
|
||||
|
||||
def cancel(self):
|
||||
return self.FAILED
|
||||
return self.RES_OK
|
||||
|
||||
|
||||
class NodeAction(Action):
|
||||
@ -144,24 +224,44 @@ class NodeAction(Action):
|
||||
An action performed on a cluster member.
|
||||
'''
|
||||
ACTIONS = (
|
||||
CREATE, DELETE, UPDATE, JOIN, LEAVE,
|
||||
NODE_CREATE, NODE_DELETE, NODE_UPDATE,
|
||||
NODE_JOIN_CLUSTER, NODE_LEAVE_CLUSTER,
|
||||
) = (
|
||||
'CREATE', 'DELETE', 'UPDATE', 'JOIN', 'LEAVE',
|
||||
'NODE_CREATE', 'NODE_DELETE', 'NODE_UPDATE',
|
||||
'NODE_JOIN_CLUSTER', 'NODE_LEAVE_CLUSTER',
|
||||
)
|
||||
|
||||
def __init__(self, context, node):
|
||||
super(NodeAction, self).__init__(context)
|
||||
def __init__(self, context, action, **kwargs):
|
||||
super(NodeAction, self).__init__(context, action, **kwargs)
|
||||
|
||||
# get cluster of this node
|
||||
if action not in self.ACTIONS:
|
||||
return self.RES_ERROR
|
||||
|
||||
# get cluster of this node
|
||||
# get policies associated with the cluster
|
||||
|
||||
def execute(self, action, **kwargs):
|
||||
if action not in self.ACTIONS:
|
||||
return self.FAILED
|
||||
return self.OK
|
||||
def execute(self, **kwargs):
|
||||
if self.action == self.NODE_CREATE:
|
||||
profile_id = kwargs.get('profile_id')
|
||||
name = kwargs.get('name')
|
||||
profile = db_api.profile_get(self.context, profile_id)
|
||||
node = profile.create_object(name, profile_id)
|
||||
if not node:
|
||||
return self.RES_ERROR
|
||||
elif self.action == self.NODE_DELETE:
|
||||
node_id = self.target
|
||||
profile.delete_object(node_id)
|
||||
elif self.action == self.NODE_UPDATE:
|
||||
node_id = self.target
|
||||
profile_id = kwargs.get('profile_id')
|
||||
profile.update_object(node_id, profile_id)
|
||||
else:
|
||||
return self.RES_ERROR
|
||||
|
||||
return self.RES_OK
|
||||
|
||||
def cancel(self):
|
||||
return self.OK
|
||||
return self.RES_OK
|
||||
|
||||
|
||||
class PolicyAction(Action):
|
||||
@ -173,18 +273,26 @@ class PolicyAction(Action):
|
||||
'''
|
||||
|
||||
ACTIONS = (
|
||||
ENABLE, DISABLE, UPDATE,
|
||||
POLICY_ENABLE, POLICY_DISABLE, POLICY_UPDATE,
|
||||
) = (
|
||||
'ENABLE', 'DISABLE', 'UPDATE',
|
||||
'POLICY_ENABLE', 'POLICY_DISABLE', 'POLICY_UPDATE',
|
||||
)
|
||||
|
||||
def __init__(self, context, cluster_id, policy_id):
|
||||
super(PolicyAction, self).__init__(context)
|
||||
def __init__(self, context, action, **kwargs):
|
||||
super(PolicyAction, self).__init__(context, action, **kwargs)
|
||||
self.cluster_id = kwargs.get('cluster_id', None)
|
||||
if self.cluster_id is None:
|
||||
raise exception.ActionMissingTarget(action)
|
||||
|
||||
self.policy_id = kwargs.get('policy_id', None)
|
||||
if self.policy_id is None:
|
||||
raise exception.ActionMissingPolicy(action)
|
||||
|
||||
# get policy associaton using the cluster id and policy id
|
||||
|
||||
def execute(self, action, **kwargs):
|
||||
if action not in self.ACTIONS:
|
||||
return self.FAILED
|
||||
def execute(self, **kwargs):
|
||||
if self.action not in self.ACTIONS:
|
||||
return self.RES_ERROR
|
||||
|
||||
self.store(start_time=datetime.datetime.utcnow(),
|
||||
status=self.RUNNING)
|
||||
@ -193,23 +301,41 @@ class PolicyAction(Action):
|
||||
policy_id = kwargs.get('policy_id')
|
||||
|
||||
# an ENABLE/DISABLE action only changes the database table
|
||||
if action == self.ENABLE:
|
||||
if self.action == self.POLICY_ENABLE:
|
||||
db_api.cluster_enable_policy(cluster_id, policy_id)
|
||||
elif action == self.DISABLE:
|
||||
elif self.action == self.POLICY_DISABLE:
|
||||
db_api.cluster_disable_policy(cluster_id, policy_id)
|
||||
else: # action == self.UPDATE:
|
||||
else: # self.action == self.UPDATE:
|
||||
# There is not direct way to update a policy because the policy
|
||||
# might be shared with another cluster, instead, we clone a new
|
||||
# policy and replace the cluster-policy entry.
|
||||
pass
|
||||
|
||||
# TODO(Qiming): Add DB API complete this.
|
||||
|
||||
|
||||
self.store(end_time=datetime.datetime.utcnow(),
|
||||
status=self.SUCCEEDED)
|
||||
return self.OK
|
||||
|
||||
return self.RES_OK
|
||||
|
||||
def cancel(self):
|
||||
self.store(end_time=datetime.datetime.utcnow(),
|
||||
status=self.CANCELLED)
|
||||
return self.OK
|
||||
return self.RES_OK
|
||||
|
||||
|
||||
class CustomAction(Action):
|
||||
ACTIONS = (
|
||||
ACTION_EXECUTE,
|
||||
) = (
|
||||
'ACTION_EXECUTE',
|
||||
)
|
||||
|
||||
def __init__(self, context, action, **kwargs):
|
||||
super(CustomAction, self).__init__(context, action, **kwargs)
|
||||
|
||||
def execute(self, **kwargs):
|
||||
return self.RES_OK
|
||||
|
||||
def cancel(self):
|
||||
return self.RES_OK
|
||||
|
Loading…
Reference in New Issue
Block a user