Merge "Refactor mistral workflow handle_signal"
This commit is contained in:
commit
75e945a851
|
@ -11,6 +11,8 @@
|
|||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import copy
|
||||
|
||||
from oslo_serialization import jsonutils
|
||||
import six
|
||||
import yaml
|
||||
|
@ -366,25 +368,24 @@ class Workflow(signal_responder.SignalResponder,
|
|||
params = data.get(self.SIGNAL_DATA_PARAMS)
|
||||
return inputs, params
|
||||
|
||||
def _validate_signal_data(self, data):
|
||||
input_value, params_value = self._get_inputs_and_params(data)
|
||||
if input_value is not None:
|
||||
if not isinstance(input_value, dict):
|
||||
def _validate_signal_data(self, inputs, params):
|
||||
if inputs is not None:
|
||||
if not isinstance(inputs, dict):
|
||||
message = (_('Input in signal data must be a map, '
|
||||
'find a %s') % type(input_value))
|
||||
'find a %s') % type(inputs))
|
||||
raise exception.StackValidationFailed(
|
||||
error=_('Signal data error'),
|
||||
message=message)
|
||||
for key in six.iterkeys(input_value):
|
||||
if (self.properties.get(self.INPUT) is None
|
||||
or key not in self.properties.get(self.INPUT)):
|
||||
for key in six.iterkeys(inputs):
|
||||
if (self.properties.get(self.INPUT) is None or
|
||||
key not in self.properties.get(self.INPUT)):
|
||||
message = _('Unknown input %s') % key
|
||||
raise exception.StackValidationFailed(
|
||||
error=_('Signal data error'),
|
||||
message=message)
|
||||
if params_value is not None and not isinstance(params_value, dict):
|
||||
if params is not None and not isinstance(params, dict):
|
||||
message = (_('Params must be a map, find a '
|
||||
'%s') % type(params_value))
|
||||
'%s') % type(params))
|
||||
raise exception.StackValidationFailed(
|
||||
error=_('Signal data error'),
|
||||
message=message)
|
||||
|
@ -513,34 +514,24 @@ class Workflow(signal_responder.SignalResponder,
|
|||
self.resource_id_set(workflow[0].name)
|
||||
|
||||
def handle_signal(self, details=None):
|
||||
self._validate_signal_data(details)
|
||||
|
||||
result_input = {}
|
||||
result_params = {}
|
||||
inputs, params = self._get_inputs_and_params(details)
|
||||
if inputs is not None:
|
||||
# NOTE(prazumovsky): Signal can contains some data, interesting
|
||||
# for workflow, e.g. inputs. So, if signal data contains input
|
||||
# we update override inputs, other leaved defined in template.
|
||||
for key, value in six.iteritems(
|
||||
self.properties.get(self.INPUT)):
|
||||
result_input.update(
|
||||
{key: inputs.get(key) or value})
|
||||
if params is not None:
|
||||
if self.properties.get(self.PARAMS) is not None:
|
||||
result_params.update(self.properties.get(self.PARAMS))
|
||||
result_params.update(params)
|
||||
self._validate_signal_data(inputs, params)
|
||||
|
||||
if not result_input and self.properties.get(self.INPUT):
|
||||
result_input.update(self.properties.get(self.INPUT))
|
||||
if not result_params and self.properties.get(self.PARAMS):
|
||||
result_params.update(self.properties.get(self.PARAMS))
|
||||
inputs_result = copy.deepcopy(self.properties[self.INPUT])
|
||||
params_result = copy.deepcopy(self.properties[self.PARAMS]) or {}
|
||||
# NOTE(prazumovsky): Signal can contains some data, interesting
|
||||
# for workflow, e.g. inputs. So, if signal data contains input
|
||||
# we update override inputs, other leaved defined in template.
|
||||
if inputs:
|
||||
inputs_result.update(inputs)
|
||||
if params:
|
||||
params_result.update(params)
|
||||
|
||||
try:
|
||||
execution = self.client().executions.create(
|
||||
self._workflow_name(),
|
||||
jsonutils.dumps(result_input),
|
||||
**result_params)
|
||||
jsonutils.dumps(inputs_result),
|
||||
**params_result)
|
||||
except Exception as ex:
|
||||
raise exception.ResourceFailure(ex, self)
|
||||
executions = [execution.id]
|
||||
|
|
Loading…
Reference in New Issue