Add janitor thread
Adds a thread to remove local DIB builds and images uploaded to providers. Change-Id: I7f2f5e9daeb160baf1bdabd672478dc892120dc5
This commit is contained in:
@@ -15,6 +15,7 @@
|
||||
|
||||
import logging
|
||||
import os
|
||||
import shutil
|
||||
import socket
|
||||
import subprocess
|
||||
import threading
|
||||
@@ -95,6 +96,17 @@ class BaseWorker(threading.Thread):
|
||||
self._hostname = socket.gethostname()
|
||||
self._statsd = stats.get_client()
|
||||
|
||||
def _makeStateData(self, state):
|
||||
'''
|
||||
Create a state dict with minimal, common state information.
|
||||
|
||||
:param str state: The state you want.
|
||||
'''
|
||||
data = {}
|
||||
data['state'] = state
|
||||
data['state_time'] = int(time.time())
|
||||
return data
|
||||
|
||||
def _checkForZooKeeperChanges(self, new_config):
|
||||
'''
|
||||
Connect to ZooKeeper cluster.
|
||||
@@ -120,6 +132,204 @@ class BaseWorker(threading.Thread):
|
||||
self._running = False
|
||||
|
||||
|
||||
class CleanupWorker(BaseWorker):
|
||||
'''
|
||||
The janitor of nodepool-builder that will remove images from providers
|
||||
and any local DIB builds.
|
||||
'''
|
||||
log = logging.getLogger("nodepool.builder.CleanupWorker")
|
||||
|
||||
def __init__(self, config_path, cleanup_interval):
|
||||
super(CleanupWorker, self).__init__(config_path)
|
||||
self.cleanup_interval = cleanup_interval
|
||||
|
||||
def _buildUploadRecencyTable(self):
|
||||
'''
|
||||
Builds a table for each image of the most recent uploads to each
|
||||
provider.
|
||||
|
||||
Example)
|
||||
|
||||
image1:
|
||||
providerA: [ (build_id, upload_id, upload_time), ... ]
|
||||
providerB: [ (build_id, upload_id, upload_time), ... ]
|
||||
image2:
|
||||
providerC: [ (build_id, upload_id, upload_time), ... ]
|
||||
'''
|
||||
self._rtable = {}
|
||||
for image in self._zk.getImageNames():
|
||||
self._rtable[image] = {}
|
||||
for build_id, build_data in self._zk.getBuilds(image, 'ready'):
|
||||
for provider in self._zk.getBuildProviders(image, build_id):
|
||||
if provider not in self._rtable[image]:
|
||||
self._rtable[image][provider] = []
|
||||
uploads = self._zk.getMostRecentImageUploads(
|
||||
2, image, build_id, provider, 'ready')
|
||||
for u_id, u_data in uploads.iteritems():
|
||||
self._rtable[image][provider].append(
|
||||
(build_id, u_id, u_data['state_time'])
|
||||
)
|
||||
|
||||
# Sort uploads by state_time (upload time) and keep the 2 most recent
|
||||
for i in self._rtable.keys():
|
||||
for p in self._rtable[i].keys():
|
||||
self._rtable[i][p].sort(key=lambda x: x[2], reverse=True)
|
||||
self._rtable[i][p] = self._rtable[i][p][:2]
|
||||
|
||||
def _isRecentUpload(self, image, provider, build_id, upload_id):
|
||||
'''
|
||||
Search for an upload for a build within the recency table.
|
||||
'''
|
||||
for b_id, u_id, u_time in self._rtable[image][provider]:
|
||||
if build_id == b_id and upload_id == u_id:
|
||||
return True
|
||||
return False
|
||||
|
||||
def _inProgressUpload(self, upload_id, upload_data, image, provider):
|
||||
'''
|
||||
Determine if an upload is in progress.
|
||||
'''
|
||||
if upload_data['state'] != 'uploading':
|
||||
return False
|
||||
|
||||
try:
|
||||
with self._zk.imageUploadLock(image, upload_id,
|
||||
provider, blocking=False):
|
||||
pass
|
||||
except exceptions.ZKLockException:
|
||||
return True
|
||||
return False
|
||||
|
||||
def _deleteLocalBuild(self, image, build_id):
|
||||
'''
|
||||
Remove expired image build from local disk.
|
||||
|
||||
:param str image: Name of the image whose build we are deleting.
|
||||
:param str build_id: ID of the build we want to delete.
|
||||
|
||||
:returns: True if files were deleted, False if none were found.
|
||||
'''
|
||||
base = "-".join([image, build_id])
|
||||
files = DibImageFile.from_image_id(self._config.imagesdir, base)
|
||||
if not files:
|
||||
return False
|
||||
|
||||
self.log.info("Doing cleanup for %s:%s" % (image, build_id))
|
||||
|
||||
manifest_dir = None
|
||||
|
||||
for f in files:
|
||||
filename = f.to_path(self._config.imagesdir, True)
|
||||
if not manifest_dir:
|
||||
path, ext = filename.rsplit('.', 1)
|
||||
manifest_dir = path + ".d"
|
||||
|
||||
try:
|
||||
os.remove(filename)
|
||||
self.log.info("Removed DIB file %s" % filename)
|
||||
except OSError as e:
|
||||
if e.errno != 2: # No such file or directory
|
||||
raise e
|
||||
|
||||
try:
|
||||
shutil.rmtree(manifest_dir)
|
||||
self.log.info("Removed DIB manifest %s" % manifest_dir)
|
||||
except OSError as e:
|
||||
if e.errno != 2: # No such file or directory
|
||||
raise e
|
||||
|
||||
return True
|
||||
|
||||
def _cleanupProvider(self, provider, image, build_id):
|
||||
all_uploads = self._zk.getUploads(image, build_id, provider.name)
|
||||
|
||||
for upload_id, upload_data in all_uploads:
|
||||
if self._isRecentUpload(image, provider, build_id, upload_id):
|
||||
continue
|
||||
|
||||
deleted = False
|
||||
|
||||
if upload_data['state'] != 'deleted':
|
||||
if not self._inProgressUpload(upload_id, upload_data,
|
||||
image, provider.name):
|
||||
data = self._makeStateData('deleted')
|
||||
self._zk.storeImageUpload(image, build_id, provider,
|
||||
data, upload_id)
|
||||
deleted = True
|
||||
|
||||
if upload_data['state'] == 'deleted' or deleted:
|
||||
manager = self._config.provider_managers[provider.name]
|
||||
try:
|
||||
manager.deleteImage(upload_data['external_name'])
|
||||
except Exception as e:
|
||||
self.log.error(
|
||||
"Unable to delete image %s from %s: %s" %
|
||||
(upload_data['external_name'], provider.name, e)
|
||||
)
|
||||
else:
|
||||
self._zk.deleteUpload(image, build_id,
|
||||
provider.name, upload_id)
|
||||
|
||||
def _cleanup(self):
|
||||
'''
|
||||
Clean up builds on disk and in providers.
|
||||
'''
|
||||
known_providers = self._config.providers.values()
|
||||
image_names = self._zk.getImageNames()
|
||||
|
||||
self._buildUploadRecencyTable()
|
||||
|
||||
for image in image_names:
|
||||
builds_to_keep = self._zk.getMostRecentBuilds(2, image, 'ready')
|
||||
all_builds = self._zk.getBuilds(image)
|
||||
|
||||
for build_id, build_data in all_builds.iteritems():
|
||||
if build_data['state'] != 'deleted':
|
||||
if build_id in [b[0] for b in builds_to_keep]:
|
||||
continue
|
||||
|
||||
for provider in known_providers:
|
||||
self._cleanupProvider(provider, image, build_id)
|
||||
|
||||
uploads_exist = False
|
||||
for p in self._zk.getBuildProviders(image, build_id):
|
||||
if self._zk.getImageUploadNumbers(image, build_id, p):
|
||||
uploads_exist = True
|
||||
break
|
||||
|
||||
if not uploads_exist:
|
||||
data = self._makeStateData('deleted')
|
||||
self._zk.storeBuild(image, data, build_id)
|
||||
if self._deleteLocalBuild(image, build_id):
|
||||
self._zk.deleteBuild(image, build_id)
|
||||
|
||||
|
||||
def run(self):
|
||||
'''
|
||||
Start point for the CleanupWorker thread.
|
||||
'''
|
||||
self._running = True
|
||||
while self._running:
|
||||
# Don't do work if we've lost communication with the ZK cluster
|
||||
while self._zk and (self._zk.suspended or self._zk.lost):
|
||||
self.log.info("ZooKeeper suspended. Waiting")
|
||||
time.sleep(SUSPEND_WAIT_TIME)
|
||||
|
||||
new_config = nodepool_config.loadConfig(self._config_path)
|
||||
self._checkForZooKeeperChanges(new_config)
|
||||
provider_manager.ProviderManager.reconfigure(self._config, new_config)
|
||||
self._config = new_config
|
||||
|
||||
self._cleanup()
|
||||
|
||||
time.sleep(self.cleanup_interval)
|
||||
|
||||
if self._zk:
|
||||
self._zk.disconnect()
|
||||
|
||||
provider_manager.ProviderManager.stopProviders(self._config)
|
||||
|
||||
|
||||
class BuildWorker(BaseWorker):
|
||||
log = logging.getLogger("nodepool.builder.BuildWorker")
|
||||
|
||||
@@ -134,10 +344,8 @@ class BuildWorker(BaseWorker):
|
||||
|
||||
:param str state: The build state you want.
|
||||
'''
|
||||
data = {}
|
||||
data = super(BuildWorker, self)._makeStateData(state)
|
||||
data['builder'] = self._hostname
|
||||
data['state'] = state
|
||||
data['state_time'] = int(time.time())
|
||||
return data
|
||||
|
||||
def _checkForScheduledImageUpdates(self):
|
||||
@@ -364,17 +572,6 @@ class UploadWorker(BaseWorker):
|
||||
def __init__(self, config_path):
|
||||
super(UploadWorker, self).__init__(config_path)
|
||||
|
||||
def _makeStateData(self, state):
|
||||
'''
|
||||
Create an upload state dict with minimal, common state information.
|
||||
|
||||
:param str state: The upload state you want.
|
||||
'''
|
||||
data = {}
|
||||
data['state'] = state
|
||||
data['state_time'] = int(time.time())
|
||||
return data
|
||||
|
||||
def _uploadImage(self, build_id, image_name, images, provider):
|
||||
'''
|
||||
Upload a local DIB image build to a provider.
|
||||
@@ -563,7 +760,9 @@ class NodePoolBuilder(object):
|
||||
self._build_workers = []
|
||||
self._num_uploaders = num_uploaders
|
||||
self._upload_workers = []
|
||||
self._janitor = None
|
||||
self._running = False
|
||||
self.cleanup_interval = 60
|
||||
|
||||
# This lock is needed because the run() method is started in a
|
||||
# separate thread of control, which can return before the scheduler
|
||||
@@ -615,11 +814,17 @@ class NodePoolBuilder(object):
|
||||
w.start()
|
||||
self._upload_workers.append(w)
|
||||
|
||||
self._janitor = CleanupWorker(self._config_path,
|
||||
self.cleanup_interval)
|
||||
self._janitor.start()
|
||||
|
||||
# Wait until all threads are running. Otherwise, we have a race
|
||||
# on the worker _running attribute if shutdown() is called before
|
||||
# run() actually begins.
|
||||
while not all([
|
||||
x.running for x in (self._build_workers + self._upload_workers)
|
||||
x.running for x in (self._build_workers
|
||||
+ self._upload_workers
|
||||
+ [self._janitor])
|
||||
]):
|
||||
time.sleep(0)
|
||||
|
||||
@@ -633,7 +838,10 @@ class NodePoolBuilder(object):
|
||||
'''
|
||||
with self._start_lock:
|
||||
self.log.debug("Stopping. NodePoolBuilder shutting down workers")
|
||||
for worker in (self._build_workers + self._upload_workers):
|
||||
for worker in (self._build_workers
|
||||
+ self._upload_workers
|
||||
+ [self._janitor]
|
||||
):
|
||||
worker.shutdown()
|
||||
|
||||
self._running = False
|
||||
@@ -641,7 +849,10 @@ class NodePoolBuilder(object):
|
||||
self.log.debug('Waiting for jobs to complete')
|
||||
|
||||
# Do not exit until all of our owned threads exit.
|
||||
for worker in (self._build_workers + self._upload_workers):
|
||||
for worker in (self._build_workers
|
||||
+ self._upload_workers
|
||||
+ [self._janitor]
|
||||
):
|
||||
worker.join()
|
||||
|
||||
self.log.debug('Stopping providers')
|
||||
|
||||
@@ -87,6 +87,7 @@ class TestNodePoolBuilder(tests.DBTestCase):
|
||||
def test_start_stop(self):
|
||||
config = self.setup_config('node_dib.yaml')
|
||||
nb = builder.NodePoolBuilder(config)
|
||||
nb.cleanup_interval = .5
|
||||
nb.start()
|
||||
nb.stop()
|
||||
|
||||
|
||||
@@ -464,3 +464,15 @@ class TestZooKeeper(tests.ZKTestCase):
|
||||
|
||||
expected = {'1': v1, '2': v2, '3': v3, '4': v4, '5': v5, '6': v6}
|
||||
self.assertEqual(expected, matches)
|
||||
|
||||
def test_deleteBuild(self):
|
||||
path = self.zk._imageBuildsPath("trusty") + "/000001"
|
||||
self.zk.client.create(path, makepath=True)
|
||||
self.zk.deleteBuild("trusty", "000001")
|
||||
self.assertIsNone(self.zk.client.exists(path))
|
||||
|
||||
def test_deleteUpload(self):
|
||||
path = self.zk._imageUploadPath("trusty", "000", "rax") + "/000001"
|
||||
self.zk.client.create(path, makepath=True)
|
||||
self.zk.deleteUpload("trusty", "000", "rax", "000001")
|
||||
self.assertIsNone(self.zk.client.exists(path))
|
||||
|
||||
@@ -780,3 +780,32 @@ class ZooKeeper(object):
|
||||
self._data_watches[path] = (image, func)
|
||||
self.client.exists(path, watch=self._watch_wrapper)
|
||||
|
||||
def deleteBuild(self, image, build_number):
|
||||
'''
|
||||
Delete an image build from ZooKeeper.
|
||||
|
||||
:param str image: The image name.
|
||||
:param str build_number: The image build number to delete.
|
||||
'''
|
||||
path = self._imageBuildsPath(image)
|
||||
path = path + "/%s" % build_number
|
||||
try:
|
||||
self.client.delete(path)
|
||||
except kze.NoNodeError:
|
||||
pass
|
||||
|
||||
def deleteUpload(self, image, build_number, provider, upload_number):
|
||||
'''
|
||||
Delete an image upload from ZooKeeper.
|
||||
|
||||
:param str image: The image name.
|
||||
:param str build_number: The image build number.
|
||||
:param str provider: The provider name owning the image.
|
||||
:param str upload_number: The image upload number to delete.
|
||||
'''
|
||||
path = self._imageUploadPath(image, build_number, provider)
|
||||
path = path + "/%s" % upload_number
|
||||
try:
|
||||
self.client.delete(path)
|
||||
except kze.NoNodeError:
|
||||
pass
|
||||
|
||||
Reference in New Issue
Block a user