Use fuel-devops subprocess_runner to run scripts on the host
- use Subprocess.check_call() from fuel-devops to get logging of stdout and 'result' object with exit code and stdout/stderr - add a temporary workaround for Subprocess to get nonblocking readline() from stdout/stderr - log results from install_k8s and install_ccp Change-Id: I12219120adf040b027e4317dfb0fdd41f3477153
This commit is contained in:
parent
537236d0d7
commit
8d322cd78d
|
@ -0,0 +1,154 @@
|
|||
# Copyright 2016 Mirantis, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from __future__ import print_function
|
||||
from __future__ import unicode_literals
|
||||
|
||||
import os
|
||||
import fcntl
|
||||
from subprocess import PIPE
|
||||
from subprocess import Popen
|
||||
from threading import Event
|
||||
from time import sleep
|
||||
|
||||
from devops.error import TimeoutError
|
||||
from devops.helpers.decorators import threaded
|
||||
from devops.helpers.exec_result import ExecResult
|
||||
from devops.helpers import subprocess_runner
|
||||
from devops import logger
|
||||
|
||||
|
||||
class Subprocess(subprocess_runner.Subprocess):
|
||||
"""Extension of fuel-devops Subprocess class
|
||||
|
||||
This is temporary solution that provides non-blocking read from
|
||||
stdout and stderr and log the data in real time.
|
||||
Should be removed when this code will be merged to fuel-devops.
|
||||
"""
|
||||
|
||||
@classmethod
|
||||
def __exec_command(cls, command, cwd=None, env=None, timeout=None,
|
||||
verbose=True):
|
||||
"""Command executor helper
|
||||
|
||||
:type command: str
|
||||
:type cwd: str
|
||||
:type env: dict
|
||||
:type timeout: int
|
||||
:rtype: ExecResult
|
||||
"""
|
||||
|
||||
def readlines(stream, verbose, lines_count=100):
|
||||
"""Nonblocking read and log lines from stream"""
|
||||
if lines_count < 1:
|
||||
lines_count = 1
|
||||
result = []
|
||||
try:
|
||||
for _ in range(1, lines_count):
|
||||
line = stream.readline()
|
||||
if line:
|
||||
result.append(line)
|
||||
if verbose:
|
||||
print(line.rstrip())
|
||||
except IOError:
|
||||
pass
|
||||
return result
|
||||
|
||||
@threaded(started=True)
|
||||
def poll_pipes(proc, result, stop):
|
||||
"""Polling task for FIFO buffers
|
||||
|
||||
:type proc: Popen
|
||||
:type result: ExecResult
|
||||
:type stop: Event
|
||||
"""
|
||||
# Get file descriptors for stdout and stderr streams
|
||||
fd_stdout = proc.stdout.fileno()
|
||||
fd_stderr = proc.stderr.fileno()
|
||||
# Get flags of stdout and stderr streams
|
||||
fl_stdout = fcntl.fcntl(fd_stdout, fcntl.F_GETFL)
|
||||
fl_stderr = fcntl.fcntl(fd_stderr, fcntl.F_GETFL)
|
||||
# Set nonblock mode for stdout and stderr streams
|
||||
fcntl.fcntl(fd_stdout, fcntl.F_SETFL, fl_stdout | os.O_NONBLOCK)
|
||||
fcntl.fcntl(fd_stderr, fcntl.F_SETFL, fl_stderr | os.O_NONBLOCK)
|
||||
|
||||
while not stop.isSet():
|
||||
sleep(0.1)
|
||||
|
||||
stdout_diff = readlines(proc.stdout, verbose)
|
||||
stderr_diff = readlines(proc.stderr, verbose)
|
||||
result.stdout += stdout_diff
|
||||
result.stderr += stderr_diff
|
||||
|
||||
proc.poll()
|
||||
|
||||
if proc.returncode is not None:
|
||||
result.exit_code = proc.returncode
|
||||
stdout_diff = readlines(proc.stdout, verbose)
|
||||
stderr_diff = readlines(proc.stderr, verbose)
|
||||
result.stdout += stdout_diff
|
||||
result.stderr += stderr_diff
|
||||
stop.set()
|
||||
|
||||
# 1 Command per run
|
||||
with cls.__lock:
|
||||
result = ExecResult(cmd=command)
|
||||
stop_event = Event()
|
||||
|
||||
logger.debug("Run command on the host: '{0}'".format(command))
|
||||
|
||||
# Run
|
||||
process = Popen(
|
||||
args=[command],
|
||||
stdin=PIPE, stdout=PIPE, stderr=PIPE,
|
||||
shell=True, cwd=cwd, env=env,
|
||||
universal_newlines=False)
|
||||
# Poll output
|
||||
poll_pipes(process, result, stop_event)
|
||||
# wait for process close
|
||||
stop_event.wait(timeout)
|
||||
|
||||
output_tmpl = (
|
||||
'\tSTDOUT:\n'
|
||||
'{0}\n'
|
||||
'\tSTDERR"\n'
|
||||
'{1}\n')
|
||||
logger.debug(output_tmpl.format(result.stdout, result.stderr))
|
||||
|
||||
# Process closed?
|
||||
if stop_event.isSet():
|
||||
stop_event.clear()
|
||||
return result
|
||||
|
||||
# Kill not ended process and wait for close
|
||||
try:
|
||||
process.kill() # kill -9
|
||||
stop_event.wait(5)
|
||||
|
||||
except OSError:
|
||||
# Nothing to kill
|
||||
logger.warning(
|
||||
"{} has been completed just after timeout: "
|
||||
"please validate timeout.".format(command))
|
||||
|
||||
no_ec_msg = (
|
||||
"No return code received while waiting for the command "
|
||||
"'{0}' during {1}s !\n".format(command, timeout))
|
||||
logger.debug(no_ec_msg)
|
||||
|
||||
raise TimeoutError(
|
||||
no_ec_msg + output_tmpl.format(
|
||||
result.stdout_brief,
|
||||
result.stderr_brief
|
||||
))
|
|
@ -39,8 +39,10 @@ class CCPManager(object):
|
|||
ccp_repo_url = settings.CCP_REPO
|
||||
cmd = ('pip install --upgrade git+{}'.format(ccp_repo_url))
|
||||
with remote.get_sudo(remote):
|
||||
# TODO(ddmitriev): log output
|
||||
remote.check_call(cmd, verbose=True)
|
||||
LOG.debug("*** Run cmd={0}".format(cmd))
|
||||
result = remote.check_call(cmd, verbose=True)
|
||||
LOG.debug("*** Result STDOUT:\n{0}".format(result.stdout_str))
|
||||
LOG.debug("*** Result STDERR:\n{0}".format(result.stderr_str))
|
||||
|
||||
@classmethod
|
||||
def build_command(cls, *args, **kwargs):
|
||||
|
|
|
@ -12,11 +12,12 @@
|
|||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
import copy
|
||||
import subprocess
|
||||
import os
|
||||
|
||||
import yaml
|
||||
|
||||
from fuel_ccp_tests.helpers import exceptions
|
||||
from fuel_ccp_tests.helpers import _subprocess_runner
|
||||
from fuel_ccp_tests import logger
|
||||
from fuel_ccp_tests import settings
|
||||
from fuel_ccp_tests.managers.k8s import cluster
|
||||
|
@ -36,7 +37,8 @@ class K8SManager(object):
|
|||
super(K8SManager, self).__init__()
|
||||
|
||||
def install_k8s(self, custom_yaml=None, env_var=None,
|
||||
k8s_admin_ip=None, k8s_slave_ips=None):
|
||||
k8s_admin_ip=None, k8s_slave_ips=None,
|
||||
expected_ec=None, verbose=True):
|
||||
"""Action to deploy k8s by fuel-ccp-installer script
|
||||
|
||||
Additional steps:
|
||||
|
@ -72,7 +74,29 @@ class K8SManager(object):
|
|||
if env_var:
|
||||
environment_variables.update(env_var)
|
||||
current_env.update(dict=environment_variables)
|
||||
self.deploy_k8s(environ=current_env)
|
||||
|
||||
# TODO(ddmitriev): replace with check_call(...,env=current_env)
|
||||
# when migrate to fuel-devops-3.0.2
|
||||
environ_str = ';'.join([
|
||||
"export {0}='{1}'".format(key, value)
|
||||
for key, value in current_env.items()])
|
||||
cmd = environ_str + ' ; ' + settings.DEPLOY_SCRIPT
|
||||
|
||||
LOG.info("Run k8s deployment")
|
||||
|
||||
# Use Subprocess.execute instead of Subprocess.check_call until
|
||||
# check_call is not fixed (fuel-devops3.0.2)
|
||||
result = _subprocess_runner.Subprocess.execute(cmd, verbose=verbose,
|
||||
timeout=2400)
|
||||
if expected_ec is None:
|
||||
expected_ec = [0]
|
||||
if result.exit_code not in expected_ec:
|
||||
raise exceptions.UnexpectedExitCode(
|
||||
cmd,
|
||||
result.exit_code,
|
||||
expected_ec,
|
||||
stdout=result.stdout_brief,
|
||||
stderr=result.stdout_brief)
|
||||
|
||||
for node_name in k8s_nodes:
|
||||
with self.__underlay.remote(node_name=node_name) as remote:
|
||||
|
@ -81,20 +105,7 @@ class K8SManager(object):
|
|||
|
||||
self.__config.k8s.kube_host = k8s_admin_ip
|
||||
|
||||
@classmethod
|
||||
def deploy_k8s(cls, environ=os.environ):
|
||||
"""Base action to deploy k8s by external deployment script"""
|
||||
LOG.info("Run k8s deployment")
|
||||
try:
|
||||
process = subprocess.Popen([settings.DEPLOY_SCRIPT],
|
||||
env=environ,
|
||||
shell=True,
|
||||
bufsize=0,
|
||||
)
|
||||
assert process.wait() == 0
|
||||
except (SystemExit, KeyboardInterrupt) as err:
|
||||
process.terminate()
|
||||
raise err
|
||||
return result
|
||||
|
||||
def get_k8sclient(self, default_namespace=None):
|
||||
k8sclient = cluster.K8sCluster(
|
||||
|
|
Loading…
Reference in New Issue