Merge branch 'master' of github.com:tengqm/senlin

Conflicts:
	TODO
	senlin/db/api.py
	senlin/db/sqlalchemy/api.py
	senlin/engine/scheduler.py
This commit is contained in:
Hang Liu 2015-01-02 20:54:31 +08:00
commit e9f73c67af
36 changed files with 2002 additions and 799 deletions

20
Changelog Normal file
View File

@ -0,0 +1,20 @@
2014-12-30 tengqm <tengqim@cn.ibm.com>
* db/api.py: remove action_update interface function to allow
stricter checking of action status changes.
2014-12-29 tengqm <tengqim@cn.ibm.com>
* db/sqlalchemy/models.py: added 'node_count' to cluster class.
* db/sqlalchemy/migrate_repo/versions/001_first_version.py:
added 'node_count' to cluster class.
2014-12-29 tengqm <tengqim@cn.ibm.com>
* TODO: Added some test cases jobs.
2015-01-02 liuhang <hangliu@cn.ibm.com>
* TODO: Remove DB action APIs task.
* db/api.py:
add 'action_add_dependency', 'action_del_dependency'
remove dependency api without transaction.
* db/sqlalchemy/api.py:
add 'action_add_dependency', 'action_del_dependency'
remove dependency api without transaction.

View File

@ -1,22 +0,0 @@
======
SENLIN
======
Senlin is a service to orchestrate multiple composite cloud applications using
templates.
Getting Started
---------------
If you'd like to run from the master branch, you can clone the git repo:
git clone git@github.com:openstack/senlin.git
* Wiki: http://wiki.openstack.org/Senlin
* Developer docs: http://docs.openstack.org/developer/senlin
Python client
-------------
https://github.com/openstack/python-senlinclient

24
TODO
View File

@ -12,7 +12,11 @@ DB
ENGINE
------
- cleanse scheduler module
- complete parser logic
- complete parser logic, construct profile/policy objects there?
DRIVER
------
- complete Heat stack driver [Qiming]
Middle Priority
@ -20,15 +24,19 @@ Middle Priority
DB
--
- Add test cases for policy_delete with 'force' set to True
- Add test cases for policy_delete with 'force' set to True
ENGINE
------
- Design and implement dynamical plugin loading mechanism that allows
loading plugins from any paths
OSLO
----
- Migrate to oslo.log
- Migrate to oslo.context
- Migrate to oslo.log
- Migrate to oslo.context
Low Priority
@ -36,6 +44,8 @@ Low Priority
TEST
----
- Add test case in db cluster to test that cluster-policy association is
deleted when we delete a cluster
- Add test case in db cluster to test that cluster-policy association is
deleted when we delete a cluster
- Add test case to engine/parser
- Add test case to engine/registry
- Add test case to engine/environment

View File

@ -0,0 +1,77 @@
..
Licensed under the Apache License, Version 2.0 (the "License"); you may
not use this file except in compliance with the License. You may obtain
a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
License for the specific language governing permissions and limitations
under the License.
Senlin Architecture
===================
Senlin is a service to create and manage clusters of homogeneous resources in
an OpenStack cloud. Senlin provides an OpenStack-native ReST API.
--------------------
Detailed Description
--------------------
What is the purpose of the project and vision for it?
*Senlin provides a clustering service for OpenStack that manages a collection
of nodes that are of the same type.*
Describe the relevance of the project to other OpenStack projects and the
OpenStack mission to provide a ubiquitous cloud computing platform:
*The Senlin service aggregates resources exposed by other components of
OpenStack into a cluster. Such a cluster can be associated with different
policies that can be checked/enforced at varying enforcement levels. Through
service APIs, a user can dynamically add nodes to and remove nodes from a
cluster, attach and detach policies, such as creation policy, deletion policy,
load-balancing policy, scaling policy, health checking policy etc. Through
integration with other OpenStack projects, users will be enabled to manage
deployments and orchestrations large scale resource pools much easier.*
*Currently no other clustering service exists for OpenStack. The developers
believe cloud developers have a strong desire to create and operate resource
clusters on OpenStack deployments. The Heat project provides a preliminary
support to resource groups but Heat developers have achieved a consensus that
such kind of a service should stand on its own feet.*
---------------
Senlin Services
---------------
The developers are focusing on creating an OpenStack style project using
OpenStack design tenets, implemented in Python. We have started with a close
interaction with Heat project.
As the developers have only started development in December 2014, the
architecture is evolving rapidly.
senlin
------
The senlin tool is a CLI which communicates with the senlin-api to manage
clusters, nodes, profiles, policies and events. End developers could also use
the Senlin REST API directly.
senlin-api
----------
The senlin-api component provides an OpenStack-native REST API that processes
API requests by sending them to the senlin-engine over RPC.
senlin-engine
-------------
The senlin engine's main responsibility is to orchestrate the clusters, nodes,
profiles and policies.

133
doc/source/glossary.rst Normal file
View File

@ -0,0 +1,133 @@
..
Licensed under the Apache License, Version 2.0 (the "License"); you may
not use this file except in compliance with the License. You may obtain
a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
License for the specific language governing permissions and limitations
under the License.
==========
Glossary
==========
.. glossary::
:sorted:
Action
An action is an operation that can be performed on a :term:`Cluster`, a
:term:`Node, a :term:`Policy`, etc. Different types of objects support
different set of actions. An action is executed by a :term:`Worker` thread
when the action becomes ready. Most Senlin APIs create actions in database
for worker threads to execute asynchronously. An action, when executed,
will check and enforce :term:`Policy` associated with the cluster. An
action can be triggered via :term:`Webhook`.
API server
HTTP REST API service for Senlin.
Cluster
A cluster is a group of homogeneous objects (i.e. :term:`Node`s). A
cluster consists of 0 or more nodes and it can be associated with 0 or
more :term:`Policy` objects. It is associated with a :term:`Profile Type`
when created.
Dependency
The :term:`Action` objects are stored into database for execution. These
actions may have dependencies among them.
Driver
A driver is a Senlin internal module that enables Senlin :term:`Engine` to
interact with other :term:`OpenStack` services. The interactions here are
usually used to create, destroy, update the objects exposed by those
services.
Environment
Used to specify user provided :term:`Plugin` that implement a
:term:`Profile Type` or a :term:`Policy Type'. User can provide plugins
that override the default plugins by customizing an environment.
Event
An event is a record left in Senlin database when something matters to
users happened. A event can be of different criticality levels.
Index
An integer property of a :term:`Node` when it is a member of a
:term:`Cluster`. Each node has an auto-generated index value that is
unique in the cluster.
Nested Cluster
A cluster that serves a member of another :term:`Cluster`.
Node
A node is an object that belongs to at most one :term:`Cluster`. A node
can become an 'orphaned node' when it is not a member of any clusters.
All nodes in a cluster must be of the same :term:`Profile Type` of the
owning cluster. In general, a node represents a physical object exposed
by other OpenStack services. A node has a unique :term:`Index` value
scoped to the cluster that owns it.
Permission
A string dictating which user (role or group) has what permissions on a
given object (i.e. :term:`Cluster`, :term:`Node`, :term:`Profile` and
:term:`Policy` etc.)
Plugin
A plugin is an implementation of a :term:`Policy Type` or :term:`Profile
Type` that can be dynamically loaded and registered to Senlin engine.
Senlin engine comes with a set of builtin plugins. Users can add their own
plugins by customizing the :term:`Environment` configuration.
Policy
A policy is a set of rules that can be checked and/or enforced when an
:term:`Action` is performed on a :term:`Cluster`. A policy is an instance
of a particular :term:`Policy Type`. Users can specify the enforcement
level when creating a policy object. Such a policy object can be attached
to and detached from a cluster.
Policy Type
A policy type is an abstraction of :term:`Policy` objects. The
implementation of a policy type specifies when the policy should be
checked and/or enforce, what profile types are supported, what operations
are to be done before, during and after each :term:`Action`. All policy
types are provided as Senlin plugins.
Profile
A profile is a mould used for creating objects (i.e. :term:`Node`). A
profile is an instance of a :term:`Profile Type` with all required
information specified. Each profile has an unique ID. As a guideline, a
profile cannot be updated once created. To change a profile, you have to
create a new instance.
Profile Type
A profile type is an abstraction of objects that are backed by some
:term:`Driver`s. The implementation of a profile type calls the driver(s)
to create objects that are managed by Senlin. The implementation also
serves a factory that can 'produce' objects given a profile. All profile
types are provided as Senlin plugins.
Role
A role is a string property that can be assigned to a :term:`Node`.
Nodes in the same cluster may assume a role for certain reason such as
application configuration. The default role for a node is empty.
OpenStack
Open source software for building private and public clouds.
Webhook
A webhook is an encoded URI (Universion Resource Identifier) that
encapsulates a tuple (user, object, action), where the user is a Keystone
entity that initiates an action and the object is a specific
:term:`Cluster`, a :term:`Node` or a :term:`Policy` etc. The action item
specifies an :term:`Action` to be triggered. Such a Webhook is the only
thing one needs to know to trigger an action on an object in Senlin.
Worker
A worker is the thread created and managed by Senlin engine to execute
an :term:`Action` that becomes ready. When the current action completes
(with a success or failure), a worker will check the database to find
another action for execution.

96
doc/source/index.rst Normal file
View File

@ -0,0 +1,96 @@
..
Licensed under the Apache License, Version 2.0 (the "License"); you may
not use this file except in compliance with the License. You may obtain
a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
License for the specific language governing permissions and limitations
under the License.
==============================================
Welcome to the Senlin developer documentation!
==============================================
Senlin is a service to create and manage :term:`cluster` of multiple cloud
resources. Senlin provides an OpenStack-native ReST API and a AWS
AutoScaling-compatible Query API is in plan.
What is the purpose of the project and vision for it?
=====================================================
* Senlin provides a clustering solution for :term:`OpenStack` cloud. A user
can create clusters of :term:`node` and associate :term:`policy` to such
a cluster.
* The software interacts with other components of OpenStack so that clusters
of resources exposed by those components can be created and operated.
* The software complements Heat project each other so Senlin can create and
manage clusters of Heat stacks while Heat can invoke Senlin APIs to
orchestrate collections of homogeneous resources.
* Senlin provides policies as plugins that can be used to specify how clusters
operate. Example policies include creation policy, placement policy,
deletion policy, load-balancing policy, scaling policy etc.
* Senlin can interact with all other OpenStack components via :term:`profile`
plugins. Each profile type implementation enable Senlin to create resources
provided by a corresponding OpenStack service.
This documentation offers information on how Senlin works and how to
contribute to the project.
Getting Started
===============
.. toctree::
:maxdepth: 1
getting_started/index
policies/index
profiles/index
glossary
Man Pages
=========
.. toctree::
:maxdepth: 2
man/index
Developers Documentation
========================
.. toctree::
:maxdepth: 1
architecture
pluginguide
API Documentation
========================
- `Senlin REST API Reference (OpenStack API Complete Reference - Clustering)`_
.. _`Senlin REST API Reference (OpenStack API Complete Reference - Clustering)`: http://api.openstack.org/api-ref-clustering-v1.html
Operations Documentation
========================
.. toctree::
:maxdepth: 1
scale_deployment
Code Documentation
==================
.. toctree::
:maxdepth: 3
sourcecode/autoindex
Indices and tables
==================
* :ref:`genindex`
* :ref:`modindex`
* :ref:`search`

View File

@ -1,12 +1,12 @@
senlin_profile_version: 2014-10-16
type: os.heat.stack
spec:
template:
heat_template_version: 2014-10-16
resources:
random:
type: OS::Heat::RandomString
properties:
length: 64
outputs:
result:
value: {get_attr: [random, value]}
!include node_stack.yaml
context: default
parameters:
len: 16
outputs:
name: node-%index%
timeoout: 60
enable_rollback: True

View File

@ -0,0 +1,12 @@
heat_template_version: 2014-10-16
parameters:
len:
type: integer
resources:
random:
type: OS::Heat::RandomString
properties:
length: 64
outputs:
result:
value: {get_attr: [random, value]}

View File

@ -1,7 +1,6 @@
[DEFAULT]
# The list of modules to copy from oslo-incubator
module=context
module=eventlet_backdoor
module=local
module=log
@ -9,7 +8,6 @@ module=loopingcall
module=policy
module=service
module=threadgroup
module=uuidutils
module=middleware.request_id
# The base module to hold the copy of openstack.common

View File

@ -14,6 +14,7 @@ kombu>=2.5.0
lxml>=2.3
netaddr>=0.7.12
oslo.config>=1.4.0 # Apache-2.0
oslo.context>=0.1.0 # Apache-2.0
oslo.db>=1.1.0 # Apache-2.0
oslo.i18n>=1.0.0 # Apache-2.0
oslo.messaging>=1.4.0,!=1.5.0

View File

@ -12,12 +12,12 @@
from oslo.middleware import request_id as oslo_request_id
from oslo.utils import importutils
from oslo_context import context
from senlin.common import exception
from senlin.common import policy
from senlin.common import wsgi
from senlin.db import api as db_api
from senlin.openstack.common import context
class RequestContext(context.RequestContext):

View File

