Remove unnecessary NodePoolBuilder thread

Originally, the thread started with the NodePoolBuilder.runForever()
method did all the work of the builder. This functionality is moving
to inside the BuildScheduler. As a result, the thread used for
NodePoolBuilder is no longer necessary. The intention is to (eventually)
totally decouple these classes so that NodePoolBuilder is used only to
start and stop the builder thread of execution. It's run() method is
renamed to start() to better reflect its purpose.

Change-Id: Ief00e4a06bd919e99dbff07f7238ac51035b06ef
This commit is contained in:
David Shrewsbury
2016-08-17 15:46:13 -04:00
parent d2e2babf3e
commit 04c81431aa
5 changed files with 94 additions and 82 deletions

View File

@@ -183,50 +183,74 @@ class BuilderScheduler(object):
log = logging.getLogger("nodepool.builder.BuilderScheduler") log = logging.getLogger("nodepool.builder.BuilderScheduler")
def __init__(self, config, num_builders, num_uploaders): def __init__(self, config, num_builders, num_uploaders):
self.config = config self._config = config
self.num_builders = num_builders self._num_builders = num_builders
self.build_workers = [] self._build_workers = []
self.num_uploaders = num_uploaders self._num_uploaders = num_uploaders
self.upload_workers = [] self._upload_workers = []
self.threads = [] self._threads = []
self._running = False
# This lock is needed because the run() method is started in a
# separate thread of control, which can return before the scheduler
# has completed startup. We need to avoid shutting down before the
# startup process has completed.
self._start_lock = threading.Lock()
@property
def running(self):
return self._running
def run(self): def run(self):
# Create build and upload worker objects with self._start_lock:
for i in range(self.num_builders): if self._running:
w = BuildWorker('Nodepool Builder Build Worker %s' % (i+1,), raise exceptions.BuilderError('Cannot start, already running.')
builder=self)
self.build_workers.append(w)
for i in range(self.num_uploaders): # Create build and upload worker objects
w = UploadWorker('Nodepool Builder Upload Worker %s' % (i+1,), for i in range(self._num_builders):
builder=self) w = BuildWorker('Nodepool Builder Build Worker %s' % (i+1,),
self.upload_workers.append(w) builder=self)
self._build_workers.append(w)
self.log.debug('Starting listener for build jobs') for i in range(self._num_uploaders):
w = UploadWorker('Nodepool Builder Upload Worker %s' % (i+1,),
builder=self)
self._upload_workers.append(w)
for thd in (self.build_workers + self.upload_workers): self.log.debug('Starting listener for build jobs')
t = threading.Thread(target=thd.run)
t.daemon = True
t.start()
self.threads.append(t)
self._running = True for thd in (self._build_workers + self._upload_workers):
t = threading.Thread(target=thd.run)
t.daemon = True
t.start()
self._threads.append(t)
# Start our watch thread to handle ZK watch notifications # Start our watch thread to handle ZK watch notifications
watch_thread = threading.Thread(target=self.registerWatches) watch_thread = threading.Thread(target=self.registerWatches)
watch_thread.daemon = True watch_thread.daemon = True
watch_thread.start() watch_thread.start()
self.threads.append(watch_thread) self._threads.append(watch_thread)
self._running = True
# Do not exit until all of our owned threads exit, which will only # Do not exit until all of our owned threads exit, which will only
# happen when BuildScheduler.stop() is called. # happen when BuildScheduler.stop() is called.
for thd in self.threads: for thd in self._threads:
thd.join() thd.join()
def stop(self): def stop(self):
self.log.debug("BuilderScheduler shutting down workers") '''
for worker in (self.build_workers + self.upload_workers): Stop the BuilderScheduler threads.
worker.shutdown()
NOTE: This method will block if called soon after startup and the
startup process has not yet completed.
'''
with self._start_lock:
self.log.debug("Stopping. BuilderScheduler shutting down workers")
for worker in (self._build_workers + self._upload_workers):
worker.shutdown()
# Setting _running to False will trigger the watch thread to stop.
self._running = False self._running = False
def registerWatches(self): def registerWatches(self):
@@ -236,47 +260,55 @@ class BuilderScheduler(object):
class NodePoolBuilder(object): class NodePoolBuilder(object):
''' '''
Class used to control the builder functionality. Class used to control the builder start and stop actions.
An instance of this class is used to start the builder threads
and also to terminate all threads of execution.
''' '''
log = logging.getLogger("nodepool.builder.NodePoolBuilder") log = logging.getLogger("nodepool.builder.NodePoolBuilder")
def __init__(self, config_path, build_workers=1, upload_workers=4): def __init__(self, config_path, build_workers=1, upload_workers=4):
self._config_path = config_path self._config_path = config_path
self._running = False
self._built_image_ids = set() self._built_image_ids = set()
self._start_lock = threading.Lock()
self._config = None self._config = None
self._build_workers = build_workers self._build_workers = build_workers
self._upload_workers = upload_workers self._upload_workers = upload_workers
self._scheduler = None
self.statsd = stats.get_client() self.statsd = stats.get_client()
@property @property
def running(self): def running(self):
return self._running '''
Return whether or not the builder is running.
def run(self): Since the scheduler is implementing the builder functionality, we
need to query it to see if it is running and use that value.
'''
if self._scheduler is not None:
return self._scheduler.running
return False
def start(self):
''' '''
Start the builder. Start the builder.
The builder functionality is encapsulated within the BuilderScheduler The builder functionality is encapsulated within the BuilderScheduler
code. This starts the main scheduler thread, which will run forever code. This starts the main scheduler thread, which will run forever
until we tell it to stop. until we tell it to stop.
NOTE: This method returns immediately, even though the BuilderScheduler
may not have completed its startup process.
''' '''
with self._start_lock: self.load_config(self._config_path)
if self._running: self._validate_config()
raise exceptions.BuilderError('Cannot start, already running.')
self.load_config(self._config_path) self._scheduler = BuilderScheduler(self._config,
self._validate_config() self._build_workers,
self._upload_workers)
self.scheduler = BuilderScheduler(self._config, self._scheduler_thread = threading.Thread(target=self._scheduler.run)
self._build_workers, self._scheduler_thread.daemon = True
self._upload_workers) self._scheduler_thread.start()
self.scheduler_thread = threading.Thread(target=self.scheduler.run)
self.scheduler_thread.daemon = True
self.scheduler_thread.start()
self._running = True
def stop(self): def stop(self):
''' '''
@@ -287,25 +319,17 @@ class NodePoolBuilder(object):
stopped all of its own threads. Since we haven't yet joined to that stopped all of its own threads. Since we haven't yet joined to that
thread, do it here. thread, do it here.
''' '''
with self._start_lock: self._scheduler.stop()
self.log.debug('Stopping.')
if not self._running:
self.log.warning("Stop called when already stopped")
return
self.scheduler.stop() # Wait for the builder to complete any currently running jobs
# by joining with the main scheduler thread which should return
# when all of its worker threads are done.
self.log.debug('Waiting for jobs to complete')
self._scheduler_thread.join()
# Wait for the builder to complete any currently running jobs self.log.debug('Stopping providers')
# by joining with the main scheduler thread which should return provider_manager.ProviderManager.stopProviders(self._config)
# when all of its worker threads are done. self.log.debug('Finished stopping')
self.log.debug('Waiting for jobs to complete')
self.scheduler_thread.join()
self.log.debug('Stopping providers')
provider_manager.ProviderManager.stopProviders(self._config)
self.log.debug('Finished stopping')
self._running = False
def load_config(self, config_path): def load_config(self, config_path):
config = nodepool_config.loadConfig(config_path) config = nodepool_config.loadConfig(config_path)

