Merge branch 'master' of github.com:tengqm/senlin

This commit is contained in:
tengqm 2015-01-03 09:57:36 +08:00
commit b19df06215
8 changed files with 482 additions and 60 deletions

View File

@ -9,3 +9,12 @@
2014-12-29 tengqm <tengqim@cn.ibm.com>
* TODO: Added some test cases jobs.
2015-01-02 liuhang <hangliu@cn.ibm.com>
* TODO: Remove DB action APIs task.
* db/api.py:
add 'action_add_dependency', 'action_del_dependency'
remove dependency api without transaction.
* db/sqlalchemy/api.py:
add 'action_add_dependency', 'action_del_dependency'
remove dependency api without transaction.

2
TODO
View File

@ -4,8 +4,6 @@ High Priority
DB
--
- Add action APIs
- Revise action add/remove dependencies APIs
- Make sure cluster-policy association is deleted when a cluster is deleted
- Add field size to cluster table
- Modify node_set_status to check/update cluster status

View File

@ -252,20 +252,12 @@ def action_get_all(context):
return IMPL.action_get_all(context)
def action_add_depends_on(context, action_id, *actions):
return IMPL.action_add_depends_on(context, action_id, *actions)
def action_add_dependency(context, depended, dependent):
return IMPL.action_add_dependency(context, depended, dependent)
def action_del_depends_on(context, action_id, *actions):
return IMPL.action_del_depends_on(context, action_id, *actions)
def action_add_depended_by(context, action_id, *actions):
return IMPL.action_add_depended_by(context, action_id, *actions)
def action_del_depended_by(context, action_id, *actions):
return IMPL.action_del_depended_by(context, action_id, *actions)
def action_del_dependency(context, depended, dependent):
return IMPL.action_del_dependency(context, depended, dependent)
def action_mark_succeeded(context, action_id):
@ -284,6 +276,10 @@ def action_start_work_on(context, action_id, owner):
return IMPL.action_start_work_on(context, action_id, owner)
def action_delete(context, action_id, force=False):
return IMPL.action_delete(context, action_id, force)
def db_sync(engine, version=None):
"""Migrate the database to `version` or the most recent version."""
return IMPL.db_sync(engine, version=version)

View File