@ -211,7 +211,23 @@ class ClusterExists(SenlinException):
msg_fmt = _("The Cluster (%(cluster_name)s) already exists.")
class ClusterValidationFailed(SenlinException):
class ClusterNotSpecified(SenlinException):
msg_fmt = _("The cluster was not specified.")
class ProfileNotFound(SenlinException):
msg_fmt = _("The profile (%(profile)s) could not be found.")
class ProfileNotSpecified(SenlinException):
msg_fmt = _("Profile not specified.")
class ProfileValidationFailed(SenlinException):
msg_fmt = _("%(message)s")
class PolicyValidationFailed(SenlinException):
msg_fmt = _("%(message)s")
@ -266,6 +282,18 @@ class RequestLimitExceeded(SenlinException):
msg_fmt = _('Request limit exceeded: %(message)s')
class ActionMissingTarget(SenlinException):
msg_fmt = _('Action "%(action)s" must have target specified')
class ActionMissingPolicy(SenlinException):
msg_fmt = _('Action "%(action)s" must have policy specified')
class ActionNotSupported(SenlinException):
msg_fmt = _('Action "%(action)s" not supported by %(object)s')
class ActionInProgress(SenlinException):
msg_fmt = _("Cluster %(cluster_name)s already has an action (%(action)s) "
"in progress.")

View File

@ -0,0 +1,33 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
# ACTION PHRASES
CLUSTER_CREATE = 'CLUSTER_CREATE'
CLUSTER_DELETE = 'CLUSTER_DELETE'
CLUSTER_UPDATE = 'CLUSTER_UPDATE'
CLUSTER_ADD_NODES = 'CLUSTER_ADD_NODES'
CLUSTER_DEL_NODES = 'CLUSTER_DEL_NODES'
CLUSTER_SCALE_UP = 'CLUSTER_SCALE_UP'
CLUSTER_SCALE_DOWN = 'CLUSTER_SCALE_DOWN'
CLUSTER_ATTACH_POLICY = 'CLUSTER_ATTACH_POLICY'
CLUSTER_DETACH_POLICY = 'CLUSTER_DETACH_POLICY'
NODE_CREATE = 'NODE_CREATE'
NODE_DELETE = 'NODE_DELETE'
NODE_UPDATE = 'NODE_UPDATE'
NODE_JOIN_CLUSTER = 'NODE_JOIN_CLUSTER'
NODE_LEAVE_CLUSTER = 'NODE_LEAVE_CLUSTER'
POLICY_ENABLE = 'POLICY_ENABLE'
POLICY_DISABLE = 'POLICY_DISABLE'
POLICY_UPDATE = 'POLICY_UPDATE'

View File

@ -113,6 +113,10 @@ def node_set_status(context, node_id, status):
return IMPL.node_set_status(context, node_id, status)
def node_migrate(context, node_id, from_cluster, to_cluster):
return IMPL.node_migrate(context, node_id, from_cluster, to_cluster)
# Locks
def cluster_lock_create(cluster_id, worker_id):
return IMPL.cluster_lock_create(cluster_id, worker_id)
@ -160,8 +164,8 @@ def policy_delete(context, policy_id, force=False):
# Cluster-Policy Associations
def cluster_attach_policy(context, values):
return IMPL.cluster_attach_policy(context, values)
def cluster_attach_policy(context, cluster_id, policy_id, values):
return IMPL.cluster_attach_policy(context, cluster_id, policy_id, values)
def cluster_get_policies(context, cluster_id):
@ -222,6 +226,7 @@ def event_get_all_by_cluster(context, cluster_id, limit=None, marker=None,
sort_dir=sort_dir,
filters=filters)
# Actions
def action_create(context, values):
return IMPL.action_create(context, values)
@ -271,10 +276,6 @@ def action_start_work_on(context, action_id, owner):
return IMPL.action_start_work_on(context, action_id, owner)
def action_update(context, action_id, values):
return IMPL.action_update(context, action_id, values)
def action_delete(context, action_id, force=False):
return IMPL.action_delete(context, action_id, force)

View File

