WIP - make playbook runs async

- make playbook run return an AnsibleJob object
- rename AnsibleCommand object to AnsibleJob
- provide methods on job to wait,get_status,get_error
- change action to use new async playbook
- fix newly introduced bug in safe_decode
- fix bug in shell where errors were outputted twice
- provide new inventory method to remove temp inventory dir/file
- fix removal of inventory dir/file in log collector
- change debug logging default to False in the callback module

Jira-Issue: OSTACKDEV-20
This commit is contained in:
Steve Noyes 2016-03-16 16:43:40 -04:00
parent dfee41685c
commit 67ef2345d7
8 changed files with 200 additions and 144 deletions

View File

@ -24,7 +24,7 @@ import traceback
from ansible.plugins.callback import CallbackBase
KOLLA_LOG_PATH = '/tmp/ansible'
DEBUG = True
DEBUG = False
PIPE_PREFIX = '.kolla_pipe_'
@ -187,9 +187,7 @@ class CallbackModule(CallbackBase):
def start(self):
if deploy_id:
play_ser = self.serialize()
if DEBUG:
log('(%s) play start [%s]'
% (deploy_id, play_ser))
log('(%s) play start [%s]' % (deploy_id, play_ser))
_send_msg(play_ser)
class Task(object):
@ -214,18 +212,14 @@ class CallbackModule(CallbackBase):
def start(self):
task_ser = self.serialize(ACTION_TASK_START)
if DEBUG:
msg = ('(%s) start task [%s]'
% (deploy_id, task_ser))
log(msg)
msg = ('(%s) start task [%s]' % (deploy_id, task_ser))
log(msg)
_send_msg(task_ser)
def end(self, result):
result_ser = result.serialize()
if DEBUG:
msg = ('(%s) end task [%s]'
% (deploy_id, result_ser))
log(msg)
msg = ('(%s) end task [%s]' % (deploy_id, result_ser))
log(msg)
_send_msg(result_ser)
def convert_to_dictionary(self, action=None):
@ -289,10 +283,8 @@ class CallbackModule(CallbackBase):
def start(self):
include_ser = self.serialize()
if DEBUG:
msg = ('(%s) included file: %s'
% (deploy_id, include_ser))
log(msg)
msg = ('(%s) included file: %s' % (deploy_id, include_ser))
log(msg)
_send_msg(include_ser)
def serialize(self):
@ -332,10 +324,8 @@ class CallbackModule(CallbackBase):
def start(self):
stats_ser = self.serialize()
if DEBUG:
msg = ('(%s) stats: %s'
% (deploy_id, stats_ser))
log(msg)
msg = ('(%s) stats: %s' % (deploy_id, stats_ser))
log(msg)
_send_msg(stats_ser)
def _fix_hosts(self, host, stats):

View File

