Files
python-tripleoclient/tripleoclient/workflows/validations.py
Carlos Camacho b3af96f74e Add new CLI option openstack tripleo validate run
This submission starts with the integration of the current supported
Mistral workflows for running the validations.

This submission integrates the running of all current installed validations
based on group validations or single validations.

Change-Id: Ib86005596e046587a4608a755a15ce8620105f8b
Implements: blueprint validation-framework
Depends-On: I294ab0016fda9587b405ef08dba3212b8e46a816
2019-03-05 09:27:51 +01:00

72 lines
2.4 KiB
Python

# Copyright 2019 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
import pprint
from tripleoclient.workflows import base
from tripleoclient import exceptions
def list_validations(clients, workflow_input):
workflow_client = clients.workflow_engine
tripleoclients = clients.tripleoclient
with tripleoclients.messaging_websocket() as ws:
execution = base.start_workflow(
workflow_client,
'tripleo.validations.v1.list',
workflow_input=workflow_input
)
for payload in base.wait_for_messages(workflow_client, ws, execution):
if 'message' in payload:
assert payload['status'] == "SUCCESS", pprint.pformat(payload)
return payload['validations']
def run_validations(clients, workflow_input):
workflow_client = clients.workflow_engine
tripleoclients = clients.tripleoclient
with tripleoclients.messaging_websocket() as ws:
if 'group_names' in workflow_input:
print('Running group validations')
execution = base.start_workflow(
workflow_client,
'tripleo.validations.v1.run_groups',
workflow_input=workflow_input
)
else:
print('Running single validations')
execution = base.start_workflow(
workflow_client,
'tripleo.validations.v1.run_validations',
workflow_input=workflow_input
)
for payload in base.wait_for_messages(workflow_client, ws, execution):
if 'message' in payload:
if payload['status'] == 'SUCCESS':
return payload['message']
if payload['status'] == 'FAILED':
raise exceptions.RegisterOrUpdateError(
'Exception running validations: {}'.format(
payload['message'])
)