Implement threading locks around layers

When we fetch layers, we shouldn't fetch the same layers multiple times.
This change adds some locking basked on layer hashes to prevent multiple
threads from trying to fetch the same layer at the same time.

Change-Id: I477219b7dca1e6cfa02a278c55a0cc1a9833d007
Related-Bug: #1844446
This commit is contained in:
Alex Schultz 2019-09-26 14:30:13 -06:00
parent 3adfefa13a
commit e57116d9c4
2 changed files with 66 additions and 7 deletions

View File

@ -26,5 +26,10 @@ class ImageUploaderException(Exception):
pass pass
class ImageUploaderThreadException(Exception):
"""Conflict during thread processing"""
pass
class ImageNotFoundException(Exception): class ImageNotFoundException(Exception):
pass pass

View File

@ -30,6 +30,8 @@ from six.moves.urllib import parse
import subprocess import subprocess
import tempfile import tempfile
import tenacity import tenacity
import threading
import time
import yaml import yaml
from oslo_concurrency import processutils from oslo_concurrency import processutils
@ -38,6 +40,7 @@ from tripleo_common.actions import ansible
from tripleo_common.image.base import BaseImageManager from tripleo_common.image.base import BaseImageManager
from tripleo_common.image.exception import ImageNotFoundException from tripleo_common.image.exception import ImageNotFoundException
from tripleo_common.image.exception import ImageUploaderException from tripleo_common.image.exception import ImageUploaderException
from tripleo_common.image.exception import ImageUploaderThreadException
from tripleo_common.image import image_export from tripleo_common.image import image_export
from tripleo_common.utils import common as common_utils from tripleo_common.utils import common as common_utils
@ -1100,6 +1103,40 @@ class SkopeoImageUploader(BaseImageUploader):
class PythonImageUploader(BaseImageUploader): class PythonImageUploader(BaseImageUploader):
"""Upload images using a direct implementation of the registry API""" """Upload images using a direct implementation of the registry API"""
uploader_lock = threading.Lock()
uploader_lock_info = set()
@classmethod
@tenacity.retry( # Retry until we no longer have collisions
retry=tenacity.retry_if_exception_type(ImageUploaderThreadException),
wait=tenacity.wait_random_exponential(multiplier=1, max=10)
)
def _layer_fetch_lock(cls, layer):
LOG.debug('Locking layer %s' % layer)
while layer in cls.uploader_lock_info:
LOG.debug('%s is being fetched by another thread' % layer)
time.sleep(0.5)
LOG.debug('Starting acquire for lock %s' % layer)
with cls.uploader_lock:
if layer in cls.uploader_lock_info:
LOG.debug('Collision for lock %s' % layer)
raise ImageUploaderThreadException('layer conflict')
LOG.debug('Acquired for lock %s' % layer)
cls.uploader_lock_info.add(layer)
LOG.debug('Updated lock info %s' % layer)
LOG.debug('Got lock on layer %s' % layer)
@classmethod
def _layer_fetch_unlock(cls, layer):
LOG.debug('Unlocking layer %s' % layer)
LOG.debug('Starting acquire for lock %s' % layer)
with cls.uploader_lock:
LOG.debug('Acquired for unlock %s' % layer)
if layer in cls.uploader_lock_info:
cls.uploader_lock_info.remove(layer)
LOG.debug('Updated lock info %s' % layer)
LOG.debug('Released lock on layer %s' % layer)
def upload_image(self, task): def upload_image(self, task):
"""Upload image from a task """Upload image from a task
@ -1388,18 +1425,35 @@ class PythonImageUploader(BaseImageUploader):
source_session=None, source_session=None,
target_session=None): target_session=None):
layer_entry = {'digest': layer} layer_entry = {'digest': layer}
if cls._target_layer_exists_registry(target_url, layer_entry, cls._layer_fetch_lock(layer)
[layer_entry], target_session): try:
return if cls._target_layer_exists_registry(
target_url, layer_entry, [layer_entry], target_session):
cls._layer_fetch_unlock(layer)
return
except ImageUploaderThreadException:
# skip trying to unlock, because that's what threw the exception
raise
except Exception:
cls._layer_fetch_unlock(layer)
raise
digest = layer_entry['digest'] digest = layer_entry['digest']
LOG.debug('Uploading layer: %s' % digest) LOG.debug('Uploading layer: %s' % digest)
calc_digest = hashlib.sha256() calc_digest = hashlib.sha256()
layer_stream = cls._layer_stream_registry( try:
digest, source_url, calc_digest, source_session) layer_stream = cls._layer_stream_registry(
return cls._copy_stream_to_registry( digest, source_url, calc_digest, source_session)
target_url, layer_entry, calc_digest, layer_stream, target_session) layer_val = cls._copy_stream_to_registry(
target_url, layer_entry, calc_digest, layer_stream,
target_session)
except Exception:
raise
else:
return layer_val
finally:
cls._layer_fetch_unlock(layer)
@classmethod @classmethod
def _assert_scheme(cls, url, scheme): def _assert_scheme(cls, url, scheme):