Merge "Handle lists of streamers" into feature/zuulv3
This commit is contained in:
commit
c3a56cc37e
|
@ -90,7 +90,7 @@ class CallbackModule(default.CallbackModule):
|
|||
self._task = None
|
||||
self._daemon_running = False
|
||||
self._play = None
|
||||
self._streamer = None
|
||||
self._streamers = []
|
||||
self.configure_logger()
|
||||
|
||||
def configure_logger(self):
|
||||
|
@ -170,23 +170,24 @@ class CallbackModule(default.CallbackModule):
|
|||
ip = play_vars[host].get(
|
||||
'ansible_host', play_vars[host].get(
|
||||
'ansible_inventory_host'))
|
||||
# TODO(mordred) this is clearly stupid and won't work for
|
||||
# multi-node (we're a for loop, but we're setting a single
|
||||
# value.
|
||||
self._streamer = threading.Thread(
|
||||
streamer = threading.Thread(
|
||||
target=self._read_log, args=(
|
||||
host, ip, log_id, task_name, hosts))
|
||||
self._streamer.daemon = True
|
||||
self._streamer.start()
|
||||
streamer.daemon = True
|
||||
streamer.start()
|
||||
self._streamers.append(streamer)
|
||||
|
||||
def _stop_streamer(self):
|
||||
if self._streamer:
|
||||
self._streamer.join(30)
|
||||
if self._streamer.is_alive():
|
||||
def _stop_streamers(self):
|
||||
while True:
|
||||
if not self._streamers:
|
||||
break
|
||||
streamer = self._streamers.pop()
|
||||
streamer.join(30)
|
||||
if streamer.is_alive():
|
||||
msg = "[Zuul] Log Stream did not terminate"
|
||||
self._log(msg, job=True, executor=True)
|
||||
|
||||
def v2_runner_on_failed(self, result, ignore_errors=False):
|
||||
def _process_result_for_localhost(self, result):
|
||||
is_localhost = False
|
||||
delegated_vars = result._result.get('_ansible_delegated_vars', None)
|
||||
if delegated_vars:
|
||||
|
@ -195,13 +196,16 @@ class CallbackModule(default.CallbackModule):
|
|||
is_localhost = True
|
||||
|
||||
if not is_localhost:
|
||||
self._stop_streamer()
|
||||
self._stop_streamers()
|
||||
if result._task.action in ('command', 'shell'):
|
||||
stdout_lines = zuul_filter_result(result._result)
|
||||
if is_localhost:
|
||||
for line in stdout_lines:
|
||||
ts, ln = (x.strip() for x in line.split(' | ', 1))
|
||||
self._log("localhost | %s " % ln, ts=ts)
|
||||
|
||||
def v2_runner_on_failed(self, result, ignore_errors=False):
|
||||
self._process_result_for_localhost(result)
|
||||
self._handle_exception(result._result)
|
||||
|
||||
if result._task.loop and 'results' in result._result:
|
||||
|
@ -221,20 +225,16 @@ class CallbackModule(default.CallbackModule):
|
|||
self._print_task_banner(result._task)
|
||||
|
||||
self._clean_results(result._result, result._task.action)
|
||||
self._process_result_for_localhost(result)
|
||||
|
||||
if result._task.action in ('include', 'include_role'):
|
||||
return
|
||||
|
||||
self._stop_streamer()
|
||||
|
||||
if result._result.get('changed', False):
|
||||
status = 'changed'
|
||||
else:
|
||||
status = 'ok'
|
||||
|
||||
if result._task.action in ('command', 'shell'):
|
||||
zuul_filter_result(result._result)
|
||||
|
||||
if result._task.loop and 'results' in result._result:
|
||||
self._process_items(result)
|
||||
|
||||
|
|
Loading…
Reference in New Issue