deb-heat/heat/tests/test_mistral_workflow.py
Miguel Grinberg c31a9533fa Move mistral resources in-tree
This change relocates the mistral resources from the contrib area into the main
resource tree. It was originally added to contrib/ because of the project's
incubation status, and more specifically because the client is not in the
global-requirements.txt file. However, when this was discussed at the summit in
Vancouver, the decision was to move the resources to main tree but skip
registration if the client is not installed. This will save users from the
trouble of installing it as a plugin.

Change-Id: I6eeef5fa2b080df610e52620d2b935450d8d49e3
2015-06-09 10:19:52 -07:00

510 lines
19 KiB
Python

#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from oslo_utils import importutils
import six
import testtools
from heat.common import exception
from heat.common import template_format
from heat.engine.clients.os import mistral as client
from heat.engine import resource
from heat.engine import resources
from heat.engine.resources.openstack.mistral import workflow
from heat.engine.resources import signal_responder
from heat.engine.resources import stack_user
from heat.engine import scheduler
from heat.engine import stack as stack_parser
from heat.engine import template
from heat.tests import common
from heat.tests import utils
mistral_client = importutils.try_import('mistralclient.api.base')
executions = importutils.try_import('mistralclient.api.v2.executions')
workflow_template = """
heat_template_version: 2013-05-23
resources:
workflow:
type: OS::Mistral::Workflow
properties:
type: direct
tasks:
- name: hello
action: std.echo output='Good morning!'
publish:
result: <% $.hello %>
"""
workflow_template_with_params = """
heat_template_version: 2013-05-23
resources:
workflow:
type: OS::Mistral::Workflow
properties:
params: {'test':'param_value'}
type: direct
tasks:
- name: hello
action: std.echo output='Good morning!'
publish:
result: <% $.hello %>
"""
workflow_template_with_params_override = """
heat_template_version: 2013-05-23
resources:
workflow:
type: OS::Mistral::Workflow
properties:
params: {'test':'param_value_override','test1':'param_value_override_1'}
type: direct
tasks:
- name: hello
action: std.echo output='Good morning!'
publish:
result: <% $.hello %>
"""
workflow_template_full = """
heat_template_version: 2013-05-23
resources:
create_vm:
type: OS::Mistral::Workflow
properties:
name: create_vm
type: direct
input:
name: create_test_server
image: 31d8eeaf-686e-4e95-bb27-765014b9f20b
flavor: 2
output:
vm_id: <% $.vm_id %>
tasks:
- name: create_server
action: |
nova.servers_create name=<% $.name %> image=<% $.image %>
flavor=<% $.flavor %>
publish:
vm_id: <% $.create_server.id %>
on_success:
- check_server_exists
- name: check_server_exists
action: nova.servers_get server=<% $.vm_id %>
publish:
server_exists: True
on_success:
- wait_instance
- name: wait_instance
action: nova.servers_find id=<% $.vm_id %> status='ACTIVE'
policies:
retry:
delay: 5
count: 15
"""
workflow_template_bad = """
heat_template_version: 2013-05-23
resources:
workflow:
type: OS::Mistral::Workflow
properties:
type: direct
tasks:
- name: second_task
action: std.noop
requires: [first_task]
- name: first_task
action: std.noop
"""
workflow_template_bad_reverse = """
heat_template_version: 2013-05-23
resources:
workflow:
type: OS::Mistral::Workflow
properties:
type: reverse
tasks:
- name: second_task
action: std.noop
requires: [first_task]
- name: first_task
action: std.noop
"""
workflow_template_update_replace = """
heat_template_version: 2013-05-23
resources:
workflow:
type: OS::Mistral::Workflow
properties:
name: hello_action
type: direct
tasks:
- name: hello
action: std.echo output='Good evening!'
publish:
result: <% $.hello %>
"""
workflow_template_update = """
heat_template_version: 2013-05-23
resources:
workflow:
type: OS::Mistral::Workflow
properties:
type: direct
description: just testing workflow resource
tasks:
- name: hello
action: std.echo output='Good evening!'
publish:
result: <% $.hello %>
"""
class FakeWorkflow(object):
def __init__(self, name):
self.name = name
class TestMistralWorkflow(common.HeatTestCase):
def setUp(self):
super(TestMistralWorkflow, self).setUp()
resources.initialise()
utils.setup_dummy_db()
self.ctx = utils.dummy_context()
tmpl = template_format.parse(workflow_template)
self.stack = utils.parse_stack(tmpl, stack_name='test_stack')
resource_defns = self.stack.t.resource_definitions(self.stack)
self.rsrc_defn = resource_defns['workflow']
self.mistral = mock.Mock()
self.patchobject(workflow.Workflow, 'mistral',
return_value=self.mistral)
self.patches = []
self.patches.append(mock.patch.object(stack_user.StackUser,
'_create_user'))
self.patches.append(mock.patch.object(signal_responder.SignalResponder,
'_create_keypair'))
self.patches.append(mock.patch.object(client,
'mistral_base'))
self.patches.append(mock.patch.object(client.MistralClientPlugin,
'_create'))
for patch in self.patches:
patch.start()
self.client = client.MistralClientPlugin(self.ctx)
def tearDown(self):
super(TestMistralWorkflow, self).tearDown()
for patch in self.patches:
patch.stop()
def _create_resource(self, name, snippet, stack):
wf = workflow.Workflow(name, snippet, stack)
self.mistral.workflows.create.return_value = [
FakeWorkflow('test_stack-workflow-b5fiekfci3yc')]
scheduler.TaskRunner(wf.create)()
return wf
def test_create(self):
wf = self._create_resource('workflow', self.rsrc_defn, self.stack)
expected_state = (wf.CREATE, wf.COMPLETE)
self.assertEqual(expected_state, wf.state)
self.assertEqual('test_stack-workflow-b5fiekfci3yc', wf.resource_id)
def test_create_with_name(self):
tmpl = template_format.parse(workflow_template_full)
stack = utils.parse_stack(tmpl)
rsrc_defns = stack.t.resource_definitions(stack)['create_vm']
wf = workflow.Workflow('create_vm', rsrc_defns, stack)
self.mistral.workflows.create.return_value = [
FakeWorkflow('create_vm')]
scheduler.TaskRunner(wf.create)()
expected_state = (wf.CREATE, wf.COMPLETE)
self.assertEqual(expected_state, wf.state)
self.assertEqual('create_vm', wf.resource_id)
def test_attributes(self):
wf = self._create_resource('workflow', self.rsrc_defn, self.stack)
self.assertEqual({'name': 'test_stack-workflow-b5fiekfci3yc',
'input': None}, wf.FnGetAtt('data'))
self.assertEqual([], wf.FnGetAtt('executions'))
def test_direct_workflow_validation_error(self):
error_msg = ("Mistral resource validation error : "
"workflow.properties.tasks.second_task.requires: "
"task second_task contains property 'requires' "
"in case of direct workflow. Only reverse workflows "
"can contain property 'requires'.")
self._test_validation_failed(workflow_template_bad, error_msg)
def test_wrong_params_using(self):
error_msg = ("Mistral resource validation error : "
"workflow.properties.params: 'task_name' is not assigned "
"in 'params' in case of reverse type workflow.")
self._test_validation_failed(workflow_template_bad_reverse, error_msg)
def _test_validation_failed(self, templatem, error_msg):
tmpl = template_format.parse(templatem)
stack = utils.parse_stack(tmpl)
rsrc_defns = stack.t.resource_definitions(stack)['workflow']
wf = workflow.Workflow('workflow', rsrc_defns, stack)
exc = self.assertRaises(exception.StackValidationFailed,
wf.validate)
self.assertEqual(error_msg, six.text_type(exc))
def test_create_wrong_definition(self):
tmpl = template_format.parse(workflow_template)
stack = utils.parse_stack(tmpl)
rsrc_defns = stack.t.resource_definitions(stack)['workflow']
wf = workflow.Workflow('workflow', rsrc_defns, stack)
self.mistral.workflows.create.side_effect = Exception('boom!')
exc = self.assertRaises(exception.ResourceFailure,
scheduler.TaskRunner(wf.create))
expected_state = (wf.CREATE, wf.FAILED)
self.assertEqual(expected_state, wf.state)
self.assertIn('Exception: boom!', six.text_type(exc))
def test_update_replace(self):
wf = self._create_resource('workflow', self.rsrc_defn, self.stack)
t = template_format.parse(workflow_template_update_replace)
rsrc_defns = template.Template(t).resource_definitions(self.stack)
new_workflow = rsrc_defns['workflow']
new_workflows = [FakeWorkflow('hello_action')]
self.mistral.workflows.update.return_value = new_workflows
self.mistral.workflows.delete.return_value = None
err = self.assertRaises(resource.UpdateReplace,
scheduler.TaskRunner(wf.update,
new_workflow))
msg = 'The Resource workflow requires replacement.'
self.assertEqual(msg, six.text_type(err))
def test_update(self):
wf = self._create_resource('workflow', self.rsrc_defn,
self.stack)
t = template_format.parse(workflow_template_update)
rsrc_defns = template.Template(t).resource_definitions(self.stack)
new_wf = rsrc_defns['workflow']
self.mistral.workflows.update.return_value = [
FakeWorkflow('test_stack-workflow-b5fiekfci3yc')]
scheduler.TaskRunner(wf.update, new_wf)()
self.mistral.workflows.update.assert_called_once()
self.assertEqual((wf.UPDATE, wf.COMPLETE), wf.state)
def test_update_failed(self):
wf = self._create_resource('workflow', self.rsrc_defn,
self.stack)
t = template_format.parse(workflow_template_update)
rsrc_defns = template.Template(t).resource_definitions(self.stack)
new_wf = rsrc_defns['workflow']
self.mistral.workflows.update.side_effect = Exception('boom!')
self.assertRaises(exception.ResourceFailure,
scheduler.TaskRunner(wf.update, new_wf))
self.assertEqual((wf.UPDATE, wf.FAILED), wf.state)
def test_delete(self):
wf = self._create_resource('workflow', self.rsrc_defn, self.stack)
scheduler.TaskRunner(wf.delete)()
self.assertEqual((wf.DELETE, wf.COMPLETE), wf.state)
def test_delete_no_data(self):
wf = self._create_resource('workflow', self.rsrc_defn, self.stack)
wf.data_delete('executions')
self.assertEqual([], wf.FnGetAtt('executions'))
scheduler.TaskRunner(wf.delete)()
self.assertEqual((wf.DELETE, wf.COMPLETE), wf.state)
def test_delete_not_found(self):
wf = self._create_resource('workflow', self.rsrc_defn, self.stack)
self.mistral.workflows.delete.side_effect = (
self.mistral.mistral_base.APIException(error_code=404))
scheduler.TaskRunner(wf.delete)()
self.assertEqual((wf.DELETE, wf.COMPLETE), wf.state)
@mock.patch.object(resource.Resource, 'client_plugin')
def test_delete_other_errors(self, mock_plugin):
"""We mock client_plugin for returning correct mistral client."""
mock_plugin.return_value = self.client
client.mistral_base.APIException = exception.Error
wf = self._create_resource('workflow', self.rsrc_defn, self.stack)
self.mistral.workflows.delete.side_effect = (Exception('boom!'))
exc = self.assertRaises(exception.ResourceFailure,
scheduler.TaskRunner(wf.delete))
self.assertEqual((wf.DELETE, wf.FAILED), wf.state)
self.assertIn('boom!', six.text_type(exc))
def test_resource_mapping(self):
mapping = workflow.resource_mapping()
self.assertEqual(1, len(mapping))
self.assertEqual(workflow.Workflow,
mapping['OS::Mistral::Workflow'])
def test_signal_failed(self):
tmpl = template_format.parse(workflow_template_full)
stack = utils.parse_stack(tmpl)
rsrc_defns = stack.t.resource_definitions(stack)['create_vm']
wf = workflow.Workflow('create_vm', rsrc_defns, stack)
self.mistral.workflows.create.return_value = [
FakeWorkflow('create_vm')]
scheduler.TaskRunner(wf.create)()
details = {'input': {'flavor': '3'}}
self.mistral.executions.create.side_effect = Exception('boom!')
err = self.assertRaises(exception.ResourceFailure,
scheduler.TaskRunner(wf.signal, details))
self.assertEqual('Exception: boom!', six.text_type(err))
def test_signal_wrong_input_and_params_type(self):
tmpl = template_format.parse(workflow_template_full)
stack = utils.parse_stack(tmpl)
rsrc_defns = stack.t.resource_definitions(stack)['create_vm']
wf = workflow.Workflow('create_vm', rsrc_defns, stack)
self.mistral.workflows.create.return_value = [
FakeWorkflow('create_vm')]
scheduler.TaskRunner(wf.create)()
details = {'input': '3'}
err = self.assertRaises(exception.ResourceFailure,
scheduler.TaskRunner(wf.signal, details))
error_message = ("StackValidationFailed: Signal data error : Input in"
" signal data must be a map, find a <type 'str'>")
self.assertEqual(error_message, six.text_type(err))
details = {'params': '3'}
err = self.assertRaises(exception.ResourceFailure,
scheduler.TaskRunner(wf.signal, details))
error_message = ("StackValidationFailed: Signal data error : Params "
"must be a map, find a <type 'str'>")
self.assertEqual(error_message, six.text_type(err))
def test_signal_wrong_input_key(self):
tmpl = template_format.parse(workflow_template_full)
stack = utils.parse_stack(tmpl)
rsrc_defns = stack.t.resource_definitions(stack)['create_vm']
wf = workflow.Workflow('create_vm', rsrc_defns, stack)
self.mistral.workflows.create.return_value = [
FakeWorkflow('create_vm')]
scheduler.TaskRunner(wf.create)()
details = {'input': {'1': '3'}}
err = self.assertRaises(exception.ResourceFailure,
scheduler.TaskRunner(wf.signal, details))
error_message = ("StackValidationFailed: Signal data error :"
" Unknown input 1")
self.assertEqual(error_message, six.text_type(err))
@testtools.skipIf(executions is None,
'Uses the actual mistral client')
def test_signal_and_delete_with_executions(self):
tmpl = template_format.parse(workflow_template_full)
stack = utils.parse_stack(tmpl)
rsrc_defns = stack.t.resource_definitions(stack)['create_vm']
wf = workflow.Workflow('create_vm', rsrc_defns, stack)
self.mistral.workflows.create.return_value = [
FakeWorkflow('create_vm')]
scheduler.TaskRunner(wf.create)()
details = {'input': {'flavor': '3'}}
execution = mock.Mock()
execution.id = '12345'
# Invoke the real create method (bug 1453539)
exec_manager = executions.ExecutionManager(wf.client('mistral'))
self.mistral.executions.create.side_effect = (
lambda *args, **kw: exec_manager.create(*args, **kw))
self.patchobject(exec_manager, '_create', return_value=execution)
scheduler.TaskRunner(wf.signal, details)()
self.assertEqual({'executions': '12345'}, wf.data())
scheduler.TaskRunner(wf.delete)()
self.assertEqual(1, self.mistral.executions.delete.call_count)
self.assertEqual((wf.DELETE, wf.COMPLETE), wf.state)
def test_workflow_params(self):
tmpl = template_format.parse(workflow_template_full)
stack = utils.parse_stack(tmpl)
rsrc_defns = stack.t.resource_definitions(stack)['create_vm']
wf = workflow.Workflow('create_vm', rsrc_defns, stack)
self.mistral.workflows.create.return_value = [
FakeWorkflow('create_vm')]
scheduler.TaskRunner(wf.create)()
details = {'input': {'flavor': '3'},
'params': {'test': 'param_value', 'test1': 'param_value_1'}}
execution = mock.Mock()
execution.id = '12345'
self.mistral.executions.create.side_effect = (
lambda *args, **kw: self.verify_params(*args, **kw))
scheduler.TaskRunner(wf.signal, details)()
def test_workflow_params_merge(self):
tmpl = template_format.parse(workflow_template_with_params)
stack = utils.parse_stack(tmpl)
rsrc_defns = stack.t.resource_definitions(stack)['workflow']
wf = workflow.Workflow('workflow', rsrc_defns, stack)
self.mistral.workflows.create.return_value = [
FakeWorkflow('workflow')]
scheduler.TaskRunner(wf.create)()
details = {'params': {'test1': 'param_value_1'}}
execution = mock.Mock()
execution.id = '12345'
self.mistral.executions.create.side_effect = (
lambda *args, **kw: self.verify_params(*args, **kw))
scheduler.TaskRunner(wf.signal, details)()
def test_workflow_params_override(self):
tmpl = template_format.parse(workflow_template_with_params_override)
stack = utils.parse_stack(tmpl)
rsrc_defns = stack.t.resource_definitions(stack)['workflow']
wf = workflow.Workflow('workflow', rsrc_defns, stack)
self.mistral.workflows.create.return_value = [
FakeWorkflow('workflow')]
scheduler.TaskRunner(wf.create)()
details = {'params': {'test': 'param_value', 'test1': 'param_value_1'}}
execution = mock.Mock()
execution.id = '12345'
self.mistral.executions.create.side_effect = (
lambda *args, **kw: self.verify_params(*args, **kw))
scheduler.TaskRunner(wf.signal, details)()
def verify_params(self, workflow_name, workflow_input=None, **params):
self.assertEqual({'test': 'param_value', 'test1': 'param_value_1'},
params)
execution = mock.Mock()
execution.id = '12345'
return execution
@testtools.skipIf(mistral_client is not None,
'Tests mistral client not installed')
def test_no_client(self):
tmpl = template.Template((template_format.parse(workflow_template)))
stack = stack_parser.Stack(utils.dummy_context(), 'foo', tmpl)
self.assertRaises(exception.ResourceTypeNotFound, stack.validate)