Stream the Ansible output to a Zaqar queue

In order to get a the 'real-time' log of the ansible execution through
mistral, we need to stream the output in a Zaqar queue and then
consume it in the CLI

Closes-Bug: 1732497 Co-Authored-By: mathieu bultel <mat.bultel@gmail.com>
Change-Id: I65f774ff5a63bb0e7bbaeef0f41f2ac8a34934b4
This commit is contained in:
Dougal Matthews 2017-11-17 10:34:13 +00:00 committed by mathieu bultel
parent 1f4de54b77
commit 30278e8a2f
2 changed files with 50 additions and 4 deletions

View File

@ -17,13 +17,16 @@ import os
import shutil import shutil
import six import six
from six.moves import configparser from six.moves import configparser
import subprocess
import tempfile import tempfile
import time
import yaml import yaml
from mistral_lib import actions from mistral_lib import actions
from oslo_concurrency import processutils from oslo_concurrency import processutils
from tripleo_common.actions import base
def write_default_ansible_cfg(work_dir, def write_default_ansible_cfg(work_dir,
base_ansible_cfg='/etc/ansible/ansible.cfg'): base_ansible_cfg='/etc/ansible/ansible.cfg'):
@ -221,7 +224,7 @@ class AnsibleAction(actions.Action):
shutil.rmtree(self.work_dir) shutil.rmtree(self.work_dir)
class AnsiblePlaybookAction(actions.Action): class AnsiblePlaybookAction(base.TripleOAction):
"""Executes ansible playbook""" """Executes ansible playbook"""
def __init__(self, **kwargs): def __init__(self, **kwargs):
@ -255,6 +258,7 @@ class AnsiblePlaybookAction(actions.Action):
self.skip_tags = self._kwargs_for_run.pop('skip_tags', None) self.skip_tags = self._kwargs_for_run.pop('skip_tags', None)
self.extra_env_variables = self._kwargs_for_run.pop( self.extra_env_variables = self._kwargs_for_run.pop(
'extra_env_variables', None) 'extra_env_variables', None)
self.queue_name = self._kwargs_for_run.pop('queue_name', None)
self._work_dir = self._kwargs_for_run.pop( self._work_dir = self._kwargs_for_run.pop(
'work_dir', None) 'work_dir', None)
@ -340,6 +344,14 @@ class AnsiblePlaybookAction(actions.Action):
self._ssh_private_key = path self._ssh_private_key = path
return path return path
def format_message(self, lines):
return {
'body': {
'payload': {
'message': ''.join(lines),
'status': 'RUNNING',
'execution': {'id': None}}}}
def run(self, context): def run(self, context):
if 0 < self.verbosity < 6: if 0 < self.verbosity < 6:
verbosity_option = '-' + ('v' * self.verbosity) verbosity_option = '-' + ('v' * self.verbosity)
@ -415,6 +427,37 @@ class AnsiblePlaybookAction(actions.Action):
'OS_AUTH_TOKEN': context.auth_token, 'OS_AUTH_TOKEN': context.auth_token,
'OS_PROJECT_NAME': context.project_name}) 'OS_PROJECT_NAME': context.project_name})
if self.queue_name:
zaqar = self.get_messaging_client(context)
queue = zaqar.queue(self.queue_name)
# TODO(d0ugal): We don't have the log errors functionality
# that processutils has, do we need to replicate that somehow?
command = [str(c) for c in command]
process = subprocess.Popen(command, stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
shell=False, bufsize=1,
cwd=self.work_dir,
env=env_variables)
start = time.time()
stdout = []
lines = []
for line in iter(process.stdout.readline, b''):
lines.append(line)
stdout.append(line)
if time.time() - start > 30:
queue.post(self.format_message(lines))
lines = []
start = time.time()
queue.post(self.format_message(lines))
process.stdout.close()
returncode = process.wait()
# TODO(d0ugal): This bit isn't ideal - as we redirect stderr to
# stdout we don't know the difference. To keep the return dict
# similar there is an empty stderr. We can use the return code
# to determine if there was an error.
return {"stdout": "".join(stdout), "returncode": returncode,
"stderr": ""}
stderr, stdout = processutils.execute( stderr, stdout = processutils.execute(
*command, cwd=self.work_dir, *command, cwd=self.work_dir,
env_variables=env_variables, env_variables=env_variables,

View File

@ -71,6 +71,7 @@ workflows:
- playbook - playbook
- inventory_file - inventory_file
- queue_name: tripleo - queue_name: tripleo
- ansible_queue_name: tripleo
- module_path: /usr/share/ansible-modules - module_path: /usr/share/ansible-modules
- ansible_extra_env_variables: - ansible_extra_env_variables:
ANSIBLE_HOST_KEY_CHECKING: 'False' ANSIBLE_HOST_KEY_CHECKING: 'False'
@ -105,7 +106,10 @@ workflows:
extra_env_variables: <% $.ansible_extra_env_variables %> extra_env_variables: <% $.ansible_extra_env_variables %>
limit_hosts: <% $.nodes %> limit_hosts: <% $.nodes %>
module_path: <% $.module_path %> module_path: <% $.module_path %>
on-success: node_update_passed queue_name: <% $.ansible_queue_name %>
on-success:
- node_update_passed: <% task().result.returncode = 0 %>
- node_update_failed: <% task().result.returncode != 0 %>
on-error: node_update_failed on-error: node_update_failed
publish: publish:
output: <% task().result %> output: <% task().result %>
@ -132,7 +136,6 @@ workflows:
type: tripleo.package_update.v1.update_nodes type: tripleo.package_update.v1.update_nodes
payload: payload:
status: <% $.status %> status: <% $.status %>
message: <% concat(task(node_update).result.stderr.substring(0, 262144), task(node_update).result.stdout.substring(0, 262144)) %>
execution: <% execution() %> execution: <% execution() %>
on-success: on-success:
- fail: <% $.get('status') = "FAILED" %> - fail: <% $.get('status') = "FAILED" %>