Improving debug logging

It's really hard to follow the multi-processing. This change adds
additional debug statements around the jobs and uploading processes.

Co-authored-by: Bogdan Dobrelya <bdobreli@redhat.com>
Change-Id: Ic043d6431cff80d7b9ee83e5f53f8bbf093ddefd
This commit is contained in:
Alex Schultz 2019-09-25 13:37:40 -06:00 committed by Bogdan Dobrelya
parent 23614b4144
commit 58abba685e
2 changed files with 123 additions and 88 deletions

View File

@ -83,40 +83,48 @@ def export_stream(target_url, layer, layer_stream, verify_digest=True):
make_dir(blob_dir_path)
blob_path = os.path.join(blob_dir_path, '%s.gz' % digest)
LOG.debug('export layer to %s' % blob_path)
LOG.debug('[%s] Export layer to %s' % (image, blob_path))
length = 0
calc_digest = hashlib.sha256()
try:
with open(blob_path, 'wb') as f:
count = 0
for chunk in layer_stream:
count += 1
if not chunk:
break
LOG.debug('[%s] Writing chunk %i for %s' %
(image, count, digest))
f.write(chunk)
calc_digest.update(chunk)
length += len(chunk)
LOG.debug('[%s] Written %i bytes for %s' %
(image, length, digest))
except Exception as e:
write_error = 'Write Failure: {}'.format(str(e))
write_error = '[{}] Write Failure: {}'.format(image, str(e))
LOG.error(write_error)
if os.path.isfile(blob_path):
os.remove(blob_path)
LOG.error('Broken layer found and removed: %s' % blob_path)
LOG.error('[%s] Broken layer found and removed %s' %
(image, blob_path))
raise IOError(write_error)
else:
LOG.info('Layer written successfully: %s' % blob_path)
LOG.info('[%s] Layer written successfully %s' % (image, blob_path))
layer_digest = 'sha256:%s' % calc_digest.hexdigest()
LOG.debug('Provided layer digest: %s' % digest)
LOG.debug('Calculated layer digest: %s' % layer_digest)
LOG.debug('[%s] Provided digest: %s, Calculated digest: %s' %
(image, digest, layer_digest))
if verify_digest:
if digest != layer_digest:
hash_request_id = hashlib.sha1(str(target_url.geturl()).encode())
error_msg = (
'Image ID: %s, Expected digest "%s" does not match'
'[%s] Image ID: %s, Expected digest "%s" does not match'
' calculated digest "%s", Blob path "%s". Blob'
' path will be cleaned up.' % (
image,
hash_request_id.hexdigest(),
digest,
layer_digest,
@ -138,6 +146,7 @@ def export_stream(target_url, layer, layer_stream, verify_digest=True):
layer['digest'] = layer_digest
layer['size'] = length
LOG.debug('[%s] Done exporting image layer %s' % (image, digest))
return layer_digest
@ -151,7 +160,7 @@ def cross_repo_mount(target_image_url, image_layers, source_layers):
dir_path = os.path.join(IMAGE_EXPORT_DIR, 'v2', image, 'blobs')
blob_path = os.path.join(dir_path, '%s.gz' % layer)
if not os.path.exists(blob_path):
LOG.debug('Layer not found: %s' % blob_path)
LOG.debug('[%s] Layer not found: %s' % (image, blob_path))
continue
target_image, tag = image_tag_from_url(target_image_url)
@ -161,7 +170,8 @@ def cross_repo_mount(target_image_url, image_layers, source_layers):
target_blob_path = os.path.join(target_dir_path, '%s.gz' % layer)
if os.path.exists(target_blob_path):
continue
LOG.debug('Linking layers: %s -> %s' % (blob_path, target_blob_path))
LOG.debug('[%s] Linking layers: %s -> %s' %
(image, blob_path, target_blob_path))
# make a hard link so the layers can have independent lifecycles
os.link(blob_path, target_blob_path)
@ -286,7 +296,7 @@ def build_tags_list(image):
IMAGE_EXPORT_DIR, 'v2', image, 'manifests')
tags_dir_path = os.path.join(IMAGE_EXPORT_DIR, 'v2', image, 'tags')
tags_list_path = os.path.join(tags_dir_path, 'list')
LOG.debug('Rebuilding %s' % tags_dir_path)
LOG.debug('[%s] Rebuilding %s' % (image, tags_dir_path))
make_dir(tags_dir_path)
tags = []
for f in os.listdir(manifests_path):
@ -330,13 +340,14 @@ def delete_image(image_url):
manifest_symlink_path = os.path.join(manifests_path, tag)
if os.path.exists(manifest_symlink_path):
LOG.debug('Deleting legacy tag symlink %s' % manifest_symlink_path)
LOG.debug('[%s] Deleting legacy tag symlink %s' %
(image, manifest_symlink_path))
os.remove(manifest_symlink_path)
type_map_path = os.path.join(manifests_path, '%s%s' %
(tag, TYPE_MAP_EXTENSION))
if os.path.exists(type_map_path):
LOG.debug('Deleting typemap file %s' % type_map_path)
LOG.debug('[%s] Deleting typemap file %s' % (image, type_map_path))
os.remove(type_map_path)
build_tags_list(image)
@ -358,7 +369,7 @@ def delete_image(image_url):
# delete list of manifest_dir_path without symlinks
for manifest_dir in delete_manifest_dirs:
LOG.debug('Deleting manifest %s' % manifest_dir)
LOG.debug('[%s] Deleting manifest %s' % (image, manifest_dir))
shutil.rmtree(manifest_dir)
# load all remaining manifests and build the set of of in-use blobs,
@ -392,14 +403,14 @@ def delete_image(image_url):
for b in os.listdir(blobs_path)])
delete_blobs = all_blobs.difference(reffed_blobs)
for blob in delete_blobs:
LOG.debug('Deleting layer blob %s' % blob)
LOG.debug('[%s] Deleting layer blob %s' % (image, blob))
os.remove(blob)
# if no files left in manifests_path, delete the whole image
remaining = os.listdir(manifests_path)
if not remaining or remaining == ['.htaccess']:
image_path = os.path.join(IMAGE_EXPORT_DIR, 'v2', image)
LOG.debug('Deleting image directory %s' % image_path)
LOG.debug('[%s] Deleting image directory %s' % (image, image_path))
shutil.rmtree(image_path)
# rebuild the catalog for the current image list

