Added some required methods
Logics for cluster actions and node actions are partially added.
This commit is contained in:
parent
384cf67330
commit
f8e19680cc
@ -17,7 +17,7 @@ from oslo.config import cfg
|
||||
|
||||
from senlin.common import exception
|
||||
from senlin.db import api as db_api
|
||||
from senlin.engine import cluster as clusters
|
||||
from senlin.engine import environment
|
||||
from senlin.engine import node as nodes
|
||||
from senlin.engine import scheduler
|
||||
|
||||
@ -67,6 +67,10 @@ class Action(object):
|
||||
def __init__(self, context, action, **kwargs):
|
||||
# context will be persisted into database so that any worker thread
|
||||
# can pick the action up and execute it on behalf of the initiator
|
||||
if action not in self.ACTIONS:
|
||||
raise exception.ActionNotSupported(
|
||||
action=action, object=_('target %s') % self.target)
|
||||
|
||||
self.context = copy.deepcopy(context)
|
||||
|
||||
self.description = kwargs.get('description', '')
|
||||
@ -115,16 +119,88 @@ class Action(object):
|
||||
self.depends_on = kwargs.get('depends_on', [])
|
||||
self.depended_by = kwargs.get('depended_by', [])
|
||||
|
||||
def store(self):
|
||||
'''
|
||||
Store the action record into database table.
|
||||
'''
|
||||
values = {
|
||||
'name': self.name,
|
||||
'context': self.context,
|
||||
'target': self.target,
|
||||
'action': self.action,
|
||||
'cause': self.cause,
|
||||
'owner': self.owner,
|
||||
'interval': self.interval,
|
||||
'start_time': self.start_time,
|
||||
'end_time': self.end_time,
|
||||
'timeout': self.timeout,
|
||||
'status': self.status,
|
||||
'status_reason': self.status_reason,
|
||||
'inputs': self.inputs,
|
||||
'outputs': self.outputs,
|
||||
'depends_on': self.depends_on,
|
||||
'depended_by': self.depended_by,
|
||||
'deleted_time': self.deleted_time,
|
||||
}
|
||||
|
||||
action = db_api.action_create(self.context, self.id, values)
|
||||
self.id = action.id
|
||||
return self.id
|
||||
|
||||
@classmethod
|
||||
def from_db_record(cls, context, record):
|
||||
'''
|
||||
Construct a action object from database record.
|
||||
:param context: the context used for DB operations;
|
||||
:param record: a DB action object that contains all fields.
|
||||
'''
|
||||
kwargs = {
|
||||
'id': record.id,
|
||||
'name': record.name,
|
||||
'context': record.context,
|
||||
'target': record.target,
|
||||
'cause': record.cause,
|
||||
'owner': record.owner,
|
||||
'interval': record.interval,
|
||||
'start_time': record.start_time,
|
||||
'end_time': record.end_time,
|
||||
'timeout': record.timeout,
|
||||
'status': record.status,
|
||||
'status_reason': record.status_reason,
|
||||
'inputs': record.inputs,
|
||||
'outputs': record.outputs,
|
||||
'depends_on': record.depends_on,
|
||||
'depended_by': record.depended_by,
|
||||
'deleted_time': record.deleted_time,
|
||||
}
|
||||
|
||||
return cls(context, record.action, **kwargs)
|
||||
|
||||
@classmethod
|
||||
def load(cls, context, action_id):
|
||||
'''
|
||||
Retrieve an action from database.
|
||||
'''
|
||||
action = db_api.action_get(context, action_id)
|
||||
if action is None:
|
||||
msg = _('No action with id "%s" exists') % action_id
|
||||
raise exception.NotFound(msg)
|
||||
|
||||
return cls.from_db_record(context, action)
|
||||
|
||||
def execute(self, **kwargs):
|
||||
'''
|
||||
Execute the action.
|
||||
In theory, the action encapsulates all information needed for
|
||||
execution. 'kwargs' may specify additional parameters.
|
||||
:param kwargs: additional parameters that may override the default
|
||||
properties stored in the action record.
|
||||
'''
|
||||
return NotImplemented
|
||||
|
||||
def cancel(self):
|
||||
return NotImplemented
|
||||
|
||||
def store(self):
|
||||
#db_api.action_update(self.id)
|
||||
return
|
||||
|
||||
def set_status(self, status):
|
||||
'''
|
||||
Set action status.
|
||||
@ -161,57 +237,102 @@ class ClusterAction(Action):
|
||||
|
||||
def __init__(self, context, action, **kwargs):
|
||||
super(ClusterAction, self).__init__(context, action, **kwargs)
|
||||
if action not in self.ACTIONS:
|
||||
raise exception.ActionNotSupported(
|
||||
action=action, object=_('cluster %s') % self.target)
|
||||
|
||||
def do_cluster_create(self, cluster):
|
||||
# TODO(Yanyan): Check if cluster lock is needed
|
||||
res = cluster.do_create()
|
||||
if res is False:
|
||||
return self.RES_ERROR
|
||||
|
||||
for m in range(cluster.size):
|
||||
name = 'node-%003d' % m
|
||||
node = nodes.Node(name, cluster.profile_id, cluster.id)
|
||||
node.store()
|
||||
kwargs = {
|
||||
'name': 'node-create-%003d' % m,
|
||||
'context': self.context,
|
||||
'target': node.id,
|
||||
'cause': 'Cluster creation',
|
||||
}
|
||||
|
||||
action = Action(self.context, 'NODE_CREATE', **kwargs)
|
||||
action.set_status(self.READY)
|
||||
|
||||
scheduler.notify()
|
||||
|
||||
def do_update(self, cluster):
|
||||
# TODO(Yanyan): Check if cluster lock is needed
|
||||
cluster.set_status(self.UPDATING)
|
||||
node_list = cluster.get_nodes()
|
||||
for node_id in node_list:
|
||||
kwargs = {
|
||||
'name': 'node-update-%s' % node_id,
|
||||
'context': self.context,
|
||||
'target': node_id,
|
||||
'cause': 'Cluster update',
|
||||
}
|
||||
action = Action(self.context, 'NODE_UPDATE', **kwargs)
|
||||
action.set_status(self.READY)
|
||||
|
||||
scheduler.notify()
|
||||
# TODO(Yanyan): release lock
|
||||
cluster.set_status(self.ACTIVE)
|
||||
|
||||
return self.RES_OK
|
||||
|
||||
def do_delete(self, cluster):
|
||||
# TODO(Yanyan): Check if cluster lock is needed
|
||||
node_list = cluster.get_nodes()
|
||||
for node_id in node_list:
|
||||
kwargs = {
|
||||
'name': 'node-delete-%s' % node_id,
|
||||
'context': self.context,
|
||||
'target': node_id,
|
||||
'cause': 'Cluster update',
|
||||
}
|
||||
action = Action(self.context, 'NODE_UPDATE', **kwargs)
|
||||
action.set_status(self.READY)
|
||||
|
||||
scheduler.notify()
|
||||
|
||||
return self.RES_OK
|
||||
|
||||
def do_add_nodes(self, cluster):
|
||||
return self.RES_OK
|
||||
|
||||
def do_del_nodes(self, cluster):
|
||||
return self.RES_OK
|
||||
|
||||
def do_resize(self, cluster):
|
||||
return self.RES_OK
|
||||
|
||||
def do_attach_policy(self, cluster):
|
||||
return self.RES_OK
|
||||
|
||||
def do_detach_policy(self, cluster):
|
||||
return self.RES_OK
|
||||
|
||||
def execute(self, **kwargs):
|
||||
'''
|
||||
Execute the action.
|
||||
In theory, the action encapsulates all information needed for
|
||||
execution. 'kwargs' may specify additional parameters.
|
||||
:param kwargs: additional parameters that may override the default
|
||||
properties stored in the action record.
|
||||
'''
|
||||
cluster = db_api.cluster_get(self.context, self.target)
|
||||
if not cluster:
|
||||
return self.RES_ERROR
|
||||
|
||||
if self.action == self.CLUSTER_CREATE:
|
||||
# TODO(Qiming):
|
||||
# We should query the lock of cluster here and wrap
|
||||
# cluster.do_create, and then let threadgroupmanager
|
||||
# to start a thread for this progress.
|
||||
cluster.do_create()
|
||||
|
||||
for m in range(cluster.size):
|
||||
name = 'node-%003s' % m
|
||||
node = nodes.Node(name, cluster.profile_id, cluster.id)
|
||||
node.store()
|
||||
kwargs = {
|
||||
'target': node.id,
|
||||
}
|
||||
|
||||
action = NodeAction(context, 'NODE_CREATE', **kwargs)
|
||||
action.set_status(self.READY)
|
||||
|
||||
scheduler.notify()
|
||||
|
||||
return self.do_create(cluster)
|
||||
elif self.action == self.CLUSTER_UPDATE:
|
||||
# TODO(Yanyan): grab lock
|
||||
cluster._set_status(self.UPDATING)
|
||||
node_list = cluster.get_nodes()
|
||||
for node_id in node_list:
|
||||
node = db_api.node_get(node_id)
|
||||
action = actions.Action(context, node, 'NODE_UPDATE', **kwargs)
|
||||
|
||||
# start a thread asynchronously
|
||||
handle = scheduler.runAction(action)
|
||||
scheduler.wait(handle)
|
||||
# TODO(Yanyan): release lock
|
||||
cluster._set_status(self.ACTIVE)
|
||||
|
||||
return self.RES_ERROR
|
||||
return self.do_update(cluster)
|
||||
elif self.action == self.CLUSTER_DELETE:
|
||||
return self.do_delete(cluster)
|
||||
elif self.action == self.CLUSTER_ADD_NODES:
|
||||
return self.do_add_nodes(cluster)
|
||||
elif self.action == self.CLUSTER_DEL_NODES:
|
||||
return self.do_del_nodes(cluster)
|
||||
elif self.action == self.CLUSTER_RESIZE:
|
||||
return self.do_resize(cluster)
|
||||
elif self.action == self.CLUSTER_ATTACH_POLICY:
|
||||
return self.do_attach_policy(cluster)
|
||||
elif self.action == self.CLUSTER_DETACH_POLICY:
|
||||
return self.do_detach_policy(cluster)
|
||||
|
||||
return self.RES_OK
|
||||
|
||||
@ -234,27 +355,83 @@ class NodeAction(Action):
|
||||
def __init__(self, context, action, **kwargs):
|
||||
super(NodeAction, self).__init__(context, action, **kwargs)
|
||||
|
||||
if action not in self.ACTIONS:
|
||||
return self.RES_ERROR
|
||||
|
||||
# get cluster of this node
|
||||
# get policies associated with the cluster
|
||||
|
||||
def _get_profile_cls(self, context, profile_id):
|
||||
profile = db_api.profile_get(context, profile_id)
|
||||
cls = environment.global_env().get_profile(profile.type)
|
||||
if not cls:
|
||||
raise exception.ProfileNotFound(profile=profile.name)
|
||||
return cls
|
||||
|
||||
def do_create(self, node):
|
||||
cls = self._get_profile_cls(self.context, node.profile_id)
|
||||
node = cls.create_object(node.name, node.profile_id)
|
||||
if not node:
|
||||
return self.RES_ERROR
|
||||
|
||||
return self.RES_OK
|
||||
|
||||
def do_update(self, node):
|
||||
new_profile_id = self.inputs.get('new_profile', None)
|
||||
if not new_profile_id:
|
||||
raise exception.ProfileNotSpecified()
|
||||
|
||||
if new_profile_id == node.profile_id:
|
||||
return self.RES_OK
|
||||
|
||||
if not node.physical_id:
|
||||
return self.RES_ERROR
|
||||
|
||||
cls = self._get_profile_cls(self.context, node.profile_id)
|
||||
node = cls.update_object(node.physical_id, new_profile_id)
|
||||
if not node:
|
||||
return self.RES_ERROR
|
||||
|
||||
return self.RES_OK
|
||||
|
||||
def do_delete(self, node):
|
||||
if not node.physical_id:
|
||||
return self.RES_OK
|
||||
|
||||
cls = self._get_profile_cls(self.context, node.profile_id)
|
||||
node = cls.delete_object(node.physical_id)
|
||||
if not node:
|
||||
return self.RES_ERROR
|
||||
|
||||
return self.RES_OK
|
||||
|
||||
def do_join(self, node):
|
||||
new_cluster_id = self.inputs.get('cluster_id', None)
|
||||
if not new_cluster_id:
|
||||
raise exception.ClusterNotSpecified()
|
||||
|
||||
# TODO(Qiming): complete this
|
||||
return self.RES_OK
|
||||
|
||||
def do_leave(self, node):
|
||||
# cluster_id = node.cluster_id
|
||||
# TODO(Qiming): complete this
|
||||
|
||||
return self.RES_OK
|
||||
|
||||
def execute(self, **kwargs):
|
||||
node = db_api.node_get(self.context, self.target)
|
||||
if not node:
|
||||
msg = _('Node with id (%s) is not found') % self.target
|
||||
raise exception.NotFound(msg)
|
||||
|
||||
if self.action == self.NODE_CREATE:
|
||||
profile_id = kwargs.get('profile_id')
|
||||
name = kwargs.get('name')
|
||||
profile = db_api.profile_get(self.context, profile_id)
|
||||
node = profile.create_object(name, profile_id)
|
||||
if not node:
|
||||
return self.RES_ERROR
|
||||
return self.do_create(node)
|
||||
elif self.action == self.NODE_DELETE:
|
||||
node_id = self.target
|
||||
profile.delete_object(node_id)
|
||||
return self.do_delete(node)
|
||||
elif self.action == self.NODE_UPDATE:
|
||||
node_id = self.target
|
||||
profile_id = kwargs.get('profile_id')
|
||||
profile.update_object(node_id, profile_id)
|
||||
return self.do_update(node)
|
||||
elif self.action == self.NODE_JOIN_CLUSTER:
|
||||
return self.do_join(node)
|
||||
elif self.action == self.NODE_LEAVE_CLUSTER:
|
||||
return self.do_leave(node)
|
||||
else:
|
||||
return self.RES_ERROR
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user