Handle item loops in zuul_stream

Items in loops get their own callback events, so we need to handle them.

Additionally, the final item and the event finished event come out of
sequence - so we need to save a note in the event finished event and
then emit it after the following item.

Finally, we'd have a todo list item from ansiblefest to copy not
directly modify the result objects we get because they're shared. Turns
out this is immediately necessary with items, because popping
stdout_lines in one item totally borks the further items (whoops)

Change-Id: Ia70239f240276b10ad09d4bda57310ebcbdb1f4d
This commit is contained in:
Monty Taylor 2017-07-01 08:35:50 -05:00
parent 9fa148291a
commit 9de422f55b
No known key found for this signature in database
GPG Key ID: 7BAE94BC7141A594
1 changed files with 119 additions and 23 deletions

View File

@ -92,6 +92,8 @@ class CallbackModule(default.CallbackModule):
self._play = None
self._streamers = []
self.configure_logger()
self._items_done = False
self._deferred_result = None
def configure_logger(self):
# ansible appends timestamp, user and pid to the log lines emitted
@ -187,33 +189,50 @@ class CallbackModule(default.CallbackModule):
msg = "[Zuul] Log Stream did not terminate"
self._log(msg, job=True, executor=True)
def _process_result_for_localhost(self, result):
def _process_result_for_localhost(self, result, is_task=True):
result_dict = dict(result._result)
localhost_names = ('localhost', '127.0.0.1')
is_localhost = False
delegated_vars = result._result.get('_ansible_delegated_vars', None)
delegated_vars = result_dict.get('_ansible_delegated_vars', None)
if delegated_vars:
delegated_host = delegated_vars['ansible_host']
if delegated_host in ('localhost', '127.0.0.1'):
if delegated_host in localhost_names:
is_localhost = True
else:
task_host = result._host.get_name()
task_hostvars = result._task._variable_manager._hostvars[task_host]
if task_hostvars['ansible_host'] in localhost_names:
is_localhost = True
if not is_localhost:
if not is_localhost and is_task:
self._stop_streamers()
if result._task.action in ('command', 'shell'):
stdout_lines = zuul_filter_result(result._result)
stdout_lines = zuul_filter_result(result_dict)
if is_localhost:
for line in stdout_lines:
self._log("localhost | %s " % line.strip())
hostname = self._get_hostname(result)
self._log("%s | %s " % (hostname, line.strip()))
def v2_runner_on_failed(self, result, ignore_errors=False):
self._process_result_for_localhost(result)
self._handle_exception(result._result)
result_dict = dict(result._result)
if result._task.loop and 'results' in result._result:
self._process_items(result)
self._handle_exception(result_dict)
if result_dict.get('msg') == 'All items completed':
result_dict['status'] = 'ERROR'
self._deferred_result = result_dict
return
self._process_result_for_localhost(result)
if result._task.loop and 'results' in result_dict:
# items have their own events
pass
else:
self._log_message(
result=result,
msg="Results: => {results}".format(
results=self._dump_results(result._result)),
results=self._dump_results(result_dict)),
status='ERROR')
if ignore_errors:
self._log_message(result, "Ignoring Errors", status="ERROR")
@ -223,35 +242,107 @@ class CallbackModule(default.CallbackModule):
and self._last_task_banner != result._task._uuid):
self._print_task_banner(result._task)
self._clean_results(result._result, result._task.action)
self._process_result_for_localhost(result)
if result._task.action in ('include', 'include_role', 'setup'):
return
if result._result.get('changed', False):
result_dict = dict(result._result)
self._clean_results(result_dict, result._task.action)
if result_dict.get('changed', False):
status = 'changed'
else:
status = 'ok'
if result._task.loop and 'results' in result._result:
self._process_items(result)
if (result_dict.get('msg') == 'All items completed'
and not self._items_done):
result_dict['status'] = status
self._deferred_result = result_dict
return
self._handle_warnings(result._result)
if not result._task.loop:
self._process_result_for_localhost(result)
else:
self._items_done = False
self._handle_warnings(result_dict)
if result._task.loop and 'results' in result_dict:
# items have their own events
pass
if result._task.loop and 'results' in result._result:
self._process_items(result)
elif result._task.action not in ('command', 'shell'):
self._log_message(
result=result,
msg="Results: => {results}".format(
results=self._dump_results(result._result)),
results=self._dump_results(result_dict)),
status=status)
elif 'results' in result_dict:
for res in result_dict['results']:
self._log_message(
result,
"Runtime: {delta} Start: {start} End: {end}".format(**res))
elif result_dict.get('msg') == 'All items completed':
self._log_message(result, result_dict['msg'])
else:
self._log_message(
result,
"Runtime: {delta} Start: {start} End: {end}".format(
**result._result))
**result_dict))
def v2_runner_item_on_ok(self, result):
result_dict = dict(result._result)
self._process_result_for_localhost(result, is_task=False)
if result_dict.get('changed', False):
status = 'changed'
else:
status = 'ok'
if result._task.action not in ('command', 'shell'):
self._log_message(
result=result,
msg="Item: {item} => {results}".format(
item=result_dict['item'],
results=self._dump_results(result_dict)),
status=status)
else:
self._log_message(
result,
"Item: {item} Runtime: {delta}"
" Start: {start} End: {end}".format(**result_dict))
if self._deferred_result:
self._process_deferred(result)
def v2_runner_item_on_failed(self, result):
result_dict = dict(result._result)
self._process_result_for_localhost(result, is_task=False)
if result._task.action not in ('command', 'shell'):
self._log_message(
result=result,
msg="Item: {item} => {results}".format(
item=result_dict['item'],
results=self._dump_results(result_dict)),
status='ERROR')
else:
self._log_message(
result,
"Item: {item} Runtime: {delta}"
" Start: {start} End: {end}".format(**result_dict))
if self._deferred_result:
self._process_deferred(result)
def _process_deferred(self, result):
self._items_done = True
result_dict = self._deferred_result
self._deferred_result = None
self._log_message(
result, "All items complete",
status=result_dict['status'])
def _print_task_banner(self, task):
@ -259,6 +350,10 @@ class CallbackModule(default.CallbackModule):
args = ''
task_args = task.args.copy()
if task.loop:
task_type = 'LOOP'
else:
task_type = 'TASK'
is_shell = task_args.pop('_uses_shell', False)
if is_shell and task_name == 'command':
task_name = 'shell'
@ -272,7 +367,8 @@ class CallbackModule(default.CallbackModule):
args = u', '.join(u'%s=%s' % a for a in task_args.items())
args = u' %s' % args
msg = "TASK [{task}{args}]".format(
msg = "{task_type} [{task}{args}]".format(
task_type=task_type,
task=task_name,
args=args)
self._log(msg)