@ -27,8 +27,12 @@ from senlin.common.i18n import _
from senlin.db.sqlalchemy import filters as db_filters
from senlin.db.sqlalchemy import migration
from senlin.db.sqlalchemy import models
from senlin.openstack.common import log as logging
from senlin.rpc import api as rpc_api
LOG = logging.getLogger(__name__)
CONF = cfg.CONF
CONF.import_opt('max_events_per_cluster', 'senlin.common.config')
@ -707,12 +711,14 @@ def action_get_1st_ready(context):
def action_get_all_ready(context):
query = model_query(context, models.Action)
query = model_query(context, models.Action).\
filter_by(status=ACTION_READY)
return query.all()
def action_get_all_by_owner(context, owner_id):
query = model_query(context, models.Action).filter_by(owner=owner_id)
query = model_query(context, models.Action).\
filter_by(owner=owner_id)
return query.all()
@ -724,50 +730,111 @@ def action_get_all(context):
return actions
def action_add_depends_on(context, action_id, *actions):
def _action_dependency_add(context, action_id, field, adds):
if not isinstance(adds, list):
add_list = [adds]
else:
add_list = adds
action = model_query(context, models.Action).get(action_id)
if not action:
raise exception.NotFound(
_('Action with id "%s" not found') % action_id)
msg = _('Action with id "%s" not found') % action_id
raise exception.NotFound(msg)
action.depends_on = list(set(actions).union(set(action.depends_on)))
# TODO(liuh): Set status to WAITING if 'depends_on' is not empty
action.save(_session(context))
return action
d = {}
if action[field] is None:
d['l'] = add_list;
else:
d = action[field]
d['l'] = list(set(d['l']).union(set(add_list)))
action[field] = d
if field == 'depends_on':
action.status = ACTION_WAITING
action.status_reason = ACTION_WAITING
action.status_reason = _('The action is waiting for its dependancy \
being completed.')
def action_del_depends_on(context, action_id, *actions):
def _action_dependency_del(context, action_id, field, dels):
if not isinstance(dels, list):
del_list = [dels]
else:
del_list = dels
action = model_query(context, models.Action).get(action_id)
if not action:
raise exception.NotFound(
_('Action with id "%s" not found') % action_id)
msg = _('Action with id "%s" not found') % action_id
raise exception.NotFound(msg)
action.depends_on = list(set(action.depends_on).different(set(actions)))
# TODO(liuh): Set status to READY if 'depends_on' is empty
action.save(_session(context))
return action
d = {}
if action[field] is not None:
d = action[field]
d['l'] = list(set(d['l']) - set(del_list))
action[field] = d
if field == 'depends_on' and len(d['l']) == 0:
action.status = ACTION_READY
action.status_reason = _('The action becomes ready due to all dependancies \
have been satisfied.')
def action_add_depended_by(context, action_id, *actions):
action = model_query(context, models.Action).get(action_id)
if not action:
raise exception.NotFound(
_('Action with id "%s" not found') % action_id)
def action_add_dependency(context, depended, dependent):
if isinstance(depended, list) and isinstance(dependent, list):
raise exception.NotSupport(
_('Multiple dependencies between lists not support'))
action.depended_by = list(set(actions).union(set(action.depended_by)))
action.save(_session(context))
return action
if isinstance(depended, list): # e.g. D depends on A,B,C
session = get_session()
with session.begin():
for d in depended:
_action_dependency_add(context, d, "depended_by", dependent)
_action_dependency_add(context, dependent, "depends_on", depended)
return
# Only dependent can be a list now, convert it to a list if it is not a list
if not isinstance(dependent, list): # e.g. B,C,D depend on A
dependents = [dependent]
else:
dependents = dependent
session = get_session()
with session.begin():
_action_dependency_add(context, depended, "depended_by", dependent)
for d in dependents:
_action_dependency_add(context, d, "depends_on", depended)
return
def action_del_depended_by(context, action_id, *actions):
action = model_query(context, models.Action).get(action_id)
if not action:
raise exception.NotFound(
_('Action with id "%s" not found') % action_id)
def action_del_dependency(context, depended, dependent):
if isinstance(depended, list) and isinstance(dependent, list):
raise exception.NotSupport(
_('Multiple dependencies between lists not support'))
action.depended_by = list(set(action.depended_by).different(set(actions)))
action.save(_session(context))
return action
if isinstance(depended, list): # e.g. D depends on A,B,C
session = get_session()
with session.begin():
for d in depended:
_action_dependency_del(context, d, "depended_by", dependent)
_action_dependency_del(context, dependent, "depends_on", depended)
return
# Only dependent can be a list now, convert it to a list if it is not a list
if not isinstance(dependent, list): # e.g. B,C,D depend on A
dependents = [dependent]
else:
dependents = dependent
session = get_session()
with session.begin():
_action_dependency_del(context, depended, "depended_by", dependent)
for d in dependents:
_action_dependency_del(context, d, "depends_on", depended)
return
def action_mark_succeeded(context, action_id):
@ -777,27 +844,26 @@ def action_mark_succeeded(context, action_id):
raise exception.NotFound(
_('Action with id "%s" not found') % action_id)
session = query.session
session.begin()
session = get_session()
with session.begin():
action.status = ACTION_SUCCEEDED
for a in action.depended_by:
action_del_depends_on(context, a, action_id)
for a in action.depended_by['l']:
_action_dependency_del(context, a, 'depends_on', action_id)
action.depended_by = {'l':[]}
action.depended_by = []
session.commit()
return action
def action_mark_failed(context, action_id):
#TODO(liuh): Failed processing to be added
#TODO(liuh): Need mark all actions depending on it failed
pass
def action_mark_cancelled(context, action_id):
#TODO(liuh): Cancel processing to be added
#TODO(liuh): Need mark all actions depending on it being cancelled
pass
@ -814,6 +880,18 @@ def action_start_work_on(context, action_id, owner):
return action
def action_delete(context, action_id, force=False):
action = action_get(context, action_id)
if not action:
msg = _('Attempt to delete a action with id "%s" that does not'
' exist') % action_id
raise exception.NotFound(msg)
# TODO(liuh): Need check if and how an action can be safety deleted
action.delete()
# Utils
def db_sync(engine, version=None):
"""Migrate the database to `version` or the most recent version."""

