Add tripleo linear strategy
Change-Id: I8539a6ad747a3923256f8c14c0a140e7670cc135
(cherry picked from commit 09da864f8c
)
This commit is contained in:
parent
313108543a
commit
ab894ee213
|
@ -0,0 +1,466 @@
|
|||
# Copyright 2020 Red Hat, Inc.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from __future__ import (absolute_import, division, print_function)
|
||||
__metaclass__ = type
|
||||
|
||||
import time
|
||||
|
||||
from ansible import constants as C
|
||||
from ansible.errors import AnsibleAssertionError
|
||||
from ansible.errors import AnsibleError
|
||||
from ansible.executor.play_iterator import PlayIterator
|
||||
from ansible.module_utils._text import to_text
|
||||
from ansible.module_utils.six import iteritems
|
||||
from ansible.playbook.block import Block
|
||||
from ansible.playbook.included_file import IncludedFile
|
||||
from ansible.playbook.task import Task
|
||||
from ansible.plugins.loader import action_loader
|
||||
from ansible.plugins.strategy import StrategyBase
|
||||
from ansible.template import Templar
|
||||
from ansible.utils.display import Display
|
||||
|
||||
DOCUMENTATION = '''
|
||||
strategy: tripleo_linear
|
||||
short_description: TripleO specific linear strategy
|
||||
description:
|
||||
- Based on the 'linear' strategy from Ansible
|
||||
- Logic broken up to allow for future improvements
|
||||
version_added: "2.9"
|
||||
author: Alex Schultz <aschultz@redhat.com>
|
||||
'''
|
||||
|
||||
display = Display()
|
||||
|
||||
|
||||
class TripleoLinearTerminated(Exception):
|
||||
"""Exception for terminated state"""
|
||||
pass
|
||||
|
||||
|
||||
class TripleoLinearNoHostTask(Exception):
|
||||
"""Exception for no host task"""
|
||||
pass
|
||||
|
||||
|
||||
class TripleoLinearRunOnce(Exception):
|
||||
"""Exception for run once"""
|
||||
pass
|
||||
|
||||
|
||||
class StrategyModule(StrategyBase):
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
super(StrategyModule, self).__init__(*args, **kwargs)
|
||||
self._any_errors_fatal = False
|
||||
self._callback_sent = False
|
||||
self._has_work = False
|
||||
self._host_pinned = True
|
||||
self._hosts_left = []
|
||||
self._iterator = None
|
||||
self._play_context = None
|
||||
self._strat_results = []
|
||||
self.noop_task = None
|
||||
# these were defined in 2.9
|
||||
self._has_hosts_cache = False
|
||||
self._has_hosts_cache_all = False
|
||||
|
||||
def _print(self, msg, host=None, level=1):
|
||||
display.verbose(msg, host=host, caplevel=level)
|
||||
|
||||
def _debug(self, msg, host=None):
|
||||
self._print(msg, host, 3)
|
||||
|
||||
def _create_noop_task(self):
|
||||
"""Create noop task"""
|
||||
self._debug('_create_noop_task...')
|
||||
noop_task = Task()
|
||||
noop_task.action = 'meta'
|
||||
noop_task.args['_raw_params'] = 'noop'
|
||||
noop_task.set_loader(self._iterator._play._loader)
|
||||
return noop_task
|
||||
|
||||
def _advance_hosts(self, hosts, host_tasks, cur_block, cur_state):
|
||||
"""Move hosts to next task"""
|
||||
self._debug('_advance_hosts...')
|
||||
noop_task = self._create_noop_task()
|
||||
returns = []
|
||||
for host in hosts:
|
||||
host_state_task = host_tasks.get(host.name)
|
||||
if host_state_task is None:
|
||||
continue
|
||||
(s, t) = host_state_task
|
||||
self._print('task: {}'.format(t))
|
||||
s = self._iterator.get_active_state(s)
|
||||
if t is None:
|
||||
continue
|
||||
self._print('task.action: {}'.format(t.action))
|
||||
if s.run_state == cur_state and s.cur_block == cur_block:
|
||||
_ = self._iterator.get_next_task_for_host(host)
|
||||
returns.append((host, t))
|
||||
else:
|
||||
returns.append((host, noop_task))
|
||||
return returns
|
||||
|
||||
def _get_next_tasks(self, hosts):
|
||||
"""Get next set of tasks"""
|
||||
self._debug('_get_next_tasks...')
|
||||
host_tasks = {}
|
||||
task_counts = {}
|
||||
|
||||
self._debug('populate next tasks for all hosts')
|
||||
for host in hosts:
|
||||
host_tasks[host.name] = self._iterator.get_next_task_for_host(
|
||||
host, peek=True)
|
||||
|
||||
self._debug('organize tasks by state')
|
||||
host_tasks_to_run = [(host, state_task)
|
||||
for host, state_task in iteritems(host_tasks)
|
||||
if state_task and state_task[1]]
|
||||
|
||||
# figure out our current block
|
||||
if host_tasks_to_run:
|
||||
try:
|
||||
lowest_cur_block = min(
|
||||
(self._iterator.get_active_state(s).cur_block
|
||||
for h, (s, t) in host_tasks_to_run
|
||||
if s.run_state != PlayIterator.ITERATING_COMPLETE))
|
||||
except ValueError:
|
||||
lowest_cur_block = None
|
||||
else:
|
||||
lowest_cur_block = None
|
||||
|
||||
# build counts for tasks by run state
|
||||
for (k, v) in host_tasks_to_run:
|
||||
(s, t) = v
|
||||
s = self._iterator.get_active_state(s)
|
||||
if s.cur_block > lowest_cur_block:
|
||||
continue
|
||||
|
||||
# count up tasks based on state, we only care about:
|
||||
# PlayIterator.ITERATING_SETUP
|
||||
# PlayIterator.ITERATING_TASKS
|
||||
# PlayIterator.ITERATING_RESCUE
|
||||
# PlayIterator.ITERATING_ALWAYS
|
||||
if not task_counts.get(s.run_state):
|
||||
task_counts[s.run_state] = 1
|
||||
else:
|
||||
task_counts[s.run_state] += 1
|
||||
|
||||
# Iterate through the different task states we care about
|
||||
# to execute them in a specific order. If there are tasks
|
||||
# in that state, we run all those tasks and then noop the
|
||||
# rest of the hosts with tasks not currently in that state
|
||||
for state_type in [PlayIterator.ITERATING_SETUP,
|
||||
PlayIterator.ITERATING_TASKS,
|
||||
PlayIterator.ITERATING_RESCUE,
|
||||
PlayIterator.ITERATING_ALWAYS]:
|
||||
if state_type in task_counts:
|
||||
return self._advance_hosts(hosts,
|
||||
host_tasks,
|
||||
lowest_cur_block,
|
||||
state_type)
|
||||
|
||||
# all done so move on by returning None for the next task in
|
||||
# the return value.
|
||||
return [(host, None) for host in hosts]
|
||||
|
||||
def _replace_with_noop(self, target):
|
||||
"""Replace task with a noop task"""
|
||||
self._debug('_replace_with_noop...')
|
||||
if self.noop_task is None:
|
||||
raise AnsibleAssertionError('noop_task is None')
|
||||
|
||||
result = []
|
||||
for t in target:
|
||||
if isinstance(t, Task):
|
||||
result.append(self.noop_task)
|
||||
elif isinstance(t, Block):
|
||||
result.append(self._create_noop_block_from(t, t._parent))
|
||||
return result
|
||||
|
||||
def _create_noop_block_from(self, original_block, parent):
|
||||
"""Create a noop block from a block"""
|
||||
self._debug('_create_noop_block_from...')
|
||||
noop_block = Block(parent_block=parent)
|
||||
noop_block.block = self._replace_with_noop(original_block.block)
|
||||
noop_block.always = self._replace_with_noop(original_block.always)
|
||||
noop_block.rescue = self._replace_with_noop(original_block.rescue)
|
||||
return noop_block
|
||||
|
||||
def _prepare_and_create_noop_block_from(self, original_block, parent):
|
||||
"""Create noop block"""
|
||||
self._debug('_prepare_and_create_noop_block_from...')
|
||||
self.noop_task = self._create_noop_task()
|
||||
return self._create_noop_block_from(original_block, parent)
|
||||
|
||||
def _get_action(self, task):
|
||||
"""Get action from task"""
|
||||
# TODO(mwhahaha): move to base task
|
||||
self._debug('_get_action...')
|
||||
try:
|
||||
action = action_loader.get(task.action, class_only=True)
|
||||
except KeyError:
|
||||
action = None
|
||||
return action
|
||||
|
||||
def _send_task_callback(self, task, templar):
|
||||
"""Send a task callback for task start"""
|
||||
# TODO(mwhahaha): do we need the bool? can we move to base class?
|
||||
self._debug('_send_task_callback...')
|
||||
if self._callback_sent:
|
||||
return
|
||||
name = task.name
|
||||
try:
|
||||
task.name = to_text(templar.template(task.name,
|
||||
fail_on_undefined=False),
|
||||
nonstring='empty')
|
||||
except Exception:
|
||||
display.debug('templating failed')
|
||||
self._tqm.send_callback('v2_playbook_on_task_start',
|
||||
task,
|
||||
is_conditional=False)
|
||||
task.name = name
|
||||
self._callback_sent = True
|
||||
|
||||
def _process_host_tasks(self, host, task):
|
||||
"""Process host task and execute"""
|
||||
self._debug('process_host_tasks...')
|
||||
results = []
|
||||
|
||||
if self._tqm._terminated:
|
||||
raise TripleoLinearTerminated()
|
||||
run_once = False
|
||||
|
||||
action = self._get_action(task)
|
||||
|
||||
# Skip already executed roles
|
||||
if task._role and task._role.has_run(host):
|
||||
if (task._role._metadata is None or task._role._metadata
|
||||
and not task._role._metadata.allow_duplicates):
|
||||
raise TripleoLinearNoHostTask()
|
||||
|
||||
if task.action == 'meta':
|
||||
results.extend(self._execute_meta(task,
|
||||
self._play_context,
|
||||
self._iterator,
|
||||
host))
|
||||
if (task.args.get('_raw_params', None) not in ('noop',
|
||||
'reset_connection',
|
||||
'end_host')):
|
||||
run_once = True
|
||||
if (task.any_errors_fatal or run_once and not task.ignore_errors):
|
||||
self._any_errors_fatal = True
|
||||
else:
|
||||
# todo handle steps like in linear
|
||||
# build get_vars call params
|
||||
vars_params = {'play': self._iterator._play,
|
||||
'host': host,
|
||||
'task': task}
|
||||
# if we have >= 2.9 we can use the hosts cache
|
||||
if self._has_hosts_cache:
|
||||
vars_params['_hosts'] = self._hosts_cache
|
||||
if self._has_hosts_cache_all:
|
||||
vars_params['_hosts_all'] = self._hosts_cache_all
|
||||
|
||||
task_vars = self._variable_manager.get_vars(**vars_params)
|
||||
|
||||
self.add_tqm_variables(task_vars, play=self._iterator._play)
|
||||
templar = Templar(loader=self._loader, variables=task_vars)
|
||||
|
||||
run_once = (templar.template(task.run_once) or action
|
||||
and getattr(action, 'BYPASS_HOST_LOOP', False))
|
||||
self._send_task_callback(task, templar)
|
||||
self._blocked_hosts[host.get_name()] = True
|
||||
self._queue_task(host, task, task_vars, self._play_context)
|
||||
del task_vars
|
||||
|
||||
if run_once:
|
||||
raise TripleoLinearRunOnce()
|
||||
|
||||
max_passes = max(1, int(len(self._tqm._workers) * 0.1))
|
||||
results.extend(self._process_pending_results(
|
||||
self._iterator, max_passes=max_passes))
|
||||
return results
|
||||
|
||||
def process_includes(self, host_results, noop=False):
|
||||
"""Process our includes"""
|
||||
# TODO(mwhahaha): move to a base class
|
||||
self._debug('process_includes...')
|
||||
include_files = IncludedFile.process_include_results(
|
||||
host_results,
|
||||
iterator=self._iterator,
|
||||
loader=self._loader,
|
||||
variable_manager=self._variable_manager
|
||||
)
|
||||
# TODO(mwhahaha): fix include_failure uage
|
||||
include_success = True
|
||||
if len(include_files) == 0:
|
||||
self._debug('No include files')
|
||||
return include_success
|
||||
|
||||
all_blocks = dict((host, []) for host in self._hosts_left)
|
||||
for include in include_files:
|
||||
self._debug('Adding include...{}'.format(include))
|
||||
try:
|
||||
if include._is_role:
|
||||
ir = self._copy_included_file(include)
|
||||
new_blocks, handler_blocks = ir.get_block_list(
|
||||
play=self._iterator._play,
|
||||
variable_manager=self._variable_manager,
|
||||
loader=self._loader)
|
||||
else:
|
||||
new_blocks = self._load_included_file(
|
||||
include, iterator=self._iterator)
|
||||
for block in new_blocks:
|
||||
vars_params = {'play': self._iterator._play,
|
||||
'task': block._parent}
|
||||
# ansible <2.9 compatibility
|
||||
if self._has_hosts_cache:
|
||||
vars_params['_hosts'] = self._hosts_cache
|
||||
if self._has_hosts_cache_all:
|
||||
vars_params['_hosts_all'] = self._hosts_cache_all
|
||||
|
||||
task_vars = self._variable_manager.get_vars(**vars_params)
|
||||
final_block = block.filter_tagged_tasks(task_vars)
|
||||
|
||||
# TODO(mwhahaha): noop is used for linear not free
|
||||
if noop:
|
||||
noop_block = self._prepare_and_create_noop_block_from(
|
||||
final_block, block._parent)
|
||||
|
||||
for host in self._hosts_left:
|
||||
if host in include._hosts:
|
||||
all_blocks[host].append(final_block)
|
||||
elif noop:
|
||||
all_blocks[host].append(noop_block)
|
||||
except AnsibleError as e:
|
||||
for host in include._hosts:
|
||||
self._tqm._failed_hosts[host.get_name()] = True
|
||||
self._iterator.mark_host_failed(host)
|
||||
display.error(to_text(e), wrap_text=False)
|
||||
include_success = False
|
||||
continue
|
||||
|
||||
for host in self._hosts_left:
|
||||
self._iterator.add_tasks(host, all_blocks[host])
|
||||
|
||||
return include_success
|
||||
|
||||
def _process_failures(self):
|
||||
"""Handle failures"""
|
||||
self._debug('_process_failures...')
|
||||
non_fail_states = frozenset([self._iterator.ITERATING_RESCUE,
|
||||
self._iterator.ITERATING_ALWAYS])
|
||||
result = self._tqm.RUN_OK
|
||||
for host in self._hosts_left:
|
||||
(s, _) = self._iterator.get_next_task_for_host(host, peek=True)
|
||||
s = self._iterator.get_active_state(s)
|
||||
if ((s.run_state not in non_fail_states)
|
||||
or (s.run_state == self._iterator.ITERATING_RESCUE
|
||||
and s.fail_state & self._iterator.FAILED_RESCUE != 0)):
|
||||
self._tqm._failed_hosts[host.name] = True
|
||||
result |= self._tqm.RUN_FAILED_BREAK_PLAY
|
||||
return result
|
||||
|
||||
def process_work(self):
|
||||
"""Run pending tasks"""
|
||||
self._debug('process_work...')
|
||||
self._callback_sent = False
|
||||
result = self._tqm.RUN_OK
|
||||
|
||||
host_tasks = self._get_next_tasks(self._hosts_left)
|
||||
self._strat_results = []
|
||||
results = []
|
||||
for (host, task) in host_tasks:
|
||||
if not task:
|
||||
continue
|
||||
try:
|
||||
self._has_work = True
|
||||
results.extend(self._process_host_tasks(host, task))
|
||||
except TripleoLinearNoHostTask:
|
||||
continue
|
||||
except (TripleoLinearTerminated, TripleoLinearRunOnce):
|
||||
break
|
||||
if self._pending_results > 0:
|
||||
results.extend(self._wait_on_pending_results(
|
||||
self._iterator))
|
||||
|
||||
self._strat_results.extend(results)
|
||||
self.update_active_connections(results)
|
||||
|
||||
return result
|
||||
|
||||
def run(self, iterator, play_context):
|
||||
"""Run our straregy"""
|
||||
self._debug('run...')
|
||||
self._iterator = iterator
|
||||
self._play_context = play_context
|
||||
self._has_work = True
|
||||
|
||||
result = self._tqm.RUN_OK
|
||||
|
||||
# check for < 2.9 and set vars so we know if we can use hosts cache
|
||||
if getattr(self, '_set_hosts_cache', False):
|
||||
self._set_hosts_cache(self._iterator._play)
|
||||
self._has_hosts_cache = True
|
||||
if getattr(self, '_set_hosts_cache_all', False):
|
||||
self._has_hosts_cache_all = True
|
||||
|
||||
while self._has_work and not self._tqm._terminated:
|
||||
self._has_work = False
|
||||
self._print('play: {}'.format(iterator._play))
|
||||
try:
|
||||
self._hosts_left = self.get_hosts_left(self._iterator)
|
||||
result = self.process_work()
|
||||
|
||||
# NOTE(mwhahaha): process_includes returns a status however
|
||||
# we will pick up on these failures further down because
|
||||
# failed_hosts will be set. We don't need the status
|
||||
# in this strategy so we just ignore it.
|
||||
self.process_includes(self._strat_results, noop=True)
|
||||
|
||||
failed_hosts = []
|
||||
unreachable_hosts = []
|
||||
for res in self._strat_results:
|
||||
if ((res.is_failed() or res._task.action == 'meta')
|
||||
and self._iterator.is_failed(res._host)):
|
||||
failed_hosts.append(res._host.name)
|
||||
elif res.is_unreachable():
|
||||
unreachable_hosts.append(res._host.name)
|
||||
|
||||
# TODO(mwhahaha): handle max_fail_percentage by tripleo role
|
||||
if (self._any_errors_fatal
|
||||
and (len(failed_hosts) > 0
|
||||
or len(unreachable_hosts) > 0)):
|
||||
result = self._process_failures()
|
||||
|
||||
failed_hosts = len(self._tqm._failed_hosts)
|
||||
hosts_left = len(self._hosts_left)
|
||||
if (result != self._tqm.RUN_OK
|
||||
and (failed_hosts >= hosts_left)):
|
||||
self._tqm.send_callback(
|
||||
'v2_playbook_on_no_hosts_remaining')
|
||||
return result
|
||||
except (IOError, EOFError) as e:
|
||||
display.warning("Exception while in task loop: {}".format(e))
|
||||
return self._tqm.RUN_UNKNOWN_ERROR
|
||||
|
||||
self._debug('sleeping... {}'.format(
|
||||
C.DEFAULT_INTERNAL_POLL_INTERVAL)
|
||||
)
|
||||
time.sleep(C.DEFAULT_INTERNAL_POLL_INTERVAL)
|
||||
|
||||
return super(StrategyModule, self).run(iterator, play_context, result)
|
Loading…
Reference in New Issue