View File

@ -446,7 +446,7 @@ class BaseImageUploader(object):
rauth.raise_for_status()
session.headers['Authorization'] = 'Bearer %s' % rauth.json()['token']
hash_request_id = hashlib.sha1(str(rauth.url).encode())
LOG.info(
LOG.debug(
'Session authenticated: id {}'.format(
hash_request_id.hexdigest()
)
@ -918,7 +918,7 @@ class BaseImageUploader(object):
for layer in source_layers:
if layer in image_layers:
existing_name = image_layers[layer].path.split(':')[0][1:]
LOG.info('Cross repository blob mount %s from %s' %
LOG.info('[%s] Cross repository blob mount from %s' %
(layer, existing_name))
data = {
'mount': layer,
@ -934,7 +934,7 @@ class SkopeoImageUploader(BaseImageUploader):
def upload_image(self, task):
t = task
LOG.info('imagename: %s' % t.image_name)
LOG.info('[%s] Got imagename' % t.image_name)
source_image_local_url = parse.urlparse('containers-storage:%s'
% t.source_image)
@ -958,12 +958,12 @@ class SkopeoImageUploader(BaseImageUploader):
image_exists = self._image_exists(t.target_image,
target_session)
except Exception:
LOG.warning('Failed to check if the target '
'image %s exists' % t.target_image)
LOG.warning('[%s] Failed to check if the target '
'image exists' % t.target_image)
pass
if t.modify_role and image_exists:
LOG.warning('Skipping upload for modified image %s' %
t.target_image)
LOG.warning('[%s] Skipping upload for modified '
'image' % t.target_image)
target_session.close()
return []
@ -984,8 +984,8 @@ class SkopeoImageUploader(BaseImageUploader):
t.target_image_url, self.image_layers, source_layers,
session=target_session)
except Exception:
LOG.error('Failed uploading the target '
'image %s' % t.target_image)
LOG.error('[%s] Failed uploading the target '
'image' % t.target_image)
raise
finally:
source_session.close()
@ -1017,14 +1017,14 @@ class SkopeoImageUploader(BaseImageUploader):
)
for layer in source_layers:
self.image_layers.setdefault(layer, t.target_image_url)
LOG.warning('Completed modify and upload for image %s' %
t.image_name)
LOG.warning('[%s] Completed modify and upload for '
'image' % t.image_name)
else:
self._copy(
t.source_image_url,
t.target_image_url,
)
LOG.warning('Completed upload for image %s' % t.image_name)
LOG.warning('[%s] Completed upload for image' % t.image_name)
for layer in source_layers:
self.image_layers.setdefault(layer, t.target_image_url)
return to_cleanup
@ -1066,7 +1066,7 @@ class SkopeoImageUploader(BaseImageUploader):
def _delete(self, image_url, session=None):
insecure = self.is_insecure_registry(registry_host=image_url.netloc)
image = image_url.geturl()
LOG.info('Deleting %s' % image)
LOG.info('[%s] Deleting image' % image)
cmd = ['skopeo', 'delete']
if insecure:
@ -1091,7 +1091,7 @@ class SkopeoImageUploader(BaseImageUploader):
for image in sorted(local_images):
if not image:
continue
LOG.warning('Removing local copy of %s' % image)
LOG.warning('[%s] Removing local copy of image' % image)
image_url = parse.urlparse('containers-storage:%s' % image)
self._delete(image_url)
@ -1128,30 +1128,30 @@ class PythonImageUploader(BaseImageUploader):
wait=tenacity.wait_random_exponential(multiplier=1, max=10)
)
def _layer_fetch_lock(cls, layer):
LOG.debug('Locking layer %s' % layer)
LOG.debug('[%s] Locking layer' % layer)
while layer in cls.uploader_lock_info:
LOG.debug('%s is being fetched by another thread' % layer)
LOG.debug('[%s] Layer is being fetched by another thread' % layer)
time.sleep(0.5)
LOG.debug('Starting acquire for lock %s' % layer)
LOG.debug('[%s] Starting acquire for lock' % layer)
with cls.uploader_lock:
if layer in cls.uploader_lock_info:
LOG.debug('Collision for lock %s' % layer)
LOG.debug('[%s] Collision for lock' % layer)
raise ImageUploaderThreadException('layer conflict')
LOG.debug('Acquired for lock %s' % layer)
LOG.debug('[%s] Acquired for lock' % layer)
cls.uploader_lock_info.add(layer)
LOG.debug('Updated lock info %s' % layer)
LOG.debug('Got lock on layer %s' % layer)
LOG.debug('[%s] Updated lock info' % layer)
LOG.debug('[%s] Got lock on layer' % layer)
@classmethod
def _layer_fetch_unlock(cls, layer):
LOG.debug('Unlocking layer %s' % layer)
LOG.debug('Starting acquire for lock %s' % layer)
LOG.debug('[%s] Unlocking layer' % layer)
LOG.debug('[%s] Starting acquire for lock' % layer)
with cls.uploader_lock:
LOG.debug('Acquired for unlock %s' % layer)
LOG.debug('[%s] Acquired for unlock' % layer)
if layer in cls.uploader_lock_info:
cls.uploader_lock_info.remove(layer)
LOG.debug('Updated lock info %s' % layer)
LOG.debug('Released lock on layer %s' % layer)
LOG.debug('[%s] Updated lock info' % layer)
LOG.debug('[%s] Released lock on layer' % layer)
def upload_image(self, task):
"""Upload image from a task
@ -1169,7 +1169,7 @@ class PythonImageUploader(BaseImageUploader):
:param: task: UploadTask with container information
"""
t = task
LOG.info('imagename: %s' % t.image_name)
LOG.info('[%s] Starting upload image process' % t.image_name)
source_local = t.source_image.startswith('containers-storage:')
target_image_local_url = parse.urlparse('containers-storage:%s' %
@ -1188,7 +1188,8 @@ class PythonImageUploader(BaseImageUploader):
try:
self._detect_target_export(t.target_image_url, target_session)
except Exception:
LOG.error('Failed uploading the target image %s' % t.target_image)
LOG.error('[%s] Failed uploading the target '
'image' % t.target_image)
# Close the session before raising it for more of retrying perhaps
target_session.close()
raise
@ -1199,8 +1200,8 @@ class PythonImageUploader(BaseImageUploader):
raise NotImplementedError('Modify role not implemented for '
'local containers')
if t.cleanup:
LOG.warning('Cleanup has no effect with a local source '
'container.')
LOG.warning('[%s] Cleanup has no effect with a local source '
'container.' % t.image_name)
try:
source_local_url = parse.urlparse(t.source_image)
@ -1211,7 +1212,7 @@ class PythonImageUploader(BaseImageUploader):
session=target_session
)
except Exception:
LOG.warning('Failed copying the target image %s '
LOG.warning('[%s] Failed copying the target image '
'to the target registry' % t.target_image)
pass
target_session.close()
@ -1223,12 +1224,12 @@ class PythonImageUploader(BaseImageUploader):
image_exists = self._image_exists(t.target_image,
target_session)
except Exception:
LOG.warning('Failed to check if the target '
'image %s exists' % t.target_image)
LOG.warning('[%s] Failed to check if the target '
'image exists' % t.target_image)
pass
if image_exists:
LOG.warning('Skipping upload for modified image %s' %
t.target_image)
LOG.warning('[%s] Skipping upload for modified image %s' %
(t.image_name, t.target_image))
target_session.close()
return []
copy_target_url = t.target_image_source_tag_url
@ -1269,8 +1270,8 @@ class PythonImageUploader(BaseImageUploader):
multi_arch=t.multi_arch
)
except Exception:
LOG.error('Failed uploading the target '
'image %s' % t.target_image)
LOG.error('[%s] Failed uploading the target '
'image' % t.target_image)
# Close the sessions before raising it for more of
# retrying perhaps
source_session.close()
@ -1278,13 +1279,10 @@ class PythonImageUploader(BaseImageUploader):
raise
if not t.modify_role:
LOG.warning('Completed upload for image %s' % t.image_name)
LOG.info('[%s] Completed upload for image' % t.image_name)
else:
LOG.info(
'Copy ummodified imagename: "{}" from target to local'.format(
t.image_name
)
)
LOG.info('[%s] Copy ummodified image from target to local' %
t.image_name)
try:
self._copy_registry_to_local(t.target_image_source_tag_url)
@ -1312,24 +1310,23 @@ class PythonImageUploader(BaseImageUploader):
t.target_image_url,
session=target_session
)
LOG.info('[%s] Completed modify and upload for image' %
t.image_name)
except Exception:
LOG.error('Failed processing the target '
'image %s' % t.target_image)
LOG.error('[%s] Failed processing the target '
'image' % t.target_image)
# Close the sessions before raising it for more of
# retrying perhaps
source_session.close()
target_session.close()
raise
LOG.warning('Completed modify and upload for image %s' %
t.image_name)
try:
for layer in source_layers:
self.image_layers.setdefault(layer, t.target_image_url)
except Exception:
LOG.warning('Failed setting default layer for the target '
'image %s' % t.target_image)
LOG.warning('[%s] Failed setting default layer %s for the '
'target image' % (t.target_image, layer))
pass
target_session.close()
source_session.close()
@ -1448,7 +1445,6 @@ class PythonImageUploader(BaseImageUploader):
)
def _layer_stream_registry(cls, digest, source_url, calc_digest,
session):
LOG.debug('Fetching layer: %s' % digest)
image, tag = cls._image_tag_from_url(source_url)
parts = {
'image': image,
@ -1459,16 +1455,21 @@ class PythonImageUploader(BaseImageUploader):
source_url, CALL_BLOB % parts)
# NOTE(aschultz): We specify None and let requests figure it out
chunk_size = None
LOG.info("[%s] Fetching layer %s from %s" %
(image, digest, source_blob_url))
with session.get(
source_blob_url, stream=True, timeout=30) as blob_req:
# TODO(aschultz): unsure if necessary or if only when using .text
blob_req.encoding = 'utf-8'
cls.check_status(session=session, request=blob_req)
for data in blob_req.iter_content(chunk_size):
LOG.debug("[%s] Read %i bytes for %s" %
(image, len(data), digest))
if not data:
break
calc_digest.update(data)
yield data
LOG.info("[%s] Done fetching layer %s from registry" % (image, digest))
@classmethod
@tenacity.retry( # Retry up to 5 times with jittered exponential backoff
@ -1498,7 +1499,7 @@ class PythonImageUploader(BaseImageUploader):
raise
digest = layer_entry['digest']
LOG.debug('Uploading layer: %s' % digest)
LOG.debug('[%s] Uploading layer' % digest)
calc_digest = hashlib.sha256()
try:
@ -1546,6 +1547,8 @@ class PythonImageUploader(BaseImageUploader):
# Upload all layers
copy_jobs = []
jobs_count = 0
jobs_finished = 0
with futures.ThreadPoolExecutor(max_workers=4) as p:
if source_layers:
for layer in source_layers:
@ -1556,26 +1559,33 @@ class PythonImageUploader(BaseImageUploader):
source_session=source_session,
target_session=target_session
))
jobs_count = len(copy_jobs)
LOG.debug('[%s] Waiting for %i jobs to finish' %
(image, jobs_count))
for job in futures.as_completed(copy_jobs):
e = job.exception()
if e:
raise e
image = job.result()
if image:
LOG.debug('Upload complete for layer: %s' % image)
layer = job.result()
if layer:
LOG.debug('[%s] Upload complete for layer %s' %
(image, layer))
jobs_finished += 1
LOG.debug('[%s] Waiting for next job: %i of %i complete' %
(image, jobs_finished, jobs_count))
LOG.debug('[%s] Completed %i jobs' % (image, jobs_count))
for source_manifest in source_manifests:
manifest = json.loads(source_manifest)
LOG.debug(
'Current image manifest: [%s]' % json.dumps(
manifest,
indent=4
)
)
LOG.debug('[%s] Current image manifest: [%s]' %
(image, json.dumps(manifest, indent=4)))
config_str = None
if manifest.get('mediaType') == MEDIA_MANIFEST_V2:
config_digest = manifest['config']['digest']
LOG.debug('Uploading config with digest: %s' % config_digest)
LOG.debug('[%s] Uploading config with digest: %s' %
(image, config_digest))
parts['digest'] = config_digest
source_config_url = cls._build_url(
@ -1599,6 +1609,7 @@ class PythonImageUploader(BaseImageUploader):
target_session=target_session,
multi_arch=multi_arch
)
LOG.debug('[%s] Finished copying image' % image)
@classmethod
def _copy_manifest_config_to_registry(cls, target_url,
@ -1658,8 +1669,8 @@ class PythonImageUploader(BaseImageUploader):
manifest_url = cls._build_url(
target_url, CALL_MANIFEST % parts)
LOG.debug('Uploading manifest of type %s to: %s' % (
manifest_type, manifest_url))
LOG.debug('[%s] Uploading manifest of type %s to: %s' %
(image, manifest_type, manifest_url))
r = target_session.put(
manifest_url,
@ -1730,7 +1741,8 @@ class PythonImageUploader(BaseImageUploader):
blob_url = cls._build_url(
target_url, CALL_BLOB % parts)
if session.head(blob_url, timeout=30).status_code == 200:
LOG.debug('Layer already exists: %s' % l['digest'])
LOG.debug('[%s] Layer already exists: %s' %
(image, l['digest']))
layer['digest'] = l['digest']
if 'size' in l:
layer['size'] = l['size']
@ -1741,7 +1753,7 @@ class PythonImageUploader(BaseImageUploader):
@classmethod
def _layer_stream_local(cls, layer_id, calc_digest):
LOG.debug('Exporting layer: %s' % layer_id)
LOG.debug('[%s] Exporting layer' % layer_id)
tar_split_path = cls._containers_file_path(
'overlay-layers',
@ -1806,7 +1818,7 @@ class PythonImageUploader(BaseImageUploader):
return
layer_id = layer_entry['id']
LOG.debug('Uploading layer: %s' % layer_id)
LOG.debug('[%s] Uploading layer' % layer_id)
calc_digest = hashlib.sha256()
layer_stream = cls._layer_stream_local(layer_id, calc_digest)
@ -1848,7 +1860,7 @@ class PythonImageUploader(BaseImageUploader):
length += chunk_length
layer_digest = 'sha256:%s' % calc_digest.hexdigest()
LOG.debug('Calculated layer digest: %s' % layer_digest)
LOG.debug('[%s] Calculated layer digest' % layer_digest)
upload_url = cls._upload_url(
target_url, session, upload_resp)
upload_resp = session.put(
@ -1888,6 +1900,8 @@ class PythonImageUploader(BaseImageUploader):
# Upload all layers
copy_jobs = []
jobs_count = 0
jobs_finished = 0
with futures.ThreadPoolExecutor(max_workers=4) as p:
for layer in manifest['layers']:
layer_entry = layers_by_digest[layer['digest']]
@ -1895,13 +1909,22 @@ class PythonImageUploader(BaseImageUploader):
cls._copy_layer_local_to_registry,
target_url, session, layer, layer_entry
))
jobs_count = len(copy_jobs)
LOG.debug('[%s] Waiting for %i jobs to finish' %
(name, jobs_count))
for job in futures.as_completed(copy_jobs):
e = job.exception()
if e:
raise e
image = job.result()
if image:
LOG.debug('Upload complete for layer: %s' % image)
layer = job.result()
if layer:
LOG.debug('[%s] Upload complete for layer: %s' %
(name, layer))
jobs_finished += 1
LOG.debug('[%s] Waiting for next job: %i of %i complete' %
(name, jobs_finished, jobs_count))
LOG.debug('[%s] Completed %i jobs' % (name, jobs_count))
manifest_str = json.dumps(manifest, indent=3)
cls._copy_manifest_config_to_registry(
@ -1910,6 +1933,7 @@ class PythonImageUploader(BaseImageUploader):
config_str=config_str,
target_session=session
)
LOG.debug('[%s] Finished copying' % name)
@classmethod
def _containers_file_path(cls, *path):
@ -1999,7 +2023,7 @@ class PythonImageUploader(BaseImageUploader):
@classmethod
def _delete(cls, image_url, session=None):
image = image_url.geturl()
LOG.info('Deleting %s' % image)
LOG.info('[%s] Deleting image' % image)
if image_url.scheme == 'docker':
return cls._delete_from_registry(image_url, session)
if image_url.scheme != 'containers-storage':
@ -2024,7 +2048,7 @@ class PythonImageUploader(BaseImageUploader):
for image in sorted(local_images):
if not image:
continue
LOG.warning('Removing local copy of %s' % image)
LOG.info('[%s] Removing local copy of image' % image)
image_url = parse.urlparse('containers-storage:%s' % image)
self._delete(image_url)