View File

@ -123,3 +123,25 @@ def parse_policy(policy_str):
# Construct a policy object based on the type specified
return data
def parse_action(action):
'''
Parse and validate the specified string as a action.
'''
if not isinstance(action, six.string_types):
# TODO(Qiming): Throw exception
return None
data = {}
try:
data = yaml.load(action, Loader=Loader)
except Exception as ex:
# TODO(Qiming): Throw exception
LOG.error(_LE('Failed parsing given data as YAML: %s'),
six.text_type(ex))
return None
# TODO(Qiming): Construct a action object based on the type specified
return data

View File

@ -525,6 +525,7 @@ class PollingTaskGroup(object):
for r in runners:
r.cancel()
def notify():
# TODO(Yanyan): Check if workers are available to pick actions to
# execute

View File

@ -42,6 +42,20 @@ sample_policy = '''
pause_time: PT10M
'''
sample_action = '''
name: test_cluster_create_action
target: cluster_001
action: create
cause: User Initiate
timeout: 60
status: INIT
status_reason: Just Initialized
inputs:
min_size: 1
max_size: 10
pause_time: PT10M
'''
UUIDs = (UUID1, UUID2, UUID3) = sorted([str(uuid.uuid4())
for x in range(3)])
@ -124,12 +138,12 @@ def create_action(ctx, **kwargs):
'target': kwargs.get('target'),
'action': kwargs.get('action'),
'cause': 'Reason for action',
'owner': kwarge.get('owner'),
'owner': kwargs.get('owner'),
'interval': -1,
'inputs': {'key': 'value'},
'outputs': {'result': 'value'}
'depends_on': [],
'depended_on': []
'outputs': {'result': 'value'},
'depends_on': {'l': []},
'depended_by': {'l': []}
}
values.update(kwargs)
return db_api.action_create(ctx, values)

View File

