Merge "Make upload workers faster on processing layers"
This commit is contained in:
commit
abbd5c46f1
|
@ -21,6 +21,7 @@ import requests
|
|||
import shutil
|
||||
|
||||
from oslo_log import log as logging
|
||||
from tripleo_common.utils import image as image_utils
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
@ -151,39 +152,68 @@ def export_stream(target_url, layer, layer_stream, verify_digest=True):
|
|||
)
|
||||
if blob_path != expected_blob_path:
|
||||
os.rename(blob_path, expected_blob_path)
|
||||
blob_path = expected_blob_path
|
||||
|
||||
layer['digest'] = layer_digest
|
||||
layer['size'] = length
|
||||
LOG.debug('[%s] Done exporting image layer %s' % (image, digest))
|
||||
return layer_digest
|
||||
return (layer_digest, blob_path)
|
||||
|
||||
|
||||
def cross_repo_mount(target_image_url, image_layers, source_layers):
|
||||
for layer in source_layers:
|
||||
if layer not in image_layers:
|
||||
continue
|
||||
|
||||
image_url = image_layers[layer]
|
||||
image, tag = image_tag_from_url(image_url)
|
||||
dir_path = os.path.join(IMAGE_EXPORT_DIR, 'v2', image, 'blobs')
|
||||
blob_path = os.path.join(dir_path, '%s.gz' % layer)
|
||||
if not os.path.exists(blob_path):
|
||||
LOG.debug('[%s] Layer not found: %s' % (image, blob_path))
|
||||
continue
|
||||
|
||||
target_image, tag = image_tag_from_url(target_image_url)
|
||||
target_dir_path = os.path.join(
|
||||
IMAGE_EXPORT_DIR, 'v2', target_image, 'blobs')
|
||||
make_dir(target_dir_path)
|
||||
target_blob_path = os.path.join(target_dir_path, '%s.gz' % layer)
|
||||
if os.path.exists(target_blob_path):
|
||||
continue
|
||||
def layer_cross_link(layer, image, blob_path, target_image_url):
|
||||
target_image, _ = image_tag_from_url(target_image_url)
|
||||
target_dir_path = os.path.join(
|
||||
IMAGE_EXPORT_DIR, 'v2', target_image, 'blobs')
|
||||
make_dir(target_dir_path)
|
||||
target_blob_path = os.path.join(target_dir_path, '%s.gz' % layer)
|
||||
if not os.path.exists(target_blob_path):
|
||||
LOG.debug('[%s] Linking layers: %s -> %s' %
|
||||
(image, blob_path, target_blob_path))
|
||||
# make a hard link so the layers can have independent lifecycles
|
||||
os.link(blob_path, target_blob_path)
|
||||
|
||||
|
||||
def cross_repo_mount(target_image_url, image_layers, source_layers,
|
||||
uploaded_layers=None):
|
||||
linked_layers = {}
|
||||
target_image, _ = image_tag_from_url(target_image_url)
|
||||
for layer in source_layers:
|
||||
known_path, ref_image = image_utils.uploaded_layers_details(
|
||||
uploaded_layers, layer, scope='local')
|
||||
|
||||
if layer not in image_layers and not ref_image:
|
||||
continue
|
||||
|
||||
image_url = image_layers.get(layer, None)
|
||||
if image_url:
|
||||
image, _ = image_tag_from_url(image_url)
|
||||
else:
|
||||
image = ref_image
|
||||
if not image:
|
||||
continue
|
||||
|
||||
if known_path and ref_image:
|
||||
blob_path = known_path
|
||||
image = ref_image
|
||||
if ref_image != image:
|
||||
LOG.debug('[%s] Layer ref. by image %s already exists '
|
||||
'at %s' % (image, ref_image, known_path))
|
||||
else:
|
||||
LOG.debug('[%s] Layer already exists at %s'
|
||||
% (image, known_path))
|
||||
else:
|
||||
dir_path = os.path.join(IMAGE_EXPORT_DIR, 'v2', image, 'blobs')
|
||||
blob_path = os.path.join(dir_path, '%s.gz' % layer)
|
||||
if not os.path.exists(blob_path):
|
||||
LOG.debug('[%s] Layer not found: %s' % (image, blob_path))
|
||||
continue
|
||||
|
||||
layer_cross_link(layer, image, blob_path, target_image_url)
|
||||
linked_layers.update({layer: {'known_path': blob_path,
|
||||
'ref_image': image}})
|
||||
return linked_layers
|
||||
|
||||
|
||||
def export_manifest_config(target_url,
|
||||
manifest_str,
|
||||
manifest_type,
|
||||
|
|
|
@ -40,6 +40,7 @@ from tripleo_common.image.exception import ImageNotFoundException
|
|||
from tripleo_common.image.exception import ImageUploaderException
|
||||
from tripleo_common.image.exception import ImageUploaderThreadException
|
||||
from tripleo_common.image import image_export
|
||||
from tripleo_common.utils import image as image_utils
|
||||
from tripleo_common.utils.locks import threadinglock
|
||||
|
||||
|
||||
|
@ -164,8 +165,9 @@ class ImageUploadManager(BaseImageManager):
|
|||
super(ImageUploadManager, self).__init__(config_files)
|
||||
self.uploaders = {
|
||||
'skopeo': SkopeoImageUploader(),
|
||||
'python': PythonImageUploader(lock)
|
||||
'python': PythonImageUploader()
|
||||
}
|
||||
self.uploaders['python'].init_global_state(lock)
|
||||
self.dry_run = dry_run
|
||||
self.cleanup = cleanup
|
||||
if mirrors:
|
||||
|
@ -177,7 +179,6 @@ class ImageUploadManager(BaseImageManager):
|
|||
for uploader in self.uploaders.values():
|
||||
uploader.registry_credentials = registry_credentials
|
||||
self.multi_arch = multi_arch
|
||||
self.lock = lock
|
||||
|
||||
@staticmethod
|
||||
def validate_registry_credentials(creds_data):
|
||||
|
@ -250,7 +251,7 @@ class ImageUploadManager(BaseImageManager):
|
|||
tasks.append(UploadTask(
|
||||
image_name, pull_source, push_destination,
|
||||
append_tag, modify_role, modify_vars, self.dry_run,
|
||||
self.cleanup, multi_arch, self.lock))
|
||||
self.cleanup, multi_arch))
|
||||
|
||||
# NOTE(mwhahaha): We want to randomize the upload process because of
|
||||
# the shared nature of container layers. Because we multiprocess the
|
||||
|
@ -282,13 +283,12 @@ class BaseImageUploader(object):
|
|||
export_registries = set()
|
||||
push_registries = set()
|
||||
|
||||
def __init__(self, lock=None):
|
||||
def __init__(self):
|
||||
self.upload_tasks = []
|
||||
# A mapping of layer hashs to the image which first copied that
|
||||
# layer to the target
|
||||
self.image_layers = {}
|
||||
self.registry_credentials = {}
|
||||
self.lock = lock
|
||||
|
||||
@classmethod
|
||||
def init_registries_cache(cls):
|
||||
|
@ -912,8 +912,14 @@ class BaseImageUploader(object):
|
|||
name = target_image_url.path.split(':')[0][1:]
|
||||
export = netloc in cls.export_registries
|
||||
if export:
|
||||
image_export.cross_repo_mount(
|
||||
target_image_url, image_layers, source_layers)
|
||||
linked_layers = image_export.cross_repo_mount(
|
||||
target_image_url, image_layers, source_layers,
|
||||
uploaded_layers=cls._global_view_proxy())
|
||||
# track linked layers globally for future references
|
||||
for layer, info in linked_layers.items():
|
||||
cls._track_uploaded_layers(
|
||||
layer, known_path=info['known_path'],
|
||||
image_ref=info['ref_image'], scope='local')
|
||||
return
|
||||
|
||||
if netloc in cls.insecure_registries:
|
||||
|
@ -923,17 +929,24 @@ class BaseImageUploader(object):
|
|||
url = '%s://%s/v2/%s/blobs/uploads/' % (scheme, netloc, name)
|
||||
|
||||
for layer in source_layers:
|
||||
if layer in image_layers:
|
||||
known_path, existing_name = image_utils.uploaded_layers_details(
|
||||
cls._global_view_proxy(), layer, scope='remote')
|
||||
if layer not in image_layers and not existing_name:
|
||||
continue
|
||||
if not existing_name:
|
||||
existing_name = image_layers[layer].path.split(':')[0][1:]
|
||||
LOG.info('[%s] Cross repository blob mount from %s' %
|
||||
(layer, existing_name))
|
||||
data = {
|
||||
'mount': layer,
|
||||
'from': existing_name
|
||||
}
|
||||
r = session.post(url, data=data, timeout=30)
|
||||
cls.check_status(session=session, request=r)
|
||||
LOG.debug('%s %s' % (r.status_code, r.reason))
|
||||
if existing_name != name:
|
||||
LOG.debug('[%s] Layer %s ref. by image %s already exists '
|
||||
'at %s' % (name, layer, existing_name, known_path))
|
||||
LOG.info('[%s] Cross repository blob mount from %s' %
|
||||
(layer, existing_name))
|
||||
data = {
|
||||
'mount': layer,
|
||||
'from': existing_name
|
||||
}
|
||||
r = session.post(url, data=data, timeout=30)
|
||||
cls.check_status(session=session, request=r)
|
||||
LOG.debug('%s %s' % (r.status_code, r.reason))
|
||||
|
||||
|
||||
class SkopeoImageUploader(BaseImageUploader):
|
||||
|
@ -1124,43 +1137,87 @@ class SkopeoImageUploader(BaseImageUploader):
|
|||
class PythonImageUploader(BaseImageUploader):
|
||||
"""Upload images using a direct implementation of the registry API"""
|
||||
|
||||
uploaded_layers = {} # provides global view for multi-threading workers
|
||||
lock = None # provides global locking info plus global view, if MP is used
|
||||
|
||||
@classmethod
|
||||
def init_global_state(cls, lock):
|
||||
if not cls.lock:
|
||||
cls.lock = lock
|
||||
|
||||
@classmethod
|
||||
@tenacity.retry( # Retry until we no longer have collisions
|
||||
retry=tenacity.retry_if_exception_type(ImageUploaderThreadException),
|
||||
wait=tenacity.wait_random_exponential(multiplier=1, max=10)
|
||||
)
|
||||
def _layer_fetch_lock(cls, layer, lock=None):
|
||||
if not lock:
|
||||
def _layer_fetch_lock(cls, layer):
|
||||
if not cls.lock:
|
||||
LOG.warning('No lock information provided for layer %s' % layer)
|
||||
return
|
||||
if layer in lock.objects():
|
||||
if layer in cls.lock.objects():
|
||||
LOG.debug('[%s] Layer is being fetched by another thread' % layer)
|
||||
raise ImageUploaderThreadException('layer being fetched')
|
||||
LOG.debug('Locking layer %s' % layer)
|
||||
LOG.debug('Starting acquire for lock %s' % layer)
|
||||
with lock.get_lock():
|
||||
if layer in lock.objects():
|
||||
known_path, image = image_utils.uploaded_layers_details(
|
||||
cls._global_view_proxy(), layer, scope='local')
|
||||
if not known_path or not image:
|
||||
known_path, image = image_utils.uploaded_layers_details(
|
||||
cls._global_view_proxy(), layer, scope='remote')
|
||||
if image and known_path:
|
||||
# already processed layers needs no further locking
|
||||
return
|
||||
with cls.lock.get_lock():
|
||||
if layer in cls.lock.objects():
|
||||
LOG.debug('Collision for lock %s' % layer)
|
||||
raise ImageUploaderThreadException('layer conflict')
|
||||
LOG.debug('Acquired for lock %s' % layer)
|
||||
lock.objects().append(layer)
|
||||
LOG.debug('Updated lock info %s' % layer)
|
||||
cls.lock.objects().append(layer)
|
||||
LOG.debug('Got lock on layer %s' % layer)
|
||||
|
||||
@classmethod
|
||||
def _layer_fetch_unlock(cls, layer, lock=None):
|
||||
if not lock:
|
||||
def _layer_fetch_unlock(cls, layer):
|
||||
if not cls.lock:
|
||||
LOG.warning('No lock information provided for layer %s' % layer)
|
||||
return
|
||||
LOG.debug('Unlocking layer %s' % layer)
|
||||
LOG.debug('Starting acquire for lock %s' % layer)
|
||||
with lock.get_lock():
|
||||
LOG.debug('Acquired for unlock %s' % layer)
|
||||
while layer in lock.objects():
|
||||
lock.objects().remove(layer)
|
||||
LOG.debug('Updated lock info %s' % layer)
|
||||
with cls.lock.get_lock():
|
||||
while layer in cls.lock.objects():
|
||||
cls.lock.objects().remove(layer)
|
||||
LOG.debug('Released lock on layer %s' % layer)
|
||||
|
||||
@classmethod
|
||||
def _global_view_proxy(cls, value=None, forget=False):
|
||||
if not cls.lock:
|
||||
LOG.warning('No lock information provided for value %s' % value)
|
||||
return
|
||||
with cls.lock.get_lock():
|
||||
if value and forget:
|
||||
cls.uploaded_layers.pop(value, None)
|
||||
if hasattr(cls.lock, '_global_view'):
|
||||
cls.lock._global_view.pop(value, None)
|
||||
elif value:
|
||||
cls.uploaded_layers.update(value)
|
||||
if hasattr(cls.lock, '_global_view'):
|
||||
cls.lock._global_view.update(value)
|
||||
|
||||
if not value:
|
||||
# return global view consolidated among MP/MT workers state
|
||||
if hasattr(cls.lock, '_global_view'):
|
||||
consolidated_view = cls.uploaded_layers.copy()
|
||||
consolidated_view.update(cls.lock._global_view)
|
||||
return consolidated_view
|
||||
else:
|
||||
return cls.uploaded_layers
|
||||
|
||||
@classmethod
|
||||
def _track_uploaded_layers(cls, layer, known_path=None, image_ref=None,
|
||||
forget=False, scope='remote'):
|
||||
if forget:
|
||||
LOG.debug('Untracking processed layer %s for any scope' % layer)
|
||||
cls._global_view_proxy(value=layer, forget=True)
|
||||
else:
|
||||
LOG.debug('Tracking processed layer %s for %s scope'
|
||||
% (layer, scope))
|
||||
cls._global_view_proxy(
|
||||
value={layer: {scope: {'ref': image_ref, 'path': known_path}}})
|
||||
|
||||
def upload_image(self, task):
|
||||
"""Upload image from a task
|
||||
|
||||
|
@ -1185,8 +1242,6 @@ class PythonImageUploader(BaseImageUploader):
|
|||
if t.dry_run:
|
||||
return []
|
||||
|
||||
lock = t.lock
|
||||
|
||||
target_username, target_password = self.credentials_for_registry(
|
||||
t.target_image_url.netloc)
|
||||
target_session = self.authenticate(
|
||||
|
@ -1277,8 +1332,7 @@ class PythonImageUploader(BaseImageUploader):
|
|||
source_session=source_session,
|
||||
target_session=target_session,
|
||||
source_layers=source_layers,
|
||||
multi_arch=t.multi_arch,
|
||||
lock=lock
|
||||
multi_arch=t.multi_arch
|
||||
)
|
||||
except Exception:
|
||||
LOG.error('[%s] Failed uploading the target '
|
||||
|
@ -1494,38 +1548,58 @@ class PythonImageUploader(BaseImageUploader):
|
|||
def _copy_layer_registry_to_registry(cls, source_url, target_url,
|
||||
layer,
|
||||
source_session=None,
|
||||
target_session=None,
|
||||
lock=None):
|
||||
target_session=None):
|
||||
layer_entry = {'digest': layer}
|
||||
try:
|
||||
cls._layer_fetch_lock(layer, lock)
|
||||
cls._layer_fetch_lock(layer)
|
||||
if cls._target_layer_exists_registry(
|
||||
target_url, layer_entry, [layer_entry], target_session):
|
||||
cls._layer_fetch_unlock(layer, lock)
|
||||
cls._layer_fetch_unlock(layer)
|
||||
return
|
||||
known_path, ref_image = image_utils.uploaded_layers_details(
|
||||
cls._global_view_proxy(), layer, scope='local')
|
||||
if known_path and ref_image:
|
||||
# cross-link target from local source, skip fetching it again
|
||||
image_export.layer_cross_link(
|
||||
layer, ref_image, known_path, target_url)
|
||||
cls._layer_fetch_unlock(layer)
|
||||
return
|
||||
except ImageUploaderThreadException:
|
||||
# skip trying to unlock, because that's what threw the exception
|
||||
raise
|
||||
except Exception:
|
||||
cls._layer_fetch_unlock(layer, lock)
|
||||
cls._layer_fetch_unlock(layer)
|
||||
raise
|
||||
|
||||
digest = layer_entry['digest']
|
||||
LOG.debug('[%s] Uploading layer' % digest)
|
||||
|
||||
calc_digest = hashlib.sha256()
|
||||
known_path = None
|
||||
layer_val = None
|
||||
try:
|
||||
layer_stream = cls._layer_stream_registry(
|
||||
digest, source_url, calc_digest, source_session)
|
||||
layer_val = cls._copy_stream_to_registry(
|
||||
layer_val, known_path = cls._copy_stream_to_registry(
|
||||
target_url, layer_entry, calc_digest, layer_stream,
|
||||
target_session)
|
||||
except (IOError, requests.exceptions.HTTPError):
|
||||
cls._track_uploaded_layers(layer, forget=True, scope='remote')
|
||||
LOG.error('[%s] Failed processing layer for the target '
|
||||
'image %s' % (layer, target_url.geturl()))
|
||||
raise
|
||||
except Exception:
|
||||
raise
|
||||
else:
|
||||
if layer_val and known_path:
|
||||
image_ref = target_url.path.split(':')[0][1:]
|
||||
uploaded = parse.urlparse(known_path).scheme
|
||||
cls._track_uploaded_layers(
|
||||
layer_val, known_path=known_path, image_ref=image_ref,
|
||||
scope=('remote' if uploaded else 'local'))
|
||||
return layer_val
|
||||
finally:
|
||||
cls._layer_fetch_unlock(layer, lock)
|
||||
cls._layer_fetch_unlock(layer)
|
||||
|
||||
@classmethod
|
||||
def _assert_scheme(cls, url, scheme):
|
||||
|
@ -1547,8 +1621,7 @@ class PythonImageUploader(BaseImageUploader):
|
|||
source_session=None,
|
||||
target_session=None,
|
||||
source_layers=None,
|
||||
multi_arch=False,
|
||||
lock=None):
|
||||
multi_arch=False):
|
||||
cls._assert_scheme(source_url, 'docker')
|
||||
cls._assert_scheme(target_url, 'docker')
|
||||
|
||||
|
@ -1570,8 +1643,7 @@ class PythonImageUploader(BaseImageUploader):
|
|||
source_url, target_url,
|
||||
layer=layer,
|
||||
source_session=source_session,
|
||||
target_session=target_session,
|
||||
lock=lock
|
||||
target_session=target_session
|
||||
))
|
||||
|
||||
jobs_count = len(copy_jobs)
|
||||
|
@ -1740,27 +1812,40 @@ class PythonImageUploader(BaseImageUploader):
|
|||
def _target_layer_exists_registry(cls, target_url, layer, check_layers,
|
||||
session):
|
||||
image, tag = cls._image_tag_from_url(target_url)
|
||||
norm_image = (image[1:] if image.startswith('/') else image)
|
||||
parts = {
|
||||
'image': image,
|
||||
'tag': tag
|
||||
}
|
||||
# Do a HEAD call for the supplied digests
|
||||
# to see if the layer is already in the registry
|
||||
layer_found = None
|
||||
# Check in global view or do a HEAD call for the supplied
|
||||
# digests to see if the layer is already in the registry
|
||||
for l in check_layers:
|
||||
if not l:
|
||||
continue
|
||||
parts['digest'] = l['digest']
|
||||
blob_url = cls._build_url(
|
||||
target_url, CALL_BLOB % parts)
|
||||
if session.head(blob_url, timeout=30).status_code == 200:
|
||||
LOG.debug('[%s] Layer already exists: %s' %
|
||||
(image, l['digest']))
|
||||
layer['digest'] = l['digest']
|
||||
if 'size' in l:
|
||||
layer['size'] = l['size']
|
||||
if 'mediaType' in l:
|
||||
layer['mediaType'] = l['mediaType']
|
||||
return True
|
||||
known_path, ref_image = image_utils.uploaded_layers_details(
|
||||
cls._global_view_proxy(), l['digest'], scope='remote')
|
||||
if ref_image == norm_image:
|
||||
LOG.debug('[%s] Layer %s already exists at %s' %
|
||||
(image, l['digest'], known_path))
|
||||
layer_found = l
|
||||
break
|
||||
else:
|
||||
parts['digest'] = l['digest']
|
||||
blob_url = cls._build_url(
|
||||
target_url, CALL_BLOB % parts)
|
||||
if session.head(blob_url, timeout=30).status_code == 200:
|
||||
LOG.debug('[%s] Layer already exists: %s' %
|
||||
(image, l['digest']))
|
||||
layer_found = l
|
||||
break
|
||||
if layer_found:
|
||||
layer['digest'] = layer_found['digest']
|
||||
if 'size' in layer_found:
|
||||
layer['size'] = layer_found['size']
|
||||
if 'mediaType' in layer_found:
|
||||
layer['mediaType'] = layer_found['mediaType']
|
||||
return True
|
||||
return False
|
||||
|
||||
@classmethod
|
||||
|
@ -1807,8 +1892,8 @@ class PythonImageUploader(BaseImageUploader):
|
|||
def _copy_layer_local_to_registry(cls, target_url,
|
||||
session, layer, layer_entry):
|
||||
|
||||
# Do a HEAD call for the compressed-diff-digest and diff-digest
|
||||
# to see if the layer is already in the registry
|
||||
# Check in global view or do a HEAD call for the compressed-diff-digest
|
||||
# and diff-digest to see if the layer is already in the registry
|
||||
check_layers = []
|
||||
compressed_digest = layer_entry.get('compressed-diff-digest')
|
||||
if compressed_digest:
|
||||
|
@ -1833,10 +1918,29 @@ class PythonImageUploader(BaseImageUploader):
|
|||
LOG.debug('[%s] Uploading layer' % layer_id)
|
||||
|
||||
calc_digest = hashlib.sha256()
|
||||
layer_stream = cls._layer_stream_local(layer_id, calc_digest)
|
||||
return cls._copy_stream_to_registry(target_url, layer, calc_digest,
|
||||
layer_stream, session,
|
||||
verify_digest=False)
|
||||
known_path = None
|
||||
layer_val = None
|
||||
try:
|
||||
layer_stream = cls._layer_stream_local(layer_id, calc_digest)
|
||||
layer_val, known_path = cls._copy_stream_to_registry(
|
||||
target_url, layer, calc_digest, layer_stream, session,
|
||||
verify_digest=False)
|
||||
except (IOError, requests.exceptions.HTTPError):
|
||||
cls._track_uploaded_layers(
|
||||
layer['digest'], forget=True, scope='remote')
|
||||
LOG.error('[%s] Failed processing layer for the target '
|
||||
'image %s' % (layer['digest'], target_url.geturl()))
|
||||
raise
|
||||
except Exception:
|
||||
raise
|
||||
else:
|
||||
if layer_val and known_path:
|
||||
image_ref = target_url.path.split(':')[0][1:]
|
||||
uploaded = parse.urlparse(known_path).scheme
|
||||
cls._track_uploaded_layers(
|
||||
layer_val, known_path=known_path, image_ref=image_ref,
|
||||
scope=('remote' if uploaded else 'local'))
|
||||
return layer_val
|
||||
|
||||
@classmethod
|
||||
def _copy_stream_to_registry(cls, target_url, layer, calc_digest,
|
||||
|
@ -1885,7 +1989,7 @@ class PythonImageUploader(BaseImageUploader):
|
|||
cls.check_status(session=session, request=upload_resp)
|
||||
layer['digest'] = layer_digest
|
||||
layer['size'] = length
|
||||
return layer_digest
|
||||
return (layer_digest, cls._build_url(target_url, target_url.path))
|
||||
|
||||
@classmethod
|
||||
@tenacity.retry( # Retry up to 5 times with jittered exponential backoff
|
||||
|
@ -2087,10 +2191,6 @@ class PythonImageUploader(BaseImageUploader):
|
|||
return
|
||||
local_images = []
|
||||
|
||||
# Pull a single image first, to avoid duplicate pulls of the
|
||||
# same base layers
|
||||
local_images.extend(upload_task(args=self.upload_tasks.pop()))
|
||||
|
||||
with self._get_executor() as p:
|
||||
for result in p.map(upload_task, self.upload_tasks):
|
||||
local_images.extend(result)
|
||||
|
@ -2105,7 +2205,7 @@ class UploadTask(object):
|
|||
|
||||
def __init__(self, image_name, pull_source, push_destination,
|
||||
append_tag, modify_role, modify_vars, dry_run, cleanup,
|
||||
multi_arch, lock=None):
|
||||
multi_arch):
|
||||
self.image_name = image_name
|
||||
self.pull_source = pull_source
|
||||
self.push_destination = push_destination
|
||||
|
@ -2115,7 +2215,6 @@ class UploadTask(object):
|
|||
self.dry_run = dry_run
|
||||
self.cleanup = cleanup
|
||||
self.multi_arch = multi_arch
|
||||
self.lock = lock
|
||||
|
||||
if ':' in image_name:
|
||||
image = image_name.rpartition(':')[0]
|
||||
|
|
|
@ -378,7 +378,7 @@ def container_images_prepare(template_file=DEFAULT_TEMPLATE_FILE,
|
|||
del(entry['services'])
|
||||
|
||||
params.update(
|
||||
detect_insecure_registries(params))
|
||||
detect_insecure_registries(params, lock=lock))
|
||||
|
||||
return_data = {}
|
||||
if output_env_file:
|
||||
|
@ -388,7 +388,7 @@ def container_images_prepare(template_file=DEFAULT_TEMPLATE_FILE,
|
|||
return return_data
|
||||
|
||||
|
||||
def detect_insecure_registries(params):
|
||||
def detect_insecure_registries(params, lock=None):
|
||||
"""Detect insecure registries in image parameters
|
||||
|
||||
:param params: dict of container image parameters
|
||||
|
@ -396,7 +396,7 @@ def detect_insecure_registries(params):
|
|||
merged into other parameters
|
||||
"""
|
||||
insecure = set()
|
||||
uploader = image_uploader.ImageUploadManager().uploader('python')
|
||||
uploader = image_uploader.ImageUploadManager(lock=lock).uploader('python')
|
||||
for image in params.values():
|
||||
host = image.split('/')[0]
|
||||
if uploader.is_insecure_registry(host):
|
||||
|
|
|
@ -90,7 +90,7 @@ class TestImageExport(base.TestCase):
|
|||
}
|
||||
calc_digest = hashlib.sha256()
|
||||
layer_stream = io.BytesIO(blob_compressed)
|
||||
layer_digest = image_export.export_stream(
|
||||
layer_digest, _ = image_export.export_stream(
|
||||
target_url, layer, layer_stream, verify_digest=False
|
||||
)
|
||||
self.assertEqual(compressed_digest, layer_digest)
|
||||
|
@ -163,7 +163,8 @@ class TestImageExport(base.TestCase):
|
|||
target_blob_path = os.path.join(target_blob_dir, 'sha256:1234.gz')
|
||||
|
||||
# call with missing source, no change
|
||||
image_export.cross_repo_mount(target_url, image_layers, source_layers)
|
||||
image_export.cross_repo_mount(target_url, image_layers, source_layers,
|
||||
uploaded_layers={})
|
||||
self.assertFalse(os.path.exists(source_blob_path))
|
||||
self.assertFalse(os.path.exists(target_blob_path))
|
||||
|
||||
|
@ -173,7 +174,8 @@ class TestImageExport(base.TestCase):
|
|||
self.assertTrue(os.path.exists(source_blob_path))
|
||||
|
||||
# call with existing source
|
||||
image_export.cross_repo_mount(target_url, image_layers, source_layers)
|
||||
image_export.cross_repo_mount(target_url, image_layers, source_layers,
|
||||
uploaded_layers={})
|
||||
self.assertTrue(os.path.exists(target_blob_path))
|
||||
with open(target_blob_path, 'r') as f:
|
||||
self.assertEqual('blob', f.read())
|
||||
|
|
|
@ -1318,8 +1318,7 @@ class TestPythonImageUploader(base.TestCase):
|
|||
source_session=source_session,
|
||||
target_session=target_session,
|
||||
source_layers=['sha256:aaa', 'sha256:bbb', 'sha256:ccc'],
|
||||
multi_arch=False,
|
||||
lock=None
|
||||
multi_arch=False
|
||||
)
|
||||
|
||||
@mock.patch('tripleo_common.image.image_uploader.'
|
||||
|
@ -1553,8 +1552,7 @@ class TestPythonImageUploader(base.TestCase):
|
|||
source_session=source_session,
|
||||
target_session=target_session,
|
||||
source_layers=['sha256:aaa', 'sha256:bbb', 'sha256:ccc'],
|
||||
multi_arch=False,
|
||||
lock=None
|
||||
multi_arch=False
|
||||
)
|
||||
|
||||
@mock.patch('tripleo_common.image.image_uploader.'
|
||||
|
@ -1685,8 +1683,7 @@ class TestPythonImageUploader(base.TestCase):
|
|||
source_session=source_session,
|
||||
target_session=target_session,
|
||||
source_layers=['sha256:aaa', 'sha256:bbb', 'sha256:ccc'],
|
||||
multi_arch=False,
|
||||
lock=None
|
||||
multi_arch=False
|
||||
)
|
||||
_copy_registry_to_local.assert_called_once_with(unmodified_target_url)
|
||||
run_modify_playbook.assert_called_once_with(
|
||||
|
@ -1816,7 +1813,8 @@ class TestPythonImageUploader(base.TestCase):
|
|||
|
||||
@mock.patch('tripleo_common.image.image_uploader.'
|
||||
'PythonImageUploader._upload_url')
|
||||
def test_copy_layer_registry_to_registry(self, _upload_url):
|
||||
@mock.patch('tripleo_common.utils.image.uploaded_layers_details')
|
||||
def test_copy_layer_registry_to_registry(self, global_check, _upload_url):
|
||||
_upload_url.return_value = 'https://192.168.2.1:5000/v2/upload'
|
||||
source_url = urlparse('docker://docker.io/t/nova-api:latest')
|
||||
target_url = urlparse('docker://192.168.2.1:5000/t/nova-api:latest')
|
||||
|
@ -1835,6 +1833,7 @@ class TestPythonImageUploader(base.TestCase):
|
|||
layer = layer_entry['digest']
|
||||
|
||||
# layer already exists at destination
|
||||
global_check.return_value = (None, None)
|
||||
self.requests.head(
|
||||
'https://192.168.2.1:5000/v2/t/nova-api/blobs/%s' % blob_digest,
|
||||
status_code=200
|
||||
|
@ -2022,8 +2021,9 @@ class TestPythonImageUploader(base.TestCase):
|
|||
@mock.patch('subprocess.Popen')
|
||||
@mock.patch('tripleo_common.image.image_uploader.'
|
||||
'PythonImageUploader._upload_url')
|
||||
def test_copy_layer_local_to_registry(self, _upload_url, mock_popen,
|
||||
mock_exists):
|
||||
@mock.patch('tripleo_common.utils.image.uploaded_layers_details')
|
||||
def test_copy_layer_local_to_registry(self, global_check, _upload_url,
|
||||
mock_popen, mock_exists):
|
||||
mock_exists.return_value = True
|
||||
_upload_url.return_value = 'https://192.168.2.1:5000/v2/upload'
|
||||
target_url = urlparse('docker://192.168.2.1:5000/t/nova-api:latest')
|
||||
|
@ -2048,6 +2048,7 @@ class TestPythonImageUploader(base.TestCase):
|
|||
}
|
||||
|
||||
# layer already exists at destination
|
||||
global_check.return_value = (None, None)
|
||||
self.requests.head(
|
||||
'https://192.168.2.1:5000/v2/t/'
|
||||
'nova-api/blobs/%s' % compressed_digest,
|
||||
|
@ -2117,6 +2118,7 @@ class TestPythonImageUploader(base.TestCase):
|
|||
layer
|
||||
)
|
||||
|
||||
@mock.patch('tripleo_common.utils.image.uploaded_layers_details')
|
||||
@mock.patch('tripleo_common.image.image_uploader.'
|
||||
'PythonImageUploader._image_manifest_config')
|
||||
@mock.patch('tripleo_common.image.image_uploader.'
|
||||
|
@ -2127,11 +2129,12 @@ class TestPythonImageUploader(base.TestCase):
|
|||
'PythonImageUploader._upload_url')
|
||||
def test_copy_local_to_registry(self, _upload_url, _containers_json,
|
||||
_copy_layer_local_to_registry,
|
||||
_image_manifest_config):
|
||||
_image_manifest_config, _global_check):
|
||||
source_url = urlparse('containers-storage:/t/nova-api:latest')
|
||||
target_url = urlparse('docker://192.168.2.1:5000/t/nova-api:latest')
|
||||
target_session = requests.Session()
|
||||
_upload_url.return_value = 'https://192.168.2.1:5000/v2/upload'
|
||||
_global_check.return_value = (None, None)
|
||||
layers = [{
|
||||
"compressed-diff-digest": "sha256:aeb786",
|
||||
"compressed-size": 74703002,
|
||||
|
|
|
@ -0,0 +1,26 @@
|
|||
# Copyright 2019 Red Hat, Inc.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
|
||||
def uploaded_layers_details(uploaded_layers, layer, scope):
|
||||
known_path = None
|
||||
known_layer = None
|
||||
image = None
|
||||
if layer:
|
||||
known_layer = uploaded_layers.get(layer, None)
|
||||
if known_layer and scope in known_layer:
|
||||
known_path = known_layer[scope].get('path', None)
|
||||
image = known_layer[scope].get('ref', None)
|
||||
return (known_path, image)
|
|
@ -23,6 +23,7 @@ from tripleo_common.utils.locks import base
|
|||
class ProcessLock(base.BaseLock):
|
||||
# the manager cannot live in __init__
|
||||
_mgr = multiprocessing.Manager()
|
||||
_global_view = _mgr.dict()
|
||||
|
||||
def __init__(self):
|
||||
self._lock = self._mgr.Lock()
|
||||
|
|
Loading…
Reference in New Issue