# Copyright 2020 Red Hat, Inc. # All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain # a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. __metaclass__ = type import os import time from ansible import constants as C from ansible.errors import AnsibleError from ansible.module_utils._text import to_text from ansible.template import Templar from ansible.utils.display import Display try: import importlib.util BASESPEC = importlib.util.spec_from_file_location( 'tripleo_base', os.path.join(os.path.dirname(__file__), 'tripleo_base.py') ) BASE = importlib.util.module_from_spec(BASESPEC) BASESPEC.loader.exec_module(BASE) except ImportError: import imp BASE = imp.load_source( 'tripleo_base', os.path.join(os.path.dirname(__file__), 'tripleo_base.py') ) DOCUMENTATION = ''' strategy: tripleo_free short_description: TripleO specific free strategy description: - Based on the 'free' strategy from Ansible - Logic broken up to allow for future improvements/extending - Will fail playbook if any hosts have a failure during the execution and any_errors_fatal is true (free does not do this). - Should be backwards compatible for Ansible 2.8 - Handles run_once to only invoke the task a single time for a given run. It should be noted that run_once does not ensure the task actually is successful but rather that it is only triggered once time for the host groups. If the task fails, the playbook will fail at the end of the play but there is no assurance that the task will actually complete successfully for a given playbook execution. Other tasks will continue on the other hosts. version_added: "2.9" author: Alex Schultz ''' display = Display() class TripleoFreeBreak(Exception): """Exception used to break loops""" pass class TripleoFreeContinue(Exception): """Exception used to continue loops""" pass class StrategyModule(BASE.TripleoBase): # this strategy handles throttling ALLOW_BASE_THROTTLING = False def __init__(self, *args, **kwargs): super(StrategyModule, self).__init__(*args, **kwargs) self._last_host = 0 self._workers_free = 0 self._run_once_tasks = set() def _filter_notified_hosts(self, notified_hosts): """Filter notified hosts""" return [host for host in notified_hosts if host in self._flushed_hosts and self._flushed_hosts[host]] def _increment_last_host(self): """Increment last host pointer If the last host pointer exceeds the number of hosts, we set it back to zero so we can start checking again with the first host """ self._debug('_increment_last_host') self._last_host += 1 self._debug('last_host is {}'.format(self._last_host)) if self._last_host > len(self._hosts_left) - 1: self._debug('resetting last host') self._last_host = 0 def _check_throttle(self, throttle, task): """Check if we should throttle""" if throttle > 0: same_task = 0 for worker in self._workers: if (worker and worker.is_alive() and worker._task._uuid == task._uuid): same_task += 1 if same_task >= throttle: return True return False def _check_failures(self, results): """Check results for failures If any errors are fatal, kill the playbook at the end of execution. All non-failed hosts will continue to run the playbook but it won't move on to the next playbook. This function returns True if there were failures and False if there are no failures. """ if self._any_errors_fatal: for res in results: if ((res.is_failed() or res._task.action == 'meta') and self._iterator.is_failed(res._host)): return True return False def _send_task_callback(self, task, templar): """Send task start callback""" self._debug('_send_task_callback...') name = task.name try: task.name = to_text(templar.template(task.name, fail_on_undefined=False), nonstring='empty') except Exception: self._debug('templating failed') self._tqm.send_callback('v2_playbook_on_task_start', task, is_conditional=False) task.name = name def _advance_host(self, host, task): """Advance the host's task as necessary""" self._debug('_advance_host {}'.format(host), host) host_name = host.get_name() # build get_vars call params vars_params = {'play': self._iterator._play, 'host': host, 'task': task} # If we have >= 2.9 we can use the hosts cache if self._has_hosts_cache: vars_params['_hosts'] = self._hosts_cache if self._has_hosts_cache_all: vars_params['_hosts_all'] = self._hosts_cache_all task_vars = self._variable_manager.get_vars(**vars_params) templar = Templar(loader=self._loader, variables=task_vars) # if task has a throttle attribute, check throttle e.g. ansible > 2.9 throttle = getattr(task, 'throttle', None) if throttle is not None: try: throttle = int(templar.template(throttle)) except Exception as e: raise AnsibleError("Failed to throttle: {}".format(e), obj=task._df, orig_exc=e) if self._check_throttle(throttle, task): raise TripleoFreeBreak() # _blocked_hosts is used in the base strategy to keep track of hosts in # that have tasks in queue self._blocked_hosts[host_name] = True # we already have the task and state from our peek call so we don't # need these again. This call just pops it of the task list for the # host. (_, _) = self._iterator.get_next_task_for_host(host) action = self._get_action(task) try: task.name = to_text(templar.template(task.name, fail_on_undefined=False), nonstring='empty') except Exception: display.warning('templating of task name failed', host=host_name) # run once doesn't work with free because we run all of them run_once = (templar.template(task.run_once) or action and getattr(action, 'BYPASS_HOST_LOOP', False)) if run_once: display.warning('tripleo_free run_once does not ensure a task ' 'is successful for a given playbook but rather ' 'it is only attempted to execute once.') if action and getattr(action, 'BYPASS_HOST_LOOP', False): raise AnsibleError('Cannot bypass host loop with strategy') else: if task._uuid in self._run_once_tasks: # If the task was previously executed, just drop it # by clearing the blocked host and telling the parent # function to continue. self._debug("Skipping run_once task {} on {}".format( task._uuid, host)) del self._blocked_hosts[host_name] raise TripleoFreeContinue() else: self._run_once_tasks.add(task._uuid) # handle role deduplication logic if task._role and task._role.has_run(host): if (task._role._metadata is None or task._role._metadata and not task._role._metadata.allow_duplicates): del self._blocked_hosts[host_name] raise TripleoFreeContinue() if task.action == 'meta': self._execute_meta(task, self._play_context, self._iterator, target_host=host) self._blocked_hosts[host_name] = False else: if not self._step or self._take_step(task, host_name): if task.any_errors_fatal: display.warning('any_errors_fatal only stops any future ' 'tasks running on the host that fails ' 'with the tripleo_free strategy.') self._any_errors_fatal = True self._send_task_callback(task, templar) self._queue_task(host, task, task_vars, self._play_context) self._workers_free -= 1 del task_vars return True def process_work(self): """Run pending tasks""" self._debug('process_work....') result = self._tqm.RUN_OK start_host = self._last_host self._strat_results = [] while True: self._debug('process_work loop') host = self._hosts_left[self._last_host] host_name = host.get_name() self._increment_last_host() (s, t) = self._iterator.get_next_task_for_host(host, peek=True) self._print("host: {}, task: {}".format(host, t)) if host_name not in self._tqm._unreachable_hosts and t: self._debug('{} has work to do, has_work = True'.format( host_name)) self._has_work = True if not self._blocked_hosts.get(host_name, False): try: self._advance_host(host, t) except TripleoFreeBreak: break except TripleoFreeContinue: continue else: self._print('{} still blocked'.format(host_name)) else: self._debug('{} is unreachable or no task'.format(host_name)) # handle host pinned by going back to the start and waiting # for the next free host if (self._host_pinned and self._workers_free == 0 and self._has_work): self._last_host = start_host if self._last_host == start_host: self._debug('We hit the start host, break our loop') break self._debug('pending results....') results = self._process_pending_results(self._iterator) self._debug('results: {}'.format(results)) self._strat_results.extend(results) if self._check_failures(results): # NOTE(mwhahaha): this is the bit of code that the upstream free # does not do result |= self._tqm.RUN_FAILED_BREAK_PLAY self._workers_free += len(results) self._debug('update connections....') self.update_active_connections(results) return result def run(self, iterator, play_context): """Run out strategy""" self._iterator = iterator self._play_context = play_context self._has_work = True self._workers_free = len(self._workers) result = self._tqm.RUN_OK # check for < 2.9 and set vars so we know if we can use hosts cache if getattr(self, '_set_hosts_cache', False): self._set_hosts_cache(self._iterator._play) self._has_hosts_cache = True if getattr(self, '_set_hosts_cache_all', False): self._has_hosts_cache_all = True # while we still have tasks and ansible is still running while self._has_work and not self._tqm._terminated: self._has_work = False self._debug('play: {}'.format(self._iterator._play)) try: # get the hosts with tasks self._hosts_left = self.get_hosts_left(self._iterator) if len(self._hosts_left) == 0: self._tqm.send_callback( 'v2_playbook_on_no_hosts_remaining') # check if we previously had an error... if result == self._tqm.RUN_OK: # by setting this to false, the parent run function # will determine if the run was ok based on a check # of the unreachable/failed hosts. result = False break # do work result |= self.process_work() # handle includes include_result = self.process_includes(self._strat_results) if self._any_errors_fatal and not include_result: # NOTE(mwhahaha): This bit of code fails the playbook if # an include fails. Upstream free does not have this today display.error('An include failure occurred, we will not ' 'continue to process after this play ' 'completes.') result |= self._tqm.RUN_FAILED_BREAK_PLAY except (IOError, EOFError) as e: display.error("Exception while running task loop: " "{}".format(e)) return self._tqm.RUN_UNKNOWN_ERROR self._debug('sleeping... {}'.format( C.DEFAULT_INTERNAL_POLL_INTERVAL) ) time.sleep(C.DEFAULT_INTERNAL_POLL_INTERVAL) # wait for any pending results _ = self._wait_on_pending_results(iterator) # call parent run to handle status return super(StrategyModule, self).run(self._iterator, self._play_context, result)