diff --git a/Changelog b/Changelog index f574fccd2..28f08c240 100644 --- a/Changelog +++ b/Changelog @@ -9,3 +9,12 @@ 2014-12-29 tengqm * TODO: Added some test cases jobs. + +2015-01-02 liuhang + * TODO: Remove DB action APIs task. + * db/api.py: + add 'action_add_dependency', 'action_del_dependency' + remove dependency api without transaction. + * db/sqlalchemy/api.py: + add 'action_add_dependency', 'action_del_dependency' + remove dependency api without transaction. diff --git a/TODO b/TODO index b21f12c65..8918e2850 100644 --- a/TODO +++ b/TODO @@ -4,8 +4,6 @@ High Priority DB -- - - Add action APIs - - Revise action add/remove dependencies APIs - Make sure cluster-policy association is deleted when a cluster is deleted - Add field size to cluster table - Modify node_set_status to check/update cluster status diff --git a/senlin/db/api.py b/senlin/db/api.py index 1468d043b..c07b7368c 100644 --- a/senlin/db/api.py +++ b/senlin/db/api.py @@ -252,20 +252,12 @@ def action_get_all(context): return IMPL.action_get_all(context) -def action_add_depends_on(context, action_id, *actions): - return IMPL.action_add_depends_on(context, action_id, *actions) +def action_add_dependency(context, depended, dependent): + return IMPL.action_add_dependency(context, depended, dependent) - -def action_del_depends_on(context, action_id, *actions): - return IMPL.action_del_depends_on(context, action_id, *actions) - - -def action_add_depended_by(context, action_id, *actions): - return IMPL.action_add_depended_by(context, action_id, *actions) - - -def action_del_depended_by(context, action_id, *actions): - return IMPL.action_del_depended_by(context, action_id, *actions) + +def action_del_dependency(context, depended, dependent): + return IMPL.action_del_dependency(context, depended, dependent) def action_mark_succeeded(context, action_id): @@ -284,6 +276,10 @@ def action_start_work_on(context, action_id, owner): return IMPL.action_start_work_on(context, action_id, owner) +def action_delete(context, action_id, force=False): + return IMPL.action_delete(context, action_id, force) + + def db_sync(engine, version=None): """Migrate the database to `version` or the most recent version.""" return IMPL.db_sync(engine, version=version) diff --git a/senlin/db/sqlalchemy/api.py b/senlin/db/sqlalchemy/api.py index 15c5db1d3..628a31aca 100644 --- a/senlin/db/sqlalchemy/api.py +++ b/senlin/db/sqlalchemy/api.py @@ -27,8 +27,12 @@ from senlin.common.i18n import _ from senlin.db.sqlalchemy import filters as db_filters from senlin.db.sqlalchemy import migration from senlin.db.sqlalchemy import models +from senlin.openstack.common import log as logging from senlin.rpc import api as rpc_api +LOG = logging.getLogger(__name__) + + CONF = cfg.CONF CONF.import_opt('max_events_per_cluster', 'senlin.common.config') @@ -707,12 +711,14 @@ def action_get_1st_ready(context): def action_get_all_ready(context): - query = model_query(context, models.Action) + query = model_query(context, models.Action).\ + filter_by(status=ACTION_READY) return query.all() def action_get_all_by_owner(context, owner_id): - query = model_query(context, models.Action).filter_by(owner=owner_id) + query = model_query(context, models.Action).\ + filter_by(owner=owner_id) return query.all() @@ -724,50 +730,111 @@ def action_get_all(context): return actions -def action_add_depends_on(context, action_id, *actions): +def _action_dependency_add(context, action_id, field, adds): + if not isinstance(adds, list): + add_list = [adds] + else: + add_list = adds + action = model_query(context, models.Action).get(action_id) if not action: - raise exception.NotFound( - _('Action with id "%s" not found') % action_id) + msg = _('Action with id "%s" not found') % action_id + raise exception.NotFound(msg) - action.depends_on = list(set(actions).union(set(action.depends_on))) - # TODO(liuh): Set status to WAITING if 'depends_on' is not empty - action.save(_session(context)) - return action + d = {} + if action[field] is None: + d['l'] = add_list; + else: + d = action[field] + d['l'] = list(set(d['l']).union(set(add_list))) + action[field] = d + if field == 'depends_on': + action.status = ACTION_WAITING + action.status_reason = ACTION_WAITING + action.status_reason = _('The action is waiting for its dependancy \ + being completed.') + + +def _action_dependency_del(context, action_id, field, dels): + if not isinstance(dels, list): + del_list = [dels] + else: + del_list = dels -def action_del_depends_on(context, action_id, *actions): action = model_query(context, models.Action).get(action_id) if not action: - raise exception.NotFound( - _('Action with id "%s" not found') % action_id) + msg = _('Action with id "%s" not found') % action_id + raise exception.NotFound(msg) - action.depends_on = list(set(action.depends_on).different(set(actions))) - # TODO(liuh): Set status to READY if 'depends_on' is empty - action.save(_session(context)) - return action + d = {} + if action[field] is not None: + d = action[field] + d['l'] = list(set(d['l']) - set(del_list)) + action[field] = d + + if field == 'depends_on' and len(d['l']) == 0: + action.status = ACTION_READY + action.status_reason = _('The action becomes ready due to all dependancies \ + have been satisfied.') -def action_add_depended_by(context, action_id, *actions): - action = model_query(context, models.Action).get(action_id) - if not action: - raise exception.NotFound( - _('Action with id "%s" not found') % action_id) +def action_add_dependency(context, depended, dependent): + if isinstance(depended, list) and isinstance(dependent, list): + raise exception.NotSupport( + _('Multiple dependencies between lists not support')) - action.depended_by = list(set(actions).union(set(action.depended_by))) - action.save(_session(context)) - return action + if isinstance(depended, list): # e.g. D depends on A,B,C + session = get_session() + with session.begin(): + for d in depended: + _action_dependency_add(context, d, "depended_by", dependent) + _action_dependency_add(context, dependent, "depends_on", depended) + return -def action_del_depended_by(context, action_id, *actions): - action = model_query(context, models.Action).get(action_id) - if not action: - raise exception.NotFound( - _('Action with id "%s" not found') % action_id) + # Only dependent can be a list now, convert it to a list if it is not a list + if not isinstance(dependent, list): # e.g. B,C,D depend on A + dependents = [dependent] + else: + dependents = dependent - action.depended_by = list(set(action.depended_by).different(set(actions))) - action.save(_session(context)) - return action + session = get_session() + with session.begin(): + _action_dependency_add(context, depended, "depended_by", dependent) + + for d in dependents: + _action_dependency_add(context, d, "depends_on", depended) + return + + +def action_del_dependency(context, depended, dependent): + if isinstance(depended, list) and isinstance(dependent, list): + raise exception.NotSupport( + _('Multiple dependencies between lists not support')) + + if isinstance(depended, list): # e.g. D depends on A,B,C + session = get_session() + with session.begin(): + for d in depended: + _action_dependency_del(context, d, "depended_by", dependent) + + _action_dependency_del(context, dependent, "depends_on", depended) + return + + # Only dependent can be a list now, convert it to a list if it is not a list + if not isinstance(dependent, list): # e.g. B,C,D depend on A + dependents = [dependent] + else: + dependents = dependent + + session = get_session() + with session.begin(): + _action_dependency_del(context, depended, "depended_by", dependent) + + for d in dependents: + _action_dependency_del(context, d, "depends_on", depended) + return def action_mark_succeeded(context, action_id): @@ -777,27 +844,26 @@ def action_mark_succeeded(context, action_id): raise exception.NotFound( _('Action with id "%s" not found') % action_id) - session = query.session - session.begin() + session = get_session() + with session.begin(): + action.status = ACTION_SUCCEEDED - action.status = ACTION_SUCCEEDED + for a in action.depended_by['l']: + _action_dependency_del(context, a, 'depends_on', action_id) + action.depended_by = {'l':[]} - for a in action.depended_by: - action_del_depends_on(context, a, action_id) - - action.depended_by = [] - - session.commit() return action def action_mark_failed(context, action_id): #TODO(liuh): Failed processing to be added + #TODO(liuh): Need mark all actions depending on it failed pass def action_mark_cancelled(context, action_id): #TODO(liuh): Cancel processing to be added + #TODO(liuh): Need mark all actions depending on it being cancelled pass @@ -814,6 +880,18 @@ def action_start_work_on(context, action_id, owner): return action +def action_delete(context, action_id, force=False): + action = action_get(context, action_id) + + if not action: + msg = _('Attempt to delete a action with id "%s" that does not' + ' exist') % action_id + raise exception.NotFound(msg) + + # TODO(liuh): Need check if and how an action can be safety deleted + action.delete() + + # Utils def db_sync(engine, version=None): """Migrate the database to `version` or the most recent version.""" diff --git a/senlin/engine/parser.py b/senlin/engine/parser.py index d847f85b8..5b8438493 100644 --- a/senlin/engine/parser.py +++ b/senlin/engine/parser.py @@ -123,3 +123,25 @@ def parse_policy(policy_str): # Construct a policy object based on the type specified return data + + +def parse_action(action): + ''' + Parse and validate the specified string as a action. + ''' + if not isinstance(action, six.string_types): + # TODO(Qiming): Throw exception + return None + + data = {} + try: + data = yaml.load(action, Loader=Loader) + except Exception as ex: + # TODO(Qiming): Throw exception + LOG.error(_LE('Failed parsing given data as YAML: %s'), + six.text_type(ex)) + return None + + # TODO(Qiming): Construct a action object based on the type specified + + return data diff --git a/senlin/engine/scheduler.py b/senlin/engine/scheduler.py index c66211dc8..4206422a3 100644 --- a/senlin/engine/scheduler.py +++ b/senlin/engine/scheduler.py @@ -525,6 +525,7 @@ class PollingTaskGroup(object): for r in runners: r.cancel() + def notify(): # TODO(Yanyan): Check if workers are available to pick actions to # execute diff --git a/senlin/tests/db/shared.py b/senlin/tests/db/shared.py index 0687d3357..af28f9f0a 100644 --- a/senlin/tests/db/shared.py +++ b/senlin/tests/db/shared.py @@ -42,6 +42,20 @@ sample_policy = ''' pause_time: PT10M ''' +sample_action = ''' + name: test_cluster_create_action + target: cluster_001 + action: create + cause: User Initiate + timeout: 60 + status: INIT + status_reason: Just Initialized + inputs: + min_size: 1 + max_size: 10 + pause_time: PT10M +''' + UUIDs = (UUID1, UUID2, UUID3) = sorted([str(uuid.uuid4()) for x in range(3)]) @@ -124,12 +138,12 @@ def create_action(ctx, **kwargs): 'target': kwargs.get('target'), 'action': kwargs.get('action'), 'cause': 'Reason for action', - 'owner': kwarge.get('owner'), + 'owner': kwargs.get('owner'), 'interval': -1, 'inputs': {'key': 'value'}, - 'outputs': {'result': 'value'} - 'depends_on': [], - 'depended_on': [] + 'outputs': {'result': 'value'}, + 'depends_on': {'l': []}, + 'depended_by': {'l': []} } values.update(kwargs) return db_api.action_create(ctx, values) diff --git a/senlin/tests/db/test_action_api.py b/senlin/tests/db/test_action_api.py new file mode 100644 index 000000000..a0bd76809 --- /dev/null +++ b/senlin/tests/db/test_action_api.py @@ -0,0 +1,304 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from senlin.common import exception +from senlin.db.sqlalchemy import api as db_api +from senlin.engine import parser +from senlin.tests.common import base +from senlin.tests.common import utils +from senlin.tests.db import shared +from senlin.openstack.common import log as logging + +def _create_action(context, action=shared.sample_action, **kwargs): + data = parser.parse_action(action) + data.update(kwargs) + return db_api.action_create(context, data) + + +class DBAPIActionTest(base.SenlinTestCase): + def setUp(self): + super(DBAPIActionTest, self).setUp() + self.ctx = utils.dummy_context() + + + def test_action_create(self): + data = parser.parse_action(shared.sample_action) + action = db_api.action_create(self.ctx, data) + + self.assertIsNotNone(action) + self.assertEqual(data['name'], action.name) + self.assertEqual(data['target'], action.target) + self.assertEqual(data['action'], action.action) + self.assertEqual(data['cause'], action.cause) + self.assertEqual(data['timeout'], action.timeout) + self.assertEqual(data['status'], action.status) + self.assertEqual(data['status_reason'], action.status_reason) + self.assertEqual(10, action.inputs['max_size']) + self.assertIsNone(action.outputs) + + + def test_action_get(self): + data = parser.parse_action(shared.sample_action) + action = _create_action(self.ctx) + retobj = db_api.action_get(self.ctx, action.id) + + self.assertIsNotNone(retobj) + self.assertEqual(data['name'], retobj.name) + self.assertEqual(data['target'], retobj.target) + self.assertEqual(data['action'], retobj.action) + self.assertEqual(data['cause'], retobj.cause) + self.assertEqual(data['timeout'], retobj.timeout) + self.assertEqual(data['status'], retobj.status) + self.assertEqual(data['status_reason'], retobj.status_reason) + self.assertEqual(10, retobj.inputs['max_size']) + self.assertIsNone(retobj.outputs) + + + def test_action_get_1st_ready(self): + specs = [ + {'name': 'action_001', 'status': 'INIT'}, + {'name': 'action_002', 'status': 'READY'}, + {'name': 'action_003', 'status': 'INIT'}, + {'name': 'action_004', 'status': 'READY'} + ] + + for spec in specs: + _create_action(self.ctx, + action=shared.sample_action, + **spec) + + action = db_api.action_get_1st_ready(self.ctx) + self.assertTrue(action.name in ['action_002', 'action_004']) + + + def test_action_get_all_ready(self): + specs = [ + {'name': 'action_001', 'status': 'INIT'}, + {'name': 'action_002', 'status': 'READY'}, + {'name': 'action_003', 'status': 'INIT'}, + {'name': 'action_004', 'status': 'READY'} + ] + + for spec in specs: + _create_action(self.ctx, + action=shared.sample_action, + **spec) + + actions = db_api.action_get_all_ready(self.ctx) + self.assertEqual(2, len(actions)) + names = [p.name for p in actions] + for spec in ['action_002', 'action_004']: + self.assertIn(spec, names) + + + def test_action_get_all_by_owner(self): + specs = [ + {'name': 'action_001', 'owner': 'work1'}, + {'name': 'action_002', 'owner': 'work2'}, + {'name': 'action_003', 'owner': 'work1'}, + {'name': 'action_004', 'owner': 'work3'} + ] + + for spec in specs: + _create_action(self.ctx, + action=shared.sample_action, + **spec) + + actions = db_api.action_get_all_by_owner(self.ctx, 'work1') + self.assertEqual(2, len(actions)) + names = [p.name for p in actions] + for spec in ['action_001', 'action_003']: + self.assertIn(spec, names) + + + def test_action_get_all(self): + specs = [ + {'name': 'action_001', 'target': 'cluster_001'}, + {'name': 'action_002', 'target': 'node_001'}, + ] + + for spec in specs: + _create_action(self.ctx, + action=shared.sample_action, + **spec) + + actions = db_api.action_get_all(self.ctx) + self.assertEqual(2, len(actions)) + names = [p.name for p in actions] + for spec in specs: + self.assertIn(spec['name'], names) + + + def _check_action_add_dependency_depended_list(self): + specs = [ + {'name': 'action_001', 'target': 'cluster_001'}, + {'name': 'action_002', 'target': 'node_001'}, + {'name': 'action_003', 'target': 'node_002'}, + {'name': 'action_004', 'target': 'node_003'}, + ] + + id_of = {} + for spec in specs: + action = _create_action(self.ctx, + action=shared.sample_action, + **spec) + id_of[spec['name']] = action.id + + db_api.action_add_dependency(self.ctx, + id_of['action_001'], + [id_of['action_002'], + id_of['action_003'], + id_of['action_004']]) + + action = db_api.action_get(self.ctx, id_of['action_001']) + l = action.depended_by['l'] + self.assertEqual(3, len(l)) + self.assertIn(id_of['action_002'], l) + self.assertIn(id_of['action_003'], l) + self.assertIn(id_of['action_004'], l) + self.assertIsNone(action.depends_on) + + for id in [id_of['action_002'], + id_of['action_003'], + id_of['action_004']]: + action = db_api.action_get(self.ctx, id) + l = action.depends_on['l'] + self.assertEqual(1, len(l)) + self.assertIn(id_of['action_001'], l) + self.assertIsNone(action.depended_by) + self.assertEqual(action.status, db_api.ACTION_WAITING) + return id_of + + + def _check_action_add_dependency_dependent_list(self): + specs = [ + {'name': 'action_001', 'target': 'cluster_001'}, + {'name': 'action_002', 'target': 'node_001'}, + {'name': 'action_003', 'target': 'node_002'}, + {'name': 'action_004', 'target': 'node_003'}, + ] + + id_of = {} + for spec in specs: + action = _create_action(self.ctx, + action=shared.sample_action, + **spec) + id_of[spec['name']] = action.id + + db_api.action_add_dependency(self.ctx, + [id_of['action_002'], + id_of['action_003'], + id_of['action_004']], + id_of['action_001']) + + action = db_api.action_get(self.ctx, id_of['action_001']) + l = action.depends_on['l'] + self.assertEqual(3, len(l)) + self.assertIn(id_of['action_002'], l) + self.assertIn(id_of['action_003'], l) + self.assertIn(id_of['action_004'], l) + self.assertIsNone(action.depended_by) + self.assertEqual(action.status, db_api.ACTION_WAITING) + + for id in [id_of['action_002'], + id_of['action_003'], + id_of['action_004']]: + action = db_api.action_get(self.ctx, id) + l = action.depended_by['l'] + self.assertEqual(1, len(l)) + self.assertIn(id_of['action_001'], l) + self.assertIsNone(action.depends_on) + return id_of + + + def test_action_add_dependency_depended_list(self): + self._check_action_add_dependency_depended_list() + + + def test_action_add_dependency_dependent_list(self): + self._check_action_add_dependency_dependent_list() + + + def test_action_del_dependency_depended_list(self): + id_of = self._check_action_add_dependency_depended_list() + + + def test_action_del_dependency_dependent_list(self): + id_of = self._check_action_add_dependency_dependent_list() + db_api.action_del_dependency(self.ctx, + [id_of['action_002'], + id_of['action_003'], + id_of['action_004']], + id_of['action_001']) + + action = db_api.action_get(self.ctx, id_of['action_001']) + self.assertEqual(0, len(action.depends_on['l'])) + self.assertEqual(action.status, db_api.ACTION_READY) + + for id in [id_of['action_002'], + id_of['action_003'], + id_of['action_004']]: + action = db_api.action_get(self.ctx, id) + self.assertEqual(0, len(action.depended_by['l'])) + + + def test_action_del_dependency_depended_list(self): + id_of = self._check_action_add_dependency_depended_list() + db_api.action_del_dependency(self.ctx, + id_of['action_001'], + [id_of['action_002'], + id_of['action_003'], + id_of['action_004']]) + + action = db_api.action_get(self.ctx, id_of['action_001']) + self.assertEqual(0, len(action.depended_by['l'])) + + for id in [id_of['action_002'], + id_of['action_003'], + id_of['action_004']]: + action = db_api.action_get(self.ctx, id) + self.assertEqual(0, len(action.depends_on['l'])) + self.assertEqual(action.status, db_api.ACTION_READY) + + + def test_action_mark_succeeded(self): + id_of = self._check_action_add_dependency_depended_list() + db_api.action_mark_succeeded(self.ctx, id_of['action_001']) + + action = db_api.action_get(self.ctx, id_of['action_001']) + self.assertEqual(0, len(action.depended_by['l'])) + self.assertEqual(action.status, db_api.ACTION_SUCCEEDED) + + for id in [id_of['action_002'], + id_of['action_003'], + id_of['action_004']]: + action = db_api.action_get(self.ctx, id) + self.assertEqual(0, len(action.depends_on['l'])) + + + def test_action_start_work_on(self): + action = _create_action(self.ctx) + + action = db_api.action_start_work_on(self.ctx, action.id, 'worker1') + + self.assertEqual(action.owner, 'worker1') + self.assertEqual(action.status, db_api.ACTION_RUNNING) + + + def test_action_delete(self): + action = _create_action(self.ctx) + self.assertIsNotNone(action) + action_id = action.id + db_api.action_delete(self.ctx, action.id) + + self.assertRaises(exception.NotFound, db_api.action_get, + self.ctx, action_id)