diff --git a/tripleo_common/actions/ansible.py b/tripleo_common/actions/ansible.py index bcb19cb64..19e98bc73 100644 --- a/tripleo_common/actions/ansible.py +++ b/tripleo_common/actions/ansible.py @@ -17,13 +17,16 @@ import os import shutil import six from six.moves import configparser +import subprocess import tempfile - +import time import yaml from mistral_lib import actions from oslo_concurrency import processutils +from tripleo_common.actions import base + def write_default_ansible_cfg(work_dir, base_ansible_cfg='/etc/ansible/ansible.cfg'): @@ -221,7 +224,7 @@ class AnsibleAction(actions.Action): shutil.rmtree(self.work_dir) -class AnsiblePlaybookAction(actions.Action): +class AnsiblePlaybookAction(base.TripleOAction): """Executes ansible playbook""" def __init__(self, **kwargs): @@ -255,6 +258,7 @@ class AnsiblePlaybookAction(actions.Action): self.skip_tags = self._kwargs_for_run.pop('skip_tags', None) self.extra_env_variables = self._kwargs_for_run.pop( 'extra_env_variables', None) + self.queue_name = self._kwargs_for_run.pop('queue_name', None) self._work_dir = self._kwargs_for_run.pop( 'work_dir', None) @@ -340,6 +344,14 @@ class AnsiblePlaybookAction(actions.Action): self._ssh_private_key = path return path + def format_message(self, lines): + return { + 'body': { + 'payload': { + 'message': ''.join(lines), + 'status': 'RUNNING', + 'execution': {'id': None}}}} + def run(self, context): if 0 < self.verbosity < 6: verbosity_option = '-' + ('v' * self.verbosity) @@ -415,6 +427,37 @@ class AnsiblePlaybookAction(actions.Action): 'OS_AUTH_TOKEN': context.auth_token, 'OS_PROJECT_NAME': context.project_name}) + if self.queue_name: + zaqar = self.get_messaging_client(context) + queue = zaqar.queue(self.queue_name) + # TODO(d0ugal): We don't have the log errors functionality + # that processutils has, do we need to replicate that somehow? + command = [str(c) for c in command] + process = subprocess.Popen(command, stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + shell=False, bufsize=1, + cwd=self.work_dir, + env=env_variables) + start = time.time() + stdout = [] + lines = [] + for line in iter(process.stdout.readline, b''): + lines.append(line) + stdout.append(line) + if time.time() - start > 30: + queue.post(self.format_message(lines)) + lines = [] + start = time.time() + queue.post(self.format_message(lines)) + process.stdout.close() + returncode = process.wait() + # TODO(d0ugal): This bit isn't ideal - as we redirect stderr to + # stdout we don't know the difference. To keep the return dict + # similar there is an empty stderr. We can use the return code + # to determine if there was an error. + return {"stdout": "".join(stdout), "returncode": returncode, + "stderr": ""} + stderr, stdout = processutils.execute( *command, cwd=self.work_dir, env_variables=env_variables, diff --git a/workbooks/package_update.yaml b/workbooks/package_update.yaml index 1a7bcd082..f93f080d2 100644 --- a/workbooks/package_update.yaml +++ b/workbooks/package_update.yaml @@ -71,6 +71,7 @@ workflows: - playbook - inventory_file - queue_name: tripleo + - ansible_queue_name: tripleo - module_path: /usr/share/ansible-modules - ansible_extra_env_variables: ANSIBLE_HOST_KEY_CHECKING: 'False' @@ -105,7 +106,10 @@ workflows: extra_env_variables: <% $.ansible_extra_env_variables %> limit_hosts: <% $.nodes %> module_path: <% $.module_path %> - on-success: node_update_passed + queue_name: <% $.ansible_queue_name %> + on-success: + - node_update_passed: <% task().result.returncode = 0 %> + - node_update_failed: <% task().result.returncode != 0 %> on-error: node_update_failed publish: output: <% task().result %> @@ -132,7 +136,6 @@ workflows: type: tripleo.package_update.v1.update_nodes payload: status: <% $.status %> - message: <% concat(task(node_update).result.stderr.substring(0, 262144), task(node_update).result.stdout.substring(0, 262144)) %> execution: <% execution() %> on-success: - fail: <% $.get('status') = "FAILED" %>