@ -37,8 +37,9 @@ CONF = cfg.CONF
CONF.import_opt('max_events_per_cluster', 'senlin.common.config')
# Action status definitions:
# ACTION_INIT: Not ready to be executed because fields are being modified,
# or dependency with other actions are being analyzed.
# ACTION_INIT: Not ready to be executed because fields are being
# modified, or dependency with other actions are being
# analyzed.
# ACTION_READY: Initialized and ready to be executed by a worker.
# ACTION_RUNNING: Being executed by a worker thread.
# ACTION_SUCCEEDED: Completed with success.
@ -140,7 +141,7 @@ def cluster_get_all_by_parent(context, parent):
def cluster_get_by_name_and_parent(context, cluster_name, parent):
query = soft_delete_aware_query(context, models.Cluster).\
filter_by(tenant == context.tenant_id).\
filter_by(tenant=context.tenant_id).\
filter_by(name=cluster_name).\
filter_by(parent=parent)
return query.first()
@ -232,8 +233,8 @@ def cluster_update(context, cluster_id, values):
if not cluster:
raise exception.NotFound(
_('Attempt to update a cluster with id "%s" that does not'
' exist') % cluster_id)
_('Attempt to update a cluster with id "%s" that does '
' exist failed') % cluster_id)
cluster.update(values)
cluster.save(_session(context))
@ -243,8 +244,8 @@ def cluster_delete(context, cluster_id):
cluster = cluster_get(context, cluster_id)
if not cluster:
raise exception.NotFound(
_('Attempt to delete a cluster with id "%s" that does not'
' exist') % cluster_id)
_('Attempt to delete a cluster with id "%s" that does '
'not exist failed') % cluster_id)
session = orm_session.Session.object_session(cluster)
@ -302,6 +303,21 @@ def node_get_by_physical_id(context, phy_id):
return query.first()
def node_migrate(context, node_id, from_cluster, to_cluster):
query = model_query(context, models.Node)
node = query.get(node_id)
session = query.session
session.begin()
if from_cluster:
cluster1 = session.query(models.Cluster).get(from_cluster)
cluster1.size -= 1
if to_cluster:
cluster2 = session.query(models.Cluster).get(to_cluster)
cluster2.size += 1
node.cluster_id = to_cluster
session.commit()
# Locks
def cluster_lock_create(cluster_id, worker_id):
session = get_session()
@ -398,8 +414,8 @@ def policy_update(context, policy_id, values):
policy = policy_get(context, policy_id)
if not policy:
msg = _('Attempt to update a policy with id: %(id)s that does not'
' exist') % policy_id
msg = _('Attempt to update a policy with id: %(id)s that does not '
'exist failed') % policy_id
raise exception.NotFound(msg)
policy.update(values)
@ -411,8 +427,8 @@ def policy_delete(context, policy_id, force=False):
policy = policy_get(context, policy_id)
if not policy:
msg = _('Attempt to delete a policy with id "%s" that does not'
' exist') % policy_id
msg = _('Attempt to delete a policy with id "%s" that does not '
'exist failed') % policy_id
raise exception.NotFound(msg)
session = orm_session.Session.object_session(policy)
@ -424,8 +440,10 @@ def policy_delete(context, policy_id, force=False):
# Cluster-Policy Associations
def cluster_attach_policy(context, values):
def cluster_attach_policy(context, cluster_id, policy_id, values):
binding = models.ClusterPolicies()
binding.cluster_id = cluster_id
binding.policy_id = policy_id
binding.update(values)
binding.save(_session(context))
return binding
@ -442,9 +460,9 @@ def cluster_detach_policy(context, cluster_id, policy_id):
filter(cluster_id=cluster_id, policy_id=policy_id)
if not binding:
msg = _('Failed detach policy "%(policy)s" from cluster '
'"%(cluster)s"') % {'policy': policy_id,
'cluster': cluster_id}
msg = _('Failed detaching policy "%(policy)s" from cluster '
'"%(cluster)s"') % {'policy': policy_id,
'cluster': cluster_id}
raise exception.NotFound(msg)
session = orm_session.Session.object_session(binding)
@ -458,8 +476,8 @@ def cluster_enable_policy(context, cluster_id, policy_id):
if not binding:
msg = _('Failed enabling policy "%(policy)s" on cluster '
'"%(cluster)s"') % {'policy': policy_id,
'cluster': cluster_id}
'"%(cluster)s"') % {'policy': policy_id,
'cluster': cluster_id}
raise exception.NotFound(msg)
@ -474,8 +492,8 @@ def cluster_disable_policy(context, cluster_id, policy_id):
if not binding:
msg = _('Failed disabling policy "%(policy)s" on cluster '
'"%(cluster)s"') % {'policy': policy_id,
'cluster': cluster_id}
'"%(cluster)s"') % {'policy': policy_id,
'cluster': cluster_id}
raise exception.NotFound(msg)
binding.update(enabled=False)
@ -606,7 +624,7 @@ def _events_filter_and_page_query(context, query, limit=None, marker=None,
def event_count_by_cluster(context, cid):
count = model_query(context, models.Event).\
filter_by(obj_id=cid, obj_type='CLUSTER').count()
return count
return count
def _events_by_cluster(context, cid):
@ -669,12 +687,13 @@ def purge_deleted(age, granularity='days'):
# user_creds_del = user_creds.delete().where(user_creds.c.id == s[2])
# engine.execute(user_creds_del)
# Actions
def action_create(context, values):
action = models.Action()
action.update(values)
action.save(_session(context))
return action
return action
def action_get(context, action_id):
@ -688,7 +707,6 @@ def action_get(context, action_id):
def action_get_1st_ready(context):
query = model_query(context, models.Action).\
filter_by(status=ACTION_READY)
return query.first()
@ -820,7 +838,8 @@ def action_del_dependency(context, depended, dependent):
def action_mark_succeeded(context, action_id):
action = model_query(context, models.Action).get(action_id)
query = model_query(context, models.Action)
action = query.get(action_id)
if not action:
raise exception.NotFound(
_('Action with id "%s" not found') % action_id)
@ -854,9 +873,9 @@ def action_start_work_on(context, action_id, owner):
raise exception.NotFound(
_('Action with id "%s" not found') % action_id)
action.owner = owner
action.owner = owner
action.status = ACTION_RUNNING
action.status_reason = _('The action is being processing.')
action.status_reason = _('The action is being processed.')
action.save(_session(context))
return action
@ -872,6 +891,7 @@ def action_delete(context, action_id, force=False):
# TODO(liuh): Need check if and how an action can be safety deleted
action.delete()
# Utils
def db_sync(engine, version=None):
"""Migrate the database to `version` or the most recent version."""

View File

@ -49,6 +49,7 @@ def upgrade(migrate_engine):
sqlalchemy.Column('created_time', sqlalchemy.DateTime),
sqlalchemy.Column('updated_time', sqlalchemy.DateTime),
sqlalchemy.Column('deleted_time', sqlalchemy.DateTime),
sqlalchemy.Column('size', sqlalchemy.Integer),
sqlalchemy.Column('next_index', sqlalchemy.Integer),
sqlalchemy.Column('timeout', sqlalchemy.Integer),
sqlalchemy.Column('status', sqlalchemy.String(255)),

View File

@ -98,6 +98,7 @@ class Cluster(BASE, SenlinBase, SoftDelete):
created_time = sqlalchemy.Column(sqlalchemy.DateTime)
updated_time = sqlalchemy.Column(sqlalchemy.DateTime)
deleted_time = sqlalchemy.Column(sqlalchemy.DateTime)
size = sqlalchemy.Column(sqlalchemy.Integer)
next_index = sqlalchemy.Column(sqlalchemy.Integer)
timeout = sqlalchemy.Column(sqlalchemy.Integer)
status = sqlalchemy.Column(sqlalchemy.String(255))

View File

@ -10,9 +10,16 @@
# License for the specific language governing permissions and limitations
# under the License.
import copy
import datetime
from oslo.config import cfg
from senlin.common import exception
from senlin.db import api as db_api
from senlin.engine import node as nodes
from senlin.engine import scheduler
from senlin.policies import base as policies
class Action(object):
@ -20,9 +27,9 @@ class Action(object):
An action can be performed on a cluster or a node of a cluster.
'''
RETURNS = (
OK, FAILED, RETRY,
RES_OK, RES_ERROR, RES_RETRY,
) = (
'OK', 'FAILED', 'RETRY',
'OK', 'ERROR', 'RETRY',
)
# Action status definitions:
@ -41,67 +48,177 @@ class Action(object):
'SUCCEEDED', 'FAILED', 'CANCELLED',
)
def __init__(self, context):
self.id = None
def __new__(cls, context, action, **kwargs):
if (cls != Action):
return super(Action, cls).__new__(cls)
target_type = action.split('_')[0]
if target_type == 'CLUSTER':
ActionClass = ClusterAction
elif target_type == 'NODE':
ActionClass = NodeAction
elif target_type == 'POLICY':
ActionClass = PolicyAction
else:
ActionClass = CustomAction
return super(Action, cls).__new__(ActionClass)
def __init__(self, context, action, **kwargs):
# context will be persisted into database so that any worker thread
# can pick the action up and execute it on behalf of the initiator
self.context = context
if action not in self.ACTIONS:
raise exception.ActionNotSupported(
action=action, object=_('target %s') % self.target)
self.description = ''
self.context = copy.deepcopy(context)
self.description = kwargs.get('description', '')
# Target is the ID of a cluster, a node, a profile
self.target = ''
self.target = kwargs.get('target', None)
if self.target is None:
raise exception.ActionMissingTarget(action=action)
# An action
self.action = ''
self.action = action
# Why this action is fired, it can be a UUID of another action
self.cause = ''
self.cause = kwargs.get('cause', '')
# Owner can be an UUID format ID for the worker that is currently
# working on the action. It also serves as a lock.
self.owner = ''
self.owner = kwargs.get('owner', None)
# An action may need to be executed repeatitively, interval is the
# time in seconds between two consequtive execution.
# A value of -1 indicates that this action is only to be executed once
self.interval = -1
self.interval = kwargs.get('interval', -1)
# Start time can be an absolute time or a time relative to another
# action. E.g.
# - '2014-12-18 08:41:39.908569'
# - 'AFTER: 57292917-af90-4c45-9457-34777d939d4d'
# - 'WHEN: 0265f93b-b1d7-421f-b5ad-cb83de2f559d'
self.start_time = ''
self.end_time = ''
# - 'WHEN: 0265f93b-b1d7-421f-b5ad-cb83de2f559d'
self.start_time = kwargs.get('start_time', None)
self.end_time = kwargs.get('end_time', None)
# Timeout is a placeholder in case some actions may linger too long
self.timeout = cfg.CONF.default_action_timeout
self.timeout = kwargs.get('timeout', cfg.CONF.default_action_timeout)
# Return code, useful when action is not automatically deleted
# after execution
self.status = ''
self.status_reason = ''
self.status = kwargs.get('status', self.INIT)
self.status_reason = kwargs.get('status_reason', '')
# All parameters are passed in using keyward arguments which is
# a list stored as JSON in DB
self.inputs = {}
self.outputs = {}
# All parameters are passed in using keyword arguments which is
# a dictionary stored as JSON in DB
self.inputs = kwargs.get('inputs', {})
self.outputs = kwargs.get('outputs', {})
# Dependency with other actions
self.depends_on = []
self.depended_by = []
self.depends_on = kwargs.get('depends_on', [])
self.depended_by = kwargs.get('depended_by', [])
def store(self):
'''
Store the action record into database table.
'''
values = {
'name': self.name,
'context': self.context,
'target': self.target,
'action': self.action,
'cause': self.cause,
'owner': self.owner,
'interval': self.interval,
'start_time': self.start_time,
'end_time': self.end_time,
'timeout': self.timeout,
'status': self.status,
'status_reason': self.status_reason,
'inputs': self.inputs,
'outputs': self.outputs,
'depends_on': self.depends_on,
'depended_by': self.depended_by,
'deleted_time': self.deleted_time,
}
action = db_api.action_create(self.context, self.id, values)
self.id = action.id
return self.id
@classmethod
def from_db_record(cls, context, record):
'''
Construct a action object from database record.
:param context: the context used for DB operations;
:param record: a DB action object that contains all fields.
'''
kwargs = {
'id': record.id,
'name': record.name,
'context': record.context,
'target': record.target,
'cause': record.cause,
'owner': record.owner,
'interval': record.interval,
'start_time': record.start_time,
'end_time': record.end_time,
'timeout': record.timeout,
'status': record.status,
'status_reason': record.status_reason,
'inputs': record.inputs,
'outputs': record.outputs,
'depends_on': record.depends_on,
'depended_by': record.depended_by,
'deleted_time': record.deleted_time,
}
return cls(context, record.action, **kwargs)
@classmethod
def load(cls, context, action_id):
'''
Retrieve an action from database.
'''
action = db_api.action_get(context, action_id)
if action is None:
msg = _('No action with id "%s" exists') % action_id
raise exception.NotFound(msg)
return cls.from_db_record(context, action)
def execute(self, **kwargs):
'''
Execute the action.
In theory, the action encapsulates all information needed for
execution. 'kwargs' may specify additional parameters.
:param kwargs: additional parameters that may override the default
properties stored in the action record.
'''
return NotImplemented
def cancel(self):
return NotImplemented
def store(self):
#db_api.action_update(self.id)
return
def set_status(self, status):
'''
Set action status.
This is not merely about a db record update.
'''
if status == self.SUCCEEDED:
db_api.action_mark_succeeded(self.context, self.id)
elif status == self.FAILED:
db_api.action_mark_failed(self.context, self.id)
elif status == self.CANCELLED:
db_api.action_mark_cancelled(self.context, self.id)
self.status = status
def get_status(self):
action = db_api.action_get(self.context, self.id)
self.status = action.status
return action.status
class ClusterAction(Action):
@ -109,34 +226,152 @@ class ClusterAction(Action):
An action performed on a cluster.
'''
ACTIONS = (
CREATE, DELETE, ADD_NODE, DEL_NODE, UPDATE,
ATTACH_POLICY, DETACH_POLICY,
CLUSTER_CREATE, CLUSTER_DELETE, CLUSTER_UPDATE,
CLUSTER_ADD_NODES, CLUSTER_DEL_NODES,
CLUSTER_SCALE_UP, CLUSTER_SCALE_DOWN,
CLUSTER_ATTACH_POLICY, CLUSTER_DETACH_POLICY,
) = (
'CREATE', 'DELETE', 'ADD_NODE', 'DEL_NODE', 'UPDATE',
'ATTACH_POLICY', 'DETACH_POLICY',
'CLUSTER_CREATE', 'CLUSTER_DELETE', 'CLUSTER_UPDATE',
'CLUSTER_ADD_NODES', 'CLUSTER_DEL_NODES',
'CLUSTER_SCALE_UP', 'CLUSTER_SCALE_DOWN',
'CLUSTER_ATTACH_POLICY', 'CLUSTER_DETACH_POLICY',
)
def __init__(self, context, cluster):
super(ClusterAction, self).__init__(context)
self.target = cluster
def __init__(self, context, action, **kwargs):
super(ClusterAction, self).__init__(context, action, **kwargs)
def execute(self, action, **kwargs):
if action not in self.ACTIONS:
return self.FAILED
def do_cluster_create(self, cluster):
# TODO(Yanyan): Check if cluster lock is needed
res = cluster.do_create()
if res is False:
return self.RES_ERROR
if action == self.CREATE:
# TODO:
# We should query the lock of cluster here and wrap
# cluster.do_create, and then let threadgroupmanager
# to start a thread for this progress.
cluster.do_create(kwargs)
else:
return self.FAILED
for m in range(cluster.size):
name = 'node-%003d' % m
node = nodes.Node(name, cluster.profile_id, cluster.id)
node.store()
kwargs = {
'name': 'node-create-%003d' % m,
'context': self.context,
'target': node.id,
'cause': 'Cluster creation',
}
return self.OK
action = Action(self.context, 'NODE_CREATE', **kwargs)
action.set_status(self.READY)
scheduler.notify()
return self.RES_OK
def do_update(self, cluster):
# TODO(Yanyan): Check if cluster lock is needed
cluster.set_status(self.UPDATING)
node_list = cluster.get_nodes()
for node_id in node_list:
kwargs = {
'name': 'node-update-%s' % node_id,
'context': self.context,
'target': node_id,
'cause': 'Cluster update',
}
action = Action(self.context, 'NODE_UPDATE', **kwargs)
action.set_status(self.READY)
scheduler.notify()
# TODO(Yanyan): release lock
cluster.set_status(self.ACTIVE)
return self.RES_OK
def do_delete(self, cluster):
# TODO(Yanyan): Check if cluster lock is needed
node_list = cluster.get_nodes()
for node_id in node_list:
kwargs = {
'name': 'node-delete-%s' % node_id,
'context': self.context,
'target': node_id,
'cause': 'Cluster update',
}
action = Action(self.context, 'NODE_UPDATE', **kwargs)
action.set_status(self.READY)
scheduler.notify()
return self.RES_OK
def do_add_nodes(self, cluster):
return self.RES_OK
def do_del_nodes(self, cluster):
return self.RES_OK
def do_scale_up(self, cluster):
return self.RES_OK
def do_scale_down(self, cluster):
return self.RES_OK
def do_attach_policy(self, cluster):
policy_id = self.inputs.get('policy_id', None)
if policy_id is None:
raise exception.PolicyNotSpecified()
policy = policies.load(self.context, policy_id)
# Check if policy has already been attached
all = db_api.cluster_get_policies(self.context, cluster.id)
for existing in all:
# Policy already attached
if existing.id == policy_id:
return self.RES_OK
if existing.type == policy.type:
raise exception.PolicyExists(policy_type=policy.type)
values = {
'cooldown': self.inputs.get('cooldown', policy.cooldown),
'level': self.inputs.get('level', policy.level),
'enabled': self.inputs.get('enabled', True),
}
db_api.cluster_attach_policy(self.context, cluster.id, policy_id,
values)
cluster.rt.policies.append(policy)
return self.RES_OK
def do_detach_policy(self, cluster):
return self.RES_OK
def execute(self, **kwargs):
res = False
cluster = db_api.cluster_get(self.context, self.target)
if not cluster:
return self.RES_ERROR
if self.action == self.CLUSTER_CREATE:
res = self.do_create(cluster)
elif self.action == self.CLUSTER_UPDATE:
res = self.do_update(cluster)
elif self.action == self.CLUSTER_DELETE:
res = self.do_delete(cluster)
elif self.action == self.CLUSTER_ADD_NODES:
res = self.do_add_nodes(cluster)
elif self.action == self.CLUSTER_DEL_NODES:
res = self.do_del_nodes(cluster)
elif self.action == self.CLUSTER_SCALE_UP:
res = self.do_scale_up(cluster)
elif self.action == self.CLUSTER_SCALE_DOWN:
res = self.do_scale_down(cluster)
elif self.action == self.CLUSTER_ATTACH_POLICY:
res = self.do_attach_policy(cluster)
elif self.action == self.CLUSTER_DETACH_POLICY:
res = self.do_detach_policy(cluster)
return self.RES_OK if res else self.RES_ERROR
def cancel(self):
return self.FAILED
return self.RES_OK
class NodeAction(Action):
@ -144,24 +379,43 @@ class NodeAction(Action):
An action performed on a cluster member.
'''
ACTIONS = (
CREATE, DELETE, UPDATE, JOIN, LEAVE,
NODE_CREATE, NODE_DELETE, NODE_UPDATE,
NODE_JOIN_CLUSTER, NODE_LEAVE_CLUSTER,
) = (
'CREATE', 'DELETE', 'UPDATE', 'JOIN', 'LEAVE',
'NODE_CREATE', 'NODE_DELETE', 'NODE_UPDATE',
'NODE_JOIN_CLUSTER', 'NODE_LEAVE_CLUSTER',
)
def __init__(self, context, node):
super(NodeAction, self).__init__(context)
def __init__(self, context, action, **kwargs):
super(NodeAction, self).__init__(context, action, **kwargs)
# get cluster of this node
# get policies associated with the cluster
def execute(self, **kwargs):
res = False
node = nodes.load(self.context, self.target)
if not node:
msg = _('Node with id (%s) is not found') % self.target
raise exception.NotFound(msg)
def execute(self, action, **kwargs):
if action not in self.ACTIONS:
return self.FAILED
return self.OK
# TODO(Qiming): Add node status changes
if self.action == self.NODE_CREATE:
res = node.do_create()
elif self.action == self.NODE_DELETE:
res = node.do_delete()
elif self.action == self.NODE_UPDATE:
new_profile_id = self.inputs.get('new_profile')
res = node.do_update(new_profile_id)
elif self.action == self.NODE_JOIN_CLUSTER:
new_cluster_id = self.inputs.get('cluster_id', None)
if not new_cluster_id:
raise exception.ClusterNotSpecified()
res = node.do_join(new_cluster_id)
elif self.action == self.NODE_LEAVE_CLUSTER:
res = node.do_leave()
return self.RES_OK if res else self.RES_ERROR
def cancel(self):
return self.OK
return self.RES_OK
class PolicyAction(Action):
@ -173,18 +427,26 @@ class PolicyAction(Action):
'''
ACTIONS = (
ENABLE, DISABLE, UPDATE,
POLICY_ENABLE, POLICY_DISABLE, POLICY_UPDATE,
) = (
'ENABLE', 'DISABLE', 'UPDATE',
'POLICY_ENABLE', 'POLICY_DISABLE', 'POLICY_UPDATE',
)
def __init__(self, context, cluster_id, policy_id):
super(PolicyAction, self).__init__(context)
def __init__(self, context, action, **kwargs):
super(PolicyAction, self).__init__(context, action, **kwargs)
self.cluster_id = kwargs.get('cluster_id', None)
if self.cluster_id is None:
raise exception.ActionMissingTarget(action)
self.policy_id = kwargs.get('policy_id', None)
if self.policy_id is None:
raise exception.ActionMissingPolicy(action)
# get policy associaton using the cluster id and policy id
def execute(self, action, **kwargs):
if action not in self.ACTIONS:
return self.FAILED
def execute(self, **kwargs):
if self.action not in self.ACTIONS:
return self.RES_ERROR
self.store(start_time=datetime.datetime.utcnow(),
status=self.RUNNING)
@ -193,23 +455,41 @@ class PolicyAction(Action):
policy_id = kwargs.get('policy_id')
# an ENABLE/DISABLE action only changes the database table
if action == self.ENABLE:
if self.action == self.POLICY_ENABLE:
db_api.cluster_enable_policy(cluster_id, policy_id)
elif action == self.DISABLE:
elif self.action == self.POLICY_DISABLE:
db_api.cluster_disable_policy(cluster_id, policy_id)
else: # action == self.UPDATE:
else: # self.action == self.UPDATE:
# There is not direct way to update a policy because the policy
# might be shared with another cluster, instead, we clone a new
# policy and replace the cluster-policy entry.
pass
# TODO(Qiming): Add DB API complete this.
self.store(end_time=datetime.datetime.utcnow(),
status=self.SUCCEEDED)
return self.OK
return self.RES_OK
def cancel(self):
self.store(end_time=datetime.datetime.utcnow(),
status=self.CANCELLED)
return self.OK
return self.RES_OK
class CustomAction(Action):
ACTIONS = (
ACTION_EXECUTE,
) = (
'ACTION_EXECUTE',
)
def __init__(self, context, action, **kwargs):
super(CustomAction, self).__init__(context, action, **kwargs)
def execute(self, **kwargs):
return self.RES_OK
def cancel(self):
return self.RES_OK

View File

@ -10,13 +10,15 @@
# License for the specific language governing permissions and limitations
# under the License.
import uuid
from datetime import datetime
import datetime
from senlin.common import exception
from senlin.common.i18n import _
from senlin.common.i18n import _LW
from senlin.db import api as db_api
from senlin.engine import action as actions
from senlin.engine import event as events
from senlin.engine import node as nodes
from senlin.engine import scheduler
from senlin.profiles import base as profiles
from senlin.rpc import api as rpc_api
@ -35,91 +37,174 @@ class Cluster(object):
'INIT', 'ACTIVE', 'ERROR', 'DELETED', 'UPDATING',
)
def __init__(self, name, profile, size=0, **kwargs):
def __init__(self, context, name, profile_id, size=0, **kwargs):
'''
Intialize a cluster object.
The cluster defaults to have 0 nodes with no profile assigned.
'''
self.context = context
self.id = kwargs.get('id', None)
self.name = name
self.profile_id = profile_id
self.user = kwargs.get('user')
self.project = kwargs.get('project')
self.domain = kwargs.get('domain')
# Initialize the fields using kwargs passed in
self.user = kwargs.get('user', '')
self.project = kwargs.get('project', '')
self.domain = kwargs.get('domain', '')
self.parent = kwargs.get('parent', '')
self.parent = kwargs.get('parent')
self.created_time = kwargs.get('created_time', None)
self.updated_time = kwargs.get('updated_time', None)
self.deleted_time = kwargs.get('deleted_time', None)
self.created_time = None
self.updated_time = None
self.deleted_time = None
# size is only the 'desired capacity', which many not be the real
# size of the cluster at a moment.
self.size = size
self.next_index = kwargs.get('next_index', 0)
self.timeout = kwargs.get('timeout', 0)
self.next_index = 0
self.timeout = 0
self.status = ''
self.status_reason = ''
self.data = {}
self.tags = {}
# persist object into database very early because:
# 1. object creation may be a time consuming task
# 2. user may want to cancel the action when cluster creation
# is still in progress
db_api.create_cluster(self)
self.status = kwargs.get('status', self.INIT)
self.status_reason = kwargs.get('status_reason', 'Initializing')
self.data = kwargs.get('data', {})
self.tags = kwargs.get('tags', {})
# rt is a dict for runtime data
self.rt = dict(size=size,
nodes={},
policies={})
# TODO(Qiming): nodes have to be reloaded when membership changes
self.rt = {
'profile': profiles.load(context, self.profile_id),
'nodes': nodes.load_all(context, cluster_id=self.id),
'policies': {},
}
def _set_status(self, context, status):
pass
#event.info(context, self.id, status,
@classmethod
def from_db_record(cls, context, record):
'''
Construct a cluster object from database record.
:param context: the context used for DB operations;
:param record: a DB cluster object that will receive all fields;
'''
kwargs = {
'id': record.id,
'user': record.user,
'project': record.project,
'domain': record.domain,
'parent': record.parent,
'created_time': record.created_time,
'updated_time': record.updated_time,
'deleted_time': record.deleted_time,
'next_index': record.next_index,
'timeout': record.timeout,
'status': record.status,
'status_reason': record.status_reason,
'data': record.data,
'tags': record.tags,
}
return cls(context, record.name, record.profile_id, record.size,
**kwargs)
@classmethod
def load(cls, context, cluster_id, show_deleted=False):
'''
Retrieve a cluster from database.
'''
cluster = db_api.cluster_get(context, cluster_id,
show_deleted=show_deleted)
if cluster is None:
msg = _('No cluster with id "%s" exists') % cluster_id
raise exception.NotFound(msg)
return cls.from_db_record(context, cluster)
@classmethod
def load_all(cls, context, limit=None, sort_keys=None, marker=None,
sort_dir=None, filters=None, tenant_safe=True,
show_deleted=False, show_nested=False):
'''
Retrieve all clusters from database.
'''
records = db_api.cluster_get_all(context, limit, sort_keys, marker,
sort_dir, filters, tenant_safe,
show_deleted, show_nested)
for record in records:
yield cls.from_db_record(context, record)
def store(self):
'''
Store the cluster in database and return its ID.
If the ID already exists, we do an update.
'''
values = {
'name': self.name,
'profile_id': self.profile_id,
'user': self.user,
'project': self.project,
'domain': self.domain,
'parent': self.parent,
'created_time': self.created_time,
'updated_time': self.updated_time,
'deleted_time': self.deleted_time,
'size': self.size,
'next_index': self.next_index,
'timeout': self.timeout,
'status': self.status,
'status_reason': self.status_reason,
'tags': self.tags,
'data': self.data,
}
if self.id:
db_api.cluster_update(self.context, self.id, values)
# TODO(Qiming): create event/log
else:
cluster = db_api.cluster_create(self.context, values)
# TODO(Qiming): create event/log
self.id = cluster.id
return self.id
def _set_status(self, status):
'''
Set status of the cluster.
'''
values = {}
now = datetime.datetime.utcnow()
if status == self.ACTIVE:
if self.status == self.INIT:
values['created_time'] = now
else:
values['updated_time'] = now
elif status == self.DELETED:
values['deleted_time'] = now
values['status'] = status
db_api.cluster_update(self.context, self.id, values)
# log status to log file
# generate event record
def do_create(self, context, **kwargs):
def do_create(self, **kwargs):
'''
A routine to be called from an action by a thread.
A routine to be called from an action.
'''
for m in range[self.size]:
node = nodes.Node(None, profile_id, cluster_id)
action = actions.NodeAction(context, node, 'CREATE', **kwargs)
# start a thread asynchnously
handle = scheduler.runAction(action)
# add subthread to the waiting list of main thread
scheduler.wait(handle)
self._set_status(self.ACTIVE)
return True
def do_delete(self, **kwargs):
def do_delete(self, context, **kwargs):
self.status = self.DELETED
def do_update(self, **kwargs):
# Profile type checking is done here because the do_update logic can
def do_update(self, context, **kwargs):
# Profile type checking is done here because the do_update logic can
# be triggered from API or Webhook
# TODO: check if profile is of the same type
profile = kwargs.get('profile')
if self.profile == profile:
event.warning(_LW('Cluster refuses to update to the same profile'
'(%s)' % (profile)))
return self.FAILED
self._set_status(self.UPDATING)
node_list = self.get_nodes()
for n in node_list:
node = nodes.Node(None, profile_id, cluster_id)
action = actions.NodeAction(context, node, 'UPDATE', **kwargs)
# start a thread asynchronously
handle = scheduler.runAction(action)
scheduler.wait(handle)
self._set_status(self.ACTIVE)
# TODO(Qiming): check if profile is of the same type
profile_id = kwargs.get('profile_id')
if self.profile_id == profile_id:
events.warning(_LW('Cluster refuses to update to the same profile'
'(%s)' % (profile_id)))
return False
def get_next_index(self):
# TODO: Get next_index from db and increment it in db
# TODO(Qiming): Get next_index from db and increment it in db
curr = self._next_index
self._next_index = self._next_index + 1
return curr
@ -127,102 +212,70 @@ class Cluster(object):
def get_nodes(self):
# This method will return each node with their associated profiles.
# Members may have different versions of the same profile type.
return {}
return self.rt.nodes
def get_policies(self):
# policies are stored in database when policy association is created
# this method retrieves the attached policies from database
return {}
return self.rt.policies
def add_nodes(self, node_ids):
pass
def del_nodes(self, node_ids):
for node in node_ids:
res = Node.destroy(node)
return True
'''
Remove nodes from current cluster.
'''
deleted = []
for node_id in node_ids:
node = db_api.node_get(node_id)
if node.leave(self):
deleted.append(node_id)
return deleted
def attach_policy(self, policy_id):
'''
Attach specified policy instance to this cluster.
'''
# TODO: check conflicts with existing policies
# TODO(Qiming): check conflicts with existing policies
self.policies.append(policy_id)
def detach_policy(self, policy_id):
# TODO: check if actions of specified policies are ongoing
# TODO(Qiming): check if actions of specified policies are ongoing
self.policies.remove(policy_id)
@classmethod
def create(cls, name, size=0, profile=None, **kwargs):
cluster = cls(name, size, profile, kwargs)
cluster.do_create()
# TODO: store this to database
# TODO: log events?
# TODO(Qiming): store this to database
# log events?
return cluster.id
@classmethod
def delete(cls, cluster_id):
cluster = db_api.get_cluster(cluster_id)
if not cluster:
message = _('No cluster exists with id "%s"') % str(cluster_id)
raise exception.NotFound(message)
# TODO: check if actions are working on and can be canceled
# TODO: destroy nodes
# TODO(Qiming): check if actions are working on and can be canceled
# destroy nodes
db_api.delete_cluster(cluster_id)
return True
@classmethod
def update(cls, cluster_id, profile):
cluster = db_api.get_cluster(cluster_id)
# TODO: Implement this
# cluster = db_api.get_cluster(cluster_id)
# TODO(Qiming): Implement this
return True
@classmethod
def load(cls, context, cluster_id=None, cluster=None, show_deleted=True):
'''Retrieve a Cluster from the database.'''
if cluster is None:
cluster = db_api.cluster_get(context, cluster_id,
show_deleted=show_deleted)
if cluster is None:
message = _('No cluster exists with id "%s"') % str(cluster_id)
raise exception.NotFound(message)
return cls._from_db(context, cluster)
@classmethod
def load_all(cls, context, limit=None, marker=None, sort_keys=None,
sort_dir=None, filters=None, tenant_safe=True,
show_deleted=False, show_nested=False):
clusters = db_api.cluster_get_all(context, limit, sort_keys, marker,
sort_dir, filters, tenant_safe,
show_deleted, show_nested) or []
for cluster in clusters:
yield cls._from_db(context, cluster)
@classmethod
def _from_db(cls, context, cluster):
# TODO: calculate current size based on nodes
size = self.size
return cls(context, cluster.name, cluster.profile, size,
id=cluster.id, status=cluster.status,
status_reason=cluster_status_reason,
parent=cluster.parent,
project=cluster.project,
created_time=cluster.created_time,
updated_time=cluster.updated_time,
deleted_time=cluster.deleted_time,
domain = cluster.domain,
timeout = cluster.timeout,
user=cluster.user)
def to_dict(self):
info = {
rpc_api.CLUSTER_NAME: self.name,
rpc_api.CLUSTER_PROFILE: self.profile,
rpc_api.CLUSTER_SIZE: self.size,
rpc_api.CLUSTER_SIZE: self.node_count,
rpc_api.CLUSTER_UUID: self.id,
rpc_api.CLUSTER_PARENT: self.parent,
rpc_api.CLUSTER_DOMAIN: self.domain,
@ -235,5 +288,5 @@ class Cluster(object):
rpc_api.CLUSTER_STATUS_REASON: self.status_reason,
rpc_api.CLUSTER_TIMEOUT: self.timeout,
}
return info

