Add scheduling thread to nodepool builder

This is the building block for the structure that nodepool builder will
take going forward. It introduces a new scheduling thread, and moves all
current build and upload worker threads underneath that. A new thread
for receiving ZooKeeper watch events is also introduced.

This code will run, but isn't currently functional because the worker
and watch threads just sleep. Later reviews will begin to add the
functionality.

A supporting developer's guide is added that shows the main components
and communication channels.

Since this is a partial implementation, several tests have to be
temporarily disabled until the ZooKeeper functionality is in place.

Change-Id: I210bb6ba1580403ddc3e5d87eb9bf60ffc7433d8
This commit is contained in:
David Shrewsbury 2016-08-09 12:24:41 -04:00
parent b2308edae1
commit d2e2babf3e
24 changed files with 225 additions and 101 deletions

45
doc/source/devguide.rst Normal file
View File

@ -0,0 +1,45 @@
.. _devguide:
Developer's Guide
=================
The following guide is intended for those interested in the inner workings
of nodepool and its various processes.
Nodepool Builder
----------------
The following is the overall diagram for the `nodepool-builder` process and
its most important pieces::
+-----------------+
| ZooKeeper |
+-----------------+
^ |
bld | | watch
+------------+ req | | trigger
| client +---------+ | +-----------------+
+------------+ | | NodepoolBuilder |
| +---+-------------+
| |
| | run/stop
| |
+-------v-------+ |
| <-------+
+---------> Build <----------+
| +---+ Scheduler +---+ |
| | | | | |
| | +---------------+ | |
| | | |
done | | start start | | done
| | bld upld | |
| | | |
| | | |
+---------v---+ +---v----------+
| BuildWorker | | UploadWorker |
+-+-------------+ +-+--------------+
| BuildWorker | | UploadWorker |
+-+-------------+ +-+--------------+
| BuildWorker | | UploadWorker |
+-------------+ +--------------+

View File

@ -15,6 +15,7 @@ Contents:
configuration
scripts
operation
devguide
Indices and tables
==================

View File

