diff --git a/tripleo_ansible/ansible_plugins/strategy/tripleo_linear.py b/tripleo_ansible/ansible_plugins/strategy/tripleo_linear.py new file mode 100644 index 000000000..1aa613873 --- /dev/null +++ b/tripleo_ansible/ansible_plugins/strategy/tripleo_linear.py @@ -0,0 +1,466 @@ +# Copyright 2020 Red Hat, Inc. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + +import time + +from ansible import constants as C +from ansible.errors import AnsibleAssertionError +from ansible.errors import AnsibleError +from ansible.executor.play_iterator import PlayIterator +from ansible.module_utils._text import to_text +from ansible.module_utils.six import iteritems +from ansible.playbook.block import Block +from ansible.playbook.included_file import IncludedFile +from ansible.playbook.task import Task +from ansible.plugins.loader import action_loader +from ansible.plugins.strategy import StrategyBase +from ansible.template import Templar +from ansible.utils.display import Display + +DOCUMENTATION = ''' + strategy: tripleo_linear + short_description: TripleO specific linear strategy + description: + - Based on the 'linear' strategy from Ansible + - Logic broken up to allow for future improvements + version_added: "2.9" + author: Alex Schultz +''' + +display = Display() + + +class TripleoLinearTerminated(Exception): + """Exception for terminated state""" + pass + + +class TripleoLinearNoHostTask(Exception): + """Exception for no host task""" + pass + + +class TripleoLinearRunOnce(Exception): + """Exception for run once""" + pass + + +class StrategyModule(StrategyBase): + + def __init__(self, *args, **kwargs): + super(StrategyModule, self).__init__(*args, **kwargs) + self._any_errors_fatal = False + self._callback_sent = False + self._has_work = False + self._host_pinned = True + self._hosts_left = [] + self._iterator = None + self._play_context = None + self._strat_results = [] + self.noop_task = None + # these were defined in 2.9 + self._has_hosts_cache = False + self._has_hosts_cache_all = False + + def _print(self, msg, host=None, level=1): + display.verbose(msg, host=host, caplevel=level) + + def _debug(self, msg, host=None): + self._print(msg, host, 3) + + def _create_noop_task(self): + """Create noop task""" + self._debug('_create_noop_task...') + noop_task = Task() + noop_task.action = 'meta' + noop_task.args['_raw_params'] = 'noop' + noop_task.set_loader(self._iterator._play._loader) + return noop_task + + def _advance_hosts(self, hosts, host_tasks, cur_block, cur_state): + """Move hosts to next task""" + self._debug('_advance_hosts...') + noop_task = self._create_noop_task() + returns = [] + for host in hosts: + host_state_task = host_tasks.get(host.name) + if host_state_task is None: + continue + (s, t) = host_state_task + self._print('task: {}'.format(t)) + s = self._iterator.get_active_state(s) + if t is None: + continue + self._print('task.action: {}'.format(t.action)) + if s.run_state == cur_state and s.cur_block == cur_block: + _ = self._iterator.get_next_task_for_host(host) + returns.append((host, t)) + else: + returns.append((host, noop_task)) + return returns + + def _get_next_tasks(self, hosts): + """Get next set of tasks""" + self._debug('_get_next_tasks...') + host_tasks = {} + task_counts = {} + + self._debug('populate next tasks for all hosts') + for host in hosts: + host_tasks[host.name] = self._iterator.get_next_task_for_host( + host, peek=True) + + self._debug('organize tasks by state') + host_tasks_to_run = [(host, state_task) + for host, state_task in iteritems(host_tasks) + if state_task and state_task[1]] + + # figure out our current block + if host_tasks_to_run: + try: + lowest_cur_block = min( + (self._iterator.get_active_state(s).cur_block + for h, (s, t) in host_tasks_to_run + if s.run_state != PlayIterator.ITERATING_COMPLETE)) + except ValueError: + lowest_cur_block = None + else: + lowest_cur_block = None + + # build counts for tasks by run state + for (k, v) in host_tasks_to_run: + (s, t) = v + s = self._iterator.get_active_state(s) + if s.cur_block > lowest_cur_block: + continue + + # count up tasks based on state, we only care about: + # PlayIterator.ITERATING_SETUP + # PlayIterator.ITERATING_TASKS + # PlayIterator.ITERATING_RESCUE + # PlayIterator.ITERATING_ALWAYS + if not task_counts.get(s.run_state): + task_counts[s.run_state] = 1 + else: + task_counts[s.run_state] += 1 + + # Iterate through the different task states we care about + # to execute them in a specific order. If there are tasks + # in that state, we run all those tasks and then noop the + # rest of the hosts with tasks not currently in that state + for state_type in [PlayIterator.ITERATING_SETUP, + PlayIterator.ITERATING_TASKS, + PlayIterator.ITERATING_RESCUE, + PlayIterator.ITERATING_ALWAYS]: + if state_type in task_counts: + return self._advance_hosts(hosts, + host_tasks, + lowest_cur_block, + state_type) + + # all done so move on by returning None for the next task in + # the return value. + return [(host, None) for host in hosts] + + def _replace_with_noop(self, target): + """Replace task with a noop task""" + self._debug('_replace_with_noop...') + if self.noop_task is None: + raise AnsibleAssertionError('noop_task is None') + + result = [] + for t in target: + if isinstance(t, Task): + result.append(self.noop_task) + elif isinstance(t, Block): + result.append(self._create_noop_block_from(t, t._parent)) + return result + + def _create_noop_block_from(self, original_block, parent): + """Create a noop block from a block""" + self._debug('_create_noop_block_from...') + noop_block = Block(parent_block=parent) + noop_block.block = self._replace_with_noop(original_block.block) + noop_block.always = self._replace_with_noop(original_block.always) + noop_block.rescue = self._replace_with_noop(original_block.rescue) + return noop_block + + def _prepare_and_create_noop_block_from(self, original_block, parent): + """Create noop block""" + self._debug('_prepare_and_create_noop_block_from...') + self.noop_task = self._create_noop_task() + return self._create_noop_block_from(original_block, parent) + + def _get_action(self, task): + """Get action from task""" + # TODO(mwhahaha): move to base task + self._debug('_get_action...') + try: + action = action_loader.get(task.action, class_only=True) + except KeyError: + action = None + return action + + def _send_task_callback(self, task, templar): + """Send a task callback for task start""" + # TODO(mwhahaha): do we need the bool? can we move to base class? + self._debug('_send_task_callback...') + if self._callback_sent: + return + name = task.name + try: + task.name = to_text(templar.template(task.name, + fail_on_undefined=False), + nonstring='empty') + except Exception: + display.debug('templating failed') + self._tqm.send_callback('v2_playbook_on_task_start', + task, + is_conditional=False) + task.name = name + self._callback_sent = True + + def _process_host_tasks(self, host, task): + """Process host task and execute""" + self._debug('process_host_tasks...') + results = [] + + if self._tqm._terminated: + raise TripleoLinearTerminated() + run_once = False + + action = self._get_action(task) + + # Skip already executed roles + if task._role and task._role.has_run(host): + if (task._role._metadata is None or task._role._metadata + and not task._role._metadata.allow_duplicates): + raise TripleoLinearNoHostTask() + + if task.action == 'meta': + results.extend(self._execute_meta(task, + self._play_context, + self._iterator, + host)) + if (task.args.get('_raw_params', None) not in ('noop', + 'reset_connection', + 'end_host')): + run_once = True + if (task.any_errors_fatal or run_once and not task.ignore_errors): + self._any_errors_fatal = True + else: + # todo handle steps like in linear + # build get_vars call params + vars_params = {'play': self._iterator._play, + 'host': host, + 'task': task} + # if we have >= 2.9 we can use the hosts cache + if self._has_hosts_cache: + vars_params['_hosts'] = self._hosts_cache + if self._has_hosts_cache_all: + vars_params['_hosts_all'] = self._hosts_cache_all + + task_vars = self._variable_manager.get_vars(**vars_params) + + self.add_tqm_variables(task_vars, play=self._iterator._play) + templar = Templar(loader=self._loader, variables=task_vars) + + run_once = (templar.template(task.run_once) or action + and getattr(action, 'BYPASS_HOST_LOOP', False)) + self._send_task_callback(task, templar) + self._blocked_hosts[host.get_name()] = True + self._queue_task(host, task, task_vars, self._play_context) + del task_vars + + if run_once: + raise TripleoLinearRunOnce() + + max_passes = max(1, int(len(self._tqm._workers) * 0.1)) + results.extend(self._process_pending_results( + self._iterator, max_passes=max_passes)) + return results + + def process_includes(self, host_results, noop=False): + """Process our includes""" + # TODO(mwhahaha): move to a base class + self._debug('process_includes...') + include_files = IncludedFile.process_include_results( + host_results, + iterator=self._iterator, + loader=self._loader, + variable_manager=self._variable_manager + ) + # TODO(mwhahaha): fix include_failure uage + include_success = True + if len(include_files) == 0: + self._debug('No include files') + return include_success + + all_blocks = dict((host, []) for host in self._hosts_left) + for include in include_files: + self._debug('Adding include...{}'.format(include)) + try: + if include._is_role: + ir = self._copy_included_file(include) + new_blocks, handler_blocks = ir.get_block_list( + play=self._iterator._play, + variable_manager=self._variable_manager, + loader=self._loader) + else: + new_blocks = self._load_included_file( + include, iterator=self._iterator) + for block in new_blocks: + vars_params = {'play': self._iterator._play, + 'task': block._parent} + # ansible <2.9 compatibility + if self._has_hosts_cache: + vars_params['_hosts'] = self._hosts_cache + if self._has_hosts_cache_all: + vars_params['_hosts_all'] = self._hosts_cache_all + + task_vars = self._variable_manager.get_vars(**vars_params) + final_block = block.filter_tagged_tasks(task_vars) + + # TODO(mwhahaha): noop is used for linear not free + if noop: + noop_block = self._prepare_and_create_noop_block_from( + final_block, block._parent) + + for host in self._hosts_left: + if host in include._hosts: + all_blocks[host].append(final_block) + elif noop: + all_blocks[host].append(noop_block) + except AnsibleError as e: + for host in include._hosts: + self._tqm._failed_hosts[host.get_name()] = True + self._iterator.mark_host_failed(host) + display.error(to_text(e), wrap_text=False) + include_success = False + continue + + for host in self._hosts_left: + self._iterator.add_tasks(host, all_blocks[host]) + + return include_success + + def _process_failures(self): + """Handle failures""" + self._debug('_process_failures...') + non_fail_states = frozenset([self._iterator.ITERATING_RESCUE, + self._iterator.ITERATING_ALWAYS]) + result = self._tqm.RUN_OK + for host in self._hosts_left: + (s, _) = self._iterator.get_next_task_for_host(host, peek=True) + s = self._iterator.get_active_state(s) + if ((s.run_state not in non_fail_states) + or (s.run_state == self._iterator.ITERATING_RESCUE + and s.fail_state & self._iterator.FAILED_RESCUE != 0)): + self._tqm._failed_hosts[host.name] = True + result |= self._tqm.RUN_FAILED_BREAK_PLAY + return result + + def process_work(self): + """Run pending tasks""" + self._debug('process_work...') + self._callback_sent = False + result = self._tqm.RUN_OK + + host_tasks = self._get_next_tasks(self._hosts_left) + self._strat_results = [] + results = [] + for (host, task) in host_tasks: + if not task: + continue + try: + self._has_work = True + results.extend(self._process_host_tasks(host, task)) + except TripleoLinearNoHostTask: + continue + except (TripleoLinearTerminated, TripleoLinearRunOnce): + break + if self._pending_results > 0: + results.extend(self._wait_on_pending_results( + self._iterator)) + + self._strat_results.extend(results) + self.update_active_connections(results) + + return result + + def run(self, iterator, play_context): + """Run our straregy""" + self._debug('run...') + self._iterator = iterator + self._play_context = play_context + self._has_work = True + + result = self._tqm.RUN_OK + + # check for < 2.9 and set vars so we know if we can use hosts cache + if getattr(self, '_set_hosts_cache', False): + self._set_hosts_cache(self._iterator._play) + self._has_hosts_cache = True + if getattr(self, '_set_hosts_cache_all', False): + self._has_hosts_cache_all = True + + while self._has_work and not self._tqm._terminated: + self._has_work = False + self._print('play: {}'.format(iterator._play)) + try: + self._hosts_left = self.get_hosts_left(self._iterator) + result = self.process_work() + + # NOTE(mwhahaha): process_includes returns a status however + # we will pick up on these failures further down because + # failed_hosts will be set. We don't need the status + # in this strategy so we just ignore it. + self.process_includes(self._strat_results, noop=True) + + failed_hosts = [] + unreachable_hosts = [] + for res in self._strat_results: + if ((res.is_failed() or res._task.action == 'meta') + and self._iterator.is_failed(res._host)): + failed_hosts.append(res._host.name) + elif res.is_unreachable(): + unreachable_hosts.append(res._host.name) + + # TODO(mwhahaha): handle max_fail_percentage by tripleo role + if (self._any_errors_fatal + and (len(failed_hosts) > 0 + or len(unreachable_hosts) > 0)): + result = self._process_failures() + + failed_hosts = len(self._tqm._failed_hosts) + hosts_left = len(self._hosts_left) + if (result != self._tqm.RUN_OK + and (failed_hosts >= hosts_left)): + self._tqm.send_callback( + 'v2_playbook_on_no_hosts_remaining') + return result + except (IOError, EOFError) as e: + display.warning("Exception while in task loop: {}".format(e)) + return self._tqm.RUN_UNKNOWN_ERROR + + self._debug('sleeping... {}'.format( + C.DEFAULT_INTERNAL_POLL_INTERVAL) + ) + time.sleep(C.DEFAULT_INTERNAL_POLL_INTERVAL) + + return super(StrategyModule, self).run(iterator, play_context, result)