View File

@ -0,0 +1,210 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import glob
import os.path
import six
from stevedore import extension
from oslo.config import cfg
from senlin.common import exception
from senlin.common.i18n import _
from senlin.common.i18n import _LE
from senlin.common.i18n import _LI
from senlin.engine import clients
from senlin.engine import parser
from senlin.engine import registry
from senlin.openstack.common import log
LOG = log.getLogger(__name__)
_environment = None
def global_env():
if _environment is None:
initialize()
return _environment
class Environment(object):
'''
An object that contains all profiles, policies and customizations.
'''
SECTIONS = (
PARAMETERS, CUSTOM_PROFILES, CUSTOM_POLICIES,
) = (
'parameters', 'custom_profiles', 'custom_policies',
)
def __init__(self, env=None, is_global=False):
'''
Create an Environment from a dict.
:param env: the json environment
:param is_global: boolean indicating if this is a user created one.
'''
self.params = {}
if is_global:
self.profile_registry = registry.Registry('profiles')
self.policy_registry = registry.Registry('policies')
else:
self.profile_registry = registry.Registry(
'profiles', global_env.profile_registry)
self.policy_registry = registry.Registry(
'policies', global_env.policy_registry)
if env is None:
env = {}
else:
# Merge user specified keys with current environment
self.params = env.get(self.PARAMETERS, {})
custom_profiles = env.get(self.CUSTOM_PROFILES, {})
custom_policies = env.get(self.CUSTOM_POLICIES, {})
self.profile_registry.load(custom_profiles)
self.policy_registry.load(custom_policies)
def parse(self, env_str):
'''
Parse a string format environment file into a dictionary.
'''
if env_str is None:
return {}
env = parser.simple_parse(env_str)
# Check unknown sections
for sect in env:
if sect not in self.SECTIONS:
msg = _('environment has unknown section "%s"') % sect
raise ValueError(msg)
# Fill in default values for missing sections
for sect in self.SECTIONS:
if sect not in env:
env[sect] = {}
return env
def load(self, env_dict):
'''
Load environment from the given dictionary.
'''
self.params.update(env_dict.get(self.PARAMETERS, {}))
self.profile_registry.load(env_dict.get(self.CUSTOM_PROFILES, {}))
self.policy_registry.load(env_dict.get(self.CUSTOM_POLICIES, {}))
def _check_profile_type_name(self, name):
if name == "" or name is None:
msg = _('Profile type name not specified')
raise exception.ProfileValidationFailed(message=msg)
elif not isinstance(name, six.string_types):
msg = _('Profile type name is not a string')
raise exception.ProfileValidationFailed(message=msg)
def register_profile(self, name, plugin):
self._check_profile_type_name(name)
self.profile_registry.register_plugin(name, plugin)
def get_profile(self, name):
self._check_profile_type_name(name)
plugin = self.profile_registry.get_plugin(name)
if plugin is None:
msg = _("Unknown profile type : %s") % name
raise exception.ProfileValidationFailed(message=msg)
return plugin
def get_profile_types(self):
return self.profile_registry.get_types()
def _check_policy_type_name(self, name):
if name == "" or name is None:
msg = _('Policy type name not specified')
raise exception.PolicyValidationFailed(message=msg)
elif not isinstance(name, six.string_types):
msg = _('Policy type name is not a string')
raise exception.PolicyValidationFailed(message=msg)
def register_policy(self, name, plugin):
self._check_policy_type_name(name)
self.policy_registry.register_plugin(name, plugin)
def get_policy(self, name):
self._check_policy_type_name(name)
plugin = self.policy_registry.get_plugin(name)
if plugin is None:
msg = _("Unknown policy type : %s") % name
raise exception.PolicyValidationFailed(message=msg)
return plugin
def get_policy_types(self):
return self.policy_registry.get_types()
def read_global_environment(self):
'''
Read and parse global enviroment files.
'''
cfg.CONF.import_opt('environment_dir', 'senlin.common.config')
env_dir = cfg.CONF.environment_dir
try:
files = glob.glob(os.path.join(env_dir, '*'))
except OSError as ex:
LOG.error(_LE('Failed to read %s'), env_dir)
LOG.exception(ex)
return
for fname in files:
try:
with open(fname) as f:
LOG.info(_LI('Loading environment from %s'), fname)
self.load(self.parse(f.read()))
except ValueError as vex:
LOG.error(_LE('Failed to parse %s'), fname)
LOG.exception(vex)
except IOError as ioex:
LOG.error(_LE('Failed to read %s'), fname)
LOG.exception(ioex)
def initialize():
global _environment
def _get_mapping(namespace):
mgr = extension.ExtensionManager(
namespace=namespace,
invoke_on_load=False,
verify_requirements=True)
return [[name, mgr[name].plugin] for name in mgr.names()]
if _environment is not None:
return
# TODO(Qiming): Check when to initialize clients if needed
clients.initialise()
env = Environment(is_global=True)
# Register global plugins when initialized
entries = _get_mapping('senlin.profiles')
for name, plugin in entries:
env.register_profile(name, plugin)
entries = _get_mapping('senlin.policies')
for name, plugin in entries:
env.register_policy(name, plugin)
env.read_global_environment()
_environment = env

