Browse Source

Implement a tripleo free strategy

This strategy is to be used for the execution of deployment steps so
that tasks run on all hosts (up to # of workers) freely without locking
at each task.  This is closer to what we had when Heat drove the
deployments as each of the software deployments would execute
simultaneously on all nodes at the same time.

This strategy does two important things for us:

  1) Hosts execute their tasks freely with no dependencies on the other
     hosts that are running the same tasks
  2) Failures on any given host will not stop future tasks from
     executing on other hosts, but will cause the overall playbook
     execution to stop at the end of the "PLAY".

These specific things are how the previous deployment executed when Heat
was the driver of deployment actions. The nodes would fetch their list
of actions, execute and report back to status. Heat would fail the
deployment if any of the hosts failed to execute their work however a
failure on one host did not impact the other hosts.  The TripleO
deployment framework already has the step concept in itself to allow
for the individual steps (1 through 5) to execute in a parallael on all
hosts at the same time. At the end of the "PLAY", ansible waits for all
nodes to complete before proceeding on to the next "PLAY". So Step 1 is
a "PLAY" followed by Step 2. If a failure occurs during Step 1 on any
host, we would not proceed to the Step 2 play.

Change-Id: Id921ed48a948211b9a90def4a4667d015ec11d2f
Alex Schultz 2 months ago
1 changed files with 411 additions and 0 deletions
  1. +411

+ 411
- 0
tripleo_ansible/ansible_plugins/strategy/ View File

@@ -0,0 +1,411 @@
# Copyright 2020 Red Hat, Inc.
# All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

from __future__ import (absolute_import, division, print_function)
__metaclass__ = type

import time

from ansible import constants as C
from ansible.errors import AnsibleError
from ansible.module_utils._text import to_text
from ansible.playbook.included_file import IncludedFile
from ansible.plugins.loader import action_loader
from ansible.plugins.strategy import StrategyBase
from ansible.template import Templar
from ansible.utils.display import Display

strategy: tripleo_free
short_description: TripleO specific free strategy
- Based on the 'free' strategy from Ansible
- Logic broken up to allow for future improvements/extending
- Will fail playbook if any hosts have a failure during the
execution and any_errors_fatal is true (free does not do this).
- Should be backwards compatible for Ansible 2.8
version_added: "2.9"
author: Alex Schultz <>

display = Display()

class TripleoFreeBreak(Exception):
"""Exception used to break loops"""

class TripleoFreeContinue(Exception):
"""Exception used to continue loops"""

class StrategyModule(StrategyBase):

# this strategy handles throttling

def __init__(self, *args, **kwargs):
super(StrategyModule, self).__init__(*args, **kwargs)
self._any_errors_fatal = False
self._callback_sent = False
self._has_work = False
self._host_pinned = False
self._hosts_left = []
self._iterator = None
self._last_host = 0
self._play_context = None
self._strat_results = []
self._workers_free = 0
# these were defined in 2.9
self._has_hosts_cache = False
self._has_hosts_cache_all = False

def _print(self, msg, host=None, level=1):
display.verbose(msg, host=host, caplevel=level)

def _debug(self, msg, host=None):
self._print(msg, host, 3)

def _filter_notified_hosts(self, notified_hosts):
"""Filter notified hosts"""
return [host for host in notified_hosts
if host in self._flushed_hosts and self._flushed_hosts[host]]

def _increment_last_host(self):
"""Increment last host pointer

If the last host pointer exceeds the number of hosts, we set it back to
zero so we can start checking again with the first host
self._last_host += 1
self._debug('last_host is {}'.format(self._last_host))
if self._last_host > len(self._hosts_left) - 1:
self._debug('resetting last host')
self._last_host = 0

def _check_throttle(self, throttle, task):
"""Check if we should throttle"""
if throttle > 0:
same_task = 0
for worker in self._workers:
if (worker and worker.is_alive()
and worker._task._uuid == task._uuid):
same_task += 1
if same_task >= throttle:
return True
return False

def _check_failures(self, results):
"""Check results for failures

If any errors are fatal, kill the playbook at the end of
execution. All non-failed hosts will continue to run the
playbook but it won't move on to the next playbook. This
function returns True if there were failures and False if
there are no failures.
if self._any_errors_fatal:
for res in results:
if ((res.is_failed() or res._task.action == 'meta')
and self._iterator.is_failed(res._host)):
return True
return False

def _get_action(self, task):
"""Get action based on task"""
action = action_loader.get(task.action, class_only=True)
except KeyError:
action = None
return action

