heat/heat/tests/openstack/mistral/test_workflow.py

936 lines
35 KiB
Python

#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
import six
import yaml
from mistralclient.api import base as mistral_base
from mistralclient.api.v2 import executions
from oslo_serialization import jsonutils
from heat.common import exception
from heat.common import template_format
from heat.engine.clients.os import mistral as client
from heat.engine import node_data
from heat.engine import resource
from heat.engine.resources.openstack.mistral import workflow
from heat.engine.resources import signal_responder
from heat.engine.resources import stack_user
from heat.engine import scheduler
from heat.engine import template
from heat.tests import common
from heat.tests import utils
workflow_template = """
heat_template_version: 2013-05-23
resources:
workflow:
type: OS::Mistral::Workflow
properties:
type: direct
tasks:
- name: hello
action: std.echo output='Good morning!'
publish:
result: <% $.hello %>
"""
workflow_template_with_tags = """
heat_template_version: queens
resources:
workflow:
type: OS::Mistral::Workflow
properties:
type: direct
tags:
- tagged
tasks:
- name: hello
action: std.echo output='Good morning!'
publish:
result: <% $.hello %>
"""
workflow_template_with_params = """
heat_template_version: 2013-05-23
resources:
workflow:
type: OS::Mistral::Workflow
properties:
params: {'test':'param_value'}
type: direct
tasks:
- name: hello
action: std.echo output='Good morning!'
publish:
result: <% $.hello %>
"""
workflow_template_with_params_override = """
heat_template_version: 2013-05-23
resources:
workflow:
type: OS::Mistral::Workflow
properties:
params: {'test':'param_value_override','test1':'param_value_override_1'}
type: direct
tasks:
- name: hello
action: std.echo output='Good morning!'
publish:
result: <% $.hello %>
"""
workflow_template_full = """
heat_template_version: 2013-05-23
parameters:
use_request_body_as_input:
type : boolean
default : false
resources:
create_vm:
type: OS::Mistral::Workflow
properties:
name: create_vm
use_request_body_as_input: { get_param: use_request_body_as_input }
type: direct
input:
name: create_test_server
image: 31d8eeaf-686e-4e95-bb27-765014b9f20b
flavor: 2
output:
vm_id: <% $.vm_id %>
task_defaults:
on_error:
- on_error
tasks:
- name: create_server
action: |
nova.servers_create name=<% $.name %> image=<% $.image %>
flavor=<% $.flavor %>
publish:
vm_id: <% $.create_server.id %>
on_success:
- check_server_exists
- name: check_server_exists
action: nova.servers_get server=<% $.vm_id %>
publish:
server_exists: True
on_success:
- list_machines
- name: wait_instance
action: nova.servers_find id=<% $.vm_id_new %> status='ACTIVE'
retry:
delay: 5
count: 15
wait_before: 7
wait_after: 8
pause_before: true
timeout: 11
keep_result: false
target: test
with_items: vm_id_new in <% $.list_servers %>
- name: list_machines
action: nova.servers_list
publish:
-list_servers: <% $.list_machines %>
on_success:
- wait_instance
- name: on_error
action: std.echo output="output"
- name: external_workflow
workflow: external_workflow_name
"""
workflow_updating_request_body_property = """
heat_template_version: 2013-05-23
resources:
create_vm:
type: OS::Mistral::Workflow
properties:
name: create_vm
use_request_body_as_input: false
type: direct
input:
name: create_test_server
image: 31d8eeaf-686e-4e95-bb27-765014b9f20b
flavor: 2
output:
vm_id: <% $.vm_id %>
task_defaults:
on_error:
- on_error
tasks:
- name: create_server
action: |
nova.servers_create name=<% $.name %> image=<% $.image %>
flavor=<% $.flavor %>
publish:
vm_id: <% $.create_server.id %>
on_success:
- check_server_exists
- name: check_server_exists
action: nova.servers_get server=<% $.vm_id %>
publish:
server_exists: True
on_success:
- list_machines
- name: wait_instance
action: nova.servers_find id=<% $.vm_id_new %> status='ACTIVE'
retry:
delay: 5
count: 15
wait_before: 7
wait_after: 8
pause_before: true
timeout: 11
keep_result: false
target: test
with_items: vm_id_new in <% $.list_servers %>
join: all
- name: list_machines
action: nova.servers_list
publish:
-list_servers: <% $.list_machines %>
on_success:
- wait_instance
- name: on_error
action: std.echo output="output"
"""
workflow_template_backward_support = """
heat_template_version: 2013-05-23
resources:
create_vm:
type: OS::Mistral::Workflow
properties:
name: create_vm
type: direct
input:
name: create_test_server
image: 31d8eeaf-686e-4e95-bb27-765014b9f20b
flavor: 2
output:
vm_id: <% $.vm_id %>
tasks:
- name: create_server
action: |
nova.servers_create name=<% $.name %> image=<% $.image %>
flavor=<% $.flavor %>
publish:
vm_id: <% $.create_server.id %>
on_success:
- check_server_exists
- name: check_server_exists
action: nova.servers_get server=<% $.vm_id %>
publish:
server_exists: True
on_success:
- wait_instance
- name: wait_instance
action: nova.servers_find id=<% $.vm_id %> status='ACTIVE'
policies:
retry:
delay: 5
count: 15
"""
workflow_template_bad = """
heat_template_version: 2013-05-23
resources:
workflow:
type: OS::Mistral::Workflow
properties:
type: direct
tasks:
- name: second_task
action: std.noop
requires: [first_task]
- name: first_task
action: std.noop
"""
workflow_template_bad_reverse = """
heat_template_version: 2013-05-23
resources:
workflow:
type: OS::Mistral::Workflow
properties:
type: reverse
tasks:
- name: second_task
action: std.noop
requires: [first_task]
- name: first_task
action: std.noop
"""
workflow_template_concurrency_no_with_items = """
heat_template_version: 2013-05-23
resources:
workflow:
type: OS::Mistral::Workflow
properties:
params: {'test':'param_value'}
type: direct
tasks:
- name: hello
action: std.echo output='Good morning!'
concurrency: 9001
"""
workflow_template_update_replace = """
heat_template_version: 2013-05-23
resources:
workflow:
type: OS::Mistral::Workflow
properties:
name: hello_action
type: direct
tasks:
- name: hello
action: std.echo output='Good evening!'
publish:
result: <% $.hello %>
"""
workflow_template_update_replace_failed = """
heat_template_version: 2013-05-23
resources:
workflow:
type: OS::Mistral::Workflow
properties:
name: hello_action
type: direct
tasks:
- name: hello
action: std.echo output='Good Morning!'
publish:
result: <% $.hello %>
"""
workflow_template_update = """
heat_template_version: 2013-05-23
resources:
workflow:
type: OS::Mistral::Workflow
properties:
type: direct
description: just testing workflow resource
tasks:
- name: hello
action: std.echo output='Good evening!'
publish:
result: <% $.hello %>
"""
workflow_template_duplicate_polices = """
heat_template_version: 2013-05-23
resources:
workflow:
type: OS::Mistral::Workflow
properties:
name: list
type: direct
tasks:
- name: list
action: nova.servers_list
policies:
retry:
delay: 5
count: 15
retry:
delay: 6
count: 16
"""
workflow_template_policies_translation = """
heat_template_version: 2016-10-14
resources:
workflow:
type: OS::Mistral::Workflow
properties:
name: translation_done
type: direct
tasks:
- name: check_dat_thing
action: nova.servers_list
policies:
retry:
delay: 5
count: 15
wait_before: 5
wait_after: 5
pause_before: true
timeout: 42
concurrency: 5
"""
class FakeWorkflow(object):
def __init__(self, name):
self.name = name
self._data = {'workflow': 'info'}
def to_dict(self):
return self._data
class TestMistralWorkflow(common.HeatTestCase):
def setUp(self):
super(TestMistralWorkflow, self).setUp()
self.ctx = utils.dummy_context()
self.mistral = mock.Mock()
self.patchobject(workflow.Workflow, 'client',
return_value=self.mistral)
self.patches = []
self.patches.append(mock.patch.object(stack_user.StackUser,
'_create_user'))
self.patches.append(mock.patch.object(signal_responder.SignalResponder,
'_create_keypair'))
self.patches.append(mock.patch.object(client.MistralClientPlugin,
'_create'))
for patch in self.patches:
patch.start()
self.client = client.MistralClientPlugin(self.ctx)
def tearDown(self):
super(TestMistralWorkflow, self).tearDown()
for patch in self.patches:
patch.stop()
def _create_resource(self, name, template=workflow_template):
tmpl = template_format.parse(template)
self.stack = utils.parse_stack(tmpl, stack_name='test_stack')
resource_defns = self.stack.t.resource_definitions(self.stack)
rsrc_defn = resource_defns['workflow']
wf = workflow.Workflow(name, rsrc_defn, self.stack)
self.mistral.workflows.create.return_value = [
FakeWorkflow('test_stack-workflow-b5fiekfci3yc')]
scheduler.TaskRunner(wf.create)()
return wf
def test_create(self):
wf = self._create_resource('workflow')
expected_state = (wf.CREATE, wf.COMPLETE)
self.assertEqual(expected_state, wf.state)
self.assertEqual('test_stack-workflow-b5fiekfci3yc', wf.resource_id)
def test_create_with_name(self):
tmpl = template_format.parse(workflow_template_full)
stack = utils.parse_stack(tmpl)
rsrc_defns = stack.t.resource_definitions(stack)['create_vm']
wf = workflow.Workflow('create_vm', rsrc_defns, stack)
self.mistral.workflows.create.return_value = [
FakeWorkflow('create_vm')]
scheduler.TaskRunner(wf.create)()
expected_state = (wf.CREATE, wf.COMPLETE)
self.assertEqual(expected_state, wf.state)
self.assertEqual('create_vm', wf.resource_id)
def test_create_with_task_parms(self):
tmpl = template_format.parse(workflow_template_full)
stack = utils.parse_stack(tmpl)
rsrc_defns = stack.t.resource_definitions(stack)['create_vm']
wf = workflow.Workflow('create_vm', rsrc_defns, stack)
self.mistral.workflows.create.side_effect = (lambda args:
self.verify_create_params(
args))
scheduler.TaskRunner(wf.create)()
def test_backward_support(self):
tmpl = template_format.parse(workflow_template_backward_support)
stack = utils.parse_stack(tmpl)
rsrc_defns = stack.t.resource_definitions(stack)['create_vm']
wf = workflow.Workflow('create_vm', rsrc_defns, stack)
self.mistral.workflows.create.return_value = [
FakeWorkflow('create_vm')]
scheduler.TaskRunner(wf.create)()
expected_state = (wf.CREATE, wf.COMPLETE)
self.assertEqual(expected_state, wf.state)
self.assertEqual('create_vm', wf.resource_id)
for task in wf.properties['tasks']:
if task['name'] is 'wait_instance':
self.assertEqual(5, task['retry']['delay'])
self.assertEqual(15, task['retry']['count'])
break
def test_attributes(self):
wf = self._create_resource('workflow')
self.mistral.workflows.get.return_value = (
FakeWorkflow('test_stack-workflow-b5fiekfci3yc'))
self.assertEqual({'name': 'test_stack-workflow-b5fiekfci3yc',
'input': None}, wf.FnGetAtt('data'))
self.assertEqual([], wf.FnGetAtt('executions'))
self.assertEqual({'workflow': 'info'}, wf.FnGetAtt('show'))
def test_direct_workflow_validation_error(self):
error_msg = ("Mistral resource validation error: "
"workflow.properties.tasks.second_task.requires: "
"task second_task contains property 'requires' "
"in case of direct workflow. Only reverse workflows "
"can contain property 'requires'.")
self._test_validation_failed(workflow_template_bad, error_msg)
def test_wrong_params_using(self):
error_msg = ("Mistral resource validation error: "
"workflow.properties.params: 'task_name' is not assigned "
"in 'params' in case of reverse type workflow.")
self._test_validation_failed(workflow_template_bad_reverse, error_msg)
def test_with_items_concurrency_failed_validate(self):
error_msg = "concurrency cannot be specified without with_items."
self._test_validation_failed(
workflow_template_concurrency_no_with_items,
error_msg,
error_cls=exception.ResourcePropertyDependency)
def _test_validation_failed(self, templatem, error_msg, error_cls=None):
tmpl = template_format.parse(templatem)
stack = utils.parse_stack(tmpl)
rsrc_defns = stack.t.resource_definitions(stack)['workflow']
wf = workflow.Workflow('workflow', rsrc_defns, stack)
if error_cls is None:
error_cls = exception.StackValidationFailed
exc = self.assertRaises(error_cls, wf.validate)
self.assertEqual(error_msg, six.text_type(exc))
def test_create_wrong_definition(self):
tmpl = template_format.parse(workflow_template)
stack = utils.parse_stack(tmpl)
rsrc_defns = stack.t.resource_definitions(stack)['workflow']
wf = workflow.Workflow('workflow', rsrc_defns, stack)
self.mistral.workflows.create.side_effect = Exception('boom!')
exc = self.assertRaises(exception.ResourceFailure,
scheduler.TaskRunner(wf.create))
expected_state = (wf.CREATE, wf.FAILED)
self.assertEqual(expected_state, wf.state)
self.assertIn('Exception: resources.workflow: boom!',
six.text_type(exc))
def test_update_replace(self):
wf = self._create_resource('workflow')
t = template_format.parse(workflow_template_update_replace)
rsrc_defns = template.Template(t).resource_definitions(self.stack)
new_workflow = rsrc_defns['workflow']
new_workflows = [FakeWorkflow('hello_action')]
self.mistral.workflows.update.return_value = new_workflows
self.mistral.workflows.delete.return_value = None
err = self.assertRaises(resource.UpdateReplace,
scheduler.TaskRunner(wf.update,
new_workflow))
msg = 'The Resource workflow requires replacement.'
self.assertEqual(msg, six.text_type(err))
def test_update(self):
wf = self._create_resource('workflow')
t = template_format.parse(workflow_template_update)
rsrc_defns = template.Template(t).resource_definitions(self.stack)
new_wf = rsrc_defns['workflow']
self.mistral.workflows.update.return_value = [
FakeWorkflow('test_stack-workflow-b5fiekfci3yc')]
scheduler.TaskRunner(wf.update, new_wf)()
self.assertTrue(self.mistral.workflows.update.called)
self.assertEqual((wf.UPDATE, wf.COMPLETE), wf.state)
def test_update_input(self):
wf = self._create_resource('workflow')
t = template_format.parse(workflow_template)
t['resources']['workflow']['properties']['input'] = {'foo': 'bar'}
rsrc_defns = template.Template(t).resource_definitions(self.stack)
new_wf = rsrc_defns['workflow']
self.mistral.workflows.update.return_value = [
FakeWorkflow('test_stack-workflow-b5fiekfci3yc')]
scheduler.TaskRunner(wf.update, new_wf)()
self.assertTrue(self.mistral.workflows.update.called)
self.assertEqual((wf.UPDATE, wf.COMPLETE), wf.state)
def test_update_failed(self):
wf = self._create_resource('workflow')
t = template_format.parse(workflow_template_update)
rsrc_defns = template.Template(t).resource_definitions(self.stack)
new_wf = rsrc_defns['workflow']
self.mistral.workflows.update.side_effect = Exception('boom!')
self.assertRaises(exception.ResourceFailure,
scheduler.TaskRunner(wf.update, new_wf))
self.assertEqual((wf.UPDATE, wf.FAILED), wf.state)
def test_update_failed_no_replace(self):
wf = self._create_resource('workflow',
workflow_template_update_replace)
t = template_format.parse(workflow_template_update_replace_failed)
rsrc_defns = template.Template(t).resource_definitions(self.stack)
new_wf = rsrc_defns['workflow']
self.mistral.workflows.get.return_value = (
FakeWorkflow('test_stack-workflow-b5fiekfci3yc'))
self.mistral.workflows.update.side_effect = [
Exception('boom!'),
[FakeWorkflow('test_stack-workflow-b5fiekfci3yc')]]
self.assertRaises(exception.ResourceFailure,
scheduler.TaskRunner(wf.update, new_wf))
self.assertEqual((wf.UPDATE, wf.FAILED), wf.state)
scheduler.TaskRunner(wf.update, new_wf)()
self.assertTrue(self.mistral.workflows.update.called)
self.assertEqual((wf.UPDATE, wf.COMPLETE), wf.state)
def test_update_failed_replace_not_found(self):
wf = self._create_resource('workflow',
workflow_template_update_replace)
t = template_format.parse(workflow_template_update_replace_failed)
rsrc_defns = template.Template(t).resource_definitions(self.stack)
new_wf = rsrc_defns['workflow']
self.mistral.workflows.update.side_effect = Exception('boom!')
self.assertRaises(exception.ResourceFailure,
scheduler.TaskRunner(wf.update, new_wf))
self.assertEqual((wf.UPDATE, wf.FAILED), wf.state)
self.mistral.workflows.get.side_effect = [
mistral_base.APIException(error_code=404)]
self.assertRaises(resource.UpdateReplace,
scheduler.TaskRunner(wf.update,
new_wf))
def test_delete_super_call_successful(self):
wf = self._create_resource('workflow')
scheduler.TaskRunner(wf.delete)()
self.assertEqual((wf.DELETE, wf.COMPLETE), wf.state)
self.assertEqual(1, self.mistral.workflows.delete.call_count)
def test_delete_executions_successful(self):
wf = self._create_resource('workflow')
self.mistral.executuions.delete.return_value = None
wf._data = {'executions': '1234,5678'}
data_delete = self.patchobject(resource.Resource, 'data_delete')
wf._delete_executions()
self.assertEqual(2, self.mistral.executions.delete.call_count)
data_delete.assert_called_once_with('executions')
def test_delete_executions_not_found(self):
wf = self._create_resource('workflow')
self.mistral.executuions.delete.side_effect = [
self.mistral.mistral_base.APIException(error_code=404),
None
]
wf._data = {'executions': '1234,5678'}
data_delete = self.patchobject(resource.Resource, 'data_delete')
wf._delete_executions()
self.assertEqual(2, self.mistral.executions.delete.call_count)
data_delete.assert_called_once_with('executions')
def test_signal_failed(self):
tmpl = template_format.parse(workflow_template_full)
stack = utils.parse_stack(tmpl)
rsrc_defns = stack.t.resource_definitions(stack)['create_vm']
wf = workflow.Workflow('create_vm', rsrc_defns, stack)
self.mistral.workflows.create.return_value = [
FakeWorkflow('create_vm')]
scheduler.TaskRunner(wf.create)()
details = {'input': {'flavor': '3'}}
self.mistral.executions.create.side_effect = Exception('boom!')
err = self.assertRaises(exception.ResourceFailure,
scheduler.TaskRunner(wf.signal, details))
self.assertEqual('Exception: resources.create_vm: boom!',
six.text_type(err))
def test_signal_wrong_input_and_params_type(self):
tmpl = template_format.parse(workflow_template_full)
stack = utils.parse_stack(tmpl)
rsrc_defns = stack.t.resource_definitions(stack)['create_vm']
wf = workflow.Workflow('create_vm', rsrc_defns, stack)
self.mistral.workflows.create.return_value = [
FakeWorkflow('create_vm')]
scheduler.TaskRunner(wf.create)()
details = {'input': '3'}
err = self.assertRaises(exception.ResourceFailure,
scheduler.TaskRunner(wf.signal, details))
if six.PY3:
entity = 'class'
else:
entity = 'type'
error_message = ("StackValidationFailed: resources.create_vm: "
"Signal data error: Input in"
" signal data must be a map, find a <%s 'str'>" %
entity)
self.assertEqual(error_message, six.text_type(err))
details = {'params': '3'}
err = self.assertRaises(exception.ResourceFailure,
scheduler.TaskRunner(wf.signal, details))
error_message = ("StackValidationFailed: resources.create_vm: "
"Signal data error: Params "
"must be a map, find a <%s 'str'>" % entity)
self.assertEqual(error_message, six.text_type(err))
def test_signal_wrong_input_key(self):
tmpl = template_format.parse(workflow_template_full)
stack = utils.parse_stack(tmpl)
rsrc_defns = stack.t.resource_definitions(stack)['create_vm']
wf = workflow.Workflow('create_vm', rsrc_defns, stack)
self.mistral.workflows.create.return_value = [
FakeWorkflow('create_vm')]
scheduler.TaskRunner(wf.create)()
details = {'input': {'1': '3'}}
err = self.assertRaises(exception.ResourceFailure,
scheduler.TaskRunner(wf.signal, details))
error_message = ("StackValidationFailed: resources.create_vm: "
"Signal data error: Unknown input 1")
self.assertEqual(error_message, six.text_type(err))
def test_signal_with_body_as_input_and_delete_with_executions(self):
tmpl = template_format.parse(workflow_template_full)
stack = utils.parse_stack(tmpl, params={
'parameters': {'use_request_body_as_input': 'true'}
})
rsrc_defns = stack.t.resource_definitions(stack)['create_vm']
wf = workflow.Workflow('create_vm', rsrc_defns, stack)
self.mistral.workflows.create.return_value = [
FakeWorkflow('create_vm')]
scheduler.TaskRunner(wf.create)()
details = {'flavor': '3'}
execution = mock.Mock()
execution.id = '12345'
exec_manager = executions.ExecutionManager(wf.client('mistral'))
self.mistral.executions.create.side_effect = (
lambda *args, **kw: exec_manager.create(*args, **kw))
self.patchobject(exec_manager, '_create', return_value=execution)
scheduler.TaskRunner(wf.signal, details)()
call_args = self.mistral.executions.create.call_args
args, kwargs = call_args
expected_args = (
'{"image": "31d8eeaf-686e-4e95-bb27-765014b9f20b", '
'"name": "create_test_server", "flavor": "3"}')
self.validate_json_inputs(kwargs['workflow_input'], expected_args)
self.assertEqual({'executions': '12345'}, wf.data())
# Updating the workflow changing "use_request_body_as_input" to
# false and signaling again with the expected request body format.
t = template_format.parse(workflow_updating_request_body_property)
new_stack = utils.parse_stack(t)
rsrc_defns = new_stack.t.resource_definitions(new_stack)
self.mistral.workflows.update.return_value = [
FakeWorkflow('test_stack-workflow-b5fiekdsa355')]
scheduler.TaskRunner(wf.update, rsrc_defns['create_vm'])()
self.assertTrue(self.mistral.workflows.update.called)
self.assertEqual((wf.UPDATE, wf.COMPLETE), wf.state)
details = {'input': {'flavor': '4'}}
execution = mock.Mock()
execution.id = '54321'
exec_manager = executions.ExecutionManager(wf.client('mistral'))
self.mistral.executions.create.side_effect = (
lambda *args, **kw: exec_manager.create(*args, **kw))
self.patchobject(exec_manager, '_create', return_value=execution)
scheduler.TaskRunner(wf.signal, details)()
call_args = self.mistral.executions.create.call_args
args, kwargs = call_args
expected_args = (
'{"image": "31d8eeaf-686e-4e95-bb27-765014b9f20b", '
'"name": "create_test_server", "flavor": "4"}')
self.validate_json_inputs(kwargs['workflow_input'], expected_args)
self.assertEqual({'executions': '54321,12345', 'name':
'test_stack-workflow-b5fiekdsa355'}, wf.data())
scheduler.TaskRunner(wf.delete)()
self.assertEqual(2, self.mistral.executions.delete.call_count)
self.assertEqual((wf.DELETE, wf.COMPLETE), wf.state)
def test_signal_and_delete_with_executions(self):
tmpl = template_format.parse(workflow_template_full)
stack = utils.parse_stack(tmpl)
rsrc_defns = stack.t.resource_definitions(stack)['create_vm']
wf = workflow.Workflow('create_vm', rsrc_defns, stack)
self.mistral.workflows.create.return_value = [
FakeWorkflow('create_vm')]
scheduler.TaskRunner(wf.create)()
details = {'input': {'flavor': '3'}}
execution = mock.Mock()
execution.id = '12345'
# Invoke the real create method (bug 1453539)
exec_manager = executions.ExecutionManager(wf.client('mistral'))
self.mistral.executions.create.side_effect = (
lambda *args, **kw: exec_manager.create(*args, **kw))
self.patchobject(exec_manager, '_create', return_value=execution)
scheduler.TaskRunner(wf.signal, details)()
self.assertEqual({'executions': '12345'}, wf.data())
scheduler.TaskRunner(wf.delete)()
self.assertEqual(1, self.mistral.executions.delete.call_count)
self.assertEqual((wf.DELETE, wf.COMPLETE), wf.state)
def test_workflow_params(self):
tmpl = template_format.parse(workflow_template_full)
stack = utils.parse_stack(tmpl)
rsrc_defns = stack.t.resource_definitions(stack)['create_vm']
wf = workflow.Workflow('create_vm', rsrc_defns, stack)
self.mistral.workflows.create.return_value = [
FakeWorkflow('create_vm')]
scheduler.TaskRunner(wf.create)()
details = {'input': {'flavor': '3'},
'params': {'test': 'param_value', 'test1': 'param_value_1'}}
execution = mock.Mock()
execution.id = '12345'
self.mistral.executions.create.side_effect = (
lambda *args, **kw: self.verify_params(*args, **kw))
scheduler.TaskRunner(wf.signal, details)()
def test_workflow_tags(self):
tmpl = template_format.parse(workflow_template_with_tags)
stack = utils.parse_stack(tmpl)
rsrc_defns = stack.t.resource_definitions(stack)['workflow']
wf = workflow.Workflow('workflow', rsrc_defns, stack)
self.mistral.workflows.create.return_value = [
FakeWorkflow('workflow')]
scheduler.TaskRunner(wf.create)()
details = {'tags': ['mytag'],
'params': {'test': 'param_value', 'test1': 'param_value_1'}}
execution = mock.Mock()
execution.id = '12345'
self.mistral.executions.create.side_effect = (
lambda *args, **kw: self.verify_params(*args, **kw))
scheduler.TaskRunner(wf.signal, details)()
def test_workflow_params_merge(self):
tmpl = template_format.parse(workflow_template_with_params)
stack = utils.parse_stack(tmpl)
rsrc_defns = stack.t.resource_definitions(stack)['workflow']
wf = workflow.Workflow('workflow', rsrc_defns, stack)
self.mistral.workflows.create.return_value = [
FakeWorkflow('workflow')]
scheduler.TaskRunner(wf.create)()
details = {'params': {'test1': 'param_value_1'}}
execution = mock.Mock()
execution.id = '12345'
self.mistral.executions.create.side_effect = (
lambda *args, **kw: self.verify_params(*args, **kw))
scheduler.TaskRunner(wf.signal, details)()
def test_workflow_params_override(self):
tmpl = template_format.parse(workflow_template_with_params_override)
stack = utils.parse_stack(tmpl)
rsrc_defns = stack.t.resource_definitions(stack)['workflow']
wf = workflow.Workflow('workflow', rsrc_defns, stack)
self.mistral.workflows.create.return_value = [
FakeWorkflow('workflow')]
scheduler.TaskRunner(wf.create)()
details = {'params': {'test': 'param_value', 'test1': 'param_value_1'}}
execution = mock.Mock()
execution.id = '12345'
self.mistral.executions.create.side_effect = (
lambda *args, **kw: self.verify_params(*args, **kw))
scheduler.TaskRunner(wf.signal, details)()
def test_duplicate_attribute_translation_error(self):
tmpl = template_format.parse(workflow_template_duplicate_polices)
stack = utils.parse_stack(tmpl)
rsrc_defns = stack.t.resource_definitions(stack)['workflow']
workflow_rsrc = workflow.Workflow('workflow', rsrc_defns, stack)
ex = self.assertRaises(exception.StackValidationFailed,
workflow_rsrc.validate)
error_msg = ("Cannot define the following properties at "
"the same time: tasks.retry, tasks.policies.retry")
self.assertIn(error_msg, six.text_type(ex))
def validate_json_inputs(self, actual_input, expected_input):
actual_json_input = jsonutils.loads(actual_input)
expected_json_input = jsonutils.loads(expected_input)
self.assertEqual(expected_json_input, actual_json_input)
def verify_params(self, workflow_name, workflow_input=None, **params):
self.assertEqual({'test': 'param_value', 'test1': 'param_value_1'},
params)
execution = mock.Mock()
execution.id = '12345'
return execution
def verify_create_params(self, wf_yaml):
wf = yaml.safe_load(wf_yaml)["create_vm"]
self.assertEqual(['on_error'], wf["task-defaults"]["on-error"])
tasks = wf['tasks']
task = tasks['wait_instance']
self.assertEqual('vm_id_new in <% $.list_servers %>',
task['with-items'])
self.assertEqual(5, task['retry']['delay'])
self.assertEqual(15, task['retry']['count'])
self.assertEqual(8, task['wait-after'])
self.assertTrue(task['pause-before'])
self.assertEqual(11, task['timeout'])
self.assertEqual('test', task['target'])
self.assertEqual(7, task['wait-before'])
self.assertFalse(task['keep-result'])
return [FakeWorkflow('create_vm')]
def test_mistral_workflow_refid(self):
tmpl = template_format.parse(workflow_template)
stack = utils.parse_stack(tmpl, stack_name='test')
rsrc = stack['workflow']
rsrc.uuid = '4c885bde-957e-4758-907b-c188a487e908'
rsrc.id = 'mockid'
rsrc.action = 'CREATE'
self.assertEqual('test-workflow-owevpzgiqw66', rsrc.FnGetRefId())
def test_mistral_workflow_refid_convergence_cache_data(self):
tmpl = template_format.parse(workflow_template)
cache_data = {'workflow': node_data.NodeData.from_dict({
'uuid': mock.ANY,
'id': mock.ANY,
'action': 'CREATE',
'status': 'COMPLETE',
'reference_id': 'convg_xyz'
})}
stack = utils.parse_stack(tmpl, stack_name='test',
cache_data=cache_data)
rsrc = stack.defn['workflow']
self.assertEqual('convg_xyz', rsrc.FnGetRefId())
def test_policies_translation_successful(self):
tmpl = template_format.parse(workflow_template_policies_translation)
stack = utils.parse_stack(tmpl)
rsrc_defns = stack.t.resource_definitions(stack)['workflow']
wf = workflow.Workflow('workflow', rsrc_defns, stack)
result = {k: v for k, v in wf.properties['tasks'][0].items() if v}
self.assertEqual({'name': 'check_dat_thing',
'action': 'nova.servers_list',
'retry': {'delay': 5, 'count': 15},
'wait_before': 5,
'wait_after': 5,
'pause_before': True,
'timeout': 42,
'concurrency': 5}, result)