@ -13,7 +13,6 @@
# License for the specific language governing permissions and limitations
# under the License.
import errno
import json
import logging
import os
@ -22,7 +21,6 @@ import threading
import time
import traceback
import gear
import shlex
import config as nodepool_config
@ -30,6 +28,7 @@ import exceptions
import provider_manager
import stats
MINS = 60
HOURS = 60 * MINS
IMAGE_TIMEOUT = 6 * HOURS # How long to wait for an image save
@ -75,27 +74,31 @@ class DibImageFile(object):
return my_path
class BaseWorker(gear.Worker):
class BaseWorker(object):
nplog = logging.getLogger("nodepool.builder.BaseWorker")
def __init__(self, *args, **kw):
super(BaseWorker, self).__init__()
self.builder = kw.pop('builder')
super(BaseWorker, self).__init__(*args, **kw)
self.images = []
self._running = False
def run(self):
while self.running:
try:
job = self.getJob()
self._handleJob(job)
except gear.InterruptedError:
pass
except Exception:
self.nplog.exception('Exception while getting job')
self._running = True
while self._running:
self.work()
def shutdown(self):
self._running = False
class BuildWorker(BaseWorker):
nplog = logging.getLogger("nodepool.builder.BuildWorker")
def work(self):
#TODO: Actually do something useful
time.sleep(5)
def _handleJob(self, job):
try:
self.nplog.debug('Got job %s with data %s',
@ -129,6 +132,10 @@ class BuildWorker(BaseWorker):
class UploadWorker(BaseWorker):
nplog = logging.getLogger("nodepool.builder.UploadWorker")
def work(self):
#TODO: Actually do something useful
time.sleep(5)
def _handleJob(self, job):
try:
self.nplog.debug('Got job %s with data %s',
@ -161,8 +168,77 @@ class UploadWorker(BaseWorker):
job.sendWorkException(traceback.format_exc())
class BuilderScheduler(object):
'''
Class used for thread of execution for the builder scheduler.
The builder scheduler has the responsibility to:
* Handle the image updating process as scheduled via the image-update
cron job defined within the `cron` config section.
* Register to receive all watch events from ZooKeeper and assign
workers for those events.
* Start and maintain the working state of each worker thread.
'''
log = logging.getLogger("nodepool.builder.BuilderScheduler")
def __init__(self, config, num_builders, num_uploaders):
self.config = config
self.num_builders = num_builders
self.build_workers = []
self.num_uploaders = num_uploaders
self.upload_workers = []
self.threads = []
def run(self):
# Create build and upload worker objects
for i in range(self.num_builders):
w = BuildWorker('Nodepool Builder Build Worker %s' % (i+1,),
builder=self)
self.build_workers.append(w)
for i in range(self.num_uploaders):
w = UploadWorker('Nodepool Builder Upload Worker %s' % (i+1,),
builder=self)
self.upload_workers.append(w)
self.log.debug('Starting listener for build jobs')
for thd in (self.build_workers + self.upload_workers):
t = threading.Thread(target=thd.run)
t.daemon = True
t.start()
self.threads.append(t)
self._running = True
# Start our watch thread to handle ZK watch notifications
watch_thread = threading.Thread(target=self.registerWatches)
watch_thread.daemon = True
watch_thread.start()
self.threads.append(watch_thread)
# Do not exit until all of our owned threads exit, which will only
# happen when BuildScheduler.stop() is called.
for thd in self.threads:
thd.join()
def stop(self):
self.log.debug("BuilderScheduler shutting down workers")
for worker in (self.build_workers + self.upload_workers):
worker.shutdown()
self._running = False
def registerWatches(self):
while self._running:
time.sleep(5)
class NodePoolBuilder(object):
log = logging.getLogger("nodepool.builder")
'''
Class used to control the builder functionality.
'''
log = logging.getLogger("nodepool.builder.NodePoolBuilder")
def __init__(self, config_path, build_workers=1, upload_workers=4):
self._config_path = config_path
@ -178,7 +254,14 @@ class NodePoolBuilder(object):
def running(self):
return self._running
def runForever(self):
def run(self):
'''
Start the builder.
The builder functionality is encapsulated within the BuilderScheduler
code. This starts the main scheduler thread, which will run forever
until we tell it to stop.
'''
with self._start_lock:
if self._running:
raise exceptions.BuilderError('Cannot start, already running.')
@ -186,67 +269,44 @@ class NodePoolBuilder(object):
self.load_config(self._config_path)
self._validate_config()
self.build_workers = []
self.upload_workers = []
for i in range(self._build_workers):
w = BuildWorker('Nodepool Builder Build Worker %s' % (i+1,),
builder=self)
self._initializeGearmanWorker(w,
self._config.gearman_servers.values())
self.build_workers.append(w)
for i in range(self._upload_workers):
w = UploadWorker('Nodepool Builder Upload Worker %s' % (i+1,),
builder=self)
self._initializeGearmanWorker(w,
self._config.gearman_servers.values())
self.upload_workers.append(w)
self._registerGearmanFunctions(self._config.diskimages.values())
self._registerExistingImageUploads()
self.log.debug('Starting listener for build jobs')
self.threads = []
for worker in self.build_workers + self.upload_workers:
t = threading.Thread(target=worker.run)
t.daemon = True
t.start()
self.threads.append(t)
self.scheduler = BuilderScheduler(self._config,
self._build_workers,
self._upload_workers)
self.scheduler_thread = threading.Thread(target=self.scheduler.run)
self.scheduler_thread.daemon = True
self.scheduler_thread.start()
self._running = True
for t in self.threads:
t.join()
self._running = False
def stop(self):
'''
Stop the builder.
Signal the scheduler thread to begin the shutdown process. We don't
want this method to return until the scheduler has successfully
stopped all of its own threads. Since we haven't yet joined to that
thread, do it here.
'''
with self._start_lock:
self.log.debug('Stopping.')
if not self._running:
self.log.warning("Stop called when already stopped")
return
for worker in self.build_workers + self.upload_workers:
try:
worker.shutdown()
except OSError as e:
if e.errno == errno.EBADF:
# The connection has been lost already
self.log.debug("Gearman connection lost when "
"attempting to shutdown; ignoring")
else:
raise
self.scheduler.stop()
self.log.debug('Waiting for jobs to complete')
# Wait for the builder to complete any currently running jobs
while self._running:
time.sleep(1)
# by joining with the main scheduler thread which should return
# when all of its worker threads are done.
self.log.debug('Waiting for jobs to complete')
self.scheduler_thread.join()
self.log.debug('Stopping providers')
provider_manager.ProviderManager.stopProviders(self._config)
self.log.debug('Finished stopping')
self._running = False
def load_config(self, config_path):
config = nodepool_config.loadConfig(config_path)
provider_manager.ProviderManager.reconfigure(
@ -254,49 +314,12 @@ class NodePoolBuilder(object):
self._config = config
def _validate_config(self):
if not self._config.gearman_servers.values():
raise RuntimeError('No gearman servers specified in config.')
if not self._config.zookeeper_servers.values():
raise RuntimeError('No ZooKeeper servers specified in config.')
if not self._config.imagesdir:
raise RuntimeError('No images-dir specified in config.')
def _initializeGearmanWorker(self, worker, servers):
for server in servers:
worker.addServer(server.host, server.port)
self.log.debug('Waiting for gearman server')
worker.waitForServer()
def _registerGearmanFunctions(self, images):
self.log.debug('Registering gearman functions')
for worker in self.build_workers:
for image in images:
worker.registerFunction(
'image-build:%s' % image.name)
def _registerExistingImageUploads(self):
images = DibImageFile.from_images_dir(self._config.imagesdir)
for image in images:
self.registerImageId(image.image_id)
def registerImageId(self, image_id):
self.log.info('Registering image id: %s', image_id)
for worker in self.upload_workers:
worker.registerFunction('image-upload:%s' % image_id)
worker.registerFunction('image-delete:%s' % image_id)
self._built_image_ids.add(image_id)
def unregisterImageId(self, image_id):
if image_id in self._built_image_ids:
self.log.info('Unregistering image id: %s', image_id)
for worker in self.upload_workers:
worker.unRegisterFunction('image-upload:%s' % image_id)
worker.unRegisterFunction('image-delete:%s' % image_id)
self._built_image_ids.remove(image_id)
else:
self.log.warning('Attempting to remove image %d but image not '
'found', image_id)
def canHandleImageIdJob(self, job, image_op):
return (job.name.startswith(image_op + ':') and
job.name.split(':')[1]) in self._built_image_ids

View File

@ -63,7 +63,7 @@ class NodePoolBuilderApp(nodepool.cmd.NodepoolApp):
signal.signal(signal.SIGINT, self.sigint_handler)
nb_thread = threading.Thread(target=self.nb.runForever)
nb_thread = threading.Thread(target=self.nb.run)
nb_thread.start()
while True:

View File

@ -137,7 +137,7 @@ class NodePoolDaemon(nodepool.cmd.NodepoolApp):
self.pool.start()
if self.args.builder:
nb_thread = threading.Thread(target=self.builder.runForever)
nb_thread = threading.Thread(target=self.builder.run)
nb_thread.start()
self.webapp.start()

View File

@ -437,7 +437,7 @@ class BuilderFixture(fixtures.Fixture):
def setUp(self):
super(BuilderFixture, self).setUp()
self.builder = builder.NodePoolBuilder(self.configfile)
nb_thread = threading.Thread(target=self.builder.runForever)
nb_thread = threading.Thread(target=self.builder.run)
nb_thread.daemon = True
self.addCleanup(self.cleanup)
nb_thread.start()

View File

@ -13,6 +13,9 @@ gearman-servers:
- host: localhost
port: {gearman_port}
zookeeper-servers:
- host: localhost
labels:
- name: real-label
image: real-image

View File

@ -13,6 +13,9 @@ gearman-servers:
- host: localhost
port: {gearman_port}
zookeeper-servers:
- host: localhost
labels:
- name: real-label
image: real-image

View File

@ -13,6 +13,9 @@ gearman-servers:
- host: localhost
port: {gearman_port}
zookeeper-servers:
- host: localhost
labels:
- name: fake-label
image: fake-image

View File

@ -13,6 +13,9 @@ gearman-servers:
- host: localhost
port: {gearman_port}
zookeeper-servers:
- host: localhost
labels:
- name: fake-label
image: fake-image

View File

@ -13,6 +13,9 @@ gearman-servers:
- host: localhost
port: {gearman_port}
zookeeper-servers:
- host: localhost
labels:
- name: fake-label
image: fake-image

View File

@ -13,6 +13,9 @@ gearman-servers:
- host: localhost
port: {gearman_port}
zookeeper-servers:
- host: localhost
labels:
- name: fake-label1
image: fake-image1

View File

@ -14,6 +14,9 @@ gearman-servers:
- host: localhost
port: {gearman_port}
zookeeper-servers:
- host: localhost
labels:
- name: fake-dib-label
image: fake-dib-image

View File

@ -14,6 +14,9 @@ gearman-servers:
- host: localhost
port: {gearman_port}
zookeeper-servers:
- host: localhost
labels:
- name: fake-label
image: fake-dib-image

View File

@ -14,6 +14,9 @@ gearman-servers:
- host: localhost
port: {gearman_port}
zookeeper-servers:
- host: localhost
labels:
- name: fake-label
image: fake-dib-image

View File

@ -14,6 +14,9 @@ gearman-servers:
- host: localhost
port: {gearman_port}
zookeeper-servers:
- host: localhost
labels:
- name: fake-label
image: fake-dib-image

View File

@ -14,6 +14,9 @@ gearman-servers:
- host: localhost
port: {gearman_port}
zookeeper-servers:
- host: localhost
labels:
- name: fake-dib-label
image: fake-dib-image

View File

@ -14,6 +14,9 @@ gearman-servers:
- host: localhost
port: {gearman_port}
zookeeper-servers:
- host: localhost
labels:
- name: fake-label
image: fake-dib-image

View File

@ -14,6 +14,9 @@ gearman-servers:
- host: localhost
port: {gearman_port}
zookeeper-servers:
- host: localhost
labels:
- name: fake-label1
image: fake-image

View File

@ -13,6 +13,9 @@ gearman-servers:
- host: localhost
port: {gearman_port}
zookeeper-servers:
- host: localhost
labels:
- name: fake-label
image: fake-image

View File

@ -13,6 +13,9 @@ gearman-servers:
- host: localhost
port: {gearman_port}
zookeeper-servers:
- host: localhost
labels:
- name: fake-label
image: fake-image

View File

@ -90,7 +90,7 @@ class TestNodepoolBuilder(tests.DBTestCase):
def test_start_stop(self):
config = self.setup_config('node_dib.yaml')
nb = builder.NodePoolBuilder(config)
nb_thread = threading.Thread(target=nb.runForever)
nb_thread = threading.Thread(target=nb.run)
nb_thread.daemon = True
nb_thread.start()
@ -103,6 +103,8 @@ class TestNodepoolBuilder(tests.DBTestCase):
def test_image_upload_fail(self):
"""Test that image upload fails are handled properly."""
self.skip("Skipping until ZooKeeper is enabled")
# Enter a working state before we test that fails are handled.
configfile = self.setup_config('node_dib.yaml')
pool = self.useNodepool(configfile, watermark_sleep=1)

View File

@ -24,6 +24,10 @@ from nodepool import tests
class TestNodepoolCMD(tests.DBTestCase):
def setUp(self):
super(TestNodepoolCMD, self).setUp()
self.skip("Skipping until ZooKeeper is enabled")
def patch_argv(self, *args):
argv = ["nodepool", "-s", self.secure_conf]
argv.extend(args)

View File

@ -30,6 +30,10 @@ import nodepool.nodepool
class TestNodepool(tests.DBTestCase):
log = logging.getLogger("nodepool.TestNodepool")
def setUp(self):
super(TestNodepool, self).setUp()
self.skip("Skipping until ZooKeeper is enabled")
def test_db(self):
db = nodedb.NodeDatabase(self.dburi)
with db.getSession() as session: