diff --git a/tripleo_common/image/image_export.py b/tripleo_common/image/image_export.py index 10d60c2b3..b67a7103a 100644 --- a/tripleo_common/image/image_export.py +++ b/tripleo_common/image/image_export.py @@ -83,40 +83,48 @@ def export_stream(target_url, layer, layer_stream, verify_digest=True): make_dir(blob_dir_path) blob_path = os.path.join(blob_dir_path, '%s.gz' % digest) - LOG.debug('export layer to %s' % blob_path) + LOG.debug('[%s] Export layer to %s' % (image, blob_path)) length = 0 calc_digest = hashlib.sha256() try: with open(blob_path, 'wb') as f: + count = 0 for chunk in layer_stream: + count += 1 if not chunk: break + LOG.debug('[%s] Writing chunk %i for %s' % + (image, count, digest)) f.write(chunk) calc_digest.update(chunk) length += len(chunk) + LOG.debug('[%s] Written %i bytes for %s' % + (image, length, digest)) except Exception as e: - write_error = 'Write Failure: {}'.format(str(e)) + write_error = '[{}] Write Failure: {}'.format(image, str(e)) LOG.error(write_error) if os.path.isfile(blob_path): os.remove(blob_path) - LOG.error('Broken layer found and removed: %s' % blob_path) + LOG.error('[%s] Broken layer found and removed %s' % + (image, blob_path)) raise IOError(write_error) else: - LOG.info('Layer written successfully: %s' % blob_path) + LOG.info('[%s] Layer written successfully %s' % (image, blob_path)) layer_digest = 'sha256:%s' % calc_digest.hexdigest() - LOG.debug('Provided layer digest: %s' % digest) - LOG.debug('Calculated layer digest: %s' % layer_digest) + LOG.debug('[%s] Provided digest: %s, Calculated digest: %s' % + (image, digest, layer_digest)) if verify_digest: if digest != layer_digest: hash_request_id = hashlib.sha1(str(target_url.geturl()).encode()) error_msg = ( - 'Image ID: %s, Expected digest "%s" does not match' + '[%s] Image ID: %s, Expected digest "%s" does not match' ' calculated digest "%s", Blob path "%s". Blob' ' path will be cleaned up.' % ( + image, hash_request_id.hexdigest(), digest, layer_digest, @@ -138,6 +146,7 @@ def export_stream(target_url, layer, layer_stream, verify_digest=True): layer['digest'] = layer_digest layer['size'] = length + LOG.debug('[%s] Done exporting image layer %s' % (image, digest)) return layer_digest @@ -151,7 +160,7 @@ def cross_repo_mount(target_image_url, image_layers, source_layers): dir_path = os.path.join(IMAGE_EXPORT_DIR, 'v2', image, 'blobs') blob_path = os.path.join(dir_path, '%s.gz' % layer) if not os.path.exists(blob_path): - LOG.debug('Layer not found: %s' % blob_path) + LOG.debug('[%s] Layer not found: %s' % (image, blob_path)) continue target_image, tag = image_tag_from_url(target_image_url) @@ -161,7 +170,8 @@ def cross_repo_mount(target_image_url, image_layers, source_layers): target_blob_path = os.path.join(target_dir_path, '%s.gz' % layer) if os.path.exists(target_blob_path): continue - LOG.debug('Linking layers: %s -> %s' % (blob_path, target_blob_path)) + LOG.debug('[%s] Linking layers: %s -> %s' % + (image, blob_path, target_blob_path)) # make a hard link so the layers can have independent lifecycles os.link(blob_path, target_blob_path) @@ -286,7 +296,7 @@ def build_tags_list(image): IMAGE_EXPORT_DIR, 'v2', image, 'manifests') tags_dir_path = os.path.join(IMAGE_EXPORT_DIR, 'v2', image, 'tags') tags_list_path = os.path.join(tags_dir_path, 'list') - LOG.debug('Rebuilding %s' % tags_dir_path) + LOG.debug('[%s] Rebuilding %s' % (image, tags_dir_path)) make_dir(tags_dir_path) tags = [] for f in os.listdir(manifests_path): @@ -330,13 +340,14 @@ def delete_image(image_url): manifest_symlink_path = os.path.join(manifests_path, tag) if os.path.exists(manifest_symlink_path): - LOG.debug('Deleting legacy tag symlink %s' % manifest_symlink_path) + LOG.debug('[%s] Deleting legacy tag symlink %s' % + (image, manifest_symlink_path)) os.remove(manifest_symlink_path) type_map_path = os.path.join(manifests_path, '%s%s' % (tag, TYPE_MAP_EXTENSION)) if os.path.exists(type_map_path): - LOG.debug('Deleting typemap file %s' % type_map_path) + LOG.debug('[%s] Deleting typemap file %s' % (image, type_map_path)) os.remove(type_map_path) build_tags_list(image) @@ -358,7 +369,7 @@ def delete_image(image_url): # delete list of manifest_dir_path without symlinks for manifest_dir in delete_manifest_dirs: - LOG.debug('Deleting manifest %s' % manifest_dir) + LOG.debug('[%s] Deleting manifest %s' % (image, manifest_dir)) shutil.rmtree(manifest_dir) # load all remaining manifests and build the set of of in-use blobs, @@ -392,14 +403,14 @@ def delete_image(image_url): for b in os.listdir(blobs_path)]) delete_blobs = all_blobs.difference(reffed_blobs) for blob in delete_blobs: - LOG.debug('Deleting layer blob %s' % blob) + LOG.debug('[%s] Deleting layer blob %s' % (image, blob)) os.remove(blob) # if no files left in manifests_path, delete the whole image remaining = os.listdir(manifests_path) if not remaining or remaining == ['.htaccess']: image_path = os.path.join(IMAGE_EXPORT_DIR, 'v2', image) - LOG.debug('Deleting image directory %s' % image_path) + LOG.debug('[%s] Deleting image directory %s' % (image, image_path)) shutil.rmtree(image_path) # rebuild the catalog for the current image list diff --git a/tripleo_common/image/image_uploader.py b/tripleo_common/image/image_uploader.py index 14d06741c..f253c4acf 100644 --- a/tripleo_common/image/image_uploader.py +++ b/tripleo_common/image/image_uploader.py @@ -446,7 +446,7 @@ class BaseImageUploader(object): rauth.raise_for_status() session.headers['Authorization'] = 'Bearer %s' % rauth.json()['token'] hash_request_id = hashlib.sha1(str(rauth.url).encode()) - LOG.info( + LOG.debug( 'Session authenticated: id {}'.format( hash_request_id.hexdigest() ) @@ -918,7 +918,7 @@ class BaseImageUploader(object): for layer in source_layers: if layer in image_layers: existing_name = image_layers[layer].path.split(':')[0][1:] - LOG.info('Cross repository blob mount %s from %s' % + LOG.info('[%s] Cross repository blob mount from %s' % (layer, existing_name)) data = { 'mount': layer, @@ -934,7 +934,7 @@ class SkopeoImageUploader(BaseImageUploader): def upload_image(self, task): t = task - LOG.info('imagename: %s' % t.image_name) + LOG.info('[%s] Got imagename' % t.image_name) source_image_local_url = parse.urlparse('containers-storage:%s' % t.source_image) @@ -958,12 +958,12 @@ class SkopeoImageUploader(BaseImageUploader): image_exists = self._image_exists(t.target_image, target_session) except Exception: - LOG.warning('Failed to check if the target ' - 'image %s exists' % t.target_image) + LOG.warning('[%s] Failed to check if the target ' + 'image exists' % t.target_image) pass if t.modify_role and image_exists: - LOG.warning('Skipping upload for modified image %s' % - t.target_image) + LOG.warning('[%s] Skipping upload for modified ' + 'image' % t.target_image) target_session.close() return [] @@ -984,8 +984,8 @@ class SkopeoImageUploader(BaseImageUploader): t.target_image_url, self.image_layers, source_layers, session=target_session) except Exception: - LOG.error('Failed uploading the target ' - 'image %s' % t.target_image) + LOG.error('[%s] Failed uploading the target ' + 'image' % t.target_image) raise finally: source_session.close() @@ -1017,14 +1017,14 @@ class SkopeoImageUploader(BaseImageUploader): ) for layer in source_layers: self.image_layers.setdefault(layer, t.target_image_url) - LOG.warning('Completed modify and upload for image %s' % - t.image_name) + LOG.warning('[%s] Completed modify and upload for ' + 'image' % t.image_name) else: self._copy( t.source_image_url, t.target_image_url, ) - LOG.warning('Completed upload for image %s' % t.image_name) + LOG.warning('[%s] Completed upload for image' % t.image_name) for layer in source_layers: self.image_layers.setdefault(layer, t.target_image_url) return to_cleanup @@ -1066,7 +1066,7 @@ class SkopeoImageUploader(BaseImageUploader): def _delete(self, image_url, session=None): insecure = self.is_insecure_registry(registry_host=image_url.netloc) image = image_url.geturl() - LOG.info('Deleting %s' % image) + LOG.info('[%s] Deleting image' % image) cmd = ['skopeo', 'delete'] if insecure: @@ -1091,7 +1091,7 @@ class SkopeoImageUploader(BaseImageUploader): for image in sorted(local_images): if not image: continue - LOG.warning('Removing local copy of %s' % image) + LOG.warning('[%s] Removing local copy of image' % image) image_url = parse.urlparse('containers-storage:%s' % image) self._delete(image_url) @@ -1128,30 +1128,30 @@ class PythonImageUploader(BaseImageUploader): wait=tenacity.wait_random_exponential(multiplier=1, max=10) ) def _layer_fetch_lock(cls, layer): - LOG.debug('Locking layer %s' % layer) + LOG.debug('[%s] Locking layer' % layer) while layer in cls.uploader_lock_info: - LOG.debug('%s is being fetched by another thread' % layer) + LOG.debug('[%s] Layer is being fetched by another thread' % layer) time.sleep(0.5) - LOG.debug('Starting acquire for lock %s' % layer) + LOG.debug('[%s] Starting acquire for lock' % layer) with cls.uploader_lock: if layer in cls.uploader_lock_info: - LOG.debug('Collision for lock %s' % layer) + LOG.debug('[%s] Collision for lock' % layer) raise ImageUploaderThreadException('layer conflict') - LOG.debug('Acquired for lock %s' % layer) + LOG.debug('[%s] Acquired for lock' % layer) cls.uploader_lock_info.add(layer) - LOG.debug('Updated lock info %s' % layer) - LOG.debug('Got lock on layer %s' % layer) + LOG.debug('[%s] Updated lock info' % layer) + LOG.debug('[%s] Got lock on layer' % layer) @classmethod def _layer_fetch_unlock(cls, layer): - LOG.debug('Unlocking layer %s' % layer) - LOG.debug('Starting acquire for lock %s' % layer) + LOG.debug('[%s] Unlocking layer' % layer) + LOG.debug('[%s] Starting acquire for lock' % layer) with cls.uploader_lock: - LOG.debug('Acquired for unlock %s' % layer) + LOG.debug('[%s] Acquired for unlock' % layer) if layer in cls.uploader_lock_info: cls.uploader_lock_info.remove(layer) - LOG.debug('Updated lock info %s' % layer) - LOG.debug('Released lock on layer %s' % layer) + LOG.debug('[%s] Updated lock info' % layer) + LOG.debug('[%s] Released lock on layer' % layer) def upload_image(self, task): """Upload image from a task @@ -1169,7 +1169,7 @@ class PythonImageUploader(BaseImageUploader): :param: task: UploadTask with container information """ t = task - LOG.info('imagename: %s' % t.image_name) + LOG.info('[%s] Starting upload image process' % t.image_name) source_local = t.source_image.startswith('containers-storage:') target_image_local_url = parse.urlparse('containers-storage:%s' % @@ -1188,7 +1188,8 @@ class PythonImageUploader(BaseImageUploader): try: self._detect_target_export(t.target_image_url, target_session) except Exception: - LOG.error('Failed uploading the target image %s' % t.target_image) + LOG.error('[%s] Failed uploading the target ' + 'image' % t.target_image) # Close the session before raising it for more of retrying perhaps target_session.close() raise @@ -1199,8 +1200,8 @@ class PythonImageUploader(BaseImageUploader): raise NotImplementedError('Modify role not implemented for ' 'local containers') if t.cleanup: - LOG.warning('Cleanup has no effect with a local source ' - 'container.') + LOG.warning('[%s] Cleanup has no effect with a local source ' + 'container.' % t.image_name) try: source_local_url = parse.urlparse(t.source_image) @@ -1211,7 +1212,7 @@ class PythonImageUploader(BaseImageUploader): session=target_session ) except Exception: - LOG.warning('Failed copying the target image %s ' + LOG.warning('[%s] Failed copying the target image ' 'to the target registry' % t.target_image) pass target_session.close() @@ -1223,12 +1224,12 @@ class PythonImageUploader(BaseImageUploader): image_exists = self._image_exists(t.target_image, target_session) except Exception: - LOG.warning('Failed to check if the target ' - 'image %s exists' % t.target_image) + LOG.warning('[%s] Failed to check if the target ' + 'image exists' % t.target_image) pass if image_exists: - LOG.warning('Skipping upload for modified image %s' % - t.target_image) + LOG.warning('[%s] Skipping upload for modified image %s' % + (t.image_name, t.target_image)) target_session.close() return [] copy_target_url = t.target_image_source_tag_url @@ -1269,8 +1270,8 @@ class PythonImageUploader(BaseImageUploader): multi_arch=t.multi_arch ) except Exception: - LOG.error('Failed uploading the target ' - 'image %s' % t.target_image) + LOG.error('[%s] Failed uploading the target ' + 'image' % t.target_image) # Close the sessions before raising it for more of # retrying perhaps source_session.close() @@ -1278,13 +1279,10 @@ class PythonImageUploader(BaseImageUploader): raise if not t.modify_role: - LOG.warning('Completed upload for image %s' % t.image_name) + LOG.info('[%s] Completed upload for image' % t.image_name) else: - LOG.info( - 'Copy ummodified imagename: "{}" from target to local'.format( - t.image_name - ) - ) + LOG.info('[%s] Copy ummodified image from target to local' % + t.image_name) try: self._copy_registry_to_local(t.target_image_source_tag_url) @@ -1312,24 +1310,23 @@ class PythonImageUploader(BaseImageUploader): t.target_image_url, session=target_session ) + LOG.info('[%s] Completed modify and upload for image' % + t.image_name) except Exception: - LOG.error('Failed processing the target ' - 'image %s' % t.target_image) + LOG.error('[%s] Failed processing the target ' + 'image' % t.target_image) # Close the sessions before raising it for more of # retrying perhaps source_session.close() target_session.close() raise - LOG.warning('Completed modify and upload for image %s' % - t.image_name) - try: for layer in source_layers: self.image_layers.setdefault(layer, t.target_image_url) except Exception: - LOG.warning('Failed setting default layer for the target ' - 'image %s' % t.target_image) + LOG.warning('[%s] Failed setting default layer %s for the ' + 'target image' % (t.target_image, layer)) pass target_session.close() source_session.close() @@ -1448,7 +1445,6 @@ class PythonImageUploader(BaseImageUploader): ) def _layer_stream_registry(cls, digest, source_url, calc_digest, session): - LOG.debug('Fetching layer: %s' % digest) image, tag = cls._image_tag_from_url(source_url) parts = { 'image': image, @@ -1459,16 +1455,21 @@ class PythonImageUploader(BaseImageUploader): source_url, CALL_BLOB % parts) # NOTE(aschultz): We specify None and let requests figure it out chunk_size = None + LOG.info("[%s] Fetching layer %s from %s" % + (image, digest, source_blob_url)) with session.get( source_blob_url, stream=True, timeout=30) as blob_req: # TODO(aschultz): unsure if necessary or if only when using .text blob_req.encoding = 'utf-8' cls.check_status(session=session, request=blob_req) for data in blob_req.iter_content(chunk_size): + LOG.debug("[%s] Read %i bytes for %s" % + (image, len(data), digest)) if not data: break calc_digest.update(data) yield data + LOG.info("[%s] Done fetching layer %s from registry" % (image, digest)) @classmethod @tenacity.retry( # Retry up to 5 times with jittered exponential backoff @@ -1498,7 +1499,7 @@ class PythonImageUploader(BaseImageUploader): raise digest = layer_entry['digest'] - LOG.debug('Uploading layer: %s' % digest) + LOG.debug('[%s] Uploading layer' % digest) calc_digest = hashlib.sha256() try: @@ -1546,6 +1547,8 @@ class PythonImageUploader(BaseImageUploader): # Upload all layers copy_jobs = [] + jobs_count = 0 + jobs_finished = 0 with futures.ThreadPoolExecutor(max_workers=4) as p: if source_layers: for layer in source_layers: @@ -1556,26 +1559,33 @@ class PythonImageUploader(BaseImageUploader): source_session=source_session, target_session=target_session )) + + jobs_count = len(copy_jobs) + LOG.debug('[%s] Waiting for %i jobs to finish' % + (image, jobs_count)) for job in futures.as_completed(copy_jobs): e = job.exception() if e: raise e - image = job.result() - if image: - LOG.debug('Upload complete for layer: %s' % image) + layer = job.result() + if layer: + LOG.debug('[%s] Upload complete for layer %s' % + (image, layer)) + jobs_finished += 1 + LOG.debug('[%s] Waiting for next job: %i of %i complete' % + (image, jobs_finished, jobs_count)) + + LOG.debug('[%s] Completed %i jobs' % (image, jobs_count)) for source_manifest in source_manifests: manifest = json.loads(source_manifest) - LOG.debug( - 'Current image manifest: [%s]' % json.dumps( - manifest, - indent=4 - ) - ) + LOG.debug('[%s] Current image manifest: [%s]' % + (image, json.dumps(manifest, indent=4))) config_str = None if manifest.get('mediaType') == MEDIA_MANIFEST_V2: config_digest = manifest['config']['digest'] - LOG.debug('Uploading config with digest: %s' % config_digest) + LOG.debug('[%s] Uploading config with digest: %s' % + (image, config_digest)) parts['digest'] = config_digest source_config_url = cls._build_url( @@ -1599,6 +1609,7 @@ class PythonImageUploader(BaseImageUploader): target_session=target_session, multi_arch=multi_arch ) + LOG.debug('[%s] Finished copying image' % image) @classmethod def _copy_manifest_config_to_registry(cls, target_url, @@ -1658,8 +1669,8 @@ class PythonImageUploader(BaseImageUploader): manifest_url = cls._build_url( target_url, CALL_MANIFEST % parts) - LOG.debug('Uploading manifest of type %s to: %s' % ( - manifest_type, manifest_url)) + LOG.debug('[%s] Uploading manifest of type %s to: %s' % + (image, manifest_type, manifest_url)) r = target_session.put( manifest_url, @@ -1730,7 +1741,8 @@ class PythonImageUploader(BaseImageUploader): blob_url = cls._build_url( target_url, CALL_BLOB % parts) if session.head(blob_url, timeout=30).status_code == 200: - LOG.debug('Layer already exists: %s' % l['digest']) + LOG.debug('[%s] Layer already exists: %s' % + (image, l['digest'])) layer['digest'] = l['digest'] if 'size' in l: layer['size'] = l['size'] @@ -1741,7 +1753,7 @@ class PythonImageUploader(BaseImageUploader): @classmethod def _layer_stream_local(cls, layer_id, calc_digest): - LOG.debug('Exporting layer: %s' % layer_id) + LOG.debug('[%s] Exporting layer' % layer_id) tar_split_path = cls._containers_file_path( 'overlay-layers', @@ -1806,7 +1818,7 @@ class PythonImageUploader(BaseImageUploader): return layer_id = layer_entry['id'] - LOG.debug('Uploading layer: %s' % layer_id) + LOG.debug('[%s] Uploading layer' % layer_id) calc_digest = hashlib.sha256() layer_stream = cls._layer_stream_local(layer_id, calc_digest) @@ -1848,7 +1860,7 @@ class PythonImageUploader(BaseImageUploader): length += chunk_length layer_digest = 'sha256:%s' % calc_digest.hexdigest() - LOG.debug('Calculated layer digest: %s' % layer_digest) + LOG.debug('[%s] Calculated layer digest' % layer_digest) upload_url = cls._upload_url( target_url, session, upload_resp) upload_resp = session.put( @@ -1888,6 +1900,8 @@ class PythonImageUploader(BaseImageUploader): # Upload all layers copy_jobs = [] + jobs_count = 0 + jobs_finished = 0 with futures.ThreadPoolExecutor(max_workers=4) as p: for layer in manifest['layers']: layer_entry = layers_by_digest[layer['digest']] @@ -1895,13 +1909,22 @@ class PythonImageUploader(BaseImageUploader): cls._copy_layer_local_to_registry, target_url, session, layer, layer_entry )) + jobs_count = len(copy_jobs) + LOG.debug('[%s] Waiting for %i jobs to finish' % + (name, jobs_count)) for job in futures.as_completed(copy_jobs): e = job.exception() if e: raise e - image = job.result() - if image: - LOG.debug('Upload complete for layer: %s' % image) + layer = job.result() + if layer: + LOG.debug('[%s] Upload complete for layer: %s' % + (name, layer)) + jobs_finished += 1 + LOG.debug('[%s] Waiting for next job: %i of %i complete' % + (name, jobs_finished, jobs_count)) + + LOG.debug('[%s] Completed %i jobs' % (name, jobs_count)) manifest_str = json.dumps(manifest, indent=3) cls._copy_manifest_config_to_registry( @@ -1910,6 +1933,7 @@ class PythonImageUploader(BaseImageUploader): config_str=config_str, target_session=session ) + LOG.debug('[%s] Finished copying' % name) @classmethod def _containers_file_path(cls, *path): @@ -1999,7 +2023,7 @@ class PythonImageUploader(BaseImageUploader): @classmethod def _delete(cls, image_url, session=None): image = image_url.geturl() - LOG.info('Deleting %s' % image) + LOG.info('[%s] Deleting image' % image) if image_url.scheme == 'docker': return cls._delete_from_registry(image_url, session) if image_url.scheme != 'containers-storage': @@ -2024,7 +2048,7 @@ class PythonImageUploader(BaseImageUploader): for image in sorted(local_images): if not image: continue - LOG.warning('Removing local copy of %s' % image) + LOG.info('[%s] Removing local copy of image' % image) image_url = parse.urlparse('containers-storage:%s' % image) self._delete(image_url)