Browse Source

Merge "Add tripleo linear strategy" into stable/train

changes/88/739588/2
Zuul 1 month ago
committed by Gerrit Code Review
parent
commit
d10ef634d2
1 changed files with 466 additions and 0 deletions
  1. +466
    -0
      tripleo_ansible/ansible_plugins/strategy/tripleo_linear.py

+ 466
- 0
tripleo_ansible/ansible_plugins/strategy/tripleo_linear.py View File

@@ -0,0 +1,466 @@
# Copyright 2020 Red Hat, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

from __future__ import (absolute_import, division, print_function)
__metaclass__ = type

import time

from ansible import constants as C
from ansible.errors import AnsibleAssertionError
from ansible.errors import AnsibleError
from ansible.executor.play_iterator import PlayIterator
from ansible.module_utils._text import to_text
from ansible.module_utils.six import iteritems
from ansible.playbook.block import Block
from ansible.playbook.included_file import IncludedFile
from ansible.playbook.task import Task
from ansible.plugins.loader import action_loader
from ansible.plugins.strategy import StrategyBase
from ansible.template import Templar
from ansible.utils.display import Display

DOCUMENTATION = '''
strategy: tripleo_linear
short_description: TripleO specific linear strategy
description:
- Based on the 'linear' strategy from Ansible
- Logic broken up to allow for future improvements
version_added: "2.9"
author: Alex Schultz <aschultz@redhat.com>
'''

display = Display()


class TripleoLinearTerminated(Exception):
"""Exception for terminated state"""
pass


class TripleoLinearNoHostTask(Exception):
"""Exception for no host task"""
pass


class TripleoLinearRunOnce(Exception):
"""Exception for run once"""
pass


class StrategyModule(StrategyBase):

def __init__(self, *args, **kwargs):
super(StrategyModule, self).__init__(*args, **kwargs)
self._any_errors_fatal = False
self._callback_sent = False
self._has_work = False
self._host_pinned = True
self._hosts_left = []
self._iterator = None
self._play_context = None
self._strat_results = []
self.noop_task = None
# these were defined in 2.9
self._has_hosts_cache = False
self._has_hosts_cache_all = False

def _print(self, msg, host=None, level=1):
display.verbose(msg, host=host, caplevel=level)

def _debug(self, msg, host=None):
self._print(msg, host, 3)

def _create_noop_task(self):
"""Create noop task"""
self._debug('_create_noop_task...')
noop_task = Task()
noop_task.action = 'meta'
noop_task.args['_raw_params'] = 'noop'
noop_task.set_loader(self._iterator._play._loader)
return noop_task

def _advance_hosts(self, hosts, host_tasks, cur_block, cur_state):
"""Move hosts to next task"""
self._debug('_advance_hosts...')
noop_task = self._create_noop_task()
returns = []
for host in hosts:
host_state_task = host_tasks.get(host.name)
if host_state_task is None:
continue
(s, t) = host_state_task
self._print('task: {}'.format(t))
s = self._iterator.get_active_state(s)
if t is None:
continue
self._print('task.action: {}'.format(t.action))
if s.run_state == cur_state and s.cur_block == cur_block:
_ = self._iterator.get_next_task_for_host(host)
returns.append((host, t))
else:
returns.append((host, noop_task))
return returns

def _get_next_tasks(self, hosts):
"""Get next set of tasks"""
self._debug('_get_next_tasks...')
host_tasks = {}
task_counts = {}

self._debug('populate next tasks for all hosts')
for host in hosts:
host_tasks[host.name] = self._iterator.get_next_task_for_host(
host, peek=True)

self._debug('organize tasks by state')
host_tasks_to_run = [(host, state_task)
for host, state_task in iteritems(host_tasks)
if state_task and state_task[1]]

# figure out our current block
if host_tasks_to_run:
try:
lowest_cur_block = min(
(self._iterator.get_active_state(s).cur_block
for h, (s, t) in host_tasks_to_run
if s.run_state != PlayIterator.ITERATING_COMPLETE))
except ValueError:
lowest_cur_block = None
else:
lowest_cur_block = None

# build counts for tasks by run state
for (k, v) in host_tasks_to_run:
(s, t) = v
s = self._iterator.get_active_state(s)
if s.cur_block > lowest_cur_block:
continue

# count up tasks based on state, we only care about:
# PlayIterator.ITERATING_SETUP
# PlayIterator.ITERATING_TASKS
# PlayIterator.ITERATING_RESCUE
# PlayIterator.ITERATING_ALWAYS
if not task_counts.get(s.run_state):
task_counts[s.run_state] = 1
else:
task_counts[s.run_state] += 1