@ -0,0 +1,304 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from senlin.common import exception
from senlin.db.sqlalchemy import api as db_api
from senlin.engine import parser
from senlin.tests.common import base
from senlin.tests.common import utils
from senlin.tests.db import shared
from senlin.openstack.common import log as logging
def _create_action(context, action=shared.sample_action, **kwargs):
data = parser.parse_action(action)
data.update(kwargs)
return db_api.action_create(context, data)
class DBAPIActionTest(base.SenlinTestCase):
def setUp(self):
super(DBAPIActionTest, self).setUp()
self.ctx = utils.dummy_context()
def test_action_create(self):
data = parser.parse_action(shared.sample_action)
action = db_api.action_create(self.ctx, data)
self.assertIsNotNone(action)
self.assertEqual(data['name'], action.name)
self.assertEqual(data['target'], action.target)
self.assertEqual(data['action'], action.action)
self.assertEqual(data['cause'], action.cause)
self.assertEqual(data['timeout'], action.timeout)
self.assertEqual(data['status'], action.status)
self.assertEqual(data['status_reason'], action.status_reason)
self.assertEqual(10, action.inputs['max_size'])
self.assertIsNone(action.outputs)
def test_action_get(self):
data = parser.parse_action(shared.sample_action)
action = _create_action(self.ctx)
retobj = db_api.action_get(self.ctx, action.id)
self.assertIsNotNone(retobj)
self.assertEqual(data['name'], retobj.name)
self.assertEqual(data['target'], retobj.target)
self.assertEqual(data['action'], retobj.action)
self.assertEqual(data['cause'], retobj.cause)
self.assertEqual(data['timeout'], retobj.timeout)
self.assertEqual(data['status'], retobj.status)
self.assertEqual(data['status_reason'], retobj.status_reason)
self.assertEqual(10, retobj.inputs['max_size'])
self.assertIsNone(retobj.outputs)
def test_action_get_1st_ready(self):
specs = [
{'name': 'action_001', 'status': 'INIT'},
{'name': 'action_002', 'status': 'READY'},
{'name': 'action_003', 'status': 'INIT'},
{'name': 'action_004', 'status': 'READY'}
]
for spec in specs:
_create_action(self.ctx,
action=shared.sample_action,
**spec)
action = db_api.action_get_1st_ready(self.ctx)
self.assertTrue(action.name in ['action_002', 'action_004'])
def test_action_get_all_ready(self):
specs = [
{'name': 'action_001', 'status': 'INIT'},
{'name': 'action_002', 'status': 'READY'},
{'name': 'action_003', 'status': 'INIT'},
{'name': 'action_004', 'status': 'READY'}
]
for spec in specs:
_create_action(self.ctx,
action=shared.sample_action,
**spec)
actions = db_api.action_get_all_ready(self.ctx)
self.assertEqual(2, len(actions))
names = [p.name for p in actions]
for spec in ['action_002', 'action_004']:
self.assertIn(spec, names)
def test_action_get_all_by_owner(self):
specs = [
{'name': 'action_001', 'owner': 'work1'},
{'name': 'action_002', 'owner': 'work2'},
{'name': 'action_003', 'owner': 'work1'},
{'name': 'action_004', 'owner': 'work3'}
]
for spec in specs:
_create_action(self.ctx,
action=shared.sample_action,
**spec)
actions = db_api.action_get_all_by_owner(self.ctx, 'work1')
self.assertEqual(2, len(actions))
names = [p.name for p in actions]
for spec in ['action_001', 'action_003']:
self.assertIn(spec, names)
def test_action_get_all(self):
specs = [
{'name': 'action_001', 'target': 'cluster_001'},
{'name': 'action_002', 'target': 'node_001'},
]
for spec in specs:
_create_action(self.ctx,
action=shared.sample_action,
**spec)
actions = db_api.action_get_all(self.ctx)
self.assertEqual(2, len(actions))
names = [p.name for p in actions]
for spec in specs:
self.assertIn(spec['name'], names)
def _check_action_add_dependency_depended_list(self):
specs = [
{'name': 'action_001', 'target': 'cluster_001'},
{'name': 'action_002', 'target': 'node_001'},
{'name': 'action_003', 'target': 'node_002'},
{'name': 'action_004', 'target': 'node_003'},
]
id_of = {}
for spec in specs:
action = _create_action(self.ctx,
action=shared.sample_action,
**spec)
id_of[spec['name']] = action.id
db_api.action_add_dependency(self.ctx,
id_of['action_001'],
[id_of['action_002'],
id_of['action_003'],
id_of['action_004']])
action = db_api.action_get(self.ctx, id_of['action_001'])
l = action.depended_by['l']
self.assertEqual(3, len(l))
self.assertIn(id_of['action_002'], l)
self.assertIn(id_of['action_003'], l)
self.assertIn(id_of['action_004'], l)
self.assertIsNone(action.depends_on)
for id in [id_of['action_002'],
id_of['action_003'],
id_of['action_004']]:
action = db_api.action_get(self.ctx, id)
l = action.depends_on['l']
self.assertEqual(1, len(l))
self.assertIn(id_of['action_001'], l)
self.assertIsNone(action.depended_by)
self.assertEqual(action.status, db_api.ACTION_WAITING)
return id_of
def _check_action_add_dependency_dependent_list(self):
specs = [
{'name': 'action_001', 'target': 'cluster_001'},
{'name': 'action_002', 'target': 'node_001'},
{'name': 'action_003', 'target': 'node_002'},
{'name': 'action_004', 'target': 'node_003'},
]
id_of = {}
for spec in specs:
action = _create_action(self.ctx,
action=shared.sample_action,
**spec)
id_of[spec['name']] = action.id
db_api.action_add_dependency(self.ctx,
[id_of['action_002'],
id_of['action_003'],
id_of['action_004']],
id_of['action_001'])
action = db_api.action_get(self.ctx, id_of['action_001'])
l = action.depends_on['l']
self.assertEqual(3, len(l))
self.assertIn(id_of['action_002'], l)
self.assertIn(id_of['action_003'], l)
self.assertIn(id_of['action_004'], l)
self.assertIsNone(action.depended_by)
self.assertEqual(action.status, db_api.ACTION_WAITING)
for id in [id_of['action_002'],
id_of['action_003'],
id_of['action_004']]:
action = db_api.action_get(self.ctx, id)
l = action.depended_by['l']
self.assertEqual(1, len(l))
self.assertIn(id_of['action_001'], l)
self.assertIsNone(action.depends_on)
return id_of
def test_action_add_dependency_depended_list(self):
self._check_action_add_dependency_depended_list()
def test_action_add_dependency_dependent_list(self):
self._check_action_add_dependency_dependent_list()
def test_action_del_dependency_depended_list(self):
id_of = self._check_action_add_dependency_depended_list()
def test_action_del_dependency_dependent_list(self):
id_of = self._check_action_add_dependency_dependent_list()
db_api.action_del_dependency(self.ctx,
[id_of['action_002'],
id_of['action_003'],
id_of['action_004']],
id_of['action_001'])
action = db_api.action_get(self.ctx, id_of['action_001'])
self.assertEqual(0, len(action.depends_on['l']))
self.assertEqual(action.status, db_api.ACTION_READY)
for id in [id_of['action_002'],
id_of['action_003'],
id_of['action_004']]:
action = db_api.action_get(self.ctx, id)
self.assertEqual(0, len(action.depended_by['l']))
def test_action_del_dependency_depended_list(self):
id_of = self._check_action_add_dependency_depended_list()
db_api.action_del_dependency(self.ctx,
id_of['action_001'],
[id_of['action_002'],
id_of['action_003'],
id_of['action_004']])
action = db_api.action_get(self.ctx, id_of['action_001'])
self.assertEqual(0, len(action.depended_by['l']))
for id in [id_of['action_002'],
id_of['action_003'],
id_of['action_004']]:
action = db_api.action_get(self.ctx, id)
self.assertEqual(0, len(action.depends_on['l']))
self.assertEqual(action.status, db_api.ACTION_READY)
def test_action_mark_succeeded(self):
id_of = self._check_action_add_dependency_depended_list()
db_api.action_mark_succeeded(self.ctx, id_of['action_001'])
action = db_api.action_get(self.ctx, id_of['action_001'])
self.assertEqual(0, len(action.depended_by['l']))
self.assertEqual(action.status, db_api.ACTION_SUCCEEDED)
for id in [id_of['action_002'],
id_of['action_003'],
id_of['action_004']]:
action = db_api.action_get(self.ctx, id)
self.assertEqual(0, len(action.depends_on['l']))
def test_action_start_work_on(self):
action = _create_action(self.ctx)
action = db_api.action_start_work_on(self.ctx, action.id, 'worker1')
self.assertEqual(action.owner, 'worker1')
self.assertEqual(action.status, db_api.ACTION_RUNNING)
def test_action_delete(self):
action = _create_action(self.ctx)
self.assertIsNotNone(action)
action_id = action.id
db_api.action_delete(self.ctx, action.id)
self.assertRaises(exception.NotFound, db_api.action_get,
self.ctx, action_id)