@ -64,7 +64,8 @@ def destroy_hosts(hostname, destroy_type, verbose_level=1, include_data=False):
if verbose_level <= 1:
playbook.print_output = False
playbook.verbose_level = verbose_level
playbook.run()
job = playbook.run()
_process_job(job, verbose_level)
def deploy(hostnames=[], groupnames=[], servicenames=[],
@ -82,7 +83,8 @@ def deploy(hostnames=[], groupnames=[], servicenames=[],
_run_deploy_rules(playbook)
playbook.run()
job = playbook.run()
_process_job(job, verbose_level)
def precheck(hostname, verbose_level=1):
@ -99,7 +101,8 @@ def precheck(hostname, verbose_level=1):
playbook.extra_vars = 'hosts=' + hostname
playbook.print_output = True
playbook.verbose_level = verbose_level
playbook.run()
job = playbook.run()
_process_job(job, verbose_level)
def upgrade(verbose_level=1):
@ -110,7 +113,20 @@ def upgrade(verbose_level=1):
playbook.extra_vars = 'action=upgrade'
playbook.print_output = True
playbook.verbose_level = verbose_level
playbook.run()
job = playbook.run()
_process_job(job, verbose_level)
def _process_job(job, verbose_level):
job.wait()
status = job.get_status()
if status != 0:
if verbose_level > 2:
LOG.info('\n\n' + 80 * '=')
LOG.info('DEBUG command output:\n%s'
% job.get_command_output())
raise CommandError(u._('Ansible command failed:\n{msg}')
.format(msg=job.get_error_message()))
def _run_deploy_rules(playbook):

View File

@ -21,6 +21,7 @@ import tempfile
import time
from kollacli.common.utils import get_admin_uids
from kollacli.common.inventory import remove_temp_inventory
from kollacli.common.utils import safe_decode
LOG = logging.getLogger(__name__)
@ -37,92 +38,137 @@ ACTION_INCLUDE_FILE = 'includefile'
ACTION_STATS = 'stats'
class AnsibleCommand(object):
class AnsibleJob(object):
"""class for running ansible commands"""
def __init__(self, command, deploy_id, print_output=True):
self.command = command
self.print_output = print_output
self.deploy_id = deploy_id
self.fragment = ''
self.is_first_packet = True
self.fifo_path = os.path.join(tempfile.gettempdir(),
'%s_%s' % (PIPE_PREFIX, self.deploy_id))
def __init__(self, cmd, deploy_id, print_output, inventory_path):
self._command = cmd
self._deploy_id = deploy_id
self._print_output = print_output
self._temp_inv_path = inventory_path
self._fragment = ''
self._is_first_packet = True
self._fifo_path = os.path.join(
tempfile.gettempdir(), '%s_%s' % (PIPE_PREFIX, self._deploy_id))
self._fifo_fd = None
self._process = None
self._errors = []
self._cmd_output = ''
def run(self):
fifo_fd = None
try:
# create and open named pipe, must be owned by kolla group
os.mkfifo(self.fifo_path, 0o660)
os.mkfifo(self._fifo_path, 0o660)
_, grp_id = get_admin_uids()
os.chown(self.fifo_path, os.getuid(), grp_id)
fifo_fd = os.open(self.fifo_path, os.O_RDONLY | os.O_NONBLOCK)
os.chown(self._fifo_path, os.getuid(), grp_id)
self._fifo_fd = os.open(self._fifo_path,
os.O_RDONLY | os.O_NONBLOCK)
process = subprocess.Popen(self.command, # nosec
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
self._process = subprocess.Popen(self._command, # nosec
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
# setup stdout to be read without blocking
flags = fcntl.fcntl(process.stdout, fcntl.F_GETFL)
fcntl.fcntl(process.stdout, fcntl.F_SETFL, (flags | os.O_NONBLOCK))
flags = fcntl.fcntl(self._process.stdout, fcntl.F_GETFL)
fcntl.fcntl(self._process.stdout, fcntl.F_SETFL,
(flags | os.O_NONBLOCK))
except Exception as e:
self._cleanup()
raise e
out = ''
while process.poll() is None:
# process is still running
def wait(self):
"""wait for job to complete
# need to drain stdout so buffer doesn't fill up and hang
# process.
out = self._get_stdout(out, process)
return status of job (see get_status for status values)
"""
while True:
status = self.get_status()
if status is not None:
break
time.sleep(1)
return status
# log info from kolla callback
self._log_callback(fifo_fd)
time.sleep(1)
def get_status(self):
"""get process status
ret_code = process.returncode
# dump any remaining data in the named pipe
self._log_callback(fifo_fd)
finally:
# close the named pipe
if fifo_fd:
os.close(fifo_fd)
if self.fifo_path and os.path.exists(self.fifo_path):
os.remove(self.fifo_path)
return out, ret_code
def _get_stdout(self, out, process):
status:
- None: running
- 0: done, success
- 1: done, error
"""
status = self._process.poll()
self._read_from_callback()
if status is not None:
self._cleanup()
status = 0
if self._process.returncode != 0:
status = 1
try:
data = process.stdout.read()
if data:
out = ''.join([safe_decode(data)])
out = safe_decode(self._process.stdout.read())
if out:
self._cmd_output = ''.join([self._cmd_output, out])
except IOError: # nosec
# error can happen if stdout is empty
pass
return out
return status
def _log_callback(self, fifo_fd):
"""log info from callback in real-time to log"""
def get_error_message(self):
""""get error message"""
msg = ''
for error in self._errors:
msg = ''.join([msg, error, '\n'])
return msg
def get_command_output(self):
"""get command output
get final output text from command execution
"""
return self._cmd_output
def _log_lines(self, lines):
if self._print_output:
for line in lines:
LOG.info(line)
def _cleanup(self):
# delete temp inventory file
remove_temp_inventory(self._temp_inv_path)
# close and delete the named pipe (fifo)
if self._fifo_fd:
try:
os.close(self._fifo_fd)
except OSError: # nosec
# fifo already closed
pass
if self._fifo_path and os.path.exists(self._fifo_path):
os.remove(self._fifo_path)
def _read_from_callback(self):
"""read lines from callback in real-time"""
data = None
try:
data = os.read(fifo_fd, 1000000)
data = os.read(self._fifo_fd, 1000000)
data = safe_decode(data)
except OSError: # nosec
# error can happen if fifo is empty
pass
if data and self.print_output:
if data:
packets = self._deserialize_packets(data)
for packet in packets:
line = self._format_packet(packet, self.is_first_packet)
LOG.info(line)
return
formatted_data = self._format_packet(packet)
lines = formatted_data.split('\n')
self._log_lines(lines)
def _format_packet(self, packet, first_packet_flag):
def _format_packet(self, packet):
action = packet['action']
if action == ACTION_INCLUDE_FILE:
return self._format_include_file(packet)
elif action == ACTION_PLAY_START:
return self._format_play_start(packet, first_packet_flag)
return self._format_play_start(packet)
elif action == ACTION_STATS:
return self._format_stats(packet)
elif action == ACTION_TASK_END:
@ -135,11 +181,11 @@ class AnsibleCommand(object):
def _format_include_file(self, packet):
return 'included: %s' % packet['filename']
def _format_play_start(self, packet, first_packet_flag):
def _format_play_start(self, packet):
msg = '\n' + self._add_filler('PLAY ', LINE_LENGTH, '*')
if first_packet_flag:
if self._is_first_packet:
msg += '\nPlaybook: %s' % packet['playbook']
self.is_first_packet = False
self._is_first_packet = False
return msg
def _format_stats(self, packet):
@ -167,7 +213,14 @@ class AnsibleCommand(object):
status = packet['status']
msg = '%s: [%s]' % (status, host)
if status == 'failed' or status == 'unreachable':
results = json.dumps(packet['results'])
results_dict = packet['results']
taskname = packet['task']['name']
# update saved error messages
self._errors.append(self._format_error(taskname, host,
status, results_dict))
# format log message
results = json.dumps(results_dict)
msg = 'fatal: [%s]: %s! => %s' % (host, status.upper(), results)
return msg
@ -177,6 +230,14 @@ class AnsibleCommand(object):
msg = '\n' + self._add_filler(task_line, LINE_LENGTH, '*')
return msg
def _format_error(self, taskname, host, status, results):
err_msg = ''
if 'msg' in results and results['msg']:
err_msg = results['msg']
msg = ('Host: %s, Task: %s, Status: %s, Message: %s' %
(host, taskname, status, err_msg))
return msg
def _add_filler(self, msg, length, filler):
num_stars = max(length - len(msg), 0)
stars = num_stars * filler
@ -204,12 +265,12 @@ class AnsibleCommand(object):
i += 1
if i == 1:
# first line
line = self.fragment + line
self.fragment = ''
line = self._fragment + line
self._fragment = ''
elif i == num_lines - 1:
# last line
if has_fragment:
self.fragment = line
self._fragment = line
continue
try:
packets.append(json.loads(line))

View File

@ -18,7 +18,7 @@ import traceback
import kollacli.i18n as u
from kollacli.common.ansible.command import AnsibleCommand
from kollacli.common.ansible.job import AnsibleJob
from kollacli.common.utils import get_admin_user
from kollacli.common.utils import get_ansible_command
from kollacli.common.utils import get_kolla_etc
@ -45,33 +45,18 @@ class AnsiblePlaybook(object):
inventory = Inventory.load()
def run(self):
inventory_path = None
try:
inventory_path = self._make_temp_inventory()
cmd = self._get_playbook_cmd(inventory_path)
self._log_ansible_cmd(cmd, inventory_path)
# run the playbook
output, ret_code = AnsibleCommand(cmd,
self.deploy_id,
self.print_output).run()
if ret_code != 0:
if not self.print_output:
# since the user didn't see the output,
# print it now
LOG.error(output)
raise CommandError(u._('Ansible command failed'))
LOG.info(u._('Success'))
except CommandError as e:
raise e
# create and run the job
job = AnsibleJob(cmd, self.deploy_id,
self.print_output, inventory_path)
job.run()
return job
except Exception:
raise Exception(traceback.format_exc())
finally:
self.inventory.remove_json_gen_file(inventory_path)
def _get_playbook_cmd(self, inventory_path):
flag = ''
@ -160,11 +145,11 @@ class AnsiblePlaybook(object):
return ('-e @' + os.path.join(kolla_etc, 'passwords.yml '))
def _log_ansible_cmd(self, cmd, inventory_path):
if self.verbose_level > 1:
if self.verbose_level > 2:
# log the ansible command
LOG.debug('cmd:' + cmd)
if self.verbose_level > 2:
if self.verbose_level > 3:
# log the inventory
dbg_gen = inventory_path
(inv, _) = \

View File

@ -108,6 +108,16 @@ DEFAULT_OVERRIDES = {
PROTECTED_GROUPS = [COMPUTE_GRP_NAME]
def remove_temp_inventory(path):
"""remove temp inventory file and its parent directory"""
if path:
if os.path.exists(path):
os.remove(path)
dirpath = os.path.dirname(path)
if os.path.exists(dirpath):
os.rmdir(dirpath)
class Host(object):
class_version = 1
log = logging.getLogger(__name__)
@ -877,9 +887,4 @@ class Inventory(object):
return json_gen_path
def remove_json_gen_file(self, path):
if path:
if os.path.exists(path):
os.remove(path)
dirpath = os.path.dirname(path)
if os.path.exists(dirpath):
os.rmdir(dirpath)
remove_temp_inventory(path)

View File

@ -267,7 +267,7 @@ def sync_write_file(path, data, mode='w'):
def safe_decode(text):
"""Convert bytes or string to unicode string"""
if text:
if text is not None:
try:
text = text.decode('utf-8')
except AttributeError: # nosec

View File

@ -72,11 +72,6 @@ class KollaCli(App):
self.add_rotational_log()
def clean_up(self, cmd, result, err):
LOG.debug('clean_up %s', cmd.__class__.__name__)
if err:
LOG.debug('ERROR: %s', err)
def add_rotational_log(self):
root_logger = logging.getLogger('')
rotate_handler = logging.handlers.RotatingFileHandler(

View File

@ -23,6 +23,7 @@ from kollacli.common.inventory import Inventory
from kollacli.common import properties
from kollacli.common.utils import get_admin_user
from kollacli.common.utils import get_ansible_command
from kollacli.common.utils import remove_temp_inventory
from kollacli.common.utils import safe_decode
tar_file_descr = None
@ -30,33 +31,36 @@ tar_file_descr = None
def run_ansible_cmd(cmd, host):
# sudo -u kolla ansible ol7-c4 -i inv_path -a "cmd"
inv_path = None
out = None
user = get_admin_user()
inv = Inventory.load()
inv_path = inv.create_json_gen_file()
ansible_verb = get_ansible_command()
ansible_cmd = ('/usr/bin/sudo -u %s %s %s -i %s -a "%s"'
% (user, ansible_verb, host, inv_path, cmd))
try:
(out, err) = subprocess.Popen(ansible_cmd, shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE).communicate()
except Exception as e:
print('%s\nCannot communicate with host: %s, skipping' % (e, host))
finally:
os.remove(inv_path)
user = get_admin_user()
inv = Inventory.load()
inv_path = inv.create_json_gen_file()
ansible_verb = get_ansible_command()
ansible_cmd = ('/usr/bin/sudo -u %s %s %s -i %s -a "%s"'
% (user, ansible_verb, host, inv_path, cmd))
try:
(out, err) = subprocess.Popen(ansible_cmd, shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE).communicate()
except Exception as e:
print('%s\nCannot communicate with host: %s, skipping' % (e, host))
if not out:
print('Host %s is not accessible: %s, skipping' % (host, err))
else:
out = safe_decode(out)
if '>>' not in out:
print('Ansible command: %s' % ansible_cmd)
print('Host: %s. \nInvalid ansible return data: [%s]. skipping'
% (host, out))
out = None
finally:
remove_temp_inventory(inv_path)
if not out:
print('Host %s is not accessible: %s, skipping' % (host, err))
else:
out = safe_decode(out)
if '>>' not in out:
print('Ansible command: %s' % ansible_cmd)
print('Host: %s. \nInvalid ansible return data: [%s]. skipping'
% (host, out))
out = None
return out