#!/usr/bin/python # # Runs a tripleo-ci test-worker # # Copyright 2013 Red Hat, Inc. # All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain # a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. # import argparse import json import logging import logging.handlers import os import subprocess import sys import tempfile import threading import time import uuid import gear from novaclient import client as novaclient from novaclient import exceptions # 100Mb log files maxBytes = 1024*1024*100 logging.basicConfig( filename="/var/www/html/tebroker/testenv-worker.log", format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') class CallbackClient(gear.Client): def __init__(self): super(CallbackClient, self).__init__() self.event = threading.Event() def handleWorkComplete(self, packet): super(CallbackClient, self).handleWorkComplete(packet) self.event.set() def handleWorkException(self, packet): super(CallbackClient, self).handleWorkException(packet) self.event.set() def handleWorkFail(self, packet): super(CallbackClient, self).handleWorkFail(packet) self.event.set() def wait(self, timeout=None): """Wait for notification of completion, error or failure. :param timeout: a timeout for the operation in seconds :type timeout: float :returns: True if a notification was received, False on timeout """ self.event.wait(timeout) return self.event.is_set() class TEWorkerThread(threading.Thread): def __init__(self, geard, num, timeout, scriptfiles): super(TEWorkerThread, self).__init__() self.geard = geard self.timeout = timeout self.scriptfiles = scriptfiles self.running = True self.num = num self.worker = None self.ucinstance = None self.complete_event = None def run(self): try: logger.info('running TE worker') self.runJob() except gear.InterruptedError: logger.info('getJob interrupted...') except Exception: logger.exception('Error while run_te_worker worker') self.running = False def runJob(self): self.worker = gear.Worker('testenv-worker-%s' % self.num) try: self._add_servers(self.worker, self.geard) self.worker.waitForServer() self.worker.registerFunction('lockenv') logger.info('Getting new job...') job = self.worker.getJob() logger.info('Received job : %s', job.arguments) arguments = json.loads(job.arguments) call_back = arguments["callback_name"] self.ucinstance = arguments["ucinstance"] job_timeout = int(arguments.get("timeout", self.timeout)) # Once this Job is called we call back to the client to run its # commands while this environment is locked self._run_callback(job_timeout, call_back, arguments) job.sendWorkComplete("") finally: self.worker.shutdown() def _add_servers(self, client, servers): for server in servers.split(','): server = server.rsplit(':', 1) if len(server) == 1: server.append('4730') client.addServer(server[0], int(server[1])) def _run_callback(self, timeout, callback_name, arguments): client = CallbackClient() self.complete_event = client.event self._add_servers(client, self.geard) client.waitForServer() try: with tempfile.NamedTemporaryFile('r') as fp: os.environ["TE_DATAFILE"] = fp.name logger.info( subprocess.check_output([ self.scriptfiles[0], self.num, arguments.get("envsize", "2"), arguments.get("ucinstance", ""), arguments.get("create_undercloud", ""), arguments.get("ssh_key", ""), arguments.get("net_iso", "multi-nic"), arguments.get("compute_envsize", "0"), arguments.get("extra_nodes", "0"), ], stderr=subprocess.STDOUT)) clientdata = fp.read() except subprocess.CalledProcessError as e: logger.error(e.output) clientdata = "Couldn't retrieve env" cb_job = gear.Job(callback_name, clientdata) client.submitJob(cb_job) # Wait for 30 seconds, then test the status of the job if not client.wait(30): # Request the job status from the broker cb_job.connection.sendPacket(gear.Packet(gear.constants.REQ, gear.constants.GET_STATUS, cb_job.handle)) # Let a little time pass for the STATUS_RES to return, If we're in # here we've already waited 30 seconds so another 10 wont make much # difference time.sleep(10) if not cb_job.running: logger.error("No sign of the Callback job starting," "assuming its no longer present") clientdata = subprocess.check_output( [self.scriptfiles[1], self.num], stderr=subprocess.STDOUT) logger.info(clientdata) client.shutdown() return # We timeout after the configured timeout - the 40 second sleep that we # perform during initial handshaking. Note that after this timeout we # offer the environment for other test clients, but the prior client's # credentials are still valid, so very confusing errors can occur if we # were ever to timeout without the client timing out first. client.wait(timeout - 40) if cb_job.failure: logger.error("The Job appears to have failed.") elif not cb_job.complete: logger.error("No sign of Job completing, Freeing environment.") else: logger.info('Returned from Job : %s', cb_job.data) try: clientdata = subprocess.check_output( [self.scriptfiles[1], self.num], stderr=subprocess.STDOUT) except subprocess.CalledProcessError as e: logger.error(e.output) raise logger.info(clientdata) client.shutdown() def _get_auth_values_from_rc(): """Read auth details from /etc/nodepoolrc :returns: A dict containing the following keys: user, tenant, auth_url and password. """ values = {} with open('/etc/nodepoolrc') as rc: for line in rc.readlines(): parts = line.split('=', 1) if 'OS_USERNAME' in parts[0]: values['user'] = parts[1] elif 'OS_TENANT' in parts[0]: values['tenant'] = parts[1] elif 'OS_AUTH_URL' in parts[0]: values['auth_url'] = parts[1] elif 'OS_PASSWORD' in parts[0]: values['password'] = parts[1] return {k: v.rstrip() for k, v in values.items()} def _get_nova_client(): auth_values = _get_auth_values_from_rc() nclient = novaclient.Client(2, auth_values['user'], auth_values['password'], project_name=auth_values['tenant'], auth_url=auth_values['auth_url'] ) return nclient def _check_instance_alive(nclient, instance, event): """Check that instance still exists in Nova Attempt to get the server specified by instance. If the server is not found, set the client event to indicate the job has gone away and we should clean up the testenv. instance will be None if the worker has not yet been assigned to a Jenkins slave, and we should do nothing in that case. :param nclient: A novaclient instance :param instance: The UUID of the instance to check :param event: The gear client event to set if the instance has gone away. """ if instance: try: nclient.servers.get(instance) except exceptions.NotFound: # There is a very brief period of time where instance could be set # and event not. It's unlikely to happen, but let's be safe. if event: event.set() logger.info('Job instance "%s" went away.', instance) def main(args=sys.argv[1:]): parser = argparse.ArgumentParser( description='Registers a test environment with a gearman broker, the ' 'registered job "lockenv" then holds the environment in a ' '"locked" state while it calls back to the client. The ' 'clients job is provided with data (contents of datafile)' ) parser.add_argument( 'scriptfiles', nargs=2, help='Path to a script whos output is provided to the client') parser.add_argument('--timeout', '-t', type=int, default=10800, help='The maximum number of seconds to hold the ' 'testenv for, can be overridden by the client.') parser.add_argument('--tenum', '-n', default=uuid.uuid4().hex, help='A unique identifier identifing this env on ' 'this host.') parser.add_argument('--geard', '-b', default='127.0.0.1:4730', help='A comma separated list of gearman brokers to ' 'connect to.') parser.add_argument('--debug', '-d', action='store_true', help='Set to debug mode.') opts = parser.parse_args(args) global logger logger = logging.getLogger('testenv-worker-' + opts.tenum) logger.addHandler(logging.handlers.RotatingFileHandler( "/var/www/html/tebroker/testenv-worker.log", maxBytes=maxBytes, backupCount=5)) logger.setLevel(logging.INFO) logger.removeHandler(logger.handlers[0]) if opts.debug: logger.setLevel(logging.DEBUG) logger.info('Starting test-env worker with data %r', opts.scriptfiles) te_worker = TEWorkerThread( opts.geard, opts.tenum, opts.timeout, opts.scriptfiles) te_worker.start() counter = 0 nclient = _get_nova_client() while te_worker.running: counter += 1 # Only check for instance existence once per minute to avoid DoS'ing # the controller if counter % 60 == 0: _check_instance_alive(nclient, te_worker.ucinstance, te_worker.complete_event) time.sleep(1) if __name__ == '__main__': main()