Implements custom resource type managed by Mistral workflows

Adds a new resource named OS::Mistral::ExternalResource which
allow users to use custom resource types implementing their actions
as Mistral workflows.

Implements: blueprint mistral-new-resource-type-workflow-execution
Change-Id: If02799e7457ca017cc119317dfb2db7198a3559f
This commit is contained in:
Giulio Fidente 2017-01-16 12:39:03 +01:00
parent f2061641fd
commit 725b404468
3 changed files with 422 additions and 0 deletions

View File

@ -0,0 +1,282 @@
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_log import log as logging
from oslo_serialization import jsonutils
import six
from heat.common import exception
from heat.common.i18n import _
from heat.engine import attributes
from heat.engine import constraints
from heat.engine import properties
from heat.engine import resource
from heat.engine import support
LOG = logging.getLogger(__name__)
class MistralExternalResource(resource.Resource):
"""A plugin for managing user-defined resources via Mistral workflows.
This resource allows users to manage resources that are not known to Heat.
The user may specify a Mistral workflow to handle each resource action,
such as CREATE, UPDATE, or DELETE.
The workflows may return an output named 'resource_id', which will be
treated as the physical ID of the resource by Heat.
Once the resource is created, subsequent workflow runs will receive the
output of the last workflow execution in the 'heat_extresource_data' key
in the workflow environment (accessible as ``env().heat_extresource_data``
in the workflow).
The template author may specify a subset of inputs as causing replacement
of the resource when they change, as an alternative to running the
UPDATE workflow.
"""
support_status = support.SupportStatus(version='9.0.0')
default_client_name = 'mistral'
entity = 'executions'
_ACTION_PROPERTIES = (
WORKFLOW, PARAMS
) = (
'workflow', 'params'
)
PROPERTIES = (
EX_ACTIONS,
INPUT,
DESCRIPTION,
REPLACE_ON_CHANGE,
ALWAYS_UPDATE
) = (
'actions',
'input',
'description',
'replace_on_change_inputs',
'always_update'
)
ATTRIBUTES = (
OUTPUT,
) = (
'output',
)
_action_properties_schema = properties.Schema(
properties.Schema.MAP,
_('Dictionary which defines the workflow to run and its params.'),
schema={
WORKFLOW: properties.Schema(
properties.Schema.STRING,
_('Workflow to execute.'),
required=True,
constraints=[
constraints.CustomConstraint('mistral.workflow')
],
),
PARAMS: properties.Schema(
properties.Schema.MAP,
_('Workflow additional parameters. If workflow is reverse '
'typed, params requires "task_name", which defines '
'initial task.'),
default={}
),
}
)
properties_schema = {
EX_ACTIONS: properties.Schema(
properties.Schema.MAP,
_('Resource action which triggers a workflow execution.'),
schema={
resource.Resource.CREATE: _action_properties_schema,
resource.Resource.UPDATE: _action_properties_schema,
resource.Resource.SUSPEND: _action_properties_schema,
resource.Resource.RESUME: _action_properties_schema,
resource.Resource.DELETE: _action_properties_schema,
},
required=True
),
INPUT: properties.Schema(
properties.Schema.MAP,
_('Dictionary which contains input for the workflows.'),
update_allowed=True,
default={}
),
DESCRIPTION: properties.Schema(
properties.Schema.STRING,
_('Workflow execution description.'),
default='Heat managed'
),
REPLACE_ON_CHANGE: properties.Schema(
properties.Schema.LIST,
_('A list of inputs that should cause the resource to be replaced '
'when their values change.'),
default=[]
),
ALWAYS_UPDATE: properties.Schema(
properties.Schema.BOOLEAN,
_('Triggers UPDATE action execution even if input is '
'unchanged.'),
default=False
),
}
attributes_schema = {
OUTPUT: attributes.Schema(
_('Output from the execution.'),
type=attributes.Schema.MAP
),
}
def _check_execution(self, action, execution_id):
"""Check execution status.
Returns False if in IDLE, RUNNING or PAUSED
returns True if in SUCCESS
raises ResourceFailure if in ERROR, CANCELLED
raises ResourceUnknownState otherwise.
"""
execution = self.client().executions.get(execution_id)
LOG.debug('Mistral execution %(id)s is in state '
'%(state)s' % {'id': execution_id,
'state': execution.state})
if execution.state in ('IDLE', 'RUNNING', 'PAUSED'):
return False, {}
if execution.state in ('SUCCESS',):
return True, jsonutils.loads(execution.output)
if execution.state in ('ERROR', 'CANCELLED'):
raise exception.ResourceFailure(
exception_or_error=execution.state,
resource=self,
action=action)
raise exception.ResourceUnknownStatus(
resource_status=execution.state,
result=_('Mistral execution is in unknown state.'))
def _handle_action(self, action, inputs=None):
action_data = self.properties[self.EX_ACTIONS].get(action)
if action_data:
# bring forward output from previous executions into env
if self.resource_id:
old_outputs = jsonutils.loads(self.data().get('outputs', '{}'))
action_env = action_data[self.PARAMS].get('env', {})
action_env['heat_extresource_data'] = old_outputs
action_data[self.PARAMS]['env'] = action_env
# inputs is not None when inputs changed on stack UPDATE
if not inputs:
inputs = self.properties[self.INPUT]
execution = self.client().executions.create(
action_data[self.WORKFLOW],
jsonutils.dumps(inputs),
self.properties[self.DESCRIPTION],
**action_data[self.PARAMS])
LOG.debug('Mistral execution %(id)s params set to '
'%(params)s' % {'id': execution.id,
'params': action_data[self.PARAMS]})
return execution.id
def _check_action(self, action, execution_id):
success = True
# execution_id is None when no data is available for a given action
if execution_id:
rsrc_id = execution_id
success, output = self._check_execution(action, execution_id)
# merge output with outputs of previous executions
outputs = jsonutils.loads(self.data().get('outputs', '{}'))
outputs.update(output)
self.data_set('outputs', jsonutils.dumps(outputs))
# set resource id using output, if found
if output.get('resource_id'):
rsrc_id = output.get('resource_id')
LOG.debug('ExternalResource id set to %(rid)s from Mistral '
'execution %(eid)s output' % {'eid': execution_id,
'rid': rsrc_id})
self.resource_id_set(six.text_type(rsrc_id)[:255])
return success
def _resolve_attribute(self, name):
if self.resource_id and name == self.OUTPUT:
return self.data().get('outputs')
def _needs_update(self, after, before, after_props, before_props,
prev_resource, check_init_complete=True):
# check if we need to force replace first
old_inputs = before_props[self.INPUT]
new_inputs = after_props[self.INPUT]
for i in after_props[self.REPLACE_ON_CHANGE]:
if old_inputs.get(i) != new_inputs.get(i):
LOG.debug('Replacing ExternalResource %(id)s instead of '
'updating due to change to input "%(i)s"' %
{"id": self.resource_id,
"i": i})
raise resource.UpdateReplace(self)
# honor always_update if found
if self.properties[self.ALWAYS_UPDATE]:
return True
# call super in all other scenarios
else:
return super(MistralExternalResource,
self)._needs_update(after,
before,
after_props,
before_props,
prev_resource,
check_init_complete)
def handle_create(self):
return self._handle_action(self.CREATE)
def check_create_complete(self, execution_id):
return self._check_action(self.CREATE, execution_id)
def handle_update(self, json_snippet, tmpl_diff, prop_diff):
new_inputs = prop_diff.get(self.INPUT)
return self._handle_action(self.UPDATE, new_inputs)
def check_update_complete(self, execution_id):
return self._check_action(self.UPDATE, execution_id)
def handle_suspend(self):
return self._handle_action(self.SUSPEND)
def check_suspend_complete(self, execution_id):
return self._check_action(self.SUSPEND, execution_id)
def handle_resume(self):
return self._handle_action(self.RESUME)
def check_resume_complete(self, execution_id):
return self._check_action(self.RESUME, execution_id)
def handle_delete(self):
return self._handle_action(self.DELETE)
def check_delete_complete(self, execution_id):
return self._check_action(self.DELETE, execution_id)
def resource_mapping():
return {
'OS::Mistral::ExternalResource': MistralExternalResource
}