View File

@ -10,10 +10,11 @@
# License for the specific language governing permissions and limitations
# under the License.
import uuid
import datetime
from senlin.common import exception
from senlin.db import api as db_api
from senlin.profiles import base as profiles
class Node(object):
@ -26,61 +27,181 @@ class Node(object):
'''
statuses = (
ACTIVE, ERROR, DELETED, UPDATING,
INIT, ACTIVE, ERROR, DELETED, UPDATING,
) = (
'ACTIVE', 'ERROR', 'DELETED', 'UPDATING',
'INITIALIZING', 'ACTIVE', 'ERROR', 'DELETED', 'UPDATING',
)
def __init__(self, name, profile_id, cluster_id=None, **kwargs):
def __init__(self, context, name, profile_id, **kwargs):
self.context = context
self.id = kwargs.get('id', None)
if name:
self.name = name
else:
# TODO
# Using self.physical_resource_name() to generate a unique name
# TODO(Qiming): Use self.physical_resource_name() to
# generate a unique name
self.name = 'node-name-tmp'
self.physical_id = None
self.cluster_id = cluster_id
self.physical_id = kwargs.get('physical_id', '')
self.profile_id = profile_id
if cluster_id is None:
self.index = -1
self.cluster_id = kwargs.get('cluster_id', '')
self.index = kwargs.get('index', -1)
self.role = kwargs.get('role', '')
self.created_time = kwargs.get('created_time', None)
self.updated_time = kwargs.get('updated_time', None)
self.deleted_time = kwargs.get('deleted_time', None)
self.status = kwargs.get('status', self.INIT)
self.status_reason = kwargs.get('status_reason', 'Initializing')
self.data = kwargs.get('data', {})
self.tags = kwargs.get('tags', {})
self.rt = {
'profile': profiles.load(context, self.profile_id),
}
def store(self):
'''
Store the node record into database table.
The invocation of DB API could be a node_create or a node_update,
depending on whether node has an ID assigned.
'''
values = {
'name': self.name,
'physical_id': self.physical_id,
'cluster_id': self.cluster_id,
'profile_id': self.profile_id,
'index': self.index,
'role': self.role,
'created_time': self.created_time,
'updated_time': self.updated_time,
'deleted_time': self.deleted_time,
'status': self.status,
'status_reason': self.status_reason,
'data': self.data,
'tags': self.tags,
}
if self.id:
db_api.node_update(self.context, self.id, values)
# TODO(Qiming): create event/log
else:
self.index = db_api.get_next_index(cluster_id)
self.role = ''
node = db_api.node_create(self.context, values)
# TODO(Qiming): create event/log
self.id = node.id
self.created_time = None
self.updated_time = None
self.deleted_time = None
return self.id
self.status = self.ACTIVE
self.status_reason = 'Initialized'
self.data = {}
self.tags = {}
# TODO: store this to database
@classmethod
def from_db_record(cls, context, record):
'''
Construct a node object from database record.
:param context: the context used for DB operations;
:param record: a DB node object that contains all fields;
'''
kwargs = {
'id': record.id,
'physical_id': record.physical_id,
'index': record.index,
'role': record.role,
'created_time': record.created_time,
'updated_time': record.updated_time,
'deleted_time': record.deleted_time,
'status': record.status,
'status_reason': record.status_reason,
'data': record.data,
'tags': record.tags,
}
return cls(context, record.name, record.profile_id, **kwargs)
def create(self, name, profile_id, cluster_id=None, **kwargs):
# TODO: invoke profile to create new object and get the physical id
# TODO: log events?
@classmethod
def load(cls, context, node_id):
'''
Retrieve a node from database.
'''
record = db_api.node_get(context, node_id)
if record is None:
msg = _('No node with id "%s" exists') % node_id
raise exception.NotFound(msg)
return cls.from_db_record(context, record)
@classmethod
def load_all(cls, context, cluster_id):
'''
Retrieve all nodes of from database.
'''
records = db_api.node_get_all_by_cluster(context, cluster_id)
for record in records:
yield cls.from_db_record(context, record)
def do_create(self):
# TODO(Qiming): log events?
self.created_time = datetime.datetime.utcnnow()
return node.id
res = profiles.create_object(self)
if res:
self.physical_id = res
return True
else:
return False
def delete(self):
node = db_api.get_node(self.id)
# TODO: invoke profile to delete this object
# TODO: check if actions are working on it and can be canceled
def do_delete(self):
if not self.physical_id:
return True
db_api.delete_node(self.id)
# TODO(Qiming): check if actions are working on it and can be canceled
# TODO(Qiming): log events
res = profiles.delete_object(self)
if res:
db_api.delete_node(self.id)
return True
else:
return False
def do_update(self, new_profile_id):
if not new_profile_id:
raise exception.ProfileNotSpecified()
if new_profile_id == self.profile_id:
return True
if not self.physical_id:
return False
res = profiles.update_object(self, new_profile_id)
if res:
self.rt['profile'] = profiles.load(self.context, new_profile_id)
self.profile_id = new_profile_id
self.updated_time = datetime.datetime.utcnow()
db_api.node_update(self.context, self.id,
{'profile_id': self.profile_id,
'updated_time': self.updated_time})
return res
def do_join(self, cluster_id):
if self.cluster_id == cluster_id:
return True
db_api.node_migrate(self.context, self.id, self.cluster_id,
cluster_id)
self.updated_time = datetime.datetime.utcnow()
db_api.node_update(self.context, self.id,
{'updated_time': self.updated_time})
return True
def update(self, new_profile_id):
old_profile = db_api.get_profile(self.profile_id)
new_profile = db_api.get_profile(new_profile_id)
def do_leave(self):
if self.cluster_id is None:
return True
# TODO: check if profile type matches
new_profile_type = new_profile.type_name
db_api.node_migrate(self.context, self.id, self.cluster_id, None)
self.updated_time = datetime.datetime.utcnow()
db_api.node_update(self.context, self.id,
{'updated_time': self.updated_time})
profile_cls = profile_registry.get_class(type_name)
profile_cls.update_object(self.id, new_profile)
self.profile_id = new_profile
self.updated_time = datetime.utcnow()
return True

View File

@ -10,7 +10,11 @@
# License for the specific language governing permissions and limitations
# under the License.
import json
import os
import requests
import six
from six.moves import urllib
import yaml
from senlin.common import i18n
@ -19,56 +23,104 @@ from senlin.openstack.common import log as logging
_LE = i18n._LE
LOG = logging.getLogger(__name__)
# Try LibYAML if available
try:
Loader = yaml.CLoader
Dumper = yaml.CDumper
except ImportError as err:
Loader = yaml.Loader
Dumper = yaml.Dumper
if hasattr(yaml, 'CSafeLoader'):
Loader = yaml.CSafeLoader
else:
Loader = yaml.SafeLoader
if hasattr(yaml, 'CSafeDumper'):
Dumper = yaml.CSafeDumper
else:
Dumper = yaml.SafeDumper
def parse_profile(profile):
class YamlLoader(Loader):
def __init__(self, stream):
if isinstance(stream, file):
self._curdir = os.path.split(stream.name)[0]
else:
self._curdir = './'
super(YamlLoader, self).__init__(stream)
def include(self, node):
url = self.construct_scalar(node)
components = urllib.parse.urlparse(url)
if components.scheme == '':
try:
url = os.path.join(self._curdir, url)
with open(url, 'r') as f:
return yaml.load(f, Loader)
except Exception as ex:
raise Exception('Failed loading file %s: %s' % (url,
six.text_type(ex)))
try:
resp = requests.get(url, stream=True)
resp.raise_for_status()
reader = resp.iter_content(chunk_size=1024)
result = ''
for chunk in reader:
result += chunk
return yaml.load(result, Loader)
except Exception as ex:
raise Exception('Failed retrieving file %s: %s' % (url,
six.text_type(ex)))
def process_unicode(self, node):
# Override the default string handling function to always return
# unicode objects
return self.construct_scalar(node)
YamlLoader.add_constructor('!include', YamlLoader.include)
YamlLoader.add_constructor(u'tag:yaml.org,2002:str',
YamlLoader.process_unicode)
YamlLoader.add_constructor(u'tag:yaml.org,2002:timestamp',
YamlLoader.process_unicode)
def simple_parse(in_str):
try:
out_dict = json.loads(in_str)
except ValueError:
try:
out_dict = yaml.load(in_str, Loader=YamlLoader)
except yaml.YAMLError as yea:
yea = six.text_type(yea)
msg = _('Error parsing input: %s') % yea
raise ValueError(msg)
else:
if out_dict is None:
out_dict = {}
if not isinstance(out_dict, dict):
msg = _('The input is not a JSON object or YAML mapping.')
raise ValueError(msg)
return out_dict
def parse_profile(profile_str):
'''
Parse and validate the specified string as a profile.
'''
if not isinstance(profile, six.string_types):
# TODO(Qiming): Throw exception
return None
data = simple_parse(profile_str)
data = {}
try:
data = yaml.load(profile, Loader=Loader)
except Exception as ex:
# TODO(Qiming): Throw exception
LOG.error(_LE('Failed parsing given data as YAML: %s'),
six.text_type(ex))
return None
# TODO(Qiming): Construct a profile object based on the type specified
# TODO(Qiming):
# Construct a profile object based on the type specified
return data
def parse_policy(policy):
def parse_policy(policy_str):
'''
Parse and validate the specified string as a policy.
'''
if not isinstance(policy, six.string_types):
# TODO(Qiming): Throw exception
return None
data = simple_parse(policy_str)
data = {}
try:
data = yaml.load(policy, Loader=Loader)
except Exception as ex:
# TODO(Qiming): Throw exception
LOG.error(_LE('Failed parsing given data as YAML: %s'),
six.text_type(ex))
return None
# TODO(Qiming): Construct a policy object based on the type specified
# TODO(Qiming):
# Construct a policy object based on the type specified
return data

View File

@ -1,30 +0,0 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import uuid
class ProfileRegistry(object):
'''
A registry for profile types.
'''
def __init__(self, **extra_paths):
self._registry = {'profiles': {}}
class ProfileManager(object):
'''
A ProfileManager manages the profile intance database.
'''
def __init__(self, **extra_paths):
pass

165
senlin/engine/registry.py Normal file
View File

@ -0,0 +1,165 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import itertools
import six
from senlin.common.i18n import _LI
from senlin.common.i18n import _LW
from senlin.openstack.common import log
LOG = log.getLogger(__name__)
class PluginInfo(object):
'''
Base mapping of plugin type to implementation.
'''
def __new__(cls, registry, name, plugin, **kwargs):
'''
Create a new PluginInfo of the appropriate class.
Placeholder for class hierarchy extensibility
'''
return super(PluginInfo, cls).__new__(cls)
def __init__(self, registry, name, plugin):
self.registry = registry
self.name = name
self.plugin = plugin
self.user_provided = True
def __eq__(self, other):
if other is None:
return False
return (self.name == other.name and
self.plugin == other.plugin and
self.user_provided == other.user_provided)
def __ne__(self, other):
return not self.__eq__(other)
def __lt__(self, other):
if self.user_provided != other.user_provided:
# user provided ones must be sorted above system ones.
return self.user_provided > other.user_provided_
if len(self.name) != len(other.name):
# more specific (longer) name must be sorted above system ones.
return len(self.name) > len(other.name)
return self.name < other.name
def __gt__(self, other):
return other.__lt__(self)
def __str__(self):
return '[Plugin](User:%s) %s -> %s' % (self.user_provided,
self.name, str(self.plugin))
class Registry(object):
'''
A registry for managing profile or policy classes.
'''
def __init__(self, registry_name, global_registry=None):
self._registry = {registry_name: {}}
self.is_global = True if global_registry else False
self.global_registry = global_registry
def _register_info(self, path, info):
'''
place the new info in the correct location in the registry.
:param path: a list of keys ['profiles', 'my_stack', 'os.heat.stack'],
or ['policies', 'my_policy', 'ScalingPolicy']
:param info: reference to a PluginInfo data structure, deregister a
PluginInfo if specified as None.
'''
descriptive_path = '/'.join(path)
name = path[-1]
# create the structure if needed
registry = self._registry
for key in path[:-1]:
if key not in registry:
registry[key] = {}
registry = registry[key]
if info is None:
# delete this entry.
LOG.warn(_LW('Removing %(item)s from %(path)s'), {
'item': name, 'path': descriptive_path})
registry.pop(name, None)
return
if name in registry and isinstance(registry[name], PluginInfo):
if registry[name] == info:
return
details = {
'path': descriptive_path,
'old': str(registry[name].value),
'new': str(info.value)
}
LOG.warn(_LW('Changing %(path)s from %(old)s to %(new)s'), details)
else:
LOG.info(_LI('Registering %(path)s -> %(value)s'), {
'path': descriptive_path, 'value': str(info.value)})
info.user_provided = self.user_env
registry[name] = info
def register_plugin(self, name, plugin):
pi = PluginInfo(self, [name], plugin)
self._register_info([name], pi)
def _load_registry(self, path, registry):
for k, v in iter(registry.items()):
path = path + [k]
if v is None:
self._register_info(path, None)
elif isinstance(v, dict):
self._load_registry(path, v)
else:
info = PluginInfo(self, path, v)
self._register_info(path, info)
def load(self, json_snippet):
self._load_registry([], json_snippet)
def iterable_by(self, name):
plugin = self._registry.get(name)
if plugin:
yield plugin
def get_plugin(self, name):
giter = []
if self.user_env:
giter = self.global_registry.iterable_by(name)
matches = itertools.chain(self.iterable_by(name), giter)
info = sorted(matches)
return info.plugin if info else None
def as_dict(self):
"""Return profiles in a dict format."""
def _as_dict(level):
tmp = {}
for k, v in iter(level.items()):
if isinstance(v, dict):
tmp[k] = _as_dict(v)
elif v.user_provided:
tmp[k] = v.value
return tmp
return _as_dict(self._registry)
def get_types(self):
'''Return a list of valid profile types.'''
return [name for name in six.iteritems(self._registry)]

View File

@ -23,7 +23,6 @@ import six
from senlin.common.i18n import _
from senlin.common.i18n import _LI
from senlin.engine import action as actions
from senlin.openstack.common import log as logging
LOG = logging.getLogger(__name__)
@ -527,20 +526,7 @@ class PollingTaskGroup(object):
r.cancel()
def runAction(action):
"""
Start a thread to run action until finished
"""
# TODO
# Query lock for this action
# call action.execute with args in subthread
pass
def wait(handle):
"""
Wait an action to finish
"""
# TODO
# Make the subthread join the main thread
def notify():
# TODO(Yanyan): Check if workers are available to pick actions to
# execute
pass

View File

@ -16,6 +16,7 @@ import functools
import eventlet
from oslo.config import cfg
from oslo import messaging
from oslo.utils import uuidutils
from osprofiler import profiler
from senlin.common import context
@ -27,10 +28,9 @@ from senlin.db import api as db_api
from senlin.engine import action as actions
from senlin.engine import cluster as clusters
from senlin.engine import senlin_lock
from senlin.engine.thread_mgr import ThreadGroupManager
from senlin.engine import thread_mgr
from senlin.openstack.common import log as logging
from senlin.openstack.common import service
from senlin.openstack.common import uuidutils
LOG = logging.getLogger(__name__)
@ -58,7 +58,7 @@ class EngineListener(service.Service):
def __init__(self, host, engine_id, thread_group_mgr):
super(EngineListener, self).__init__()
self.thread_group_mgr = thread_group_mgr
self.TG = thread_group_mgr
self.engine_id = engine_id
self.host = host
@ -78,10 +78,10 @@ class EngineListener(service.Service):
def stop_cluster(self, ctxt, cluster_id):
'''Stop any active threads on a cluster.'''
self.thread_group_mgr.stop(cluster_id)
self.TG.stop(cluster_id)
def send(self, ctxt, cluster_id, message):
self.thread_group_mgr.send(cluster_id, message)
self.TG.send(cluster_id, message)
@profiler.trace_cls("rpc")
@ -100,20 +100,21 @@ class EngineService(service.Service):
def __init__(self, host, topic, manager=None):
super(EngineService, self).__init__()
# TODO(Qiming): call environment.initialize() when environment
# is ready
self.host = host
self.topic = topic
# The following are initialized here, but assigned in start() which
# happens after the fork when spawning multiple worker processes
self.engine_id = None
self.thread_group_mgr = None
self.TG = None
self.target = None
def start(self):
self.engine_id = senlin_lock.BaseLock.generate_engine_id()
self.thread_group_mgr = ThreadGroupManager()
self.listener = EngineListener(self.host, self.engine_id,
self.thread_group_mgr)
self.TG = thread_mgr.ThreadGroupManager()
self.listener = EngineListener(self.host, self.engine_id, self.TG)
LOG.debug("Starting listener for engine %s" % self.engine_id)
self.listener.start()
@ -135,14 +136,14 @@ class EngineService(service.Service):
pass
# Wait for all active threads to be finished
for cluster_id in self.thread_group_mgr.groups.keys():
for cluster_id in self.TG.groups.keys():
# Ignore dummy service task
if cluster_id == cfg.CONF.periodic_interval:
continue
LOG.info(_LI("Waiting cluster %s processing to be finished"),
cluster_id)
# Stop threads gracefully
self.thread_group_mgr.stop(cluster_id, True)
self.TG.stop(cluster_id, True)
LOG.info(_LI("cluster %s processing was finished"), cluster_id)
# Terminate the engine process
@ -150,39 +151,39 @@ class EngineService(service.Service):
super(EngineService, self).stop()
@request_context
def identify_cluster(self, cnxt, cluster_name):
def identify_cluster(self, context, cluster_name):
"""
The identify_cluster method returns the cluster id for a
single, live cluster given the cluster name.
:param cnxt: RPC context.
:param context: RPC context.
:param cluster_name: Name or ID of the cluster to look up.
"""
if uuidutils.is_uuid_like(cluster_name):
db_cluster = db_api.cluster_get(cnxt, cluster_name,
db_cluster = db_api.cluster_get(context, cluster_name,
show_deleted=True)
# may be the name is in uuid format, so if get by id returns None,
# we should get the info by name again
if not db_cluster:
db_cluster = db_api.cluster_get_by_name(cnxt, cluster_name)
db_cluster = db_api.cluster_get_by_name(context, cluster_name)
else:
db_cluster = db_api.cluster_get_by_name(cnxt, cluster_name)
db_cluster = db_api.cluster_get_by_name(context, cluster_name)
if db_cluster:
cluster = clusters.Cluster.load(cnxt, cluster=db_cluster)
cluster = clusters.Cluster.load(context, cluster=db_cluster)
return dict(cluster.id)
else:
raise exception.ClusterNotFound(cluster_name=cluster_name)
def _get_cluster(self, cnxt, cluster_identity, show_deleted=False):
def _get_cluster(self, context, cluster_identity, show_deleted=False):
"""
Get Cluster record in DB based on cluster id
"""
# Currently, cluster_identity is cluster id OR cluster name
# TODO: use full cluster identity as inpurt, e.g.
# TODO(Yanyan): use full cluster identity as input, e.g.
# *cluster_name/cluster_id*
cluster_id = self.identify_cluster(cnxt, cluster_identity)
cluster_id = self.identify_cluster(context, cluster_identity)
db_cluster = db_api.cluster_get(cnxt, cluster_id,
db_cluster = db_api.cluster_get(context, cluster_id,
show_deleted=show_deleted,
eager_load=True)
@ -192,20 +193,21 @@ class EngineService(service.Service):
return db_cluster
@request_context
def show_cluster(self, cnxt, cluster_identity):
def show_cluster(self, context, cluster_identity):
"""
Return detailed information about one or all clusters.
:param cnxt: RPC context.
:param context: RPC context.
:param cluster_identity: Name of the cluster you want to show, or None
to show all
"""
if cluster_identity is not None:
db_cluster = self._get_cluster(cnxt, cluster_identity,
db_cluster = self._get_cluster(context, cluster_identity,
show_deleted=True)
cluster_list = clusters.Cluster.load(cnxt, cluster=db_cluster)
cluster_list = clusters.Cluster.load(context, cluster=db_cluster)
else:
cluster_list = clusters.Cluster.load_all(cnxt, show_deleted=True)
cluster_list = clusters.Cluster.load_all(context,
show_deleted=True)
# Format clusters info
clusters_info = []
@ -215,13 +217,13 @@ class EngineService(service.Service):
return {'clusters': clusters_info}
@request_context
def list_clusters(self, cnxt, limit=None, marker=None, sort_keys=None,
def list_clusters(self, context, limit=None, marker=None, sort_keys=None,
sort_dir=None, filters=None, tenant_safe=True,
show_deleted=False, show_nested=False):
"""
The list_clusters method returns attributes of all clusters.
:param cnxt: RPC context
:param context: RPC context
:param limit: the number of clusters to list (integer or string)
:param marker: the ID of the last item in the previous page
:param sort_keys: an array of fields used to sort the list
@ -232,7 +234,7 @@ class EngineService(service.Service):
:param show_nested: if true, show nested clusters
:returns: a list of formatted clusters
"""
cluster_list = clusters.Cluster.load_all(cnxt, limit, marker,
cluster_list = clusters.Cluster.load_all(context, limit, marker,
sort_keys, sort_dir,
filters, tenant_safe,
show_deleted, show_nested)
@ -245,57 +247,48 @@ class EngineService(service.Service):
return {'clusters': clusters_info}
@request_context
def create_cluster(self, cnxt, cluster_name, size, profile,
owner_id=None, nested_depth=0, user_creds_id=None,
cluster_user_project_id=None):
"""
def create_cluster(self, context, name, profile_id, size, args):
'''
Handle request to perform a create action on a cluster
:param cnxt: RPC context.
:param cluster_name: Name of the cluster you want to create.
:param size: Size of cluster you want to create.
:param profile: Profile used to create cluster nodes.
:param owner_id: parent cluster ID for nested clusters, only
expected when called from another senlin-engine
(not a user option)
:param nested_depth: the nested depth for nested clusters, only
expected when called from another senlin-engine
:param user_creds_id: the parent user_creds record for nested clusters
:param cluster_user_project_id: the parent cluster_user_project_id for
nested clusters
"""
LOG.info(_LI('Creating cluster %s'), cluster_name)
:param cntxt: RPC context.
:param name: Name of the cluster to created.
:param profile_id: Profile used to create cluster nodes.
:param size: Desired size of cluster to be created.
:param args: A dictionary of other parameters
'''
LOG.info(_LI('Creating cluster %s'), name)
# TODO: construct real kwargs based on input for cluster creating
kwargs = {}
kwargs['owner_id'] = owner_id
kwargs['nested_depth'] = nested_depth
kwargs['user_creds_id'] = user_creds_id
kwargs['cluster_user_project_id'] = cluster_user_project_id
kwargs = {
'parent': args.get('parent', ''),
'user': context.get('username', ''),
'project': context.get('tenant_id', ''),
'timeout': args.get('timeout', 0),
'tags': args.get('tags', {}),
}
cluster = clusters.Cluster(cluster_name, size, profile, **kwargs)
action = actions.ClusterAction(cnxt, cluster, 'CREATE', **kwargs)
self.thread_group_mgr.start_with_lock(cnxt, cluster, 'cluster',
self.engine_id, action.execute)
cluster = clusters.Cluster(name, profile_id, size, **kwargs)
cluster.store()
action = actions.Action(context, cluster, 'CLUSTER_CREATE', **kwargs)
self.TG.start_action_woker(action, self.engine_id)
return cluster.id
@request_context
def update_cluster(self, cnxt, cluster_identity, profile):
def update_cluster(self, context, cluster_identity, profile_id):
"""
Handle request to perform a update action on a cluster
:param cnxt: RPC context.
:param context: RPC context.
:param cluster_identity: Name of the cluster you want to create.
:param size: Size of cluster you want to create.
:param profile: Profile used to create cluster nodes.
"""
# Get the database representation of the existing cluster
db_cluster = self._get_cluster(cnxt, cluster_identity)
db_cluster = self._get_cluster(context, cluster_identity)
LOG.info(_LI('Updating cluster %s'), db_cluster.name)
cluster = clusters.Cluster.load(cnxt, cluster=db_cluster)
cluster = clusters.Cluster.load(context, cluster=db_cluster)
if cluster.status == cluster.ERROR:
msg = _('Updating a cluster when it is errored')
raise exception.NotSupported(feature=msg)
@ -304,38 +297,37 @@ class EngineService(service.Service):
msg = _('Updating a cluster which has been deleted')
raise exception.NotSupported(feature=msg)
kwargs = {}
kwargs['profile'] = profile
action = actions.ClusterAction(cnxt, cluster, 'UPDATE', **kwargs)
kwargs = {
'profile_id': profile_id
}
self.thread_group_mgr.start_with_lock(cnxt, cluster, 'cluster',
self.engine_id, action.execute)
action = actions.Action(context, cluster, 'CLUSTER_UPDATE', **kwargs)
self.TG.start_action_worker(action, self.engine_id)
return cluster.id
@request_context
def delete_cluster(self, cnxt, cluster_identity):
def delete_cluster(self, context, cluster_identity):
"""
Handle request to perform a delete action on a cluster
:param cnxt: RPC context.
:param context: RPC context.
:param cluster_identity: Name or ID of the cluster you want to delete.
"""
db_cluster = self._get_cluster(cnxt, cluster_identity)
db_cluster = self._get_cluster(context, cluster_identity)
LOG.info(_LI('Deleting cluster %s'), db_cluster.name)
# This is an operation on a cluster, so we try to acquire ClusterLock
cluster = clusters.Cluster.load(cnxt, cluster=db_cluster)
lock = senlin_lock.ClusterLock(cnxt, cluster, self.engine_id)
cluster = clusters.Cluster.load(context, cluster=db_cluster)
lock = senlin_lock.ClusterLock(context, cluster, self.engine_id)
with lock.try_thread_lock(cluster.id) as acquire_result:
# Successfully acquired lock
if acquire_result is None:
self.thread_group_mgr.stop_timers(cluster.id)
action = actions.ClusterAction(cnxt, cluster, 'DELETE')
self.thread_group_mgr.start_with_acquired_lock(cluster, lock,
action.execute)
self.TG.stop_timers(cluster.id)
action = actions.Action(context, cluster, 'CLUSTER_DELETE')
self.TG.start_action_worker(action, self.engine_id)
return
# Current engine has the lock
@ -343,11 +335,11 @@ class EngineService(service.Service):
# give threads which are almost complete an opportunity to
# finish naturally before force stopping them
eventlet.sleep(0.2)
self.thread_group_mgr.stop(cluster.id)
self.TG.stop(cluster.id)
# Another active engine has the lock
elif senlin_lock.ClusterLock.engine_alive(cnxt, acquire_result):
elif senlin_lock.ClusterLock.engine_alive(context, acquire_result):
stop_result = self._remote_call(
cnxt, acquire_result, self.listener.STOP_CLUSTER,
context, acquire_result, self.listener.STOP_CLUSTER,
cluster_id=cluster.id)
if stop_result is None:
LOG.debug("Successfully stopped remote task on engine %s"
@ -359,49 +351,19 @@ class EngineService(service.Service):
# There may be additional nodes that we don't know about
# if an update was in-progress when the cluster was stopped, so
# reload the cluster from the database.
db_cluster = self._get_cluster(cnxt, cluster_identity)
cluster = clusters.Cluster.load(cnxt, cluster=db_cluster)
action = actions.ClusterAction(cnxt, cluster, 'DELETE')
db_cluster = self._get_cluster(context, cluster_identity)
cluster = clusters.Cluster.load(context, cluster=db_cluster)
action = actions.Action(context, cluster, 'CLUSTER_DELETE')
self.thread_group_mgr.start_with_lock(cnxt, cluster, 'cluster',
self.engine_id, action.execute)
self.TB.start_action_worker(action, self.engine_id)
return None
@request_context
def cluster_suspend(self, cnxt, cluster_identity):
'''
Handle request to perform suspend action on a cluster
'''
db_cluster = self._get_cluster(cnxt, cluster_identity)
LOG.debug("suspending cluster %s" % db_cluster.name)
cluster = clusters.Cluster.load(cnxt, cluster=db_cluster)
action = actions.ClusterAction(cnxt, cluster, 'SUSPEND')
self.thread_group_mgr.start_with_lock(cnxt, cluster, 'cluster',
self.engine_id, action.execute)
@request_context
def cluster_resume(self, cnxt, cluster_identity):
'''
Handle request to perform a resume action on a cluster
'''
db_cluster = self._get_cluster(cnxt, cluster_identity)
LOG.debug("resuming cluster %s" % db_cluster.name)
cluster = clusters.Cluster.load(cnxt, cluster=db_cluster)
action = actions.ClusterAction(cnxt, cluster, 'RESUME')
self.thread_group_mgr.start_with_lock(cnxt, cluster, 'cluster',
self.engine_id, action.execute)
def _remote_call(self, cnxt, lock_engine_id, call, *args, **kwargs):
def _remote_call(self, context, lock_engine_id, call, *args, **kwargs):
self.cctxt = self._client.prepare(
version='1.0',
topic=lock_engine_id)
try:
self.cctxt.call(cnxt, call, *args, **kwargs)
self.cctxt.call(context, call, *args, **kwargs)
except messaging.MessagingTimeout:
return False

View File

@ -1,15 +1,14 @@
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import collections
import os
@ -40,42 +39,26 @@ class ThreadGroupManager(object):
# Create dummy service task, because when there is nothing queued
# on self.tg the process exits
self.add_timer(cfg.CONF.periodic_interval, self._service_task)
def _service_task(self):
"""
'''
This is a dummy task which gets queued on the service.Service
threadgroup. Without this service.Service sees nothing running
i.e has nothing to wait() on, so the process exits..
This could also be used to trigger periodic non-cluster-specific
housekeeping tasks
"""
'''
# TODO(Yanyan): have this task call dbapi purge events
pass
def _serialize_profile_info(self):
prof = profiler.get()
trace_info = None
if prof:
trace_info = {
"hmac_key": prof.hmac_key,
"base_id": prof.get_base_id(),
"parent_id": prof.get_id()
}
return trace_info
def _start_with_trace(self, trace, func, *args, **kwargs):
if trace:
profiler.init(**trace)
return func(*args, **kwargs)
def start(self, target_id, func, *args, **kwargs):
"""
Run the given method in a sub-thread.
"""
if target_id not in self.groups:
self.groups[target_id] = threadgroup.ThreadGroup()
return self.groups[target_id].add_thread(self._start_with_trace,
self._serialize_profile_info(),
func, *args, **kwargs)
return self.groups[target_id].add_thread(func, *args, **kwargs)
def start_with_lock(self, cnxt, target, target_type, engine_id,
func, *args, **kwargs):
@ -99,6 +82,7 @@ class ThreadGroupManager(object):
lock = senlin_lock.ClusterLock(cnxt, target, engine_id)
elif target_type == 'node':
lock = senlin_lock.NodeLock(cnxt, target, engine_id)
with lock.thread_lock(target.id):
th = self.start_with_acquired_lock(target, lock,
func, *args, **kwargs)
@ -168,9 +152,45 @@ class ThreadGroupManager(object):
for th in threads:
th.link(mark_done, th)
while not all(links_done.values()):
eventlet.sleep()
def send(self, target_id, message):
for event in self.events.pop(target_id, []):
event.send(message)
def action_proc(self, action, worker_id):
'''
Thread procedure.
'''
status = action.get_status()
while status in (action.INIT, action.WAITING):
# TODO(Qiming): Handle 'start_time' field of an action
yield
status = action.get_status()
# Exit quickly if action has been taken care of or marked
# completed or cancelled by other activities
if status != action.READY:
return
done = False
while not done:
# Take over the action
action.set_status(action.RUNNING)
result = action.execute()
if result == action.OK:
action.set_status(action.SUCCEEDED)
done = True
elif result == action.ERROR:
action.set_status(action.FAILED)
done = True
elif result == action.RETRY:
continue
def start_action_worker(self, action, engine_id):
self.start(self.action_proc, engine_id)