# Iterate through the different task states we care about
# to execute them in a specific order. If there are tasks
# in that state, we run all those tasks and then noop the
# rest of the hosts with tasks not currently in that state
for state_type in [PlayIterator.ITERATING_SETUP,
PlayIterator.ITERATING_TASKS,
PlayIterator.ITERATING_RESCUE,
PlayIterator.ITERATING_ALWAYS]:
if state_type in task_counts:
return self._advance_hosts(hosts,
host_tasks,
lowest_cur_block,
state_type)

# all done so move on by returning None for the next task in
# the return value.
return [(host, None) for host in hosts]

def _replace_with_noop(self, target):
"""Replace task with a noop task"""
self._debug('_replace_with_noop...')
if self.noop_task is None:
raise AnsibleAssertionError('noop_task is None')

result = []
for t in target:
if isinstance(t, Task):
result.append(self.noop_task)
elif isinstance(t, Block):
result.append(self._create_noop_block_from(t, t._parent))
return result

def _create_noop_block_from(self, original_block, parent):
"""Create a noop block from a block"""
self._debug('_create_noop_block_from...')
noop_block = Block(parent_block=parent)
noop_block.block = self._replace_with_noop(original_block.block)
noop_block.always = self._replace_with_noop(original_block.always)
noop_block.rescue = self._replace_with_noop(original_block.rescue)
return noop_block

def _prepare_and_create_noop_block_from(self, original_block, parent):
"""Create noop block"""
self._debug('_prepare_and_create_noop_block_from...')
self.noop_task = self._create_noop_task()
return self._create_noop_block_from(original_block, parent)

def _get_action(self, task):
"""Get action from task"""
# TODO(mwhahaha): move to base task
self._debug('_get_action...')
try:
action = action_loader.get(task.action, class_only=True)
except KeyError:
action = None
return action

def _send_task_callback(self, task, templar):
"""Send a task callback for task start"""
# TODO(mwhahaha): do we need the bool? can we move to base class?
self._debug('_send_task_callback...')
if self._callback_sent:
return
name = task.name
try:
task.name = to_text(templar.template(task.name,
fail_on_undefined=False),
nonstring='empty')
except Exception:
display.debug('templating failed')
self._tqm.send_callback('v2_playbook_on_task_start',
task,
is_conditional=False)
task.name = name
self._callback_sent = True

def _process_host_tasks(self, host, task):
"""Process host task and execute"""
self._debug('process_host_tasks...')
results = []

if self._tqm._terminated:
raise TripleoLinearTerminated()
run_once = False

action = self._get_action(task)

# Skip already executed roles
if task._role and task._role.has_run(host):
if (task._role._metadata is None or task._role._metadata
and not task._role._metadata.allow_duplicates):
raise TripleoLinearNoHostTask()

if task.action == 'meta':
results.extend(self._execute_meta(task,
self._play_context,
self._iterator,
host))
if (task.args.get('_raw_params', None) not in ('noop',
'reset_connection',
'end_host')):
run_once = True
if (task.any_errors_fatal or run_once and not task.ignore_errors):
self._any_errors_fatal = True
else:
# todo handle steps like in linear
# build get_vars call params
vars_params = {'play': self._iterator._play,
'host': host,
'task': task}
# if we have >= 2.9 we can use the hosts cache
if self._has_hosts_cache:
vars_params['_hosts'] = self._hosts_cache
if self._has_hosts_cache_all:
vars_params['_hosts_all'] = self._hosts_cache_all

task_vars = self._variable_manager.get_vars(**vars_params)

self.add_tqm_variables(task_vars, play=self._iterator._play)
templar = Templar(loader=self._loader, variables=task_vars)

run_once = (templar.template(task.run_once) or action
and getattr(action, 'BYPASS_HOST_LOOP', False))
self._send_task_callback(task, templar)
self._blocked_hosts[host.get_name()] = True
self._queue_task(host, task, task_vars, self._play_context)
del task_vars

if run_once:
raise TripleoLinearRunOnce()

max_passes = max(1, int(len(self._tqm._workers) * 0.1))
results.extend(self._process_pending_results(
self._iterator, max_passes=max_passes))
return results

def process_includes(self, host_results, noop=False):
"""Process our includes"""
# TODO(mwhahaha): move to a base class
self._debug('process_includes...')
include_files = IncludedFile.process_include_results(
host_results,
iterator=self._iterator,
loader=self._loader,
variable_manager=self._variable_manager
)
# TODO(mwhahaha): fix include_failure uage
include_success = True
if len(include_files) == 0:
self._debug('No include files')
return include_success

