Mitigate config disagreement issues
Because the config file can change on us at any point, and each thread carries around its own copy of it, it's possible that a thread can be using an outdated version while in the middle of its work. This can cause unexpected behavior, like uploading an image to a provider when it is currently being deleted (leaving an undeleted upload). Until we can solve the deeper issue of config file syncing, we can reduce probability of this delete/upload problem by adding a check for the build status _just_ before an upload attempt, and by reloading our config after we spent time trying to upload to a provider (which can take quite a while). Also removes a TODO that was TODONE. Change-Id: Id066361792d356cf7f22b9157092e364a848c8e1
This commit is contained in:
parent
fd296c868b
commit
76f3617e24
|
@ -797,6 +797,19 @@ class UploadWorker(BaseWorker):
|
||||||
self.log = logging.getLogger("nodepool.builder.UploadWorker.%s" % name)
|
self.log = logging.getLogger("nodepool.builder.UploadWorker.%s" % name)
|
||||||
self.name = 'UploadWorker.%s' % name
|
self.name = 'UploadWorker.%s' % name
|
||||||
|
|
||||||
|
def _reloadConfig(self):
|
||||||
|
'''
|
||||||
|
Reload the nodepool configuration file.
|
||||||
|
'''
|
||||||
|
new_config = nodepool_config.loadConfig(self._config_path)
|
||||||
|
if not self._config:
|
||||||
|
self._config = new_config
|
||||||
|
|
||||||
|
self._checkForZooKeeperChanges(new_config)
|
||||||
|
provider_manager.ProviderManager.reconfigure(self._config, new_config,
|
||||||
|
use_taskmanager=False)
|
||||||
|
self._config = new_config
|
||||||
|
|
||||||
def _uploadImage(self, build_id, upload_id, image_name, images, provider):
|
def _uploadImage(self, build_id, upload_id, image_name, images, provider):
|
||||||
'''
|
'''
|
||||||
Upload a local DIB image build to a provider.
|
Upload a local DIB image build to a provider.
|
||||||
|
@ -891,17 +904,26 @@ class UploadWorker(BaseWorker):
|
||||||
'''
|
'''
|
||||||
for provider in self._config.providers.values():
|
for provider in self._config.providers.values():
|
||||||
for image in provider.images.values():
|
for image in provider.images.values():
|
||||||
|
uploaded = False
|
||||||
|
|
||||||
# Check if we've been told to shutdown
|
# Check if we've been told to shutdown
|
||||||
# or if ZK connection is suspended
|
# or if ZK connection is suspended
|
||||||
if not self.running or self._zk.suspended or self._zk.lost:
|
if not self.running or self._zk.suspended or self._zk.lost:
|
||||||
return
|
return
|
||||||
try:
|
try:
|
||||||
self._checkProviderImageUpload(provider, image)
|
uploaded = self._checkProviderImageUpload(provider, image)
|
||||||
except Exception:
|
except Exception:
|
||||||
self.log.exception("Error uploading image %s "
|
self.log.exception("Error uploading image %s "
|
||||||
"to provider %s:",
|
"to provider %s:",
|
||||||
image.name, provider.name)
|
image.name, provider.name)
|
||||||
|
|
||||||
|
# NOTE: Due to the configuration file disagreement issue
|
||||||
|
# (the copy we have may not be current), if we took the time
|
||||||
|
# to attempt to upload an image, let's short-circuit this loop
|
||||||
|
# to give us a chance to reload the configuration file.
|
||||||
|
if uploaded:
|
||||||
|
return
|
||||||
|
|
||||||
def _checkProviderImageUpload(self, provider, image):
|
def _checkProviderImageUpload(self, provider, image):
|
||||||
'''
|
'''
|
||||||
The main body of _checkForProviderUploads. This encapsulates
|
The main body of _checkForProviderUploads. This encapsulates
|
||||||
|
@ -909,18 +931,18 @@ class UploadWorker(BaseWorker):
|
||||||
and performing the upload. It is a separate function so that
|
and performing the upload. It is a separate function so that
|
||||||
exception handling can treat all provider-image uploads
|
exception handling can treat all provider-image uploads
|
||||||
indepedently.
|
indepedently.
|
||||||
'''
|
|
||||||
# TODO(jeblair): check for pause here
|
|
||||||
|
|
||||||
|
:returns: True if an upload was attempted, False otherwise.
|
||||||
|
'''
|
||||||
# Check if image uploads are paused.
|
# Check if image uploads are paused.
|
||||||
if provider.images.get(image.name).pause:
|
if provider.images.get(image.name).pause:
|
||||||
return
|
return False
|
||||||
|
|
||||||
# Search for the most recent 'ready' image build
|
# Search for the most recent 'ready' image build
|
||||||
builds = self._zk.getMostRecentBuilds(1, image.name,
|
builds = self._zk.getMostRecentBuilds(1, image.name,
|
||||||
zk.READY)
|
zk.READY)
|
||||||
if not builds:
|
if not builds:
|
||||||
return
|
return False
|
||||||
|
|
||||||
build = builds[0]
|
build = builds[0]
|
||||||
|
|
||||||
|
@ -929,17 +951,17 @@ class UploadWorker(BaseWorker):
|
||||||
local_images = DibImageFile.from_image_id(
|
local_images = DibImageFile.from_image_id(
|
||||||
self._config.imagesdir, "-".join([image.name, build.id]))
|
self._config.imagesdir, "-".join([image.name, build.id]))
|
||||||
if not local_images:
|
if not local_images:
|
||||||
return
|
return False
|
||||||
|
|
||||||
# See if this image has already been uploaded
|
# See if this image has already been uploaded
|
||||||
upload = self._zk.getMostRecentBuildImageUploads(
|
upload = self._zk.getMostRecentBuildImageUploads(
|
||||||
1, image.name, build.id, provider.name, zk.READY)
|
1, image.name, build.id, provider.name, zk.READY)
|
||||||
if upload:
|
if upload:
|
||||||
return
|
return False
|
||||||
|
|
||||||
# See if this provider supports the available image formats
|
# See if this provider supports the available image formats
|
||||||
if provider.image_type not in build.formats:
|
if provider.image_type not in build.formats:
|
||||||
return
|
return False
|
||||||
|
|
||||||
try:
|
try:
|
||||||
with self._zk.imageUploadLock(
|
with self._zk.imageUploadLock(
|
||||||
|
@ -951,7 +973,15 @@ class UploadWorker(BaseWorker):
|
||||||
upload = self._zk.getMostRecentBuildImageUploads(
|
upload = self._zk.getMostRecentBuildImageUploads(
|
||||||
1, image.name, build.id, provider.name, zk.READY)
|
1, image.name, build.id, provider.name, zk.READY)
|
||||||
if upload:
|
if upload:
|
||||||
return
|
return False
|
||||||
|
|
||||||
|
# NOTE: Due to the configuration file disagreement issue
|
||||||
|
# (the copy we have may not be current), we try to verify
|
||||||
|
# that another thread isn't trying to delete this build just
|
||||||
|
# before we upload.
|
||||||
|
b = self._zk.getBuild(image.name, build.id)
|
||||||
|
if b.state == zk.DELETING:
|
||||||
|
return False
|
||||||
|
|
||||||
# New upload number with initial state 'uploading'
|
# New upload number with initial state 'uploading'
|
||||||
data = zk.ImageUpload()
|
data = zk.ImageUpload()
|
||||||
|
@ -965,9 +995,10 @@ class UploadWorker(BaseWorker):
|
||||||
# Set final state
|
# Set final state
|
||||||
self._zk.storeImageUpload(image.name, build.id,
|
self._zk.storeImageUpload(image.name, build.id,
|
||||||
provider.name, data, upnum)
|
provider.name, data, upnum)
|
||||||
|
return True
|
||||||
except exceptions.ZKLockException:
|
except exceptions.ZKLockException:
|
||||||
# Lock is already held. Skip it.
|
# Lock is already held. Skip it.
|
||||||
pass
|
return False
|
||||||
|
|
||||||
def run(self):
|
def run(self):
|
||||||
|
|
||||||
|
@ -982,7 +1013,8 @@ class UploadWorker(BaseWorker):
|
||||||
time.sleep(SUSPEND_WAIT_TIME)
|
time.sleep(SUSPEND_WAIT_TIME)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
self._run()
|
self._reloadConfig()
|
||||||
|
self._checkForProviderUploads()
|
||||||
except Exception:
|
except Exception:
|
||||||
self.log.exception("Exception in UploadWorker:")
|
self.log.exception("Exception in UploadWorker:")
|
||||||
time.sleep(10)
|
time.sleep(10)
|
||||||
|
@ -991,21 +1023,6 @@ class UploadWorker(BaseWorker):
|
||||||
|
|
||||||
provider_manager.ProviderManager.stopProviders(self._config)
|
provider_manager.ProviderManager.stopProviders(self._config)
|
||||||
|
|
||||||
def _run(self):
|
|
||||||
'''
|
|
||||||
Body of run method for exception handling purposes.
|
|
||||||
'''
|
|
||||||
new_config = nodepool_config.loadConfig(self._config_path)
|
|
||||||
if not self._config:
|
|
||||||
self._config = new_config
|
|
||||||
|
|
||||||
self._checkForZooKeeperChanges(new_config)
|
|
||||||
provider_manager.ProviderManager.reconfigure(self._config, new_config,
|
|
||||||
use_taskmanager=False)
|
|
||||||
self._config = new_config
|
|
||||||
|
|
||||||
self._checkForProviderUploads()
|
|
||||||
|
|
||||||
|
|
||||||
class NodePoolBuilder(object):
|
class NodePoolBuilder(object):
|
||||||
'''
|
'''
|
||||||
|
|
Loading…
Reference in New Issue