View File

@ -1,126 +0,0 @@
# Copyright 2011 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Simple class that stores security context information in the web request.
Projects should subclass this class if they wish to enhance the request
context or provide additional information in their specific WSGI pipeline.
"""
import itertools
import uuid
def generate_request_id():
return b'req-' + str(uuid.uuid4()).encode('ascii')
class RequestContext(object):
"""Helper class to represent useful information about a request context.
Stores information about the security context under which the user
accesses the system, as well as additional request information.
"""
user_idt_format = '{user} {tenant} {domain} {user_domain} {p_domain}'
def __init__(self, auth_token=None, user=None, tenant=None, domain=None,
user_domain=None, project_domain=None, is_admin=False,
read_only=False, show_deleted=False, request_id=None,
instance_uuid=None):
self.auth_token = auth_token
self.user = user
self.tenant = tenant
self.domain = domain
self.user_domain = user_domain
self.project_domain = project_domain
self.is_admin = is_admin
self.read_only = read_only
self.show_deleted = show_deleted
self.instance_uuid = instance_uuid
if not request_id:
request_id = generate_request_id()
self.request_id = request_id
def to_dict(self):
user_idt = (
self.user_idt_format.format(user=self.user or '-',
tenant=self.tenant or '-',
domain=self.domain or '-',
user_domain=self.user_domain or '-',
p_domain=self.project_domain or '-'))
return {'user': self.user,
'tenant': self.tenant,
'domain': self.domain,
'user_domain': self.user_domain,
'project_domain': self.project_domain,
'is_admin': self.is_admin,
'read_only': self.read_only,
'show_deleted': self.show_deleted,
'auth_token': self.auth_token,
'request_id': self.request_id,
'instance_uuid': self.instance_uuid,
'user_identity': user_idt}
@classmethod
def from_dict(cls, ctx):
return cls(
auth_token=ctx.get("auth_token"),
user=ctx.get("user"),
tenant=ctx.get("tenant"),
domain=ctx.get("domain"),
user_domain=ctx.get("user_domain"),
project_domain=ctx.get("project_domain"),
is_admin=ctx.get("is_admin", False),
read_only=ctx.get("read_only", False),
show_deleted=ctx.get("show_deleted", False),
request_id=ctx.get("request_id"),
instance_uuid=ctx.get("instance_uuid"))
def get_admin_context(show_deleted=False):
context = RequestContext(None,
tenant=None,
is_admin=True,
show_deleted=show_deleted)
return context
def get_context_from_function_and_args(function, args, kwargs):
"""Find an arg of type RequestContext and return it.
This is useful in a couple of decorators where we don't
know much about the function we're wrapping.
"""
for arg in itertools.chain(kwargs.values(), args):
if isinstance(arg, RequestContext):
return arg
return None
def is_user_context(context):
"""Indicates if the request context is a normal user."""
if not context:
return False
if context.is_admin:
return False
if not context.user_id or not context.project_id:
return False
return True

View File

@ -1,37 +0,0 @@
# Copyright (c) 2012 Intel Corporation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
UUID related utilities and helper functions.
"""
import uuid
def generate_uuid():
return str(uuid.uuid4())
def is_uuid_like(val):
"""Returns validation of a value as a UUID.
For our purposes, a UUID is a canonical form string:
aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa
"""
try:
return str(uuid.UUID(val)) == val
except (TypeError, ValueError, AttributeError):
return False