all_blocks = dict((host, []) for host in self._hosts_left)
for include in include_files:
self._debug('Adding include...{}'.format(include))
try:
if include._is_role:
ir = self._copy_included_file(include)
new_blocks, handler_blocks = ir.get_block_list(
play=self._iterator._play,
variable_manager=self._variable_manager,
loader=self._loader)
else:
new_blocks = self._load_included_file(
include, iterator=self._iterator)
for block in new_blocks:
vars_params = {'play': self._iterator._play,
'task': block._parent}
# ansible <2.9 compatibility
if self._has_hosts_cache:
vars_params['_hosts'] = self._hosts_cache
if self._has_hosts_cache_all:
vars_params['_hosts_all'] = self._hosts_cache_all

task_vars = self._variable_manager.get_vars(**vars_params)
final_block = block.filter_tagged_tasks(task_vars)

# TODO(mwhahaha): noop is used for linear not free
if noop:
noop_block = self._prepare_and_create_noop_block_from(
final_block, block._parent)

for host in self._hosts_left:
if host in include._hosts:
all_blocks[host].append(final_block)
elif noop:
all_blocks[host].append(noop_block)
except AnsibleError as e:
for host in include._hosts:
self._tqm._failed_hosts[host.get_name()] = True
self._iterator.mark_host_failed(host)
display.error(to_text(e), wrap_text=False)
include_success = False
continue

for host in self._hosts_left:
self._iterator.add_tasks(host, all_blocks[host])

return include_success

def _process_failures(self):
"""Handle failures"""
self._debug('_process_failures...')
non_fail_states = frozenset([self._iterator.ITERATING_RESCUE,
self._iterator.ITERATING_ALWAYS])
result = self._tqm.RUN_OK
for host in self._hosts_left:
(s, _) = self._iterator.get_next_task_for_host(host, peek=True)
s = self._iterator.get_active_state(s)
if ((s.run_state not in non_fail_states)
or (s.run_state == self._iterator.ITERATING_RESCUE
and s.fail_state & self._iterator.FAILED_RESCUE != 0)):
self._tqm._failed_hosts[host.name] = True
result |= self._tqm.RUN_FAILED_BREAK_PLAY
return result

def process_work(self):
"""Run pending tasks"""
self._debug('process_work...')
self._callback_sent = False
result = self._tqm.RUN_OK

host_tasks = self._get_next_tasks(self._hosts_left)
self._strat_results = []
results = []
for (host, task) in host_tasks:
if not task:
continue
try:
self._has_work = True
results.extend(self._process_host_tasks(host, task))
except TripleoLinearNoHostTask:
continue
except (TripleoLinearTerminated, TripleoLinearRunOnce):
break
if self._pending_results > 0:
results.extend(self._wait_on_pending_results(
self._iterator))

self._strat_results.extend(results)
self.update_active_connections(results)

return result

def run(self, iterator, play_context):
"""Run our straregy"""
self._debug('run...')
self._iterator = iterator
self._play_context = play_context
self._has_work = True

result = self._tqm.RUN_OK

# check for < 2.9 and set vars so we know if we can use hosts cache
if getattr(self, '_set_hosts_cache', False):
self._set_hosts_cache(self._iterator._play)
self._has_hosts_cache = True
if getattr(self, '_set_hosts_cache_all', False):
self._has_hosts_cache_all = True

while self._has_work and not self._tqm._terminated:
self._has_work = False
self._print('play: {}'.format(iterator._play))
try:
self._hosts_left = self.get_hosts_left(self._iterator)
result = self.process_work()

# NOTE(mwhahaha): process_includes returns a status however
# we will pick up on these failures further down because
# failed_hosts will be set. We don't need the status
# in this strategy so we just ignore it.
self.process_includes(self._strat_results, noop=True)

failed_hosts = []
unreachable_hosts = []
for res in self._strat_results:
if ((res.is_failed() or res._task.action == 'meta')
and self._iterator.is_failed(res._host)):
failed_hosts.append(res._host.name)
elif res.is_unreachable():
unreachable_hosts.append(res._host.name)

# TODO(mwhahaha): handle max_fail_percentage by tripleo role
if (self._any_errors_fatal
and (len(failed_hosts) > 0
or len(unreachable_hosts) > 0)):
result = self._process_failures()

failed_hosts = len(self._tqm._failed_hosts)
hosts_left = len(self._hosts_left)
if (result != self._tqm.RUN_OK
and (failed_hosts >= hosts_left)):
self._tqm.send_callback(
'v2_playbook_on_no_hosts_remaining')
return result
except (IOError, EOFError) as e:
display.warning("Exception while in task loop: {}".format(e))
return self._tqm.RUN_UNKNOWN_ERROR

self._debug('sleeping... {}'.format(
C.DEFAULT_INTERNAL_POLL_INTERVAL)
)
time.sleep(C.DEFAULT_INTERNAL_POLL_INTERVAL)

return super(StrategyModule, self).run(iterator, play_context, result)

Loading…
Cancel
Save