def _send_task_callback(self, task, templar):
"""Send task start callback"""
name =
try: = to_text(templar.template(,
except Exception:
self._debug('templating failed')
is_conditional=False) = name

def _advance_host(self, host, task):
"""Advance the host's task as necessary"""
self._debug('_advance_host {}'.format(host), host)
host_name = host.get_name()

# build get_vars call params
vars_params = {'play': self._iterator._play,
'host': host,
'task': task}
# If we have >= 2.9 we can use the hosts cache
if self._has_hosts_cache:
vars_params['_hosts'] = self._hosts_cache
if self._has_hosts_cache_all:
vars_params['_hosts_all'] = self._hosts_cache_all

task_vars = self._variable_manager.get_vars(**vars_params)
templar = Templar(loader=self._loader, variables=task_vars)

throttle = int(templar.template(task.throttle))
except Exception as e:
raise AnsibleError("Failed to throttle: {}".format(e),
if self._check_throttle(throttle, task):
raise TripleoFreeBreak()

# _blocked_hosts is used in the base strategy to keep track of hosts in
# that have tasks in queue
self._blocked_hosts[host_name] = True

# we already have the task and state from our peek call so we don't
# need these again. This call just pops it of the task list for the
# host.
(_, _) = self._iterator.get_next_task_for_host(host)
action = self._get_action(task)

try: = to_text(templar.template(,
except Exception:
display.warning('templating of task name failed', host=host_name)

# run once doesn't work with free because we run all of them
run_once = (templar.template(task.run_once) or action
and getattr(action, 'BYPASS_HOST_LOOP', False))

# TODO(mwhahaha): make it work by caching it across all hosts
if run_once:
if action and getattr(action, 'BYPASS_HOST_LOOP', False):
raise AnsibleError('Cannot bypass host loop with strategy')
display.warning("Using run_once does not work with the"
"tripleo_free strategy")

# handle role deduplication logic
if task._role and task._role.has_run(host):
if (task._role._metadata is None or task._role._metadata
and not task._role._metadata.allow_duplicates):
del self._blocked_hosts[host_name]
raise TripleoFreeContinue()

if task.action == 'meta':
self._execute_meta(task, self._play_context, self._iterator,
self._blocked_hosts[host_name] = False
if not self._step or self._take_step(task, host_name):
if task.any_errors_fatal:
display.warning('any_errors_fatal only stops any future '
'tasks running on the host that fails '
'with the tripleo_free strategy.')
self._any_errors_fatal = True
self._send_task_callback(task, templar)
self._queue_task(host, task, task_vars, self._play_context)
self._workers_free -= 1
del task_vars
return True

def process_work(self):
"""Run pending tasks"""
result = self._tqm.RUN_OK
start_host = self._last_host
self._strat_results = []
while True:
self._debug('process_work loop')
host = self._hosts_left[self._last_host]
host_name = host.get_name()


(s, t) = self._iterator.get_next_task_for_host(host, peek=True)
self._print("host: {}, task: {}".format(host, t))

if host_name not in self._tqm._unreachable_hosts and t:
self._debug('{} has work to do, has_work = True'.format(
self._has_work = True
if not self._blocked_hosts.get(host_name, False):
self._advance_host(host, t)
except TripleoFreeBreak:
except TripleoFreeContinue:
self._print('{} still blocked'.format(host_name))
self._debug('{} is unreachable or no task'.format(host_name))

# handle host pinned by going back to the start and waiting
# for the next free host
if (self._host_pinned and self._workers_free == 0
and self._has_work):
self._last_host = start_host

if self._last_host == start_host:
self._debug('We hit the start host, break our loop')

self._debug('pending results....')
results = self._process_pending_results(self._iterator)
self._debug('results: {}'.format(results))

if self._check_failures(results):
# NOTE(mwhahaha): this is the bit of code that the upstream free
# does not do
result |= self._tqm.RUN_FAILED_BREAK_PLAY

self._workers_free += len(results)
self._debug('update connections....')

return result

def process_includes(self, host_results, noop=False):
"""Handle includes

This function processes includes and adds them tasks to the hosts.
It will return False if there was a failure during the include
include_files = IncludedFile.process_include_results(

include_success = True
if len(include_files) == 0:
self._debug('No include files')
return include_success

all_blocks = dict((host, []) for host in self._hosts_left)
for include in include_files:
self._debug('Adding include...{}'.format(include))
if include._is_role:
ir = self._copy_included_file(include)
new_blocks, handler_blocks = ir.get_block_list(
new_blocks = self._load_included_file(
include, iterator=self._iterator)
for block in new_blocks:
vars_params = {'play': self._iterator._play,
'task': block._parent}
# ansible <2.9 compatibility
if self._has_hosts_cache:
vars_params['_hosts'] = self._hosts_cache
if self._has_hosts_cache_all:
vars_params['_hosts_all'] = self._hosts_cache_all

task_vars = self._variable_manager.get_vars(**vars_params)
final_block = block.filter_tagged_tasks(task_vars)

for host in self._hosts_left:
if host in include._hosts:
except AnsibleError as e:
for host in include._hosts:
self._tqm._failed_hosts[host.get_name()] = True
display.error(to_text(e), wrap_text=False)
include_success = False

for host in self._hosts_left:
self._iterator.add_tasks(host, all_blocks[host])

return include_success

def run(self, iterator, play_context):
"""Run out strategy"""
self._iterator = iterator
self._play_context = play_context
self._has_work = True
self._workers_free = len(self._workers)

result = self._tqm.RUN_OK

# check for < 2.9 and set vars so we know if we can use hosts cache
if getattr(self, '_set_hosts_cache', False):
self._has_hosts_cache = True
if getattr(self, '_set_hosts_cache_all', False):
self._has_hosts_cache_all = True

# while we still have tasks and ansible is still running
while self._has_work and not self._tqm._terminated:
self._has_work = False
self._debug('play: {}'.format(self._iterator._play))
# get the hosts with tasks
self._hosts_left = self.get_hosts_left(self._iterator)
if len(self._hosts_left) == 0:
# check if we previously had an error...
if result == self._tqm.RUN_OK:
# by setting this to false, the parent run function
# will determine if the run was ok based on a check
# of the unreachable/failed hosts.
result = False
# do work
result |= self.process_work()
# handle includes
include_result = self.process_includes(self._strat_results)
if self._any_errors_fatal and not include_result:
# NOTE(mwhahaha): This bit of code fails the playbook if
# an include fails. Upstream free does not have this today
display.error('An include failure occurred, we will not '
'continue to process after this play '
result |= self._tqm.RUN_FAILED_BREAK_PLAY
except (IOError, EOFError) as e:
display.error("Exception while running task loop: "
return self._tqm.RUN_UNKNOWN_ERROR

self._debug('sleeping... {}'.format(

# wait for any pending results
_ = self._wait_on_pending_results(iterator)

# call parent run to handle status
return super(StrategyModule, self).run(self._iterator,