Add atom intentions for tasks and retries
Add atom intentions: REVERT, EXECUTE, RETRY and IGNORE. Intentions will be used by action engine to schedule tasks correctly. Add intention to task detail and extend storage to work with atom intentions. Add alembic migration to add intentions column to database. Change-Id: I79c9bb5f11861658dbfedfd64ef93eb92b29cb2d
This commit is contained in:
@@ -456,6 +456,7 @@ def _convert_td_to_internal(td, parent_uuid):
|
||||
results = persistence_utils.encode_retry_results(results)
|
||||
return models.TaskDetail(name=td.name, uuid=td.uuid,
|
||||
atom_type=td.atom_type,
|
||||
intention=td.intention,
|
||||
state=td.state, results=results,
|
||||
failure=td.failure, meta=td.meta,
|
||||
version=td.version, parent_uuid=parent_uuid)
|
||||
@@ -471,6 +472,7 @@ def _convert_td_to_external(td):
|
||||
atom_cls = logbook.get_atom_detail_class(td.atom_type)
|
||||
td_c = atom_cls(td.name, uuid=td.uuid)
|
||||
td_c.state = td.state
|
||||
td_c.intention = td.intention
|
||||
td_c.results = results
|
||||
td_c.failure = td.failure
|
||||
td_c.meta = td.meta
|
||||
|
||||
@@ -0,0 +1,39 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
|
||||
# Copyright (C) 2012-2013 Yahoo! Inc. All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
# revision identifiers, used by Alembic.
|
||||
revision = '14b227d79a87'
|
||||
down_revision = '84d6e888850'
|
||||
|
||||
from alembic import op
|
||||
import sqlalchemy as sa
|
||||
from taskflow import states
|
||||
|
||||
|
||||
def upgrade():
|
||||
bind = op.get_bind()
|
||||
intention_type = sa.Enum(*states.INTENTIONS, name='intention_type')
|
||||
column = sa.Column('intention', intention_type,
|
||||
server_default=states.EXECUTE)
|
||||
impl = intention_type.dialect_impl(bind.dialect)
|
||||
impl.create(bind, checkfirst=True)
|
||||
op.add_column('taskdetails', column)
|
||||
|
||||
|
||||
def downgrade():
|
||||
op.drop_column('taskdetails', 'intention')
|
||||
@@ -27,6 +27,7 @@ from taskflow.openstack.common import timeutils
|
||||
from taskflow.openstack.common import uuidutils
|
||||
|
||||
from taskflow.persistence import logbook
|
||||
from taskflow import states
|
||||
from taskflow.utils import persistence_utils
|
||||
|
||||
BASE = declarative_base()
|
||||
@@ -110,6 +111,7 @@ class TaskDetail(BASE, ModelBase):
|
||||
atom_type = Column(Enum(*logbook.ATOM_TYPES, name='atom_types'))
|
||||
# Member variables
|
||||
state = Column(String)
|
||||
intention = Column(Enum(*states.INTENTIONS, name='intentions'))
|
||||
results = Column(Json)
|
||||
failure = Column(Failure)
|
||||
version = Column(String)
|
||||
|
||||
@@ -21,6 +21,7 @@ import abc
|
||||
import six
|
||||
|
||||
from taskflow.openstack.common import uuidutils
|
||||
from taskflow import states
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
@@ -155,6 +156,8 @@ class AtomDetail(object):
|
||||
#
|
||||
# The state the atom was last in.
|
||||
self.state = None
|
||||
# The intention of action that would be applied to the atom.
|
||||
self.intention = states.EXECUTE
|
||||
# The results it may have produced (useful for reverting).
|
||||
self.results = None
|
||||
# An Failure object that holds exception the atom may have thrown
|
||||
@@ -174,6 +177,7 @@ class AtomDetail(object):
|
||||
if td is self:
|
||||
return
|
||||
self.state = td.state
|
||||
self.intention = td.intention
|
||||
self.meta = td.meta
|
||||
self.failure = td.failure
|
||||
self.results = td.results
|
||||
|
||||
@@ -39,6 +39,13 @@ REVERTED = REVERTED
|
||||
REVERTING = REVERTING
|
||||
SUCCESS = SUCCESS
|
||||
|
||||
# Atom intentions.
|
||||
EXECUTE = 'EXECUTE'
|
||||
IGNORE = 'IGNORE'
|
||||
REVERT = 'REVERT'
|
||||
RETRY = 'RETRY'
|
||||
INTENTIONS = [EXECUTE, IGNORE, REVERT, RETRY]
|
||||
|
||||
# TODO(harlowja): use when we can timeout tasks??
|
||||
TIMED_OUT = 'TIMED_OUT'
|
||||
|
||||
|
||||
@@ -202,6 +202,16 @@ class Storage(object):
|
||||
td = self._taskdetail_by_name(task_name)
|
||||
return td.state
|
||||
|
||||
def set_atom_intention(self, atom_name, intention):
|
||||
"""Set intention for atom with given name."""
|
||||
td = self._taskdetail_by_name(atom_name)
|
||||
td.intention = intention
|
||||
self._with_connection(self._save_task_detail, task_detail=td)
|
||||
|
||||
def get_atom_intention(self, atom_name):
|
||||
"""Get intention of atom with given name."""
|
||||
return self._taskdetail_by_name(atom_name).intention
|
||||
|
||||
def get_tasks_states(self, task_names):
|
||||
"""Gets all task states."""
|
||||
with self._lock.read_lock():
|
||||
|
||||
@@ -19,6 +19,7 @@ import contextlib
|
||||
from taskflow import exceptions as exc
|
||||
from taskflow.openstack.common import uuidutils
|
||||
from taskflow.persistence import logbook
|
||||
from taskflow import states
|
||||
from taskflow.utils import misc
|
||||
|
||||
|
||||
@@ -222,6 +223,7 @@ class PersistenceTestMixin(object):
|
||||
self.assertIsNot(td2, None)
|
||||
self.assertEqual(td2.name, 'detail-1')
|
||||
self.assertEqual(td2.version, '4.2')
|
||||
self.assertEqual(td2.intention, states.EXECUTE)
|
||||
|
||||
def test_logbook_delete(self):
|
||||
lb_id = uuidutils.generate_uuid()
|
||||
@@ -245,6 +247,7 @@ class PersistenceTestMixin(object):
|
||||
fd = logbook.FlowDetail('test', uuid=uuidutils.generate_uuid())
|
||||
lb.add(fd)
|
||||
td = logbook.RetryDetail("detail-1", uuid=uuidutils.generate_uuid())
|
||||
td.intention = states.REVERT
|
||||
fd.add(td)
|
||||
|
||||
with contextlib.closing(self._get_connection()) as conn:
|
||||
@@ -257,6 +260,7 @@ class PersistenceTestMixin(object):
|
||||
fd2 = lb2.find(fd.uuid)
|
||||
td2 = fd2.find(td.uuid)
|
||||
self.assertEqual(td2.atom_type, logbook.RETRY_DETAIL)
|
||||
self.assertEqual(td2.intention, states.REVERT)
|
||||
|
||||
def test_retry_detail_save_with_task_failure(self):
|
||||
lb_id = uuidutils.generate_uuid()
|
||||
@@ -283,3 +287,30 @@ class PersistenceTestMixin(object):
|
||||
fail2 = td2.results[0][1].get('some-task')
|
||||
self.assertIsInstance(fail2, misc.Failure)
|
||||
self.assertTrue(fail.matches(fail2))
|
||||
|
||||
def test_retry_detail_save_intention(self):
|
||||
lb_id = uuidutils.generate_uuid()
|
||||
lb_name = 'lb-%s' % (lb_id)
|
||||
lb = logbook.LogBook(name=lb_name, uuid=lb_id)
|
||||
fd = logbook.FlowDetail('test', uuid=uuidutils.generate_uuid())
|
||||
lb.add(fd)
|
||||
td = logbook.RetryDetail("retry-1", uuid=uuidutils.generate_uuid())
|
||||
fd.add(td)
|
||||
|
||||
# save it
|
||||
with contextlib.closing(self._get_connection()) as conn:
|
||||
conn.save_logbook(lb)
|
||||
conn.update_flow_details(fd)
|
||||
conn.update_task_details(td)
|
||||
|
||||
# change intention and save
|
||||
td.intention = states.REVERT
|
||||
with contextlib.closing(self._get_connection()) as conn:
|
||||
conn.update_task_details(td)
|
||||
|
||||
# now read it back
|
||||
with contextlib.closing(self._get_connection()) as conn:
|
||||
lb2 = conn.get_logbook(lb_id)
|
||||
fd2 = lb2.find(fd.uuid)
|
||||
td2 = fd2.find(td.uuid)
|
||||
self.assertEqual(td2.intention, states.REVERT)
|
||||
|
||||
@@ -550,3 +550,17 @@ class StorageTest(test.TestCase):
|
||||
self.assertRaisesRegexp(TypeError,
|
||||
'Unknown atom type',
|
||||
logbook.get_atom_detail_class, 'some_detail')
|
||||
|
||||
def test_save_task_intention(self):
|
||||
s = self._get_storage()
|
||||
s.ensure_task('my task')
|
||||
s.set_atom_intention('my task', states.REVERT)
|
||||
intention = s.get_atom_intention('my task')
|
||||
self.assertEqual(intention, states.REVERT)
|
||||
|
||||
def test_save_retry_intention(self):
|
||||
s = self._get_storage()
|
||||
s.ensure_retry('my retry')
|
||||
s.set_atom_intention('my retry', states.RETRY)
|
||||
intention = s.get_atom_intention('my retry')
|
||||
self.assertEqual(intention, states.RETRY)
|
||||
|
||||
@@ -111,9 +111,9 @@ def task_details_merge(td_e, td_new, deep_copy=False):
|
||||
return td_e
|
||||
|
||||
copy_fn = _copy_function(deep_copy)
|
||||
if td_e.state != td_new.state:
|
||||
# NOTE(imelnikov): states are just strings, no need to copy.
|
||||
td_e.state = td_new.state
|
||||
# NOTE(imelnikov): states and intentions are just strings, no need to copy.
|
||||
td_e.state = td_new.state
|
||||
td_e.intention = td_new.intention
|
||||
if td_e.results != td_new.results:
|
||||
td_e.results = copy_fn(td_new.results)
|
||||
if td_e.failure != td_new.failure:
|
||||
@@ -313,6 +313,7 @@ def format_task_detail(td):
|
||||
'state': td.state,
|
||||
'version': td.version,
|
||||
'atom_type': td.atom_type,
|
||||
'intention': td.intention,
|
||||
}
|
||||
|
||||
|
||||
@@ -324,8 +325,9 @@ def unformat_task_detail(uuid, td_data):
|
||||
atom_cls = logbook.get_atom_detail_class(td_data['atom_type'])
|
||||
td = atom_cls(name=td_data['name'], uuid=uuid)
|
||||
td.state = td_data.get('state')
|
||||
td.results = results
|
||||
td.failure = failure_from_dict(td_data.get('failure'))
|
||||
td.intention = td_data.get('intention')
|
||||
td.results = results
|
||||
td.meta = td_data.get('meta')
|
||||
td.version = td_data.get('version')
|
||||
return td
|
||||
|
||||
Reference in New Issue
Block a user