diff --git a/taskflow/persistence/backends/impl_sqlalchemy.py b/taskflow/persistence/backends/impl_sqlalchemy.py index 8fdf5aec..35395adf 100644 --- a/taskflow/persistence/backends/impl_sqlalchemy.py +++ b/taskflow/persistence/backends/impl_sqlalchemy.py @@ -456,6 +456,7 @@ def _convert_td_to_internal(td, parent_uuid): results = persistence_utils.encode_retry_results(results) return models.TaskDetail(name=td.name, uuid=td.uuid, atom_type=td.atom_type, + intention=td.intention, state=td.state, results=results, failure=td.failure, meta=td.meta, version=td.version, parent_uuid=parent_uuid) @@ -471,6 +472,7 @@ def _convert_td_to_external(td): atom_cls = logbook.get_atom_detail_class(td.atom_type) td_c = atom_cls(td.name, uuid=td.uuid) td_c.state = td.state + td_c.intention = td.intention td_c.results = results td_c.failure = td.failure td_c.meta = td.meta diff --git a/taskflow/persistence/backends/sqlalchemy/alembic/versions/14b227d79a87_add_intention_column.py b/taskflow/persistence/backends/sqlalchemy/alembic/versions/14b227d79a87_add_intention_column.py new file mode 100644 index 00000000..d8311933 --- /dev/null +++ b/taskflow/persistence/backends/sqlalchemy/alembic/versions/14b227d79a87_add_intention_column.py @@ -0,0 +1,39 @@ +# -*- coding: utf-8 -*- + +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright (C) 2012-2013 Yahoo! Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +# revision identifiers, used by Alembic. +revision = '14b227d79a87' +down_revision = '84d6e888850' + +from alembic import op +import sqlalchemy as sa +from taskflow import states + + +def upgrade(): + bind = op.get_bind() + intention_type = sa.Enum(*states.INTENTIONS, name='intention_type') + column = sa.Column('intention', intention_type, + server_default=states.EXECUTE) + impl = intention_type.dialect_impl(bind.dialect) + impl.create(bind, checkfirst=True) + op.add_column('taskdetails', column) + + +def downgrade(): + op.drop_column('taskdetails', 'intention') diff --git a/taskflow/persistence/backends/sqlalchemy/models.py b/taskflow/persistence/backends/sqlalchemy/models.py index 61c2d311..e06e8a39 100644 --- a/taskflow/persistence/backends/sqlalchemy/models.py +++ b/taskflow/persistence/backends/sqlalchemy/models.py @@ -27,6 +27,7 @@ from taskflow.openstack.common import timeutils from taskflow.openstack.common import uuidutils from taskflow.persistence import logbook +from taskflow import states from taskflow.utils import persistence_utils BASE = declarative_base() @@ -110,6 +111,7 @@ class TaskDetail(BASE, ModelBase): atom_type = Column(Enum(*logbook.ATOM_TYPES, name='atom_types')) # Member variables state = Column(String) + intention = Column(Enum(*states.INTENTIONS, name='intentions')) results = Column(Json) failure = Column(Failure) version = Column(String) diff --git a/taskflow/persistence/logbook.py b/taskflow/persistence/logbook.py index 917d9c10..35f38edc 100644 --- a/taskflow/persistence/logbook.py +++ b/taskflow/persistence/logbook.py @@ -21,6 +21,7 @@ import abc import six from taskflow.openstack.common import uuidutils +from taskflow import states LOG = logging.getLogger(__name__) @@ -155,6 +156,8 @@ class AtomDetail(object): # # The state the atom was last in. self.state = None + # The intention of action that would be applied to the atom. + self.intention = states.EXECUTE # The results it may have produced (useful for reverting). self.results = None # An Failure object that holds exception the atom may have thrown @@ -174,6 +177,7 @@ class AtomDetail(object): if td is self: return self.state = td.state + self.intention = td.intention self.meta = td.meta self.failure = td.failure self.results = td.results diff --git a/taskflow/states.py b/taskflow/states.py index 8e7c2270..540319cf 100644 --- a/taskflow/states.py +++ b/taskflow/states.py @@ -39,6 +39,13 @@ REVERTED = REVERTED REVERTING = REVERTING SUCCESS = SUCCESS +# Atom intentions. +EXECUTE = 'EXECUTE' +IGNORE = 'IGNORE' +REVERT = 'REVERT' +RETRY = 'RETRY' +INTENTIONS = [EXECUTE, IGNORE, REVERT, RETRY] + # TODO(harlowja): use when we can timeout tasks?? TIMED_OUT = 'TIMED_OUT' diff --git a/taskflow/storage.py b/taskflow/storage.py index 1b332cfb..a9e943c7 100644 --- a/taskflow/storage.py +++ b/taskflow/storage.py @@ -202,6 +202,16 @@ class Storage(object): td = self._taskdetail_by_name(task_name) return td.state + def set_atom_intention(self, atom_name, intention): + """Set intention for atom with given name.""" + td = self._taskdetail_by_name(atom_name) + td.intention = intention + self._with_connection(self._save_task_detail, task_detail=td) + + def get_atom_intention(self, atom_name): + """Get intention of atom with given name.""" + return self._taskdetail_by_name(atom_name).intention + def get_tasks_states(self, task_names): """Gets all task states.""" with self._lock.read_lock(): diff --git a/taskflow/tests/unit/persistence/base.py b/taskflow/tests/unit/persistence/base.py index 7946c7c0..ccc2b449 100644 --- a/taskflow/tests/unit/persistence/base.py +++ b/taskflow/tests/unit/persistence/base.py @@ -19,6 +19,7 @@ import contextlib from taskflow import exceptions as exc from taskflow.openstack.common import uuidutils from taskflow.persistence import logbook +from taskflow import states from taskflow.utils import misc @@ -222,6 +223,7 @@ class PersistenceTestMixin(object): self.assertIsNot(td2, None) self.assertEqual(td2.name, 'detail-1') self.assertEqual(td2.version, '4.2') + self.assertEqual(td2.intention, states.EXECUTE) def test_logbook_delete(self): lb_id = uuidutils.generate_uuid() @@ -245,6 +247,7 @@ class PersistenceTestMixin(object): fd = logbook.FlowDetail('test', uuid=uuidutils.generate_uuid()) lb.add(fd) td = logbook.RetryDetail("detail-1", uuid=uuidutils.generate_uuid()) + td.intention = states.REVERT fd.add(td) with contextlib.closing(self._get_connection()) as conn: @@ -257,6 +260,7 @@ class PersistenceTestMixin(object): fd2 = lb2.find(fd.uuid) td2 = fd2.find(td.uuid) self.assertEqual(td2.atom_type, logbook.RETRY_DETAIL) + self.assertEqual(td2.intention, states.REVERT) def test_retry_detail_save_with_task_failure(self): lb_id = uuidutils.generate_uuid() @@ -283,3 +287,30 @@ class PersistenceTestMixin(object): fail2 = td2.results[0][1].get('some-task') self.assertIsInstance(fail2, misc.Failure) self.assertTrue(fail.matches(fail2)) + + def test_retry_detail_save_intention(self): + lb_id = uuidutils.generate_uuid() + lb_name = 'lb-%s' % (lb_id) + lb = logbook.LogBook(name=lb_name, uuid=lb_id) + fd = logbook.FlowDetail('test', uuid=uuidutils.generate_uuid()) + lb.add(fd) + td = logbook.RetryDetail("retry-1", uuid=uuidutils.generate_uuid()) + fd.add(td) + + # save it + with contextlib.closing(self._get_connection()) as conn: + conn.save_logbook(lb) + conn.update_flow_details(fd) + conn.update_task_details(td) + + # change intention and save + td.intention = states.REVERT + with contextlib.closing(self._get_connection()) as conn: + conn.update_task_details(td) + + # now read it back + with contextlib.closing(self._get_connection()) as conn: + lb2 = conn.get_logbook(lb_id) + fd2 = lb2.find(fd.uuid) + td2 = fd2.find(td.uuid) + self.assertEqual(td2.intention, states.REVERT) diff --git a/taskflow/tests/unit/test_storage.py b/taskflow/tests/unit/test_storage.py index 74a166f1..3d6bc5be 100644 --- a/taskflow/tests/unit/test_storage.py +++ b/taskflow/tests/unit/test_storage.py @@ -550,3 +550,17 @@ class StorageTest(test.TestCase): self.assertRaisesRegexp(TypeError, 'Unknown atom type', logbook.get_atom_detail_class, 'some_detail') + + def test_save_task_intention(self): + s = self._get_storage() + s.ensure_task('my task') + s.set_atom_intention('my task', states.REVERT) + intention = s.get_atom_intention('my task') + self.assertEqual(intention, states.REVERT) + + def test_save_retry_intention(self): + s = self._get_storage() + s.ensure_retry('my retry') + s.set_atom_intention('my retry', states.RETRY) + intention = s.get_atom_intention('my retry') + self.assertEqual(intention, states.RETRY) diff --git a/taskflow/utils/persistence_utils.py b/taskflow/utils/persistence_utils.py index f17f9774..0f14f383 100644 --- a/taskflow/utils/persistence_utils.py +++ b/taskflow/utils/persistence_utils.py @@ -111,9 +111,9 @@ def task_details_merge(td_e, td_new, deep_copy=False): return td_e copy_fn = _copy_function(deep_copy) - if td_e.state != td_new.state: - # NOTE(imelnikov): states are just strings, no need to copy. - td_e.state = td_new.state + # NOTE(imelnikov): states and intentions are just strings, no need to copy. + td_e.state = td_new.state + td_e.intention = td_new.intention if td_e.results != td_new.results: td_e.results = copy_fn(td_new.results) if td_e.failure != td_new.failure: @@ -313,6 +313,7 @@ def format_task_detail(td): 'state': td.state, 'version': td.version, 'atom_type': td.atom_type, + 'intention': td.intention, } @@ -324,8 +325,9 @@ def unformat_task_detail(uuid, td_data): atom_cls = logbook.get_atom_detail_class(td_data['atom_type']) td = atom_cls(name=td_data['name'], uuid=uuid) td.state = td_data.get('state') - td.results = results td.failure = failure_from_dict(td_data.get('failure')) + td.intention = td_data.get('intention') + td.results = results td.meta = td_data.get('meta') td.version = td_data.get('version') return td