diff --git a/nodepool/builder.py b/nodepool/builder.py index 75905536a..7f3867d7e 100644 --- a/nodepool/builder.py +++ b/nodepool/builder.py @@ -24,6 +24,7 @@ import threading import time import shlex import uuid +from concurrent.futures.thread import ThreadPoolExecutor from pathlib import Path @@ -440,12 +441,12 @@ class CleanupWorker(BaseWorker): f'{image_name}.build.state') pipeline.gauge(key, build_status) upload_status_by_provider = {} - for upload_worker in self.builder._upload_workers: - for provider_name, provider_status in \ - upload_worker._image_status.get(image_name, {}).items(): - upload_status_by_provider[provider_name] = max( - provider_status, upload_status_by_provider.get( - provider_name, STATUS_IDLE)) + upload_worker = self.builder._upload_worker + for provider_name, provider_status in \ + upload_worker._image_status.get(image_name, {}).items(): + upload_status_by_provider[provider_name] = max( + provider_status, upload_status_by_provider.get( + provider_name, STATUS_IDLE)) for provider_name, upload_status in \ upload_status_by_provider.items(): key = (f'nodepool.builder.{self._hostname}.image.{image_name}.' @@ -1102,11 +1103,12 @@ class BuildWorker(BaseWorker): class UploadWorker(BaseWorker): def __init__(self, name, builder_id, config_path, secure_path, - interval, zk): + interval, zk, upload_workers): super(UploadWorker, self).__init__(builder_id, config_path, secure_path, interval, zk) self.log = logging.getLogger("nodepool.builder.UploadWorker.%s" % name) self.name = 'UploadWorker.%s' % name + self.upload_pool = ThreadPoolExecutor(max_workers=upload_workers) def _reloadConfig(self): ''' @@ -1259,30 +1261,23 @@ class UploadWorker(BaseWorker): If we find any builds in the 'ready' state that haven't been uploaded to providers, do the upload if they are available on the local disk. ''' + self.log.info("Check for provider uploads") + for provider in self._config.providers.values(): if not provider.manage_images: continue for image in provider.diskimages.values(): - uploaded = False - # Check if we've been told to shutdown # or if ZK connection is suspended if not self._running or self._zk.suspended or self._zk.lost: return try: - uploaded = self._checkProviderImageUpload(provider, image) + self._checkProviderImageUpload(provider, image) except Exception: self.log.exception("Error uploading image %s " "to provider %s:", image.name, provider.name) - # NOTE: Due to the configuration file disagreement issue - # (the copy we have may not be current), if we took the time - # to attempt to upload an image, let's short-circuit this loop - # to give us a chance to reload the configuration file. - if uploaded: - return - def _checkProviderImageUpload(self, provider, image): ''' The main body of _checkForProviderUploads. This encapsulates @@ -1299,14 +1294,14 @@ class UploadWorker(BaseWorker): # Check if image uploads are paused. if provider.diskimages.get(image.name).pause: self._image_status[image.name][provider.name] = STATUS_PAUSED - return False + return self._image_status[image.name][provider.name] = STATUS_IDLE # Search for the most recent 'ready' image build builds = self._zk.getMostRecentBuilds(1, image.name, zk.READY) if not builds: - return False + return build = builds[0] @@ -1315,29 +1310,34 @@ class UploadWorker(BaseWorker): local_images = DibImageFile.from_image_id( self._config.images_dir, "-".join([image.name, build.id])) if not local_images: - return False + return # See if this image has already been uploaded upload = self._zk.getMostRecentBuildImageUploads( 1, image.name, build.id, provider.name, zk.READY) if upload: - return False + return # See if this provider supports the available image formats if provider.image_type not in build.formats: - return False + return + # Offload the actual upload to the thread pool + self.upload_pool.submit( + self._doImageUpload, build, image, local_images, provider) + + def _doImageUpload(self, build, image, local_images, provider): try: with self._zk.imageUploadLock( - image.name, build.id, provider.name, - blocking=False + image.name, build.id, provider.name, + blocking=False ): # Verify once more that it hasn't been uploaded since the # last check. upload = self._zk.getMostRecentBuildImageUploads( 1, image.name, build.id, provider.name, zk.READY) if upload: - return False + return # NOTE: Due to the configuration file disagreement issue # (the copy we have may not be current), we try to verify @@ -1345,7 +1345,7 @@ class UploadWorker(BaseWorker): # before we upload. b = self._zk.getBuild(image.name, build.id) if not b or b.state == zk.DELETING: - return False + return # New upload number with initial state 'uploading' data = zk.ImageUpload() @@ -1367,10 +1367,13 @@ class UploadWorker(BaseWorker): # Set final state self._zk.storeImageUpload(image.name, build.id, provider.name, data, upnum) - return True except exceptions.ZKLockException: # Lock is already held. Skip it. - return False + return + except Exception: + self.log.exception("Error uploading image %s " + "to provider %s:", + image.name, provider.name) finally: self._image_status[image.name][provider.name] = STATUS_IDLE @@ -1401,6 +1404,10 @@ class UploadWorker(BaseWorker): provider_manager.ProviderManager.stopProviders(self._config) + def shutdown(self): + super().shutdown() + self.upload_pool.shutdown(wait=False) + class NodePoolBuilder(object): ''' @@ -1428,7 +1435,7 @@ class NodePoolBuilder(object): self._num_builders = num_builders self._build_workers = [] self._num_uploaders = num_uploaders - self._upload_workers = [] + self._upload_worker = None self._janitor = None self._running = False self.cleanup_interval = 60 @@ -1517,12 +1524,12 @@ class NodePoolBuilder(object): w.start() self._build_workers.append(w) - for i in range(self._num_uploaders): - w = UploadWorker(i, builder_id, - self._config_path, self._secure_path, - self.upload_interval, self.zk) - w.start() - self._upload_workers.append(w) + self._upload_worker = UploadWorker( + i, builder_id, + self._config_path, self._secure_path, + self.upload_interval, self.zk, + self._num_uploaders) + self._upload_worker.start() if self.cleanup_interval > 0: self._janitor = CleanupWorker( @@ -1534,7 +1541,7 @@ class NodePoolBuilder(object): # Wait until all threads are running. Otherwise, we have a race # on the worker _running attribute if shutdown() is called before # run() actually begins. - workers = self._build_workers + self._upload_workers + workers = self._build_workers + [self._upload_worker] if self._janitor: workers += [self._janitor] while not all([ diff --git a/nodepool/tests/__init__.py b/nodepool/tests/__init__.py index 713f5a7b2..bc5fe7fda 100644 --- a/nodepool/tests/__init__.py +++ b/nodepool/tests/__init__.py @@ -394,10 +394,9 @@ class BuilderFixture(fixtures.Fixture): # The NodePoolBuilder.stop() method does not intentionally stop the # upload workers for reasons documented in that method. But we can # safely do so in tests. - for worker in self.builder._upload_workers: - worker.shutdown() - worker.join() - self.log.debug("Stopped worker %s", worker.name) + self.builder._upload_worker.shutdown() + self.builder._upload_worker.join() + self.log.debug("Stopped worker %s", self.builder._upload_worker.name) self.builder.stop() diff --git a/nodepool/tests/unit/test_builder.py b/nodepool/tests/unit/test_builder.py index 48645cb83..f75a13431 100644 --- a/nodepool/tests/unit/test_builder.py +++ b/nodepool/tests/unit/test_builder.py @@ -248,9 +248,8 @@ class TestNodePoolBuilder(tests.DBTestCase): builder1 = self.useBuilder(configfile1) self.waitForImage('fake-provider1', 'fake-image1') self.waitForBuild('fake-image1') - for worker in builder1._upload_workers: - worker.shutdown() - worker.join() + builder1._upload_worker.shutdown() + builder1._upload_worker.join() builder1.stop() self.zk.deleteUpload('fake-image1', '0000000001', diff --git a/nodepool/tests/unit/test_commands.py b/nodepool/tests/unit/test_commands.py index 16d994a6a..39f20256a 100644 --- a/nodepool/tests/unit/test_commands.py +++ b/nodepool/tests/unit/test_commands.py @@ -301,9 +301,8 @@ class TestNodepoolCMD(tests.DBTestCase): builds = self.zk.getMostRecentBuilds(1, 'fake-image', zk.READY) # 2. Stop builder1; start builder2 - for worker in builder1._upload_workers: - worker.shutdown() - worker.join() + builder1._upload_worker.shutdown() + builder1._upload_worker.join() builder1.stop() # setup_config() makes a new images_dir each time, so this # acts as a different builder. @@ -662,9 +661,8 @@ class TestNodepoolCMD(tests.DBTestCase): build2 = self.waitForBuild('fake-image', ignore_list=[build]) pool.stop() - for worker in builder._upload_workers: - worker.shutdown() - worker.join() + builder._upload_worker.shutdown() + builder._upload_worker.join() builder.stop() # Save a copy of the data in ZK old_data = self.getZKTree('/nodepool/images') diff --git a/nodepool/tests/unit/test_launcher.py b/nodepool/tests/unit/test_launcher.py index fc11d247f..2e137d202 100644 --- a/nodepool/tests/unit/test_launcher.py +++ b/nodepool/tests/unit/test_launcher.py @@ -726,7 +726,7 @@ class TestLauncher(tests.DBTestCase): pool = self.useNodepool(configfile, watermark_sleep=1) self.startPool(pool) - provider = (builder._upload_workers[0]._config. + provider = (builder._upload_worker._config. provider_managers['fake-provider']) cloud_image = provider.adapter._findImage(image.external_id) self.assertEqual(