View File

@@ -16,7 +16,6 @@ import argparse
import extras import extras
import signal import signal
import sys import sys
import threading
import daemon import daemon
@@ -62,9 +61,7 @@ class NodePoolBuilderApp(nodepool.cmd.NodepoolApp):
self.args.upload_workers) self.args.upload_workers)
signal.signal(signal.SIGINT, self.sigint_handler) signal.signal(signal.SIGINT, self.sigint_handler)
self.nb.start()
nb_thread = threading.Thread(target=self.nb.run)
nb_thread.start()
while True: while True:
signal.pause() signal.pause()

View File

@@ -137,8 +137,7 @@ class NodePoolDaemon(nodepool.cmd.NodepoolApp):
self.pool.start() self.pool.start()
if self.args.builder: if self.args.builder:
nb_thread = threading.Thread(target=self.builder.run) self.builder.start()
nb_thread.start()
self.webapp.start() self.webapp.start()

View File

@@ -437,10 +437,8 @@ class BuilderFixture(fixtures.Fixture):
def setUp(self): def setUp(self):
super(BuilderFixture, self).setUp() super(BuilderFixture, self).setUp()
self.builder = builder.NodePoolBuilder(self.configfile) self.builder = builder.NodePoolBuilder(self.configfile)
nb_thread = threading.Thread(target=self.builder.run) self.builder.start()
nb_thread.daemon = True
self.addCleanup(self.cleanup) self.addCleanup(self.cleanup)
nb_thread.start()
def cleanup(self): def cleanup(self):
self.builder.stop() self.builder.stop()

View File

@@ -15,8 +15,6 @@
import os import os
import time import time
import threading
import fixtures import fixtures
from nodepool import builder, exceptions, fakeprovider, tests from nodepool import builder, exceptions, fakeprovider, tests
@@ -90,16 +88,12 @@ class TestNodepoolBuilder(tests.DBTestCase):
def test_start_stop(self): def test_start_stop(self):
config = self.setup_config('node_dib.yaml') config = self.setup_config('node_dib.yaml')
nb = builder.NodePoolBuilder(config) nb = builder.NodePoolBuilder(config)
nb_thread = threading.Thread(target=nb.run) nb.start()
nb_thread.daemon = True
nb_thread.start()
while not nb.running: while not nb.running:
time.sleep(.5) time.sleep(.5)
nb.stop() nb.stop()
while nb_thread.isAlive():
time.sleep(.5)
def test_image_upload_fail(self): def test_image_upload_fail(self):
"""Test that image upload fails are handled properly.""" """Test that image upload fails are handled properly."""