Merge "Add type to tasks API"

This commit is contained in:
Jenkins 2016-11-09 10:55:22 +00:00 committed by Gerrit Code Review
commit 532db4bf63
7 changed files with 104 additions and 4 deletions

View File

@ -276,6 +276,7 @@ class Task(resource.Resource):
id = wtypes.text id = wtypes.text
name = wtypes.text name = wtypes.text
type = wtypes.text
workflow_name = wtypes.text workflow_name = wtypes.text
workflow_id = wtypes.text workflow_id = wtypes.text

View File

@ -0,0 +1,37 @@
# Copyright 2016 OpenStack Foundation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""add type to task execution
Revision ID: 020
Revises: 019
Create Date: 2016-10-05 13:24:52.911011
"""
# revision identifiers, used by Alembic.
revision = '020'
down_revision = '019'
from alembic import op
import sqlalchemy as sa
def upgrade():
op.add_column(
'task_executions_v2',
sa.Column('type', sa.String(length=10), nullable=True)
)

View File

@ -233,6 +233,7 @@ class TaskExecution(Execution):
# Main properties. # Main properties.
action_spec = sa.Column(st.JsonLongDictType()) action_spec = sa.Column(st.JsonLongDictType())
unique_key = sa.Column(sa.String(250), nullable=True) unique_key = sa.Column(sa.String(250), nullable=True)
type = sa.Column(sa.String(10))
# Whether the task is fully processed (publishing and calculating commands # Whether the task is fully processed (publishing and calculating commands
# after it). It allows to simplify workflow controller implementations # after it). It allows to simplify workflow controller implementations

View File

@ -207,6 +207,7 @@ class Task(object):
def _create_task_execution(self, state=states.RUNNING, state_info=None): def _create_task_execution(self, state=states.RUNNING, state_info=None):
task_id = utils.generate_unicode_uuid() task_id = utils.generate_unicode_uuid()
task_name = self.task_spec.get_name() task_name = self.task_spec.get_name()
task_type = self.task_spec.get_type()
data_flow.add_current_task_to_context(self.ctx, task_id, task_name) data_flow.add_current_task_to_context(self.ctx, task_id, task_name)
@ -223,7 +224,8 @@ class Task(object):
'in_context': self.ctx, 'in_context': self.ctx,
'published': {}, 'published': {},
'runtime_context': {}, 'runtime_context': {},
'project_id': self.wf_ex.project_id 'project_id': self.wf_ex.project_id,
'type': task_type
} }
self.task_ex = db_api.create_task_execution(values) self.task_ex = db_api.create_task_execution(values)

View File

@ -0,0 +1,18 @@
---
version: "2.0"
name: wb_with_nested_wf
workflows:
wrapping_wf:
type: direct
tasks:
call_inner_wf:
workflow: inner_wf
inner_wf:
type: direct
tasks:
hello:
action: std.echo output="Hello from inner workflow"

View File

@ -36,6 +36,9 @@ RESERVED_TASK_NAMES = [
'pause' 'pause'
] ]
ACTION_TASK_TYPE = 'ACTION'
WORKFLOW_TASK_TYPE = 'WORKFLOW'
class TaskSpec(base.BaseSpec): class TaskSpec(base.BaseSpec):
# See http://json-schema.org # See http://json-schema.org
@ -190,9 +193,6 @@ class TaskSpec(base.BaseSpec):
def get_description(self): def get_description(self):
return self._description return self._description
def get_type(self):
return self._type
def get_action_name(self): def get_action_name(self):
return self._action if self._action else None return self._action if self._action else None
@ -223,6 +223,11 @@ class TaskSpec(base.BaseSpec):
def get_safe_rerun(self): def get_safe_rerun(self):
return self._safe_rerun return self._safe_rerun
def get_type(self):
if self._workflow:
return WORKFLOW_TASK_TYPE
return ACTION_TASK_TYPE
class DirectWorkflowTaskSpec(TaskSpec): class DirectWorkflowTaskSpec(TaskSpec):
_polymorphic_value = 'direct' _polymorphic_value = 'direct'

View File

@ -55,3 +55,39 @@ class TasksTestsV2(base.TestCase):
self.assertEqual( self.assertEqual(
self.direct_wf_name, body['tasks'][-1]['workflow_name'] self.direct_wf_name, body['tasks'][-1]['workflow_name']
) )
class TaskTypesTestsV2(base.TestCase):
_service = 'workflowv2'
def setUp(self):
super(TaskTypesTestsV2, self).setUp()
self.useFixture(lockutils.LockFixture('mistral-workflow'))
_, wb_body = self.client.create_workbook('wb_with_nested_wf.yaml')
self.nested_wf_name = 'wb_with_nested_wf.wrapping_wf'
_, execution = self.client.create_execution(self.nested_wf_name)
@test.attr(type='sanity')
def test_task_type(self):
resp, body = self.client.get_list_obj('tasks')
self.assertEqual(200, resp.status)
bt = body['tasks']
ll = [[v for k, v in d.iteritems() if 'type' in k] for d in bt]
types_list = [item for sublist in ll for item in sublist]
self.assertIn(
'WORKFLOW', types_list
)
self.assertIn(
'ACTION', types_list
)
# there are 2 tasks in the workflow one of each type
self.assertEqual(
2, len(types_list)
)