Make upload workers faster on processing layers
Make upload workers processing image layers only once (as the best effort). This also reworks and simplifies locks management for individual tasks now managed for the PythonImageUploader class namespace only. When fetching source layer, cross-link it for the target local image, whenever that source is already exists. When pushing a layer to a target registry, do not repeat transfering the same data, if already pushed earlier for another image. The 1st time a layer gets uploaded/fetched for an image, that image and its known path (local or remote) becomes a reference for future cross-referencing by other images. Store such information about already processed layers in global view shared for all workers to speed-up data transfering jobs they execute. Having that global view, uploading the 1st image in the tasks list as a separate (and non-concurrent) job becomes redundant and now will be executed concurently with other images. Based on the dynamically picked multi-workers mode, provide the global view as a graf with its MP/MT state synchronization as the following: * use globally shared locking info also containing global layers view for MP-workers. With the shared global view state we can no longer use local locking objects individual for each task. * if cannot use multi-process workers, like when executing it via Mistral by monkey patched eventlet greenthreads, choose threadinglock and multi-threads-safe standard dictionary in the shared class namespace to store the global view there * if it can do MP, pick processlock also containing a safe from data races Manager().dict() as the global view shared among cooperating OS processes. * use that global view in a transparent fashion, provided by a special classmethod proxying access to the internal state shared for workers. Ultimately, all that optimizes: * completion time * re-fetching of the already processed layers * local deduplication of layers * the amount of outbound HTTP requests to registries * if-layer-exists and other internal logic check executed against the in-memory cache firstly. As layers locking and unlocking becomes a popular action, reduce the noise of the debug messages it produces. Closes-bug: #1847225 Related-bug: #1844446 Change-Id: Ie5ef4045b7e22c06551e886f9f9b6f22c8d4bd21 Signed-off-by: Bogdan Dobrelya <bdobreli@redhat.com>
This commit is contained in:
parent
26bd0efd26
commit
46f8129894
@ -21,6 +21,7 @@ import requests
|
||||
import shutil
|
||||
|
||||
from oslo_log import log as logging
|
||||
from tripleo_common.utils import image as image_utils
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
@ -143,37 +144,66 @@ def export_stream(target_url, layer, layer_stream, verify_digest=True):
|
||||
)
|
||||
if blob_path != expected_blob_path:
|
||||
os.rename(blob_path, expected_blob_path)
|
||||
blob_path = expected_blob_path
|
||||
|
||||
layer['digest'] = layer_digest
|
||||
layer['size'] = length
|
||||
LOG.debug('[%s] Done exporting image layer %s' % (image, digest))
|
||||
return layer_digest
|
||||
return (layer_digest, blob_path)
|
||||
|
||||
|
||||
def cross_repo_mount(target_image_url, image_layers, source_layers):
|
||||
def layer_cross_link(layer, image, blob_path, target_image_url):
|
||||
target_image, _ = image_tag_from_url(target_image_url)
|
||||
target_dir_path = os.path.join(
|
||||
IMAGE_EXPORT_DIR, 'v2', target_image, 'blobs')
|
||||
make_dir(target_dir_path)
|
||||
target_blob_path = os.path.join(target_dir_path, '%s.gz' % layer)
|
||||
if not os.path.exists(target_blob_path):
|
||||
LOG.debug('[%s] Linking layers: %s -> %s' %
|
||||
(image, blob_path, target_blob_path))
|
||||
# make a hard link so the layers can have independent lifecycles
|
||||
os.link(blob_path, target_blob_path)
|
||||
|
||||
|
||||
def cross_repo_mount(target_image_url, image_layers, source_layers,
|
||||
uploaded_layers=None):
|
||||
linked_layers = {}
|
||||
target_image, _ = image_tag_from_url(target_image_url)
|
||||
for layer in source_layers:
|
||||
if layer not in image_layers:
|
||||
known_path, ref_image = image_utils.uploaded_layers_details(
|
||||
uploaded_layers, layer, scope='local')
|
||||
|
||||
if layer not in image_layers and not ref_image:
|
||||
continue
|
||||
|
||||
image_url = image_layers[layer]
|
||||
image, tag = image_tag_from_url(image_url)
|
||||
image_url = image_layers.get(layer, None)
|
||||
if image_url:
|
||||
image, _ = image_tag_from_url(image_url)
|
||||
else:
|
||||
image = ref_image
|
||||
if not image:
|
||||
continue
|
||||
|
||||
if known_path and ref_image:
|
||||
blob_path = known_path
|
||||
image = ref_image
|
||||
if ref_image != image:
|
||||
LOG.debug('[%s] Layer ref. by image %s already exists '
|
||||
'at %s' % (image, ref_image, known_path))
|
||||
else:
|
||||
LOG.debug('[%s] Layer already exists at %s'
|
||||
% (image, known_path))
|
||||
else:
|
||||
dir_path = os.path.join(IMAGE_EXPORT_DIR, 'v2', image, 'blobs')
|
||||
blob_path = os.path.join(dir_path, '%s.gz' % layer)
|
||||
if not os.path.exists(blob_path):
|
||||
LOG.debug('[%s] Layer not found: %s' % (image, blob_path))
|
||||
continue
|
||||
|
||||
target_image, tag = image_tag_from_url(target_image_url)
|
||||
target_dir_path = os.path.join(
|
||||
IMAGE_EXPORT_DIR, 'v2', target_image, 'blobs')
|
||||
make_dir(target_dir_path)
|
||||
target_blob_path = os.path.join(target_dir_path, '%s.gz' % layer)
|
||||
if os.path.exists(target_blob_path):
|
||||
continue
|
||||
LOG.debug('[%s] Linking layers: %s -> %s' %
|
||||
(image, blob_path, target_blob_path))
|
||||
# make a hard link so the layers can have independent lifecycles
|
||||
os.link(blob_path, target_blob_path)
|
||||
layer_cross_link(layer, image, blob_path, target_image_url)
|
||||
linked_layers.update({layer: {'known_path': blob_path,
|
||||
'ref_image': image}})
|
||||
return linked_layers
|
||||
|
||||
|
||||
def export_manifest_config(target_url,
|
||||
|
@ -40,6 +40,7 @@ from tripleo_common.image.exception import ImageNotFoundException
|
||||
from tripleo_common.image.exception import ImageUploaderException
|
||||
from tripleo_common.image.exception import ImageUploaderThreadException
|
||||
from tripleo_common.image import image_export
|
||||
from tripleo_common.utils import image as image_utils
|
||||
from tripleo_common.utils.locks import threadinglock
|
||||
|
||||
|
||||
@ -164,8 +165,9 @@ class ImageUploadManager(BaseImageManager):
|
||||
super(ImageUploadManager, self).__init__(config_files)
|
||||
self.uploaders = {
|
||||
'skopeo': SkopeoImageUploader(),
|
||||
'python': PythonImageUploader(lock)
|
||||
'python': PythonImageUploader()
|
||||
}
|
||||
self.uploaders['python'].init_global_state(lock)
|
||||
self.dry_run = dry_run
|
||||
self.cleanup = cleanup
|
||||
if mirrors:
|
||||
@ -177,7 +179,6 @@ class ImageUploadManager(BaseImageManager):
|
||||
for uploader in self.uploaders.values():
|
||||
uploader.registry_credentials = registry_credentials
|
||||
self.multi_arch = multi_arch
|
||||
self.lock = lock
|
||||
|
||||
@staticmethod
|
||||
def validate_registry_credentials(creds_data):
|
||||
@ -250,7 +251,7 @@ class ImageUploadManager(BaseImageManager):
|
||||
tasks.append(UploadTask(
|
||||
image_name, pull_source, push_destination,
|
||||
append_tag, modify_role, modify_vars, self.dry_run,
|
||||
self.cleanup, multi_arch, self.lock))
|
||||
self.cleanup, multi_arch))
|
||||
|
||||
# NOTE(mwhahaha): We want to randomize the upload process because of
|
||||
# the shared nature of container layers. Because we multiprocess the
|
||||
@ -282,13 +283,12 @@ class BaseImageUploader(object):
|
||||
export_registries = set()
|
||||
push_registries = set()
|
||||
|
||||
def __init__(self, lock=None):
|
||||
def __init__(self):
|
||||
self.upload_tasks = []
|
||||
# A mapping of layer hashs to the image which first copied that
|
||||
# layer to the target
|
||||
self.image_layers = {}
|
||||
self.registry_credentials = {}
|
||||
self.lock = lock
|
||||
|
||||
@classmethod
|
||||
def init_registries_cache(cls):
|
||||
@ -912,8 +912,14 @@ class BaseImageUploader(object):
|
||||
name = target_image_url.path.split(':')[0][1:]
|
||||
export = netloc in cls.export_registries
|
||||
if export:
|
||||
image_export.cross_repo_mount(
|
||||
target_image_url, image_layers, source_layers)
|
||||
linked_layers = image_export.cross_repo_mount(
|
||||
target_image_url, image_layers, source_layers,
|
||||
uploaded_layers=cls._global_view_proxy())
|
||||
# track linked layers globally for future references
|
||||
for layer, info in linked_layers.items():
|
||||
cls._track_uploaded_layers(
|
||||
layer, known_path=info['known_path'],
|
||||
image_ref=info['ref_image'], scope='local')
|
||||
return
|
||||
|
||||
if netloc in cls.insecure_registries:
|
||||
@ -923,8 +929,15 @@ class BaseImageUploader(object):
|
||||
url = '%s://%s/v2/%s/blobs/uploads/' % (scheme, netloc, name)
|
||||
|
||||
for layer in source_layers:
|
||||
if layer in image_layers:
|
||||
known_path, existing_name = image_utils.uploaded_layers_details(
|
||||
cls._global_view_proxy(), layer, scope='remote')
|
||||
if layer not in image_layers and not existing_name:
|
||||
continue
|
||||
if not existing_name:
|
||||
existing_name = image_layers[layer].path.split(':')[0][1:]
|
||||
if existing_name != name:
|
||||
LOG.debug('[%s] Layer %s ref. by image %s already exists '
|
||||
'at %s' % (name, layer, existing_name, known_path))
|
||||
LOG.info('[%s] Cross repository blob mount from %s' %
|
||||
(layer, existing_name))
|
||||
data = {
|
||||
@ -1124,43 +1137,87 @@ class SkopeoImageUploader(BaseImageUploader):
|
||||
class PythonImageUploader(BaseImageUploader):
|
||||
"""Upload images using a direct implementation of the registry API"""
|
||||
|
||||
uploaded_layers = {} # provides global view for multi-threading workers
|
||||
lock = None # provides global locking info plus global view, if MP is used
|
||||
|
||||
@classmethod
|
||||
def init_global_state(cls, lock):
|
||||
if not cls.lock:
|
||||
cls.lock = lock
|
||||
|
||||
@classmethod
|
||||
@tenacity.retry( # Retry until we no longer have collisions
|
||||
retry=tenacity.retry_if_exception_type(ImageUploaderThreadException),
|
||||
wait=tenacity.wait_random_exponential(multiplier=1, max=10)
|
||||
)
|
||||
def _layer_fetch_lock(cls, layer, lock=None):
|
||||
if not lock:
|
||||
def _layer_fetch_lock(cls, layer):
|
||||
if not cls.lock:
|
||||
LOG.warning('No lock information provided for layer %s' % layer)
|
||||
return
|
||||
if layer in lock.objects():
|
||||
if layer in cls.lock.objects():
|
||||
LOG.debug('[%s] Layer is being fetched by another thread' % layer)
|
||||
raise ImageUploaderThreadException('layer being fetched')
|
||||
LOG.debug('Locking layer %s' % layer)
|
||||
LOG.debug('Starting acquire for lock %s' % layer)
|
||||
with lock.get_lock():
|
||||
if layer in lock.objects():
|
||||
known_path, image = image_utils.uploaded_layers_details(
|
||||
cls._global_view_proxy(), layer, scope='local')
|
||||
if not known_path or not image:
|
||||
known_path, image = image_utils.uploaded_layers_details(
|
||||
cls._global_view_proxy(), layer, scope='remote')
|
||||
if image and known_path:
|
||||
# already processed layers needs no further locking
|
||||
return
|
||||
with cls.lock.get_lock():
|
||||
if layer in cls.lock.objects():
|
||||
LOG.debug('Collision for lock %s' % layer)
|
||||
raise ImageUploaderThreadException('layer conflict')
|
||||
LOG.debug('Acquired for lock %s' % layer)
|
||||
lock.objects().append(layer)
|
||||
LOG.debug('Updated lock info %s' % layer)
|
||||
cls.lock.objects().append(layer)
|
||||
LOG.debug('Got lock on layer %s' % layer)
|
||||
|
||||
@classmethod
|
||||
def _layer_fetch_unlock(cls, layer, lock=None):
|
||||
if not lock:
|
||||
def _layer_fetch_unlock(cls, layer):
|
||||
if not cls.lock:
|
||||
LOG.warning('No lock information provided for layer %s' % layer)
|
||||
return
|
||||
LOG.debug('Unlocking layer %s' % layer)
|
||||
LOG.debug('Starting acquire for lock %s' % layer)
|
||||
with lock.get_lock():
|
||||
LOG.debug('Acquired for unlock %s' % layer)
|
||||
while layer in lock.objects():
|
||||
lock.objects().remove(layer)
|
||||
LOG.debug('Updated lock info %s' % layer)
|
||||
with cls.lock.get_lock():
|
||||
while layer in cls.lock.objects():
|
||||
cls.lock.objects().remove(layer)
|
||||
LOG.debug('Released lock on layer %s' % layer)
|
||||
|
||||
@classmethod
|
||||
def _global_view_proxy(cls, value=None, forget=False):
|
||||
if not cls.lock:
|
||||
LOG.warning('No lock information provided for value %s' % value)
|
||||
return
|
||||
with cls.lock.get_lock():
|
||||
if value and forget:
|
||||
cls.uploaded_layers.pop(value, None)
|
||||
if hasattr(cls.lock, '_global_view'):
|
||||
cls.lock._global_view.pop(value, None)
|
||||
elif value:
|
||||
cls.uploaded_layers.update(value)
|
||||
if hasattr(cls.lock, '_global_view'):
|
||||
cls.lock._global_view.update(value)
|
||||
|
||||
if not value:
|
||||
# return global view consolidated among MP/MT workers state
|
||||
if hasattr(cls.lock, '_global_view'):
|
||||
consolidated_view = cls.uploaded_layers.copy()
|
||||
consolidated_view.update(cls.lock._global_view)
|
||||
return consolidated_view
|
||||
else:
|
||||
return cls.uploaded_layers
|
||||
|
||||
@classmethod
|
||||
def _track_uploaded_layers(cls, layer, known_path=None, image_ref=None,
|
||||
forget=False, scope='remote'):
|
||||
if forget:
|
||||
LOG.debug('Untracking processed layer %s for any scope' % layer)
|
||||
cls._global_view_proxy(value=layer, forget=True)
|
||||
else:
|
||||
LOG.debug('Tracking processed layer %s for %s scope'
|
||||
% (layer, scope))
|
||||
cls._global_view_proxy(
|
||||
value={layer: {scope: {'ref': image_ref, 'path': known_path}}})
|
||||
|
||||
def upload_image(self, task):
|
||||
"""Upload image from a task
|
||||
|
||||
@ -1185,8 +1242,6 @@ class PythonImageUploader(BaseImageUploader):
|
||||
if t.dry_run:
|
||||
return []
|
||||
|
||||
lock = t.lock
|
||||
|
||||
target_username, target_password = self.credentials_for_registry(
|
||||
t.target_image_url.netloc)
|
||||
target_session = self.authenticate(
|
||||
@ -1277,8 +1332,7 @@ class PythonImageUploader(BaseImageUploader):
|
||||
source_session=source_session,
|
||||
target_session=target_session,
|
||||
source_layers=source_layers,
|
||||
multi_arch=t.multi_arch,
|
||||
lock=lock
|
||||
multi_arch=t.multi_arch
|
||||
)
|
||||
except Exception:
|
||||
LOG.error('[%s] Failed uploading the target '
|
||||
@ -1494,38 +1548,58 @@ class PythonImageUploader(BaseImageUploader):
|
||||
def _copy_layer_registry_to_registry(cls, source_url, target_url,
|
||||
layer,
|
||||
source_session=None,
|
||||
target_session=None,
|
||||
lock=None):
|
||||
target_session=None):
|
||||
layer_entry = {'digest': layer}
|
||||
try:
|
||||
cls._layer_fetch_lock(layer, lock)
|
||||
cls._layer_fetch_lock(layer)
|
||||
if cls._target_layer_exists_registry(
|
||||
target_url, layer_entry, [layer_entry], target_session):
|
||||
cls._layer_fetch_unlock(layer, lock)
|
||||
cls._layer_fetch_unlock(layer)
|
||||
return
|
||||
known_path, ref_image = image_utils.uploaded_layers_details(
|
||||
cls._global_view_proxy(), layer, scope='local')
|
||||
if known_path and ref_image:
|
||||
# cross-link target from local source, skip fetching it again
|
||||
image_export.layer_cross_link(
|
||||
layer, ref_image, known_path, target_url)
|
||||
cls._layer_fetch_unlock(layer)
|
||||
return
|
||||
except ImageUploaderThreadException:
|
||||
# skip trying to unlock, because that's what threw the exception
|
||||
raise
|
||||
except Exception:
|
||||
cls._layer_fetch_unlock(layer, lock)
|
||||
cls._layer_fetch_unlock(layer)
|
||||
raise
|
||||
|
||||
digest = layer_entry['digest']
|
||||
LOG.debug('[%s] Uploading layer' % digest)
|
||||
|
||||
calc_digest = hashlib.sha256()
|
||||
known_path = None
|
||||
layer_val = None
|
||||
try:
|
||||
layer_stream = cls._layer_stream_registry(
|
||||
digest, source_url, calc_digest, source_session)
|
||||
layer_val = cls._copy_stream_to_registry(
|
||||
layer_val, known_path = cls._copy_stream_to_registry(
|
||||
target_url, layer_entry, calc_digest, layer_stream,
|
||||
target_session)
|
||||
except (IOError, requests.exceptions.HTTPError):
|
||||
cls._track_uploaded_layers(layer, forget=True, scope='remote')
|
||||
LOG.error('[%s] Failed processing layer for the target '
|
||||
'image %s' % (layer, target_url.geturl()))
|
||||
raise
|
||||
except Exception:
|
||||
raise
|
||||
else:
|
||||
if layer_val and known_path:
|
||||
image_ref = target_url.path.split(':')[0][1:]
|
||||
uploaded = parse.urlparse(known_path).scheme
|
||||
cls._track_uploaded_layers(
|
||||
layer_val, known_path=known_path, image_ref=image_ref,
|
||||
scope=('remote' if uploaded else 'local'))
|
||||
return layer_val
|
||||
finally:
|
||||
cls._layer_fetch_unlock(layer, lock)
|
||||
cls._layer_fetch_unlock(layer)
|
||||
|
||||
@classmethod
|
||||
def _assert_scheme(cls, url, scheme):
|
||||
@ -1547,8 +1621,7 @@ class PythonImageUploader(BaseImageUploader):
|
||||
source_session=None,
|
||||
target_session=None,
|
||||
source_layers=None,
|
||||
multi_arch=False,
|
||||
lock=None):
|
||||
multi_arch=False):
|
||||
cls._assert_scheme(source_url, 'docker')
|
||||
cls._assert_scheme(target_url, 'docker')
|
||||
|
||||
@ -1570,8 +1643,7 @@ class PythonImageUploader(BaseImageUploader):
|
||||
source_url, target_url,
|
||||
layer=layer,
|
||||
source_session=source_session,
|
||||
target_session=target_session,
|
||||
lock=lock
|
||||
target_session=target_session
|
||||
))
|
||||
|
||||
jobs_count = len(copy_jobs)
|
||||
@ -1742,26 +1814,39 @@ class PythonImageUploader(BaseImageUploader):
|
||||
def _target_layer_exists_registry(cls, target_url, layer, check_layers,
|
||||
session):
|
||||
image, tag = cls._image_tag_from_url(target_url)
|
||||
norm_image = (image[1:] if image.startswith('/') else image)
|
||||
parts = {
|
||||
'image': image,
|
||||
'tag': tag
|
||||
}
|
||||
# Do a HEAD call for the supplied digests
|
||||
# to see if the layer is already in the registry
|
||||
layer_found = None
|
||||
# Check in global view or do a HEAD call for the supplied
|
||||
# digests to see if the layer is already in the registry
|
||||
for l in check_layers:
|
||||
if not l:
|
||||
continue
|
||||
known_path, ref_image = image_utils.uploaded_layers_details(
|
||||
cls._global_view_proxy(), l['digest'], scope='remote')
|
||||
if ref_image == norm_image:
|
||||
LOG.debug('[%s] Layer %s already exists at %s' %
|
||||
(image, l['digest'], known_path))
|
||||
layer_found = l
|
||||
break
|
||||
else:
|
||||
parts['digest'] = l['digest']
|
||||
blob_url = cls._build_url(
|
||||
target_url, CALL_BLOB % parts)
|
||||
if session.head(blob_url, timeout=30).status_code == 200:
|
||||
LOG.debug('[%s] Layer already exists: %s' %
|
||||
(image, l['digest']))
|
||||
layer['digest'] = l['digest']
|
||||
if 'size' in l:
|
||||
layer['size'] = l['size']
|
||||
if 'mediaType' in l:
|
||||
layer['mediaType'] = l['mediaType']
|
||||
layer_found = l
|
||||
break
|
||||
if layer_found:
|
||||
layer['digest'] = layer_found['digest']
|
||||
if 'size' in layer_found:
|
||||
layer['size'] = layer_found['size']
|
||||
if 'mediaType' in layer_found:
|
||||
layer['mediaType'] = layer_found['mediaType']
|
||||
return True
|
||||
return False
|
||||
|
||||
@ -1809,8 +1894,8 @@ class PythonImageUploader(BaseImageUploader):
|
||||
def _copy_layer_local_to_registry(cls, target_url,
|
||||
session, layer, layer_entry):
|
||||
|
||||
# Do a HEAD call for the compressed-diff-digest and diff-digest
|
||||
# to see if the layer is already in the registry
|
||||
# Check in global view or do a HEAD call for the compressed-diff-digest
|
||||
# and diff-digest to see if the layer is already in the registry
|
||||
check_layers = []
|
||||
compressed_digest = layer_entry.get('compressed-diff-digest')
|
||||
if compressed_digest:
|
||||
@ -1835,10 +1920,29 @@ class PythonImageUploader(BaseImageUploader):
|
||||
LOG.debug('[%s] Uploading layer' % layer_id)
|
||||
|
||||
calc_digest = hashlib.sha256()
|
||||
known_path = None
|
||||
layer_val = None
|
||||
try:
|
||||
layer_stream = cls._layer_stream_local(layer_id, calc_digest)
|
||||
return cls._copy_stream_to_registry(target_url, layer, calc_digest,
|
||||
layer_stream, session,
|
||||
layer_val, known_path = cls._copy_stream_to_registry(
|
||||
target_url, layer, calc_digest, layer_stream, session,
|
||||
verify_digest=False)
|
||||
except (IOError, requests.exceptions.HTTPError):
|
||||
cls._track_uploaded_layers(
|
||||
layer['digest'], forget=True, scope='remote')
|
||||
LOG.error('[%s] Failed processing layer for the target '
|
||||
'image %s' % (layer['digest'], target_url.geturl()))
|
||||
raise
|
||||
except Exception:
|
||||
raise
|
||||
else:
|
||||
if layer_val and known_path:
|
||||
image_ref = target_url.path.split(':')[0][1:]
|
||||
uploaded = parse.urlparse(known_path).scheme
|
||||
cls._track_uploaded_layers(
|
||||
layer_val, known_path=known_path, image_ref=image_ref,
|
||||
scope=('remote' if uploaded else 'local'))
|
||||
return layer_val
|
||||
|
||||
@classmethod
|
||||
def _copy_stream_to_registry(cls, target_url, layer, calc_digest,
|
||||
@ -1887,7 +1991,7 @@ class PythonImageUploader(BaseImageUploader):
|
||||
cls.check_status(session=session, request=upload_resp)
|
||||
layer['digest'] = layer_digest
|
||||
layer['size'] = length
|
||||
return layer_digest
|
||||
return (layer_digest, cls._build_url(target_url, target_url.path))
|
||||
|
||||
@classmethod
|
||||
@tenacity.retry( # Retry up to 5 times with jittered exponential backoff
|
||||
@ -2089,10 +2193,6 @@ class PythonImageUploader(BaseImageUploader):
|
||||
return
|
||||
local_images = []
|
||||
|
||||
# Pull a single image first, to avoid duplicate pulls of the
|
||||
# same base layers
|
||||
local_images.extend(upload_task(args=self.upload_tasks.pop()))
|
||||
|
||||
with self._get_executor() as p:
|
||||
for result in p.map(upload_task, self.upload_tasks):
|
||||
local_images.extend(result)
|
||||
@ -2107,7 +2207,7 @@ class UploadTask(object):
|
||||
|
||||
def __init__(self, image_name, pull_source, push_destination,
|
||||
append_tag, modify_role, modify_vars, dry_run, cleanup,
|
||||
multi_arch, lock=None):
|
||||
multi_arch):
|
||||
self.image_name = image_name
|
||||
self.pull_source = pull_source
|
||||
self.push_destination = push_destination
|
||||
@ -2117,7 +2217,6 @@ class UploadTask(object):
|
||||
self.dry_run = dry_run
|
||||
self.cleanup = cleanup
|
||||
self.multi_arch = multi_arch
|
||||
self.lock = lock
|
||||
|
||||
if ':' in image_name:
|
||||
image = image_name.rpartition(':')[0]
|
||||
|
@ -378,7 +378,7 @@ def container_images_prepare(template_file=DEFAULT_TEMPLATE_FILE,
|
||||
del(entry['services'])
|
||||
|
||||
params.update(
|
||||
detect_insecure_registries(params))
|
||||
detect_insecure_registries(params, lock=lock))
|
||||
|
||||
return_data = {}
|
||||
if output_env_file:
|
||||
@ -388,7 +388,7 @@ def container_images_prepare(template_file=DEFAULT_TEMPLATE_FILE,
|
||||
return return_data
|
||||
|
||||
|
||||
def detect_insecure_registries(params):
|
||||
def detect_insecure_registries(params, lock=None):
|
||||
"""Detect insecure registries in image parameters
|
||||
|
||||
:param params: dict of container image parameters
|
||||
@ -396,7 +396,7 @@ def detect_insecure_registries(params):
|
||||
merged into other parameters
|
||||
"""
|
||||
insecure = set()
|
||||
uploader = image_uploader.ImageUploadManager().uploader('python')
|
||||
uploader = image_uploader.ImageUploadManager(lock=lock).uploader('python')
|
||||
for image in params.values():
|
||||
host = image.split('/')[0]
|
||||
if uploader.is_insecure_registry(host):
|
||||
|
@ -89,7 +89,7 @@ class TestImageExport(base.TestCase):
|
||||
}
|
||||
calc_digest = hashlib.sha256()
|
||||
layer_stream = io.BytesIO(blob_compressed)
|
||||
layer_digest = image_export.export_stream(
|
||||
layer_digest, _ = image_export.export_stream(
|
||||
target_url, layer, layer_stream, verify_digest=False
|
||||
)
|
||||
self.assertEqual(compressed_digest, layer_digest)
|
||||
@ -145,7 +145,8 @@ class TestImageExport(base.TestCase):
|
||||
target_blob_path = os.path.join(target_blob_dir, 'sha256:1234.gz')
|
||||
|
||||
# call with missing source, no change
|
||||
image_export.cross_repo_mount(target_url, image_layers, source_layers)
|
||||
image_export.cross_repo_mount(target_url, image_layers, source_layers,
|
||||
uploaded_layers={})
|
||||
self.assertFalse(os.path.exists(source_blob_path))
|
||||
self.assertFalse(os.path.exists(target_blob_path))
|
||||
|
||||
@ -155,7 +156,8 @@ class TestImageExport(base.TestCase):
|
||||
self.assertTrue(os.path.exists(source_blob_path))
|
||||
|
||||
# call with existing source
|
||||
image_export.cross_repo_mount(target_url, image_layers, source_layers)
|
||||
image_export.cross_repo_mount(target_url, image_layers, source_layers,
|
||||
uploaded_layers={})
|
||||
self.assertTrue(os.path.exists(target_blob_path))
|
||||
with open(target_blob_path, 'r') as f:
|
||||
self.assertEqual('blob', f.read())
|
||||
|
@ -1318,8 +1318,7 @@ class TestPythonImageUploader(base.TestCase):
|
||||
source_session=source_session,
|
||||
target_session=target_session,
|
||||
source_layers=['sha256:aaa', 'sha256:bbb', 'sha256:ccc'],
|
||||
multi_arch=False,
|
||||
lock=None
|
||||
multi_arch=False
|
||||
)
|
||||
|
||||
@mock.patch('tripleo_common.image.image_uploader.'
|
||||
@ -1553,8 +1552,7 @@ class TestPythonImageUploader(base.TestCase):
|
||||
source_session=source_session,
|
||||
target_session=target_session,
|
||||
source_layers=['sha256:aaa', 'sha256:bbb', 'sha256:ccc'],
|
||||
multi_arch=False,
|
||||
lock=None
|
||||
multi_arch=False
|
||||
)
|
||||
|
||||
@mock.patch('tripleo_common.image.image_uploader.'
|
||||
@ -1685,8 +1683,7 @@ class TestPythonImageUploader(base.TestCase):
|
||||
source_session=source_session,
|
||||
target_session=target_session,
|
||||
source_layers=['sha256:aaa', 'sha256:bbb', 'sha256:ccc'],
|
||||
multi_arch=False,
|
||||
lock=None
|
||||
multi_arch=False
|
||||
)
|
||||
_copy_registry_to_local.assert_called_once_with(unmodified_target_url)
|
||||
run_modify_playbook.assert_called_once_with(
|
||||
@ -1816,7 +1813,8 @@ class TestPythonImageUploader(base.TestCase):
|
||||
|
||||
@mock.patch('tripleo_common.image.image_uploader.'
|
||||
'PythonImageUploader._upload_url')
|
||||
def test_copy_layer_registry_to_registry(self, _upload_url):
|
||||
@mock.patch('tripleo_common.utils.image.uploaded_layers_details')
|
||||
def test_copy_layer_registry_to_registry(self, global_check, _upload_url):
|
||||
_upload_url.return_value = 'https://192.168.2.1:5000/v2/upload'
|
||||
source_url = urlparse('docker://docker.io/t/nova-api:latest')
|
||||
target_url = urlparse('docker://192.168.2.1:5000/t/nova-api:latest')
|
||||
@ -1835,6 +1833,7 @@ class TestPythonImageUploader(base.TestCase):
|
||||
layer = layer_entry['digest']
|
||||
|
||||
# layer already exists at destination
|
||||
global_check.return_value = (None, None)
|
||||
self.requests.head(
|
||||
'https://192.168.2.1:5000/v2/t/nova-api/blobs/%s' % blob_digest,
|
||||
status_code=200
|
||||
@ -2022,8 +2021,9 @@ class TestPythonImageUploader(base.TestCase):
|
||||
@mock.patch('subprocess.Popen')
|
||||
@mock.patch('tripleo_common.image.image_uploader.'
|
||||
'PythonImageUploader._upload_url')
|
||||
def test_copy_layer_local_to_registry(self, _upload_url, mock_popen,
|
||||
mock_exists):
|
||||
@mock.patch('tripleo_common.utils.image.uploaded_layers_details')
|
||||
def test_copy_layer_local_to_registry(self, global_check, _upload_url,
|
||||
mock_popen, mock_exists):
|
||||
mock_exists.return_value = True
|
||||
_upload_url.return_value = 'https://192.168.2.1:5000/v2/upload'
|
||||
target_url = urlparse('docker://192.168.2.1:5000/t/nova-api:latest')
|
||||
@ -2048,6 +2048,7 @@ class TestPythonImageUploader(base.TestCase):
|
||||
}
|
||||
|
||||
# layer already exists at destination
|
||||
global_check.return_value = (None, None)
|
||||
self.requests.head(
|
||||
'https://192.168.2.1:5000/v2/t/'
|
||||
'nova-api/blobs/%s' % compressed_digest,
|
||||
@ -2117,6 +2118,7 @@ class TestPythonImageUploader(base.TestCase):
|
||||
layer
|
||||
)
|
||||
|
||||
@mock.patch('tripleo_common.utils.image.uploaded_layers_details')
|
||||
@mock.patch('tripleo_common.image.image_uploader.'
|
||||
'PythonImageUploader._image_manifest_config')
|
||||
@mock.patch('tripleo_common.image.image_uploader.'
|
||||
@ -2127,11 +2129,12 @@ class TestPythonImageUploader(base.TestCase):
|
||||
'PythonImageUploader._upload_url')
|
||||
def test_copy_local_to_registry(self, _upload_url, _containers_json,
|
||||
_copy_layer_local_to_registry,
|
||||
_image_manifest_config):
|
||||
_image_manifest_config, _global_check):
|
||||
source_url = urlparse('containers-storage:/t/nova-api:latest')
|
||||
target_url = urlparse('docker://192.168.2.1:5000/t/nova-api:latest')
|
||||
target_session = requests.Session()
|
||||
_upload_url.return_value = 'https://192.168.2.1:5000/v2/upload'
|
||||
_global_check.return_value = (None, None)
|
||||
layers = [{
|
||||
"compressed-diff-digest": "sha256:aeb786",
|
||||
"compressed-size": 74703002,
|
||||
|
26
tripleo_common/utils/image.py
Normal file
26
tripleo_common/utils/image.py
Normal file
@ -0,0 +1,26 @@
|
||||
# Copyright 2019 Red Hat, Inc.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
|
||||
def uploaded_layers_details(uploaded_layers, layer, scope):
|
||||
known_path = None
|
||||
known_layer = None
|
||||
image = None
|
||||
if layer:
|
||||
known_layer = uploaded_layers.get(layer, None)
|
||||
if known_layer and scope in known_layer:
|
||||
known_path = known_layer[scope].get('path', None)
|
||||
image = known_layer[scope].get('ref', None)
|
||||
return (known_path, image)
|
@ -23,6 +23,7 @@ from tripleo_common.utils.locks import base
|
||||
class ProcessLock(base.BaseLock):
|
||||
# the manager cannot live in __init__
|
||||
_mgr = multiprocessing.Manager()
|
||||
_global_view = _mgr.dict()
|
||||
|
||||
def __init__(self):
|
||||
self._lock = self._mgr.Lock()
|
||||
|
Loading…
Reference in New Issue
Block a user