tripleo-ci/testenv-client
Derek Higgins 53ca182d08 Fail fast if a testenv couldn't be created
Fail immediately if the testenv worker couldn't get an environment
for this job. Without this the ci job continues until after it
installs the undercloud and generates the overcloud images, then finally
fails when it attempts to register the nodes.

Change-Id: Icfb9f3a662ec817e9170a109a3e5ddeb3df720d5
2016-07-13 17:13:17 +01:00

200 lines
6.9 KiB
Python
Executable File

#!/usr/bin/python
#
# Runs a tripleo-ci test-client
#
# Copyright 2013 Red Hat, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
import argparse
import json
import logging
import sys
import subprocess
import os
import tempfile
import textwrap
import threading
import time
import uuid
import gear
logging.basicConfig(
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger('testenv-client')
logger.setLevel(logging.INFO)
class TestCallback(object):
def __init__(self, servers, name, command):
self.servers = servers
self.name = name
self.command = command
self.created = time.time()
# Default the return value to None, this may end up being
# used if the gearman worker goes down before the job finishes
self.rv = None
def __call__(self):
self.worker = gear.Worker('testenv-client-%s' % self.name)
add_servers(self.worker, self.servers)
self.worker.waitForServer()
self.worker.registerFunction(self.name)
try:
job = self.worker.getJob()
except gear.InterruptedError:
return
logger.info('Received job : %s', job.arguments.strip())
time_waiting = time.time() - self.created
if time_waiting > 90:
logger.warn('%.1f seconds waiting for a worker.' % (time_waiting))
logger.info('Running command "%s"', ' '.join(self.command))
if job.arguments == "Couldn't retrieve env":
logger.error(job.arguments)
self.rv = 2
job.sendWorkComplete("")
return
with tempfile.NamedTemporaryFile('w') as fp:
fp.write(job.arguments)
fp.flush()
os.environ["TE_DATAFILE"] = fp.name
try:
self.rv = subprocess.call(self.command)
except:
logger.exception("Error calling command")
self.rv = 2
job.sendWorkComplete("")
class TestEnvClient(gear.Client):
def __init__(self):
super(TestEnvClient, self).__init__()
self.event = threading.Event()
def handleWorkComplete(self, packet):
super(TestEnvClient, self).handleWorkComplete(packet)
self.event.set()
def handleWorkException(self, packet):
super(TestEnvClient, self).handleWorkException(packet)
self.event.set()
def handleWorkFail(self, packet):
super(TestEnvClient, self).handleWorkFail(packet)
self.event.set()
def wait(self, timeout=None):
"""Wait for notification of completion, error or failure.
:param timeout: a timeout for the operation in seconds
:type timeout: float
:returns: True if a notification was received, False on timeout
"""
self.event.wait(timeout)
return self.event.is_set()
def add_servers(client, servers):
for server in servers.split(','):
server = server.rsplit(':', 1)
if len(server) == 1:
server.append('4730')
client.addServer(server[0], int(server[1]))
def main(args=sys.argv[1:]):
parser = argparse.ArgumentParser(
description=(textwrap.dedent("""
Starts up a gearman worker and then calls the job "lockenv" over
gearman, then waits for the worker to be called, once the worker
is called it will place the provided data in a datafile (indicated
by the TE_DATAFILE environment variable) and run the "command"
provided, the exit code will be the exit code of the command that
was run. Essentially this allows a command to be run while the
worker is holding a test environment in a locked state e.g. to
simply output the data provided one could run the command:
$ echo 'cat $TE_DATAFILE' | %s -- bash
""" % sys.argv[0])),
formatter_class=argparse.RawTextHelpFormatter
)
parser.add_argument('command', nargs="+",
help='A command to run once the test env is locked')
parser.add_argument('--geard', '-b', default='127.0.0.1:4730',
help='A comma separated list of gearman brokers to '
'connect to.')
parser.add_argument('--jobnum', '-n', default=uuid.uuid4().hex,
help='A unique identifier identifing this job.')
parser.add_argument('--timeout', '-t', default='10800',
help='Set a timeout, after which the command will '
'be killed.')
parser.add_argument('--envsize', default="2",
help='Number of baremetal nodes to request')
parser.add_argument('--ucinstance',
help='uuid for the undercloud instance (where an '
'interface on the provisioning net is attached')
parser.add_argument('--debug', '-d', action='store_true',
help='Set to debug mode.')
opts = parser.parse_args(args)
if opts.debug:
logger.setLevel(logging.DEBUG)
callback_name = "callback_" + opts.jobnum
cb = TestCallback(opts.geard, callback_name, opts.command)
threading.Thread(target=cb).start()
client = TestEnvClient()
add_servers(client, opts.geard)
client.waitForServer()
job_params = {
"callback_name": callback_name,
"timeout": opts.timeout,
"envsize":opts.envsize,
"ucinstance":opts.ucinstance,
}
job = gear.Job('lockenv', json.dumps(job_params))
client.submitJob(job)
# No timeout here as there will be a timeout on the jenkins jobs, which is
# also passed to the testenv-worker, lets not second guess them.
client.wait()
if job.failure:
# This signals an error with the gearman connection to the worker
# we log it, but still return cb.rv the command may have succeeded
logger.error("The gearman Job has failed")
cb.worker.stopWaitingForJobs()
# If the testenv worker releases the environment before our command
# completes we kill this process and all its children, to immediately
# stop the running job
if cb.rv is None:
logger.error("The command hasn't completed but the testenv worker has "
"released the environment. Killing all processes.")
subprocess.call(["sudo", "kill", "-9", "-%d" % os.getpgrp()])
logger.debug("Exiting with status : %d", cb.rv)
return cb.rv
if __name__ == '__main__':
exit(main())