diff --git a/tripleo_common/image/image_export.py b/tripleo_common/image/image_export.py index d289f87c8..79de0b1f7 100644 --- a/tripleo_common/image/image_export.py +++ b/tripleo_common/image/image_export.py @@ -21,6 +21,7 @@ import requests import shutil from oslo_log import log as logging +from tripleo_common.utils import image as image_utils LOG = logging.getLogger(__name__) @@ -151,39 +152,68 @@ def export_stream(target_url, layer, layer_stream, verify_digest=True): ) if blob_path != expected_blob_path: os.rename(blob_path, expected_blob_path) + blob_path = expected_blob_path layer['digest'] = layer_digest layer['size'] = length LOG.debug('[%s] Done exporting image layer %s' % (image, digest)) - return layer_digest + return (layer_digest, blob_path) -def cross_repo_mount(target_image_url, image_layers, source_layers): - for layer in source_layers: - if layer not in image_layers: - continue - - image_url = image_layers[layer] - image, tag = image_tag_from_url(image_url) - dir_path = os.path.join(IMAGE_EXPORT_DIR, 'v2', image, 'blobs') - blob_path = os.path.join(dir_path, '%s.gz' % layer) - if not os.path.exists(blob_path): - LOG.debug('[%s] Layer not found: %s' % (image, blob_path)) - continue - - target_image, tag = image_tag_from_url(target_image_url) - target_dir_path = os.path.join( - IMAGE_EXPORT_DIR, 'v2', target_image, 'blobs') - make_dir(target_dir_path) - target_blob_path = os.path.join(target_dir_path, '%s.gz' % layer) - if os.path.exists(target_blob_path): - continue +def layer_cross_link(layer, image, blob_path, target_image_url): + target_image, _ = image_tag_from_url(target_image_url) + target_dir_path = os.path.join( + IMAGE_EXPORT_DIR, 'v2', target_image, 'blobs') + make_dir(target_dir_path) + target_blob_path = os.path.join(target_dir_path, '%s.gz' % layer) + if not os.path.exists(target_blob_path): LOG.debug('[%s] Linking layers: %s -> %s' % (image, blob_path, target_blob_path)) # make a hard link so the layers can have independent lifecycles os.link(blob_path, target_blob_path) +def cross_repo_mount(target_image_url, image_layers, source_layers, + uploaded_layers=None): + linked_layers = {} + target_image, _ = image_tag_from_url(target_image_url) + for layer in source_layers: + known_path, ref_image = image_utils.uploaded_layers_details( + uploaded_layers, layer, scope='local') + + if layer not in image_layers and not ref_image: + continue + + image_url = image_layers.get(layer, None) + if image_url: + image, _ = image_tag_from_url(image_url) + else: + image = ref_image + if not image: + continue + + if known_path and ref_image: + blob_path = known_path + image = ref_image + if ref_image != image: + LOG.debug('[%s] Layer ref. by image %s already exists ' + 'at %s' % (image, ref_image, known_path)) + else: + LOG.debug('[%s] Layer already exists at %s' + % (image, known_path)) + else: + dir_path = os.path.join(IMAGE_EXPORT_DIR, 'v2', image, 'blobs') + blob_path = os.path.join(dir_path, '%s.gz' % layer) + if not os.path.exists(blob_path): + LOG.debug('[%s] Layer not found: %s' % (image, blob_path)) + continue + + layer_cross_link(layer, image, blob_path, target_image_url) + linked_layers.update({layer: {'known_path': blob_path, + 'ref_image': image}}) + return linked_layers + + def export_manifest_config(target_url, manifest_str, manifest_type, diff --git a/tripleo_common/image/image_uploader.py b/tripleo_common/image/image_uploader.py index d798712a0..2a6143393 100644 --- a/tripleo_common/image/image_uploader.py +++ b/tripleo_common/image/image_uploader.py @@ -40,6 +40,7 @@ from tripleo_common.image.exception import ImageNotFoundException from tripleo_common.image.exception import ImageUploaderException from tripleo_common.image.exception import ImageUploaderThreadException from tripleo_common.image import image_export +from tripleo_common.utils import image as image_utils from tripleo_common.utils.locks import threadinglock @@ -164,8 +165,9 @@ class ImageUploadManager(BaseImageManager): super(ImageUploadManager, self).__init__(config_files) self.uploaders = { 'skopeo': SkopeoImageUploader(), - 'python': PythonImageUploader(lock) + 'python': PythonImageUploader() } + self.uploaders['python'].init_global_state(lock) self.dry_run = dry_run self.cleanup = cleanup if mirrors: @@ -177,7 +179,6 @@ class ImageUploadManager(BaseImageManager): for uploader in self.uploaders.values(): uploader.registry_credentials = registry_credentials self.multi_arch = multi_arch - self.lock = lock @staticmethod def validate_registry_credentials(creds_data): @@ -250,7 +251,7 @@ class ImageUploadManager(BaseImageManager): tasks.append(UploadTask( image_name, pull_source, push_destination, append_tag, modify_role, modify_vars, self.dry_run, - self.cleanup, multi_arch, self.lock)) + self.cleanup, multi_arch)) # NOTE(mwhahaha): We want to randomize the upload process because of # the shared nature of container layers. Because we multiprocess the @@ -282,13 +283,12 @@ class BaseImageUploader(object): export_registries = set() push_registries = set() - def __init__(self, lock=None): + def __init__(self): self.upload_tasks = [] # A mapping of layer hashs to the image which first copied that # layer to the target self.image_layers = {} self.registry_credentials = {} - self.lock = lock @classmethod def init_registries_cache(cls): @@ -912,8 +912,14 @@ class BaseImageUploader(object): name = target_image_url.path.split(':')[0][1:] export = netloc in cls.export_registries if export: - image_export.cross_repo_mount( - target_image_url, image_layers, source_layers) + linked_layers = image_export.cross_repo_mount( + target_image_url, image_layers, source_layers, + uploaded_layers=cls._global_view_proxy()) + # track linked layers globally for future references + for layer, info in linked_layers.items(): + cls._track_uploaded_layers( + layer, known_path=info['known_path'], + image_ref=info['ref_image'], scope='local') return if netloc in cls.insecure_registries: @@ -923,17 +929,24 @@ class BaseImageUploader(object): url = '%s://%s/v2/%s/blobs/uploads/' % (scheme, netloc, name) for layer in source_layers: - if layer in image_layers: + known_path, existing_name = image_utils.uploaded_layers_details( + cls._global_view_proxy(), layer, scope='remote') + if layer not in image_layers and not existing_name: + continue + if not existing_name: existing_name = image_layers[layer].path.split(':')[0][1:] - LOG.info('[%s] Cross repository blob mount from %s' % - (layer, existing_name)) - data = { - 'mount': layer, - 'from': existing_name - } - r = session.post(url, data=data, timeout=30) - cls.check_status(session=session, request=r) - LOG.debug('%s %s' % (r.status_code, r.reason)) + if existing_name != name: + LOG.debug('[%s] Layer %s ref. by image %s already exists ' + 'at %s' % (name, layer, existing_name, known_path)) + LOG.info('[%s] Cross repository blob mount from %s' % + (layer, existing_name)) + data = { + 'mount': layer, + 'from': existing_name + } + r = session.post(url, data=data, timeout=30) + cls.check_status(session=session, request=r) + LOG.debug('%s %s' % (r.status_code, r.reason)) class SkopeoImageUploader(BaseImageUploader): @@ -1124,43 +1137,87 @@ class SkopeoImageUploader(BaseImageUploader): class PythonImageUploader(BaseImageUploader): """Upload images using a direct implementation of the registry API""" + uploaded_layers = {} # provides global view for multi-threading workers + lock = None # provides global locking info plus global view, if MP is used + + @classmethod + def init_global_state(cls, lock): + if not cls.lock: + cls.lock = lock + @classmethod @tenacity.retry( # Retry until we no longer have collisions retry=tenacity.retry_if_exception_type(ImageUploaderThreadException), wait=tenacity.wait_random_exponential(multiplier=1, max=10) ) - def _layer_fetch_lock(cls, layer, lock=None): - if not lock: + def _layer_fetch_lock(cls, layer): + if not cls.lock: LOG.warning('No lock information provided for layer %s' % layer) return - if layer in lock.objects(): + if layer in cls.lock.objects(): LOG.debug('[%s] Layer is being fetched by another thread' % layer) raise ImageUploaderThreadException('layer being fetched') - LOG.debug('Locking layer %s' % layer) - LOG.debug('Starting acquire for lock %s' % layer) - with lock.get_lock(): - if layer in lock.objects(): + known_path, image = image_utils.uploaded_layers_details( + cls._global_view_proxy(), layer, scope='local') + if not known_path or not image: + known_path, image = image_utils.uploaded_layers_details( + cls._global_view_proxy(), layer, scope='remote') + if image and known_path: + # already processed layers needs no further locking + return + with cls.lock.get_lock(): + if layer in cls.lock.objects(): LOG.debug('Collision for lock %s' % layer) raise ImageUploaderThreadException('layer conflict') - LOG.debug('Acquired for lock %s' % layer) - lock.objects().append(layer) - LOG.debug('Updated lock info %s' % layer) + cls.lock.objects().append(layer) LOG.debug('Got lock on layer %s' % layer) @classmethod - def _layer_fetch_unlock(cls, layer, lock=None): - if not lock: + def _layer_fetch_unlock(cls, layer): + if not cls.lock: LOG.warning('No lock information provided for layer %s' % layer) return - LOG.debug('Unlocking layer %s' % layer) - LOG.debug('Starting acquire for lock %s' % layer) - with lock.get_lock(): - LOG.debug('Acquired for unlock %s' % layer) - while layer in lock.objects(): - lock.objects().remove(layer) - LOG.debug('Updated lock info %s' % layer) + with cls.lock.get_lock(): + while layer in cls.lock.objects(): + cls.lock.objects().remove(layer) LOG.debug('Released lock on layer %s' % layer) + @classmethod + def _global_view_proxy(cls, value=None, forget=False): + if not cls.lock: + LOG.warning('No lock information provided for value %s' % value) + return + with cls.lock.get_lock(): + if value and forget: + cls.uploaded_layers.pop(value, None) + if hasattr(cls.lock, '_global_view'): + cls.lock._global_view.pop(value, None) + elif value: + cls.uploaded_layers.update(value) + if hasattr(cls.lock, '_global_view'): + cls.lock._global_view.update(value) + + if not value: + # return global view consolidated among MP/MT workers state + if hasattr(cls.lock, '_global_view'): + consolidated_view = cls.uploaded_layers.copy() + consolidated_view.update(cls.lock._global_view) + return consolidated_view + else: + return cls.uploaded_layers + + @classmethod + def _track_uploaded_layers(cls, layer, known_path=None, image_ref=None, + forget=False, scope='remote'): + if forget: + LOG.debug('Untracking processed layer %s for any scope' % layer) + cls._global_view_proxy(value=layer, forget=True) + else: + LOG.debug('Tracking processed layer %s for %s scope' + % (layer, scope)) + cls._global_view_proxy( + value={layer: {scope: {'ref': image_ref, 'path': known_path}}}) + def upload_image(self, task): """Upload image from a task @@ -1185,8 +1242,6 @@ class PythonImageUploader(BaseImageUploader): if t.dry_run: return [] - lock = t.lock - target_username, target_password = self.credentials_for_registry( t.target_image_url.netloc) target_session = self.authenticate( @@ -1277,8 +1332,7 @@ class PythonImageUploader(BaseImageUploader): source_session=source_session, target_session=target_session, source_layers=source_layers, - multi_arch=t.multi_arch, - lock=lock + multi_arch=t.multi_arch ) except Exception: LOG.error('[%s] Failed uploading the target ' @@ -1494,38 +1548,58 @@ class PythonImageUploader(BaseImageUploader): def _copy_layer_registry_to_registry(cls, source_url, target_url, layer, source_session=None, - target_session=None, - lock=None): + target_session=None): layer_entry = {'digest': layer} try: - cls._layer_fetch_lock(layer, lock) + cls._layer_fetch_lock(layer) if cls._target_layer_exists_registry( target_url, layer_entry, [layer_entry], target_session): - cls._layer_fetch_unlock(layer, lock) + cls._layer_fetch_unlock(layer) + return + known_path, ref_image = image_utils.uploaded_layers_details( + cls._global_view_proxy(), layer, scope='local') + if known_path and ref_image: + # cross-link target from local source, skip fetching it again + image_export.layer_cross_link( + layer, ref_image, known_path, target_url) + cls._layer_fetch_unlock(layer) return except ImageUploaderThreadException: # skip trying to unlock, because that's what threw the exception raise except Exception: - cls._layer_fetch_unlock(layer, lock) + cls._layer_fetch_unlock(layer) raise digest = layer_entry['digest'] LOG.debug('[%s] Uploading layer' % digest) calc_digest = hashlib.sha256() + known_path = None + layer_val = None try: layer_stream = cls._layer_stream_registry( digest, source_url, calc_digest, source_session) - layer_val = cls._copy_stream_to_registry( + layer_val, known_path = cls._copy_stream_to_registry( target_url, layer_entry, calc_digest, layer_stream, target_session) + except (IOError, requests.exceptions.HTTPError): + cls._track_uploaded_layers(layer, forget=True, scope='remote') + LOG.error('[%s] Failed processing layer for the target ' + 'image %s' % (layer, target_url.geturl())) + raise except Exception: raise else: + if layer_val and known_path: + image_ref = target_url.path.split(':')[0][1:] + uploaded = parse.urlparse(known_path).scheme + cls._track_uploaded_layers( + layer_val, known_path=known_path, image_ref=image_ref, + scope=('remote' if uploaded else 'local')) return layer_val finally: - cls._layer_fetch_unlock(layer, lock) + cls._layer_fetch_unlock(layer) @classmethod def _assert_scheme(cls, url, scheme): @@ -1547,8 +1621,7 @@ class PythonImageUploader(BaseImageUploader): source_session=None, target_session=None, source_layers=None, - multi_arch=False, - lock=None): + multi_arch=False): cls._assert_scheme(source_url, 'docker') cls._assert_scheme(target_url, 'docker') @@ -1570,8 +1643,7 @@ class PythonImageUploader(BaseImageUploader): source_url, target_url, layer=layer, source_session=source_session, - target_session=target_session, - lock=lock + target_session=target_session )) jobs_count = len(copy_jobs) @@ -1740,27 +1812,40 @@ class PythonImageUploader(BaseImageUploader): def _target_layer_exists_registry(cls, target_url, layer, check_layers, session): image, tag = cls._image_tag_from_url(target_url) + norm_image = (image[1:] if image.startswith('/') else image) parts = { 'image': image, 'tag': tag } - # Do a HEAD call for the supplied digests - # to see if the layer is already in the registry + layer_found = None + # Check in global view or do a HEAD call for the supplied + # digests to see if the layer is already in the registry for l in check_layers: if not l: continue - parts['digest'] = l['digest'] - blob_url = cls._build_url( - target_url, CALL_BLOB % parts) - if session.head(blob_url, timeout=30).status_code == 200: - LOG.debug('[%s] Layer already exists: %s' % - (image, l['digest'])) - layer['digest'] = l['digest'] - if 'size' in l: - layer['size'] = l['size'] - if 'mediaType' in l: - layer['mediaType'] = l['mediaType'] - return True + known_path, ref_image = image_utils.uploaded_layers_details( + cls._global_view_proxy(), l['digest'], scope='remote') + if ref_image == norm_image: + LOG.debug('[%s] Layer %s already exists at %s' % + (image, l['digest'], known_path)) + layer_found = l + break + else: + parts['digest'] = l['digest'] + blob_url = cls._build_url( + target_url, CALL_BLOB % parts) + if session.head(blob_url, timeout=30).status_code == 200: + LOG.debug('[%s] Layer already exists: %s' % + (image, l['digest'])) + layer_found = l + break + if layer_found: + layer['digest'] = layer_found['digest'] + if 'size' in layer_found: + layer['size'] = layer_found['size'] + if 'mediaType' in layer_found: + layer['mediaType'] = layer_found['mediaType'] + return True return False @classmethod @@ -1807,8 +1892,8 @@ class PythonImageUploader(BaseImageUploader): def _copy_layer_local_to_registry(cls, target_url, session, layer, layer_entry): - # Do a HEAD call for the compressed-diff-digest and diff-digest - # to see if the layer is already in the registry + # Check in global view or do a HEAD call for the compressed-diff-digest + # and diff-digest to see if the layer is already in the registry check_layers = [] compressed_digest = layer_entry.get('compressed-diff-digest') if compressed_digest: @@ -1833,10 +1918,29 @@ class PythonImageUploader(BaseImageUploader): LOG.debug('[%s] Uploading layer' % layer_id) calc_digest = hashlib.sha256() - layer_stream = cls._layer_stream_local(layer_id, calc_digest) - return cls._copy_stream_to_registry(target_url, layer, calc_digest, - layer_stream, session, - verify_digest=False) + known_path = None + layer_val = None + try: + layer_stream = cls._layer_stream_local(layer_id, calc_digest) + layer_val, known_path = cls._copy_stream_to_registry( + target_url, layer, calc_digest, layer_stream, session, + verify_digest=False) + except (IOError, requests.exceptions.HTTPError): + cls._track_uploaded_layers( + layer['digest'], forget=True, scope='remote') + LOG.error('[%s] Failed processing layer for the target ' + 'image %s' % (layer['digest'], target_url.geturl())) + raise + except Exception: + raise + else: + if layer_val and known_path: + image_ref = target_url.path.split(':')[0][1:] + uploaded = parse.urlparse(known_path).scheme + cls._track_uploaded_layers( + layer_val, known_path=known_path, image_ref=image_ref, + scope=('remote' if uploaded else 'local')) + return layer_val @classmethod def _copy_stream_to_registry(cls, target_url, layer, calc_digest, @@ -1885,7 +1989,7 @@ class PythonImageUploader(BaseImageUploader): cls.check_status(session=session, request=upload_resp) layer['digest'] = layer_digest layer['size'] = length - return layer_digest + return (layer_digest, cls._build_url(target_url, target_url.path)) @classmethod @tenacity.retry( # Retry up to 5 times with jittered exponential backoff @@ -2087,10 +2191,6 @@ class PythonImageUploader(BaseImageUploader): return local_images = [] - # Pull a single image first, to avoid duplicate pulls of the - # same base layers - local_images.extend(upload_task(args=self.upload_tasks.pop())) - with self._get_executor() as p: for result in p.map(upload_task, self.upload_tasks): local_images.extend(result) @@ -2105,7 +2205,7 @@ class UploadTask(object): def __init__(self, image_name, pull_source, push_destination, append_tag, modify_role, modify_vars, dry_run, cleanup, - multi_arch, lock=None): + multi_arch): self.image_name = image_name self.pull_source = pull_source self.push_destination = push_destination @@ -2115,7 +2215,6 @@ class UploadTask(object): self.dry_run = dry_run self.cleanup = cleanup self.multi_arch = multi_arch - self.lock = lock if ':' in image_name: image = image_name.rpartition(':')[0] diff --git a/tripleo_common/image/kolla_builder.py b/tripleo_common/image/kolla_builder.py index 0f9201998..d3ea7ec11 100644 --- a/tripleo_common/image/kolla_builder.py +++ b/tripleo_common/image/kolla_builder.py @@ -378,7 +378,7 @@ def container_images_prepare(template_file=DEFAULT_TEMPLATE_FILE, del(entry['services']) params.update( - detect_insecure_registries(params)) + detect_insecure_registries(params, lock=lock)) return_data = {} if output_env_file: @@ -388,7 +388,7 @@ def container_images_prepare(template_file=DEFAULT_TEMPLATE_FILE, return return_data -def detect_insecure_registries(params): +def detect_insecure_registries(params, lock=None): """Detect insecure registries in image parameters :param params: dict of container image parameters @@ -396,7 +396,7 @@ def detect_insecure_registries(params): merged into other parameters """ insecure = set() - uploader = image_uploader.ImageUploadManager().uploader('python') + uploader = image_uploader.ImageUploadManager(lock=lock).uploader('python') for image in params.values(): host = image.split('/')[0] if uploader.is_insecure_registry(host): diff --git a/tripleo_common/tests/image/test_image_export.py b/tripleo_common/tests/image/test_image_export.py index 0e7169c46..11f1c9ad2 100644 --- a/tripleo_common/tests/image/test_image_export.py +++ b/tripleo_common/tests/image/test_image_export.py @@ -90,7 +90,7 @@ class TestImageExport(base.TestCase): } calc_digest = hashlib.sha256() layer_stream = io.BytesIO(blob_compressed) - layer_digest = image_export.export_stream( + layer_digest, _ = image_export.export_stream( target_url, layer, layer_stream, verify_digest=False ) self.assertEqual(compressed_digest, layer_digest) @@ -163,7 +163,8 @@ class TestImageExport(base.TestCase): target_blob_path = os.path.join(target_blob_dir, 'sha256:1234.gz') # call with missing source, no change - image_export.cross_repo_mount(target_url, image_layers, source_layers) + image_export.cross_repo_mount(target_url, image_layers, source_layers, + uploaded_layers={}) self.assertFalse(os.path.exists(source_blob_path)) self.assertFalse(os.path.exists(target_blob_path)) @@ -173,7 +174,8 @@ class TestImageExport(base.TestCase): self.assertTrue(os.path.exists(source_blob_path)) # call with existing source - image_export.cross_repo_mount(target_url, image_layers, source_layers) + image_export.cross_repo_mount(target_url, image_layers, source_layers, + uploaded_layers={}) self.assertTrue(os.path.exists(target_blob_path)) with open(target_blob_path, 'r') as f: self.assertEqual('blob', f.read()) diff --git a/tripleo_common/tests/image/test_image_uploader.py b/tripleo_common/tests/image/test_image_uploader.py index a7d0d10ad..3f7d8fa8f 100644 --- a/tripleo_common/tests/image/test_image_uploader.py +++ b/tripleo_common/tests/image/test_image_uploader.py @@ -1318,8 +1318,7 @@ class TestPythonImageUploader(base.TestCase): source_session=source_session, target_session=target_session, source_layers=['sha256:aaa', 'sha256:bbb', 'sha256:ccc'], - multi_arch=False, - lock=None + multi_arch=False ) @mock.patch('tripleo_common.image.image_uploader.' @@ -1553,8 +1552,7 @@ class TestPythonImageUploader(base.TestCase): source_session=source_session, target_session=target_session, source_layers=['sha256:aaa', 'sha256:bbb', 'sha256:ccc'], - multi_arch=False, - lock=None + multi_arch=False ) @mock.patch('tripleo_common.image.image_uploader.' @@ -1685,8 +1683,7 @@ class TestPythonImageUploader(base.TestCase): source_session=source_session, target_session=target_session, source_layers=['sha256:aaa', 'sha256:bbb', 'sha256:ccc'], - multi_arch=False, - lock=None + multi_arch=False ) _copy_registry_to_local.assert_called_once_with(unmodified_target_url) run_modify_playbook.assert_called_once_with( @@ -1816,7 +1813,8 @@ class TestPythonImageUploader(base.TestCase): @mock.patch('tripleo_common.image.image_uploader.' 'PythonImageUploader._upload_url') - def test_copy_layer_registry_to_registry(self, _upload_url): + @mock.patch('tripleo_common.utils.image.uploaded_layers_details') + def test_copy_layer_registry_to_registry(self, global_check, _upload_url): _upload_url.return_value = 'https://192.168.2.1:5000/v2/upload' source_url = urlparse('docker://docker.io/t/nova-api:latest') target_url = urlparse('docker://192.168.2.1:5000/t/nova-api:latest') @@ -1835,6 +1833,7 @@ class TestPythonImageUploader(base.TestCase): layer = layer_entry['digest'] # layer already exists at destination + global_check.return_value = (None, None) self.requests.head( 'https://192.168.2.1:5000/v2/t/nova-api/blobs/%s' % blob_digest, status_code=200 @@ -2022,8 +2021,9 @@ class TestPythonImageUploader(base.TestCase): @mock.patch('subprocess.Popen') @mock.patch('tripleo_common.image.image_uploader.' 'PythonImageUploader._upload_url') - def test_copy_layer_local_to_registry(self, _upload_url, mock_popen, - mock_exists): + @mock.patch('tripleo_common.utils.image.uploaded_layers_details') + def test_copy_layer_local_to_registry(self, global_check, _upload_url, + mock_popen, mock_exists): mock_exists.return_value = True _upload_url.return_value = 'https://192.168.2.1:5000/v2/upload' target_url = urlparse('docker://192.168.2.1:5000/t/nova-api:latest') @@ -2048,6 +2048,7 @@ class TestPythonImageUploader(base.TestCase): } # layer already exists at destination + global_check.return_value = (None, None) self.requests.head( 'https://192.168.2.1:5000/v2/t/' 'nova-api/blobs/%s' % compressed_digest, @@ -2117,6 +2118,7 @@ class TestPythonImageUploader(base.TestCase): layer ) + @mock.patch('tripleo_common.utils.image.uploaded_layers_details') @mock.patch('tripleo_common.image.image_uploader.' 'PythonImageUploader._image_manifest_config') @mock.patch('tripleo_common.image.image_uploader.' @@ -2127,11 +2129,12 @@ class TestPythonImageUploader(base.TestCase): 'PythonImageUploader._upload_url') def test_copy_local_to_registry(self, _upload_url, _containers_json, _copy_layer_local_to_registry, - _image_manifest_config): + _image_manifest_config, _global_check): source_url = urlparse('containers-storage:/t/nova-api:latest') target_url = urlparse('docker://192.168.2.1:5000/t/nova-api:latest') target_session = requests.Session() _upload_url.return_value = 'https://192.168.2.1:5000/v2/upload' + _global_check.return_value = (None, None) layers = [{ "compressed-diff-digest": "sha256:aeb786", "compressed-size": 74703002, diff --git a/tripleo_common/utils/image.py b/tripleo_common/utils/image.py new file mode 100644 index 000000000..49f02b80f --- /dev/null +++ b/tripleo_common/utils/image.py @@ -0,0 +1,26 @@ +# Copyright 2019 Red Hat, Inc. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + + +def uploaded_layers_details(uploaded_layers, layer, scope): + known_path = None + known_layer = None + image = None + if layer: + known_layer = uploaded_layers.get(layer, None) + if known_layer and scope in known_layer: + known_path = known_layer[scope].get('path', None) + image = known_layer[scope].get('ref', None) + return (known_path, image) diff --git a/tripleo_common/utils/locks/processlock.py b/tripleo_common/utils/locks/processlock.py index c927eb0c2..c30cda847 100644 --- a/tripleo_common/utils/locks/processlock.py +++ b/tripleo_common/utils/locks/processlock.py @@ -23,6 +23,7 @@ from tripleo_common.utils.locks import base class ProcessLock(base.BaseLock): # the manager cannot live in __init__ _mgr = multiprocessing.Manager() + _global_view = _mgr.dict() def __init__(self): self._lock = self._mgr.Lock()