View File

@ -10,34 +10,111 @@
# License for the specific language governing permissions and limitations
# under the License.
from senlin.common import exception
from senlin.db import api as db_api
from senlin.engine import environment
class PolicyBase(object):
class Policy(object):
'''
Base class for policies.
'''
def __init__(self, name, type_name, **kwargs):
ENFORCEMENT_LEVELS = (
CRITICAL, ERROR, WARNING, INFO, DEBUG,
) = (
'CRITICAL', 'ERROR', 'WARNING', 'INFO', 'DEBUG',
)
def __new__(cls, type_name, name, **kwargs):
'''
Create a new policy of the appropriate class.
'''
if cls != Policy:
PolicyClass = cls
else:
PolicyClass = environment.global_env().get_policy(type_name)
return super(Policy, cls).__new__(PolicyClass)
def __init__(self, type_name, name, **kwargs):
self.id = kwargs.get('id', None)
self.name = name
self.type = type_name
self.cooldown = 0
self.level = DEBUG
self.spec = {}
self.data = {}
def pre_op(self, cluster_id, action, **args):
self.context = kwargs.get('context', None)
self.cooldown = kwargs.get('cooldown', 0)
self.level = kwargs.get('level', self.DEBUG)
self.spec = kwargs.get('spec', {})
self.data = kwargs.get('data', {})
def store(self):
'''
Store the policy object into database table.
This could be a policy_create or a policy_update DB API invocation,
depends on whether self.id is set.
'''
values = {
'name': self.name,
'type': self.type,
'cooldown': self.cooldown,
'level': self.level,
'spec': self.spec,
'data': self.data,
}
if self.id:
db_api.policy_update(self.context, self.id, values)
else:
policy = db_api.policy_create(self.context, values)
self.id = policy.id
return self.id
@classmethod
def from_db_record(cls, context, record):
'''
Construct a policy object from a database record.
:param context:
:param record:
'''
kwargs = {
'id': record.id,
'name': record.name,
'context': context,
'type': record.type,
'cooldown': record.cooldown,
'level': record.level,
'spec': record.spec,
'data': record.data,
}
return cls(record.type, record.name, **kwargs)
@classmethod
def load(cls, context, policy_id):
'''
Retrieve and reconstruct a policy object from DB.
'''
policy = db_api.policy_get(context, policy_id)
if policy is None:
msg = _('No policy with id "%s" exists') % policy_id
raise exception.NotFound(msg)
return cls.from_db_record(context, policy)
def pre_op(self, cluster_id, action, **kwargs):
'''
Force all subclasses to implement an operation that will be invoked
before an action.
'''
return NotImplemented
def enforce(self, cluster_id, action, **args):
def enforce(self, cluster_id, action, **kwargs):
'''
Force all subclasses to implement an operation that can be called
during an action.
'''
return NotImplemented
def post_op(self, cluster_id, action, **args):
def post_op(self, cluster_id, action, **kwargs):
'''
Force all subclasses to implement an operation that will be performed
after an action.
@ -46,16 +123,18 @@ class PolicyBase(object):
def to_dict(self):
pb_dict = {
'id': self.id,
'name': self.name,
'type': self.type_name,
'uuid': self.uuid,
'type': self.type,
'spec': self.spec,
'level': self.level,
'cooldown': self.cooldown,
'data': self.data,
}
return pb_dict
@classmethod
def from_dict(self, **kwargs):
pb = PolicyBase(kwargs)
return pb
def from_dict(cls, **kwargs):
type_name = kwargs.get('type', '')
name = kwargs.get('name', '')
return cls(type_name, name, **kwargs)

View File

@ -10,11 +10,14 @@
# License for the specific language governing permissions and limitations
# under the License.
import random
from senlin.common import senlin_consts as consts
from senlin.db import api as db_api
from senlin.policies import base
class DeletionPolicy(base.PolicyBase):
class DeletionPolicy(base.Policy):
'''
Policy for deleting member(s) from a cluster.
'''
@ -28,46 +31,48 @@ class DeletionPolicy(base.PolicyBase):
)
TARGET = [
('BEFORE', 'CLUSTER', 'DELETE_MEMBER'),
('AFTER', 'CLUSTER', 'DELETE_MEMBER'),
('WHEN', consts.CLUSTER_SCALE_DOWN),
('AFTER', consts.CLUSTER_DEL_NODES),
('AFTER', consts.CLUSTER_SCALE_DOWN),
]
PROFILE_TYPE = [
'ANY'
]
def __init__(self, name, type_name, **kwargs):
super(DeletionPolicy, self).__init__(name, type_name, kwargs)
def __init__(self, type_name, name, **kwargs):
super(DeletionPolicy, self).__init__(type_name, name, **kwargs)
self.criteria = kwargs.get('criteria')
self.grace_period = kwargs.get('grace_period')
self.delete_desired_capacity = kwargs.get('reduce_desired_capacity')
def _sort_members_by_creation_time(members):
# TODO: do sorting
return members
self.criteria = kwargs.get('criteria', '')
self.grace_period = kwargs.get('grace_period', 0)
self.reduce_desired_capacity = kwargs.get('reduce_desired_capacity',
False)
random.seed()
def pre_op(self, cluster_id, action, **args):
# :cluster_id the cluster
# :action 'DEL_MEMBER'
# :args a list of candidate members
# TODO: choose victims from the given cluster
members = db_api.get_members(cluster_id)
sorted = self._sort_members_by_creation_time(members)
if self.criteria == self.OLDEST_FIRST:
victim = sorted[0]
elif self.criteria ==self.YOUNGEST_FIRST:
victim = sorted[-1]
else:
rand = random(len(sorted))
victim = sorted[rand]
# TODO: return True/False
return victim
'''
We don't block the deletion anyhow.
'''
return True
def enforce(self, cluster_id, action, **args):
pass
'''
The enforcement of a deletion policy returns the chosen victims
that will be deleted.
'''
nodes = db_api.node_get_all_by_cluster_id(cluster_id)
if self.criteria == self.RANDOM:
rand = random.randrange(len(nodes))
return nodes[rand]
sorted_list = sorted(nodes, key=lambda r: (r.created_time, r.name))
if self.criteria == self.OLDEST_FIRST:
victim = sorted_list[0]
else: # self.criteria == self.YOUNGEST_FIRST:
victim = sorted_list[-1]
return victim
def post_op(self, cluster_id, action, **args):
# TODO(Qiming): process grace period here if needed
pass

View File

@ -10,14 +10,15 @@
# License for the specific language governing permissions and limitations
# under the License.
from senlin.common import senlin_consts as consts
from senlin.policies import base
class HealthPolicy(base.PolicyBase):
class HealthPolicy(base.Policy):
'''
Policy for health checking for members of a cluster.
'''
CHECK_TYPES = (
VM_LIFECYCLE_EVENTS,
VM_STATUS_POLLING,
@ -29,7 +30,10 @@ class HealthPolicy(base.PolicyBase):
)
TARGET = [
('AFTER', 'CLUSTER', 'ADD_MEMBER')
('AFTER', consts.CLUSTER_SCALE_UP),
('AFTER', consts.CLUSTER_ADD_NODES),
('BEFORE', consts.CLUSTER_SCALE_DOWN),
('BEFORE', consts.CLUSTER_DEL_NODES),
]
PROFILE_TYPE = [
@ -37,20 +41,31 @@ class HealthPolicy(base.PolicyBase):
'AWS.AutoScaling.LaunchConfiguration',
]
def __init__(self, name, type_name, **kwargs):
super(HealthPolicy, self).__init__(name, type_name, kwargs)
def __init__(self, type_name, name, **kwargs):
super(HealthPolicy, self).__init__(type_name, name, kwargs)
self.interval = kwargs.get('interval')
self.grace_period = kwargs.get('grace_period')
self.check_type = kwargs.get('check_type')
self.interval = self.spec.get('interval')
self.grace_period = self.spec.get('grace_period')
self.check_type = self.spec.get('check_type')
def pre_op(self, cluster_id, action, **args):
pass
# Ignore actions that are not required to be processed at this stage
if action not in (consts.CLUSTER_SCALE_DOWN,
consts.CLUSTER_DEL_NODES):
return True
# TODO(anyone): Unsubscribe nodes from backend health monitoring
# infrastructure
return True
def enforce(self, cluster_id, action, **args):
pass
def post_op(self, cluster_id, action, **args):
# TODO: subscribe to vm-lifecycle-events for the specified VM
# or add vm to the list of VM status polling
pass
# Ignore irrelevant action here
if action not in (consts.CLUSTER_SCALE_UP, consts.CLUSTER_ADD_NODES):
return True
# TODO(anyone): subscribe to vm-lifecycle-events for the specified VM
# or add vm to the list of VM status polling
return True

View File

@ -1,16 +1 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from stevedore import extension

View File

@ -10,64 +10,99 @@
# License for the specific language governing permissions and limitations
# under the License.
import collections
import uuid
from senlin.common import exception
from senlin.db import api as db_api
from senlin.engine import environment
from senlin.openstack.common import log as logging
LOG = logging.getLogger(__name__)
class ProfileBase(object):
class Profile(object):
'''
Base class for profiles.
'''
def __new__(cls, profile, *args, **kwargs):
def __new__(cls, type_name, name, **kwargs):
'''
Create a new profile of the appropriate class.
'''
global _profile_classes
if _profile_classes is None:
mgr = extension.ExtensionManager(name_space='senlin.profiles',
invoke_on_load=False,
verify_requirements=True)
_profile_classes = dict((tuple(name.split('.')), mgr[name].plugin)
for name in mgr.names())
if cls != ProfileBase:
if cls != Profile:
ProfileClass = cls
else:
ProfileClass = get_profile_class(profile)
ProfileClass = environment.global_env().get_profile(type_name)
return super(Profile, cls).__new__(ProfileClass)
def __init__(self, name, type_name, **kwargs):
def __init__(self, type_name, name, **kwargs):
'''
Initialize the profile with given parameters and a JSON object.
'''
self.name = name
self.type_name = type_name
self.permission = ''
self.spec = kwargs.get('spec')
self.tags = {}
self.type = type_name
self.id = kwargs.get('id', None)
self.permission = kwargs.get('permission', '')
self.spec = kwargs.get('spec', {})
self.tags = kwargs.get('tags', {})
self.deleted_time = kwargs.get('deleted_time', None)
@classmethod
def create_object(cls, name, type_name, **kwargs):
obj = cls(name, type_name, kwargs)
physical_id = obj.do_create()
return physical_id
def from_db_record(cls, context, record):
'''
Construct a profile object from database record.
:param context: the context used for DB operations.
:param record: a DB Profle object that contains all required fields.
'''
kwargs = {
'id': record.id,
'spec': record.spec,
'permission': record.permission,
'tags': record.tags,
'deleted_time': record.deleted_time,
}
return cls(record.type, record.name, **kwargs)
@classmethod
def delete_object(cls, physical_id):
obj = db_api.load_member(physical_id=physical_id)
result = obj.do_delete()
return result
def load(cls, context, profile_id):
'''
Retrieve a profile object from database.
'''
profile = db_api.profile_get(context, profile_id)
if profile is None:
msg = _('No profile with id "%s" exists') % profile_id
raise exception.NotFound(msg)
return cls.from_db_record(context, profile)
def store(self):
'''
Store the profile into database and return its ID.
'''
values = {
'name': self.name,
'type': self.type,
'spec': self.spec,
'permission': self.permission,
'tags': self.tags,
}
profile = db_api.profile_create(self.context, values)
return profile.id
@classmethod
def update_object(cls, physical_id, new_profile):
obj = db_api.load_member(physical_id=physical_id)
result = obj.do_update()
return result
def create_object(cls, obj):
profile = cls.from_db(obj.context, obj.profile_id)
return profile.do_create(obj)
@classmethod
def delete_object(cls, obj):
profile = cls.from_db(obj.context, obj.profile_id)
return profile.do_delete(obj)
@classmethod
def update_object(cls, obj, new_profile_id):
profile = cls.from_db(obj.context, obj.profile_id)
return profile.do_update(obj, new_profile_id)
def do_create(self):
'''
@ -99,5 +134,4 @@ class ProfileBase(object):
@classmethod
def from_dict(cls, **kwargs):
pb = cls(kwargs)
return pb
return cls(kwargs)

