Check for images to upload single threaded
Currently we have multiple upload workers where each of them is constantly checking if there is an image to upload. This check is involves quite a lot zk operations increasing load on the nodepool-builder as well as on zk. Instead we can run the check loop only once and dispatch the actual upload to a limited ThreadPoolExecutor. Change-Id: I95a4a57c4acb856de5c25d570249c43f735503d5
This commit is contained in:
parent
17c57eea85
commit
b4312bcd1f
|
@ -24,6 +24,7 @@ import threading
|
|||
import time
|
||||
import shlex
|
||||
import uuid
|
||||
from concurrent.futures.thread import ThreadPoolExecutor
|
||||
|
||||
from pathlib import Path
|
||||
|
||||
|
@ -440,12 +441,12 @@ class CleanupWorker(BaseWorker):
|
|||
f'{image_name}.build.state')
|
||||
pipeline.gauge(key, build_status)
|
||||
upload_status_by_provider = {}
|
||||
for upload_worker in self.builder._upload_workers:
|
||||
for provider_name, provider_status in \
|
||||
upload_worker._image_status.get(image_name, {}).items():
|
||||
upload_status_by_provider[provider_name] = max(
|
||||
provider_status, upload_status_by_provider.get(
|
||||
provider_name, STATUS_IDLE))
|
||||
upload_worker = self.builder._upload_worker
|
||||
for provider_name, provider_status in \
|
||||
upload_worker._image_status.get(image_name, {}).items():
|
||||
upload_status_by_provider[provider_name] = max(
|
||||
provider_status, upload_status_by_provider.get(
|
||||
provider_name, STATUS_IDLE))
|
||||
for provider_name, upload_status in \
|
||||
upload_status_by_provider.items():
|
||||
key = (f'nodepool.builder.{self._hostname}.image.{image_name}.'
|
||||
|
@ -1102,11 +1103,12 @@ class BuildWorker(BaseWorker):
|
|||
|
||||
class UploadWorker(BaseWorker):
|
||||
def __init__(self, name, builder_id, config_path, secure_path,
|
||||
interval, zk):
|
||||
interval, zk, upload_workers):
|
||||
super(UploadWorker, self).__init__(builder_id, config_path,
|
||||
secure_path, interval, zk)
|
||||
self.log = logging.getLogger("nodepool.builder.UploadWorker.%s" % name)
|
||||
self.name = 'UploadWorker.%s' % name
|
||||
self.upload_pool = ThreadPoolExecutor(max_workers=upload_workers)
|
||||
|
||||
def _reloadConfig(self):
|
||||
'''
|
||||
|
@ -1259,30 +1261,23 @@ class UploadWorker(BaseWorker):
|
|||
If we find any builds in the 'ready' state that haven't been uploaded
|
||||
to providers, do the upload if they are available on the local disk.
|
||||
'''
|
||||
self.log.info("Check for provider uploads")
|
||||
|
||||
for provider in self._config.providers.values():
|
||||
if not provider.manage_images:
|
||||
continue
|
||||
for image in provider.diskimages.values():
|
||||
uploaded = False
|
||||
|
||||
# Check if we've been told to shutdown
|
||||
# or if ZK connection is suspended
|
||||
if not self._running or self._zk.suspended or self._zk.lost:
|
||||
return
|
||||
try:
|
||||
uploaded = self._checkProviderImageUpload(provider, image)
|
||||
self._checkProviderImageUpload(provider, image)
|
||||
except Exception:
|
||||
self.log.exception("Error uploading image %s "
|
||||
"to provider %s:",
|
||||
image.name, provider.name)
|
||||
|
||||
# NOTE: Due to the configuration file disagreement issue
|
||||
# (the copy we have may not be current), if we took the time
|
||||
# to attempt to upload an image, let's short-circuit this loop
|
||||
# to give us a chance to reload the configuration file.
|
||||
if uploaded:
|
||||
return
|
||||
|
||||
def _checkProviderImageUpload(self, provider, image):
|
||||
'''
|
||||
The main body of _checkForProviderUploads. This encapsulates
|
||||
|
@ -1299,14 +1294,14 @@ class UploadWorker(BaseWorker):
|
|||
# Check if image uploads are paused.
|
||||
if provider.diskimages.get(image.name).pause:
|
||||
self._image_status[image.name][provider.name] = STATUS_PAUSED
|
||||
return False
|
||||
return
|
||||
|
||||
self._image_status[image.name][provider.name] = STATUS_IDLE
|
||||
# Search for the most recent 'ready' image build
|
||||
builds = self._zk.getMostRecentBuilds(1, image.name,
|
||||
zk.READY)
|
||||
if not builds:
|
||||
return False
|
||||
return
|
||||
|
||||
build = builds[0]
|
||||
|
||||
|
@ -1315,29 +1310,34 @@ class UploadWorker(BaseWorker):
|
|||
local_images = DibImageFile.from_image_id(
|
||||
self._config.images_dir, "-".join([image.name, build.id]))
|
||||
if not local_images:
|
||||
return False
|
||||
return
|
||||
|
||||
# See if this image has already been uploaded
|
||||
upload = self._zk.getMostRecentBuildImageUploads(
|
||||
1, image.name, build.id, provider.name, zk.READY)
|
||||
if upload:
|
||||
return False
|
||||
return
|
||||
|
||||
# See if this provider supports the available image formats
|
||||
if provider.image_type not in build.formats:
|
||||
return False
|
||||
return
|
||||
|
||||
# Offload the actual upload to the thread pool
|
||||
self.upload_pool.submit(
|
||||
self._doImageUpload, build, image, local_images, provider)
|
||||
|
||||
def _doImageUpload(self, build, image, local_images, provider):
|
||||
try:
|
||||
with self._zk.imageUploadLock(
|
||||
image.name, build.id, provider.name,
|
||||
blocking=False
|
||||
image.name, build.id, provider.name,
|
||||
blocking=False
|
||||
):
|
||||
# Verify once more that it hasn't been uploaded since the
|
||||
# last check.
|
||||
upload = self._zk.getMostRecentBuildImageUploads(
|
||||
1, image.name, build.id, provider.name, zk.READY)
|
||||
if upload:
|
||||
return False
|
||||
return
|
||||
|
||||
# NOTE: Due to the configuration file disagreement issue
|
||||
# (the copy we have may not be current), we try to verify
|
||||
|
@ -1345,7 +1345,7 @@ class UploadWorker(BaseWorker):
|
|||
# before we upload.
|
||||
b = self._zk.getBuild(image.name, build.id)
|
||||
if not b or b.state == zk.DELETING:
|
||||
return False
|
||||
return
|
||||
|
||||
# New upload number with initial state 'uploading'
|
||||
data = zk.ImageUpload()
|
||||
|
@ -1367,10 +1367,13 @@ class UploadWorker(BaseWorker):
|
|||
# Set final state
|
||||
self._zk.storeImageUpload(image.name, build.id,
|
||||
provider.name, data, upnum)
|
||||
return True
|
||||
except exceptions.ZKLockException:
|
||||
# Lock is already held. Skip it.
|
||||
return False
|
||||
return
|
||||
except Exception:
|
||||
self.log.exception("Error uploading image %s "
|
||||
"to provider %s:",
|
||||
image.name, provider.name)
|
||||
finally:
|
||||
self._image_status[image.name][provider.name] = STATUS_IDLE
|
||||
|
||||
|
@ -1401,6 +1404,10 @@ class UploadWorker(BaseWorker):
|
|||
|
||||
provider_manager.ProviderManager.stopProviders(self._config)
|
||||
|
||||
def shutdown(self):
|
||||
super().shutdown()
|
||||
self.upload_pool.shutdown(wait=False)
|
||||
|
||||
|
||||
class NodePoolBuilder(object):
|
||||
'''
|
||||
|
@ -1428,7 +1435,7 @@ class NodePoolBuilder(object):
|
|||
self._num_builders = num_builders
|
||||
self._build_workers = []
|
||||
self._num_uploaders = num_uploaders
|
||||
self._upload_workers = []
|
||||
self._upload_worker = None
|
||||
self._janitor = None
|
||||
self._running = False
|
||||
self.cleanup_interval = 60
|
||||
|
@ -1517,12 +1524,12 @@ class NodePoolBuilder(object):
|
|||
w.start()
|
||||
self._build_workers.append(w)
|
||||
|
||||
for i in range(self._num_uploaders):
|
||||
w = UploadWorker(i, builder_id,
|
||||
self._config_path, self._secure_path,
|
||||
self.upload_interval, self.zk)
|
||||
w.start()
|
||||
self._upload_workers.append(w)
|
||||
self._upload_worker = UploadWorker(
|
||||
i, builder_id,
|
||||
self._config_path, self._secure_path,
|
||||
self.upload_interval, self.zk,
|
||||
self._num_uploaders)
|
||||
self._upload_worker.start()
|
||||
|
||||
if self.cleanup_interval > 0:
|
||||
self._janitor = CleanupWorker(
|
||||
|
@ -1534,7 +1541,7 @@ class NodePoolBuilder(object):
|
|||
# Wait until all threads are running. Otherwise, we have a race
|
||||
# on the worker _running attribute if shutdown() is called before
|
||||
# run() actually begins.
|
||||
workers = self._build_workers + self._upload_workers
|
||||
workers = self._build_workers + [self._upload_worker]
|
||||
if self._janitor:
|
||||
workers += [self._janitor]
|
||||
while not all([
|
||||
|
|
|
@ -394,10 +394,9 @@ class BuilderFixture(fixtures.Fixture):
|
|||
# The NodePoolBuilder.stop() method does not intentionally stop the
|
||||
# upload workers for reasons documented in that method. But we can
|
||||
# safely do so in tests.
|
||||
for worker in self.builder._upload_workers:
|
||||
worker.shutdown()
|
||||
worker.join()
|
||||
self.log.debug("Stopped worker %s", worker.name)
|
||||
self.builder._upload_worker.shutdown()
|
||||
self.builder._upload_worker.join()
|
||||
self.log.debug("Stopped worker %s", self.builder._upload_worker.name)
|
||||
self.builder.stop()
|
||||
|
||||
|
||||
|
|
|
@ -248,9 +248,8 @@ class TestNodePoolBuilder(tests.DBTestCase):
|
|||
builder1 = self.useBuilder(configfile1)
|
||||
self.waitForImage('fake-provider1', 'fake-image1')
|
||||
self.waitForBuild('fake-image1')
|
||||
for worker in builder1._upload_workers:
|
||||
worker.shutdown()
|
||||
worker.join()
|
||||
builder1._upload_worker.shutdown()
|
||||
builder1._upload_worker.join()
|
||||
builder1.stop()
|
||||
|
||||
self.zk.deleteUpload('fake-image1', '0000000001',
|
||||
|
|
|
@ -301,9 +301,8 @@ class TestNodepoolCMD(tests.DBTestCase):
|
|||
builds = self.zk.getMostRecentBuilds(1, 'fake-image', zk.READY)
|
||||
|
||||
# 2. Stop builder1; start builder2
|
||||
for worker in builder1._upload_workers:
|
||||
worker.shutdown()
|
||||
worker.join()
|
||||
builder1._upload_worker.shutdown()
|
||||
builder1._upload_worker.join()
|
||||
builder1.stop()
|
||||
# setup_config() makes a new images_dir each time, so this
|
||||
# acts as a different builder.
|
||||
|
@ -662,9 +661,8 @@ class TestNodepoolCMD(tests.DBTestCase):
|
|||
build2 = self.waitForBuild('fake-image', ignore_list=[build])
|
||||
|
||||
pool.stop()
|
||||
for worker in builder._upload_workers:
|
||||
worker.shutdown()
|
||||
worker.join()
|
||||
builder._upload_worker.shutdown()
|
||||
builder._upload_worker.join()
|
||||
builder.stop()
|
||||
# Save a copy of the data in ZK
|
||||
old_data = self.getZKTree('/nodepool/images')
|
||||
|
|
|
@ -726,7 +726,7 @@ class TestLauncher(tests.DBTestCase):
|
|||
|
||||
pool = self.useNodepool(configfile, watermark_sleep=1)
|
||||
self.startPool(pool)
|
||||
provider = (builder._upload_workers[0]._config.
|
||||
provider = (builder._upload_worker._config.
|
||||
provider_managers['fake-provider'])
|
||||
cloud_image = provider.adapter._findImage(image.external_id)
|
||||
self.assertEqual(
|
||||
|
|
Loading…
Reference in New Issue