tripleo-ci/scripts/te-broker/testenv-worker
Sorin Sbarnea ed27a979d5 Adopt use of pre-commit linting
Follows the same configuration that was used on
tripleo-quickstart-extras and documented use on tripleo-docs.

Change-Id: Iba8a2db92137f9f6ad28f498627eb1b87039d99f
Story: https://tree.taiga.io/project/tripleo-ci-board/task/381
2018-12-14 15:50:06 +00:00

314 lines
11 KiB
Python
Executable File

#!/usr/bin/python
#
# Runs a tripleo-ci test-worker
#
# Copyright 2013 Red Hat, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
import argparse
import json
import logging
import logging.handlers
import os
import subprocess
import sys
import tempfile
import threading
import time
import uuid
import gear
from novaclient import client as novaclient
from novaclient import exceptions
# 100Mb log files
maxBytes = 1024*1024*100
logging.basicConfig(
filename="/var/www/html/tebroker/testenv-worker.log",
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
class CallbackClient(gear.Client):
def __init__(self):
super(CallbackClient, self).__init__()
self.event = threading.Event()
def handleWorkComplete(self, packet):
super(CallbackClient, self).handleWorkComplete(packet)
self.event.set()
def handleWorkException(self, packet):
super(CallbackClient, self).handleWorkException(packet)
self.event.set()
def handleWorkFail(self, packet):
super(CallbackClient, self).handleWorkFail(packet)
self.event.set()
def wait(self, timeout=None):
"""Wait for notification of completion, error or failure.
:param timeout: a timeout for the operation in seconds
:type timeout: float
:returns: True if a notification was received, False on timeout
"""
self.event.wait(timeout)
return self.event.is_set()
class TEWorkerThread(threading.Thread):
def __init__(self, geard, num, timeout, scriptfiles):
super(TEWorkerThread, self).__init__()
self.geard = geard
self.timeout = timeout
self.scriptfiles = scriptfiles
self.running = True
self.num = num
self.worker = None
self.ucinstance = None
self.complete_event = None
def run(self):
try:
logger.info('running TE worker')
self.runJob()
except gear.InterruptedError:
logger.info('getJob interrupted...')
except Exception:
logger.exception('Error while run_te_worker worker')
self.running = False
def runJob(self):
self.worker = gear.Worker('testenv-worker-%s' % self.num)
try:
self._add_servers(self.worker, self.geard)
self.worker.waitForServer()
self.worker.registerFunction('lockenv')
logger.info('Getting new job...')
job = self.worker.getJob()
logger.info('Received job : %s', job.arguments)
arguments = json.loads(job.arguments)
call_back = arguments["callback_name"]
self.ucinstance = arguments["ucinstance"]
job_timeout = int(arguments.get("timeout", self.timeout))
# Once this Job is called we call back to the client to run its
# commands while this environment is locked
self._run_callback(job_timeout, call_back, arguments)
job.sendWorkComplete("")
finally:
self.worker.shutdown()
def _add_servers(self, client, servers):
for server in servers.split(','):
server = server.rsplit(':', 1)
if len(server) == 1:
server.append('4730')
client.addServer(server[0], int(server[1]))
def _run_callback(self, timeout, callback_name, arguments):
client = CallbackClient()
self.complete_event = client.event
self._add_servers(client, self.geard)
client.waitForServer()
try:
with tempfile.NamedTemporaryFile('r') as fp:
os.environ["TE_DATAFILE"] = fp.name
logger.info(
subprocess.check_output([
self.scriptfiles[0],
self.num,
arguments.get("envsize", "2"),
arguments.get("ucinstance", ""),
arguments.get("create_undercloud", ""),
arguments.get("ssh_key", ""),
arguments.get("net_iso", "multi-nic"),
arguments.get("compute_envsize", "0"),
arguments.get("extra_nodes", "0"),
],
stderr=subprocess.STDOUT))
clientdata = fp.read()
except subprocess.CalledProcessError as e:
logger.error(e.output)
clientdata = "Couldn't retrieve env"
cb_job = gear.Job(callback_name, clientdata)
client.submitJob(cb_job)
# Wait for 30 seconds, then test the status of the job
if not client.wait(30):
# Request the job status from the broker
cb_job.connection.sendPacket(gear.Packet(gear.constants.REQ,
gear.constants.GET_STATUS,
cb_job.handle))
# Let a little time pass for the STATUS_RES to return, If we're in
# here we've already waited 30 seconds so another 10 wont make much
# difference
time.sleep(10)
if not cb_job.running:
logger.error("No sign of the Callback job starting,"
"assuming its no longer present")
clientdata = subprocess.check_output(
[self.scriptfiles[1], self.num], stderr=subprocess.STDOUT)
logger.info(clientdata)
client.shutdown()
return
# We timeout after the configured timeout - the 40 second sleep that we
# perform during initial handshaking. Note that after this timeout we
# offer the environment for other test clients, but the prior client's
# credentials are still valid, so very confusing errors can occur if we
# were ever to timeout without the client timing out first.
client.wait(timeout - 40)
if cb_job.failure:
logger.error("The Job appears to have failed.")
elif not cb_job.complete:
logger.error("No sign of Job completing, Freeing environment.")
else:
logger.info('Returned from Job : %s', cb_job.data)
try:
clientdata = subprocess.check_output(
[self.scriptfiles[1], self.num], stderr=subprocess.STDOUT)
except subprocess.CalledProcessError as e:
logger.error(e.output)
raise
logger.info(clientdata)
client.shutdown()
def _get_auth_values_from_rc():
"""Read auth details from /etc/nodepoolrc
:returns: A dict containing the following keys: user, tenant, auth_url
and password.
"""
values = {}
with open('/etc/nodepoolrc') as rc:
for line in rc.readlines():
parts = line.split('=', 1)
if 'OS_USERNAME' in parts[0]:
values['user'] = parts[1]
elif 'OS_TENANT' in parts[0]:
values['tenant'] = parts[1]
elif 'OS_AUTH_URL' in parts[0]:
values['auth_url'] = parts[1]
elif 'OS_PASSWORD' in parts[0]:
values['password'] = parts[1]
return {k: v.rstrip() for k, v in values.items()}
def _get_nova_client():
auth_values = _get_auth_values_from_rc()
nclient = novaclient.Client(2,
auth_values['user'],
auth_values['password'],
project_name=auth_values['tenant'],
auth_url=auth_values['auth_url']
)
return nclient
def _check_instance_alive(nclient, instance, event):
"""Check that instance still exists in Nova
Attempt to get the server specified by instance. If the server is not
found, set the client event to indicate the job has gone away and we
should clean up the testenv.
instance will be None if the worker has not yet been assigned to a
Jenkins slave, and we should do nothing in that case.
:param nclient: A novaclient instance
:param instance: The UUID of the instance to check
:param event: The gear client event to set if the instance has gone away.
"""
if instance:
try:
nclient.servers.get(instance)
except exceptions.NotFound:
# There is a very brief period of time where instance could be set
# and event not. It's unlikely to happen, but let's be safe.
if event:
event.set()
logger.info('Job instance "%s" went away.', instance)
def main(args=sys.argv[1:]):
parser = argparse.ArgumentParser(
description='Registers a test environment with a gearman broker, the '
'registered job "lockenv" then holds the environment in a '
'"locked" state while it calls back to the client. The '
'clients job is provided with data (contents of datafile)'
)
parser.add_argument(
'scriptfiles',
nargs=2,
help='Path to a script whos output is provided to the client')
parser.add_argument('--timeout', '-t', type=int, default=10800,
help='The maximum number of seconds to hold the '
'testenv for, can be overridden by the client.')
parser.add_argument('--tenum', '-n', default=uuid.uuid4().hex,
help='A unique identifier identifing this env on '
'this host.')
parser.add_argument('--geard', '-b', default='127.0.0.1:4730',
help='A comma separated list of gearman brokers to '
'connect to.')
parser.add_argument('--debug', '-d', action='store_true',
help='Set to debug mode.')
opts = parser.parse_args(args)
global logger
logger = logging.getLogger('testenv-worker-' + opts.tenum)
logger.addHandler(logging.handlers.RotatingFileHandler(
"/var/www/html/tebroker/testenv-worker.log",
maxBytes=maxBytes,
backupCount=5))
logger.setLevel(logging.INFO)
logger.removeHandler(logger.handlers[0])
if opts.debug:
logger.setLevel(logging.DEBUG)
logger.info('Starting test-env worker with data %r', opts.scriptfiles)
te_worker = TEWorkerThread(
opts.geard,
opts.tenum,
opts.timeout,
opts.scriptfiles)
te_worker.start()
counter = 0
nclient = _get_nova_client()
while te_worker.running:
counter += 1
# Only check for instance existence once per minute to avoid DoS'ing
# the controller
if counter % 60 == 0:
_check_instance_alive(nclient, te_worker.ucinstance,
te_worker.complete_event)
time.sleep(1)
if __name__ == '__main__':
main()