View File

@ -90,6 +90,7 @@ def create_cluster(ctx, profile, **kwargs):
'project': ctx.tenant_id,
'domain': 'unknown',
'parent': None,
'node_count': 0,
'next_index': 0,
'timeout': '60',
'status': 'INIT',
@ -137,3 +138,21 @@ def create_event(ctx, **kwargs):
}
values.update(kwargs)
return db_api.event_create(ctx, values)
def create_action(ctx, **kwargs):
values = {
'context': kwargs.get('context'),
'description': 'Action description',
'target': kwargs.get('target'),
'action': kwargs.get('action'),
'cause': 'Reason for action',
'owner': kwarge.get('owner'),
'interval': -1,
'inputs': {'key': 'value'},
'outputs': {'result': 'value'}
'depends_on': [],
'depended_on': []
}
values.update(kwargs)
return db_api.action_create(ctx, values)

View File

@ -36,6 +36,7 @@ class DBAPIClusterTest(base.SenlinTestCase):
self.assertEqual(self.ctx.tenant_id, cluster.project)
self.assertEqual('unknown', cluster.domain)
self.assertIsNone(cluster.parent)
self.assertEqual(0, cluster.node_count)
self.assertEqual(0, cluster.next_index)
self.assertEqual('60', cluster.timeout)
self.assertEqual('INIT', cluster.status)