Merge branch 'master' of 9.186.106.172:/opt/git/senlin

This commit is contained in:
tengqm 2014-12-25 18:50:14 +08:00
commit 6df19843d0
6 changed files with 196 additions and 37 deletions

View File

@ -247,8 +247,32 @@ def action_get_all(context):
return IMPL.action_get_all(context)
def action_mark_complete(context, action_id):
return IMPL.action_mark_complete(context, action_id)
def action_add_depends_on(context, action_id, *actions):
return IMPL.action_add_depends_on(context, action_id, *actions)
def action_del_depends_on(context, action_id, *actions):
return IMPL.action_del_depends_on(context, action_id, *actions)
def action_add_depended_by(context, action_id, *actions):
return IMPL.action_add_depended_by(context, action_id, *actions)
def action_del_depended_by(context, action_id, *actions):
return IMPL.action_del_depended_by(context, action_id, *actions)
def action_mark_succeeded(context, action_id):
return IMPL.action_mark_succeeded(context, action_id)
def action_mark_failed(context, action_id):
return IMPL.action_mark_failed(context, action_id)
def action_mark_cancelled(context, action_id):
return IMPL.action_mark_cancelled(context, action_id)
def action_start_work_on(context, action_id, owner):

View File

@ -23,7 +23,7 @@ from oslo.db.sqlalchemy import utils
from sqlalchemy.orm import session as orm_session
from senlin.common import exception
from senlin.common import i18n
from senlin.common.i18n import _
from senlin.db.sqlalchemy import filters as db_filters
from senlin.db.sqlalchemy import migration
from senlin.db.sqlalchemy import models
@ -32,6 +32,22 @@ from senlin.rpc import api as rpc_api
CONF = cfg.CONF
CONF.import_opt('max_events_per_cluster', 'senlin.common.config')
# Action status definitions:
# ACTION_INIT: Not ready to be executed because fields are being modified,
# or dependency with other actions are being analyzed.
# ACTION_READY: Initialized and ready to be executed by a worker.
# ACTION_RUNNING: Being executed by a worker thread.
# ACTION_SUCCEEDED: Completed with success.
# ACTION_FAILED: Completed with failure.
# ACTION_CANCELLED: Action cancelled because worker thread was cancelled.
ACTION_STATUSES = (
ACTION_INIT, ACTION_WAITING, ACTION_READY, ACTION_RUNNING,
ACTION_SUCCEEDED, ACTION_FAILED, ACTION_CANCELED
) = (
'INIT', 'WAITING', 'READY', 'RUNNING',
'SUCCEEDED', 'FAILED', 'CANCELLED',
)
_facade = None
@ -212,7 +228,7 @@ def cluster_update(context, cluster_id, values):
if not cluster:
raise exception.NotFound(
i18n._('Attempt to update a cluster with id "%s" that does not'
_('Attempt to update a cluster with id "%s" that does not'
' exist') % cluster_id)
cluster.update(values)
@ -223,7 +239,7 @@ def cluster_delete(context, cluster_id):
cluster = cluster_get(context, cluster_id)
if not cluster:
raise exception.NotFound(
i18n._('Attempt to delete a cluster with id "%s" that does not'
_('Attempt to delete a cluster with id "%s" that does not'
' exist') % cluster_id)
session = orm_session.Session.object_session(cluster)
@ -249,7 +265,7 @@ def node_create(context, values):
def node_get(context, node_id):
node = model_query(context, models.Node).get(node_id)
if not node:
msg = i18n._('Node with id "%s" not found') % node_id
msg = _('Node with id "%s" not found') % node_id
raise exception.NotFound(msg)
return node
@ -257,7 +273,7 @@ def node_get(context, node_id):
def node_get_all(context):
nodes = model_query(context, models.Node).all()
if not nodes:
raise exception.NotFound(i18n._('No nodes were found'))
raise exception.NotFound(_('No nodes were found'))
return nodes
@ -265,7 +281,7 @@ def node_get_all_by_cluster(context, cluster_id):
query = model_query(context, models.Node).filter_by(cluster_id=cluster_id)
nodes = query.all()
if not nodes:
msg = i18n._("No nodes for cluster %s were found") % cluster_id
msg = _("No nodes for cluster %s were found") % cluster_id
raise exception.NotFound(msg)
return dict((node.name, node) for node in nodes)
@ -361,7 +377,7 @@ def policy_get(context, policy_id, show_deleted=False):
show_deleted=show_deleted)
policy = policy.filter_by(id=policy_id).first()
if not policy:
msg = i18n._('Policy with id "%s" not found') % policy_id
msg = _('Policy with id "%s" not found') % policy_id
raise exception.NotFound(msg)
return policy
@ -378,7 +394,7 @@ def policy_update(context, policy_id, values):
policy = policy_get(context, policy_id)
if not policy:
msg = i18n._('Attempt to update a policy with id: %(id)s that does not'
msg = _('Attempt to update a policy with id: %(id)s that does not'
' exist') % policy_id
raise exception.NotFound(msg)
@ -391,7 +407,7 @@ def policy_delete(context, policy_id, force=False):
policy = policy_get(context, policy_id)
if not policy:
msg = i18n._('Attempt to delete a policy with id "%s" that does not'
msg = _('Attempt to delete a policy with id "%s" that does not'
' exist') % policy_id
raise exception.NotFound(msg)
@ -422,7 +438,7 @@ def cluster_detach_policy(context, cluster_id, policy_id):
filter(cluster_id=cluster_id, policy_id=policy_id)
if not binding:
msg = i18n._('Failed detach policy "%(policy)s" from cluster '
msg = _('Failed detach policy "%(policy)s" from cluster '
'"%(cluster)s"') % {'policy': policy_id,
'cluster': cluster_id}
raise exception.NotFound(msg)
@ -437,7 +453,7 @@ def cluster_enable_policy(context, cluster_id, policy_id):
filter(cluster_id=cluster_id, policy_id=policy_id)
if not binding:
msg = i18n._('Failed enabling policy "%(policy)s" on cluster '
msg = _('Failed enabling policy "%(policy)s" on cluster '
'"%(cluster)s"') % {'policy': policy_id,
'cluster': cluster_id}
@ -453,7 +469,7 @@ def cluster_disable_policy(context, cluster_id, policy_id):
filter(cluster_id=cluster_id, policy_id=policy_id)
if not binding:
msg = i18n._('Failed disabling policy "%(policy)s" on cluster '
msg = _('Failed disabling policy "%(policy)s" on cluster '
'"%(cluster)s"') % {'policy': policy_id,
'cluster': cluster_id}
raise exception.NotFound(msg)
@ -474,7 +490,7 @@ def profile_create(context, values):
def profile_get(context, profile_id):
profile = model_query(context, models.Profile).get(profile_id)
if not profile:
msg = i18n._('Profile with id "%s" not found') % profile_id
msg = _('Profile with id "%s" not found') % profile_id
raise exception.NotFound(msg)
return profile
@ -660,21 +676,26 @@ def action_create(context, values):
def action_get(context, action_id):
action = model_query(context, models.Action).get(action_id)
if not action:
msg = i18n._('Action with id "%s" not found') % action_id
msg = _('Action with id "%s" not found') % action_id
raise exception.NotFound(msg)
return action
def action_get_1st_ready(context):
pass
query = model_query(context, models.Action).\
filter_by(status == ACTION_READY)
return query.first()
def action_get_all_ready(context):
pass
query = model_query(context, models.Action)
return query.all()
def action_get_all_by_owner(context, owner):
pass
def action_get_all_by_owner(context, owner_id):
query = model_query(context, models.Action).\
filter_by(owner == owner_id)
return query.all()
def action_get_all(context):
@ -685,14 +706,91 @@ def action_get_all(context):
return actions
def action_mark_complete(context, action_id):
#TODO(liuh):update dependencies, add more actions if needed
def action_add_depends_on(context, action_id, *actions):
action = model_query(context, models.Action).get(action_id)
if not action:
raise exception.NotFound(
_('Action with id "%s" not found') % action_id)
action.depends_on = list(set(actions).union(set(action.depends_on)))
action.save(_session(context))
return action
def action_del_depends_on(context, action_id, *actions):
action = model_query(context, models.Action).get(action_id)
if not action:
raise exception.NotFound(
_('Action with id "%s" not found') % action_id)
action.depends_on = list(set(action.depends_on).different(set(actions)))
action.save(_session(context))
return action
def action_add_depended_by(context, action_id, *actions):
action = model_query(context, models.Action).get(action_id)
if not action:
raise exception.NotFound(
_('Action with id "%s" not found') % action_id)
action.depended_by = list(set(actions).union(set(action.depended_by)))
action.save(_session(context))
return action
def action_del_depended_by(context, action_id, *actions):
action = model_query(context, models.Action).get(action_id)
if not action:
raise exception.NotFound(
_('Action with id "%s" not found') % action_id)
action.depended_by = list(set(action.depended_by).different(set(actions)))
action.save(_session(context))
return action
def action_mark_succeeded(context, action_id):
action = model_query(context, models.Action).get(action_id)
if not action:
raise exception.NotFound(
_('Action with id "%s" not found') % action_id)
session = query.session
session.begin()
action.status = ACTION_SUCCEEDED
for a in action.depended_by
action_del_depends_on(context, a, action_id)
action.depended_by = []
session.commit()
return action
def action_mark_failed(context, action_id):
#TODO(liuh): Failed processing to be added
pass
def action_mark_cancelled(context, action_id):
#TODO(liuh): Cancel processing to be added
pass
def action_start_work_on(context, action_id, owner):
#TODO(liuh):Set 'owner' field to owner
pass
action = model_query(context, models.Action).get(action_id)
if not action:
raise exception.NotFound(
_('Action with id "%s" not found') % action_id)
action.owner = owner
action.status = ACTION_RUNNING
action.status_reason = _('The action is being processing.')
action.save(_session(context))
return action
def action_update(context, action_id, values):

View File

@ -118,12 +118,17 @@ class ClusterAction(Action):
def __init__(self, context, cluster):
super(ClusterAction, self).__init__(context)
self.target = cluster
def execute(self, action, **kwargs):
if action not in self.ACTIONS:
return self.FAILED
if action == self.CREATE:
# TODO:
# We should query the lock of cluster here and wrap
# cluster.do_create, and then let threadgroupmanager
# to start a thread for this progress.
cluster.do_create(kwargs)
else:
return self.FAILED
@ -144,13 +149,13 @@ class NodeAction(Action):
'CREATE', 'DELETE', 'UPDATE', 'JOIN', 'LEAVE',
)
def __init__(self, context, node_id):
def __init__(self, context, node):
super(NodeAction, self).__init__(context)
# get cluster of this node
# get policies associated with the cluster
def execute(self, action):
def execute(self, action, **kwargs):
if action not in self.ACTIONS:
return self.FAILED
return self.OK

View File

@ -14,7 +14,9 @@ import uuid
from datetime import datetime
from senlin.db import api as db_api
from senlin.engine import node
from senlin.engine import action as actions
from senlin.engine import node as nodes
from senlin.engine import scheduler
from senlin.rpc import api as rpc_api
@ -81,10 +83,12 @@ class Cluster(object):
A routine to be called from an action by a thread.
'''
for m in range[self.size]:
action = MemberAction(cluster_id, profile, 'CREATE', **kwargs)
# start a thread asynchnously
handle = scheduler.runAction(action)
scheduler.wait(handle)
node = nodes.Node(None, profile_id, cluster_id)
action = actions.NodeAction(context, node, 'CREATE', **kwargs)
# start a thread asynchnously
handle = scheduler.runAction(action)
# add subthread to the waiting list of main thread
scheduler.wait(handle)
self._set_status(self.ACTIVE)
@ -103,11 +107,14 @@ class Cluster(object):
self._set_status(self.UPDATING)
for m in range[self.size]:
action = MemberAction(cluster_id, profile, 'CREATE', **kwargs)
# start a thread asynchronously
handle = scheduler.runAction(action)
scheduler.wait(handle)
node_list = self.get_nodes()
for n in node_list:
node = nodes.Node(None, profile_id, cluster_id)
action = actions.NodeAction(context, node, 'UPDATE', **kwargs)
# start a thread asynchronously
handle = scheduler.runAction(action)
scheduler.wait(handle)
self._set_status(self.ACTIVE)

View File

@ -32,7 +32,12 @@ class Node(object):
)
def __init__(self, name, profile_id, cluster_id=None, **kwargs):
self.name = name
if name:
self.name = name
else:
# TODO
# Using self.physical_resource_name() to generate a unique name
self.name = 'node-name-tmp'
self.physical_id = None
self.cluster_id = cluster_id
self.profile_id = profile_id

View File

@ -23,6 +23,7 @@ import six
from senlin.common.i18n import _
from senlin.common.i18n import _LI
from senlin.engine import action as actions
from senlin.openstack.common import log as logging
LOG = logging.getLogger(__name__)
@ -524,3 +525,22 @@ class PollingTaskGroup(object):
with excutils.save_and_reraise_exception():
for r in runners:
r.cancel()
def runAction(action):
"""
Start a thread to run action until finished
"""
# TODO
# Query lock for this action
# call action.execute with args in subthread
pass
def wait(handle):
"""
Wait an action to finish
"""
# TODO
# Make the subthread join the main thread
pass