tripleo-ci/testenv-client
Sorin Sbarnea f78da2885a lint: enable black
Assures consistent formatting of our python codebase without having
to rely on humans to do it, or to debate during reviews.

Change-Id: I1e62cc755fa60e453dea865f436241ecae330771
2020-03-24 13:17:31 +00:00

266 lines
8.6 KiB
Python
Executable File

#!/usr/bin/python
#
# Runs a tripleo-ci test-client
#
# Copyright 2013 Red Hat, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
import argparse
import json
import logging
import os
import subprocess
import sys
import tempfile
import textwrap
import threading
import time
import uuid
import gear
logging.basicConfig(format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger('testenv-client')
logger.setLevel(logging.INFO)
class TestCallback(object):
def __init__(self, servers, name, command):
self.servers = servers
self.name = name
self.command = command
self.created = time.time()
# Default the return value to None, this may end up being
# used if the gearman worker goes down before the job finishes
self.rv = None
def __call__(self):
self.worker = gear.Worker('testenv-client-%s' % self.name)
add_servers(self.worker, self.servers)
self.worker.waitForServer()
self.worker.registerFunction(self.name)
try:
job = self.worker.getJob()
except gear.InterruptedError:
return
logger.info('Received job : %s', job.arguments.strip())
time_waiting = time.time() - self.created
if time_waiting > 90:
logger.warn('%.1f seconds waiting for a worker.' % (time_waiting))
if (
"Couldn't retrieve env" in job.arguments
or "Failed creating OVB stack" in job.arguments
):
logger.error(job.arguments)
self.rv = 2
job.sendWorkComplete("")
return
logger.info('Running command "%s"', ' '.join(self.command))
with tempfile.NamedTemporaryFile('w') as fp:
fp.write(job.arguments)
fp.flush()
os.environ["TE_DATAFILE"] = fp.name
try:
self.rv = subprocess.call(self.command)
except Exception:
logger.exception("Error calling command")
self.rv = 2
job.sendWorkComplete("")
class TestEnvClient(gear.Client):
def __init__(self):
super(TestEnvClient, self).__init__()
self.event = threading.Event()
def handleWorkComplete(self, packet):
super(TestEnvClient, self).handleWorkComplete(packet)
self.event.set()
def handleWorkException(self, packet):
super(TestEnvClient, self).handleWorkException(packet)
self.event.set()
def handleWorkFail(self, packet):
super(TestEnvClient, self).handleWorkFail(packet)
self.event.set()
def wait(self, timeout=None):
"""Wait for notification of completion, error or failure.
:param timeout: a timeout for the operation in seconds
:type timeout: float
:returns: True if a notification was received, False on timeout
"""
self.event.wait(timeout)
return self.event.is_set()
def add_servers(client, servers):
for server in servers.split(','):
server = server.rsplit(':', 1)
if len(server) == 1:
server.append('4730')
client.addServer(server[0], int(server[1]))
def main(args=sys.argv[1:]):
parser = argparse.ArgumentParser(
description=(
textwrap.dedent(
"""
Starts up a gearman worker and then calls the job "lockenv" over
gearman, then waits for the worker to be called, once the worker
is called it will place the provided data in a datafile (indicated
by the TE_DATAFILE environment variable) and run the "command"
provided, the exit code will be the exit code of the command that
was run. Essentially this allows a command to be run while the
worker is holding a test environment in a locked state e.g. to
simply output the data provided one could run the command:
$ echo 'cat $TE_DATAFILE' | %s -- bash
"""
% sys.argv[0]
)
),
formatter_class=argparse.RawTextHelpFormatter,
)
parser.add_argument(
'command', nargs="+", help='A command to run once the test env is locked'
)
parser.add_argument(
'--geard',
'-b',
default='127.0.0.1:4730',
help='A comma separated list of gearman brokers to ' 'connect to.',
)
parser.add_argument(
'--jobnum',
'-n',
default=uuid.uuid4().hex,
help='A unique identifier identifing this job.',
)
parser.add_argument(
'--timeout',
'-t',
default='10800',
help='Set a timeout, after which the command will ' 'be killed.',
)
parser.add_argument(
'--envsize', default="2", help='Number of baremetal nodes to request'
)
parser.add_argument(
'--compute-envsize',
default='0',
help='Number of compute baremetal nodes to request. '
'When this is set to a value > 0, the primary '
'nodes will be tagged with the controller '
'profile and the extra nodes with compute. The '
'compute nodes will be a smaller flavor in order '
'to use less resources.',
)
parser.add_argument(
'--ucinstance',
help='uuid for the undercloud instance (where an '
'interface on the provisioning net is attached',
)
parser.add_argument(
'--create-undercloud', action='store_true', help='deploy the undercloud node.'
)
parser.add_argument(
'--ssh-key', default='', help='ssh key for the ovb nodes to be deployed.'
)
parser.add_argument(
'--net-iso',
default="multi-nic",
choices=['none', 'multi-nic', 'public-bond'],
help='"none" requests an environment without network '
'isolation, "multi-nic" requests one with a '
'basic multiple nic configuration, and '
'"public-bond" requests one like "multi-nic" '
'but with two public nics for use with bonded '
'nic-configs.',
)
parser.add_argument(
'--extra-nodes',
default='0',
help='Number of extra undercloud-like nodes to ' 'request',
)
parser.add_argument('--debug', '-d', action='store_true', help='Set to debug mode.')
opts = parser.parse_args(args)
if opts.debug:
logger.setLevel(logging.DEBUG)
callback_name = "callback_" + opts.jobnum
cb = TestCallback(opts.geard, callback_name, opts.command)
threading.Thread(target=cb).start()
client = TestEnvClient()
add_servers(client, opts.geard)
client.waitForServer()
job_identifier = '%s: %s' % (
os.environ.get('ZUUL_CHANGE', 'No change'),
os.environ['TOCI_JOBTYPE'],
)
job_params = {
"callback_name": callback_name,
"timeout": opts.timeout,
"envsize": opts.envsize,
"compute_envsize": opts.compute_envsize,
"ucinstance": opts.ucinstance,
"create_undercloud": "true" if opts.create_undercloud else "",
"ssh_key": opts.ssh_key,
"net_iso": opts.net_iso,
"extra_nodes": opts.extra_nodes,
"job_identifier": job_identifier,
}
job = gear.Job('lockenv', json.dumps(job_params))
client.submitJob(job)
# No timeout here as there will be a timeout on the jenkins jobs, which is
# also passed to the testenv-worker, lets not second guess them.
client.wait()
if job.failure:
# This signals an error with the gearman connection to the worker
# we log it, but still return cb.rv the command may have succeeded
logger.error("The gearman Job has failed")
cb.worker.stopWaitingForJobs()
# If the testenv worker releases the environment before our command
# completes we kill this process and all its children, to immediately
# stop the running job
if cb.rv is None:
logger.error(
"The command hasn't completed but the testenv worker has "
"released the environment. Killing all processes."
)
subprocess.call(["sudo", "kill", "-9", "-%d" % os.getpgrp()])
logger.debug("Exiting with status : %d", cb.rv)
return cb.rv
if __name__ == '__main__':
exit(main())