View File

@ -0,0 +1,134 @@
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from heat.common import exception
from heat.common import template_format
from heat.engine.clients.os import mistral as client
from heat.engine import resource
from heat.engine.resources.openstack.mistral import external_resource
from heat.engine import scheduler
from heat.engine import template
from heat.tests import common
from heat.tests import utils
external_resource_template = """
heat_template_version: ocata
resources:
custom:
type: OS::Mistral::ExternalResource
properties:
actions:
CREATE:
workflow: some_workflow
params:
target: create_my_custom_thing
UPDATE:
workflow: another_workflow
DELETE:
workflow: yet_another_workflow
input:
foo1: 123
foo2: 456
replace_on_change_inputs:
- foo2
"""
class FakeExecution(object):
def __init__(self, id='1234', output='{}', state='IDLE'):
self.id = id
self.output = output
self.state = state
class TestMistralExternalResource(common.HeatTestCase):
def setUp(self):
super(TestMistralExternalResource, self).setUp()
self.ctx = utils.dummy_context()
tmpl = template_format.parse(external_resource_template)
self.stack = utils.parse_stack(tmpl, stack_name='test_stack')
resource_defns = self.stack.t.resource_definitions(self.stack)
self.rsrc_defn = resource_defns['custom']
self.mistral = mock.Mock()
self.patchobject(external_resource.MistralExternalResource,
'client',
return_value=self.mistral)
self.patchobject(client, 'mistral_base')
self.patchobject(client.MistralClientPlugin, '_create')
self.client = client.MistralClientPlugin(self.ctx)
def _create_resource(self, name, snippet, stack,
output='{}',
get_state='SUCCESS'):
execution = external_resource.MistralExternalResource(name,
snippet,
stack)
self.mistral.executions.get.return_value = (
FakeExecution('test_stack-execution-b5fiekfci3yc',
output,
get_state))
self.mistral.executions.create.return_value = (
FakeExecution('test_stack-execution-b5fiekfci3yc'))
return execution
def test_create(self):
execution = self._create_resource('execution',
self.rsrc_defn,
self.stack)
scheduler.TaskRunner(execution.create)()
expected_state = (execution.CREATE, execution.COMPLETE)
self.assertEqual(expected_state, execution.state)
self.assertEqual('test_stack-execution-b5fiekfci3yc',
execution.resource_id)
def test_create_with_resource_id_output(self):
output = '{"resource_id": "my-fake-resource-id"}'
execution = self._create_resource('execution',
self.rsrc_defn,
self.stack,
output)
scheduler.TaskRunner(execution.create)()
expected_state = (execution.CREATE, execution.COMPLETE)
self.assertEqual(expected_state, execution.state)
self.assertEqual('my-fake-resource-id',
execution.resource_id)
def test_replace_on_change(self):
execution = self._create_resource('execution',
self.rsrc_defn,
self.stack)
scheduler.TaskRunner(execution.create)()
expected_state = (execution.CREATE, execution.COMPLETE)
self.assertEqual(expected_state, execution.state)
tmpl = template_format.parse(external_resource_template)
tmpl['resources']['custom']['properties']['input']['foo2'] = '4567'
res_defns = template.Template(tmpl).resource_definitions(self.stack)
new_custom_defn = res_defns['custom']
self.assertRaises(resource.UpdateReplace,
scheduler.TaskRunner(execution.update,
new_custom_defn))
def test_create_failed(self):
execution = self._create_resource('execution',
self.rsrc_defn,
self.stack,
get_state='ERROR')
self.assertRaises(exception.ResourceFailure,
scheduler.TaskRunner(execution.create))
expected_state = (execution.CREATE, execution.FAILED)
self.assertEqual(expected_state, execution.state)

View File

@ -0,0 +1,6 @@
---
features:
- |
A new OS::Mistral::ExternalResource is added that allows users to manage
resources that are not known to Heat by specifying in the template Mistral
workflows to handle actions such as create, update and delete.