Log exceptions when checking status
This change adds logging whenever a request has a status code of >= 300. The check_status method will now create an SHA1 ID from the request URL and log the status, reason, text, and headers (in debug) whenever a request has a return code of >= 300. This will allow us to better debug image processing issues, even when multiprocessing requests. Deployers will now have the ability to track down issues to specific images and endpoints. Session creation was moved into a single object class to ensure we're managing sessions in a uniform way. Change-Id: If4e84a0273e295267248559c63ae994a5a826004 Signed-off-by: Kevin Carter <kecarter@redhat.com>
This commit is contained in:
parent
314917b2a9
commit
e06cf4482d
|
@ -17,6 +17,7 @@ import collections
|
|||
import hashlib
|
||||
import json
|
||||
import os
|
||||
import requests
|
||||
import shutil
|
||||
|
||||
from oslo_log import log as logging
|
||||
|
@ -76,7 +77,7 @@ def image_tag_from_url(image_url):
|
|||
|
||||
|
||||
def export_stream(target_url, layer, layer_stream, verify_digest=True):
|
||||
image, tag = image_tag_from_url(target_url)
|
||||
image, _ = image_tag_from_url(target_url)
|
||||
digest = layer['digest']
|
||||
blob_dir_path = os.path.join(IMAGE_EXPORT_DIR, 'v2', image, 'blobs')
|
||||
make_dir(blob_dir_path)
|
||||
|
@ -86,37 +87,54 @@ def export_stream(target_url, layer, layer_stream, verify_digest=True):
|
|||
|
||||
length = 0
|
||||
calc_digest = hashlib.sha256()
|
||||
|
||||
try:
|
||||
with open(blob_path, 'w+b') as f:
|
||||
with open(blob_path, 'wb') as f:
|
||||
for chunk in layer_stream:
|
||||
if not chunk:
|
||||
break
|
||||
f.write(chunk)
|
||||
calc_digest.update(chunk)
|
||||
length += len(chunk)
|
||||
|
||||
layer_digest = 'sha256:%s' % calc_digest.hexdigest()
|
||||
LOG.debug('Calculated layer digest: %s' % layer_digest)
|
||||
|
||||
if verify_digest:
|
||||
if digest != layer_digest:
|
||||
raise IOError('Expected digest %s '
|
||||
'does not match calculated %s' %
|
||||
(digest, layer_digest))
|
||||
else:
|
||||
# if the original layer is uncompressed
|
||||
# the digest may change on export
|
||||
expected_blob_path = os.path.join(
|
||||
blob_dir_path, '%s.gz' % layer_digest)
|
||||
if blob_path != expected_blob_path:
|
||||
os.rename(blob_path, expected_blob_path)
|
||||
|
||||
except Exception as e:
|
||||
LOG.error('Error while writing blob %s' % blob_path)
|
||||
# cleanup blob file
|
||||
write_error = 'Write Failure: {}'.format(str(e))
|
||||
LOG.error(write_error)
|
||||
if os.path.isfile(blob_path):
|
||||
os.remove(blob_path)
|
||||
raise e
|
||||
LOG.error('Broken layer found and removed: %s' % blob_path)
|
||||
raise IOError(write_error)
|
||||
else:
|
||||
LOG.info('Layer written successfully: %s' % blob_path)
|
||||
|
||||
layer_digest = 'sha256:%s' % calc_digest.hexdigest()
|
||||
LOG.debug('Provided layer digest: %s' % digest)
|
||||
LOG.debug('Calculated layer digest: %s' % layer_digest)
|
||||
|
||||
if verify_digest:
|
||||
if digest != layer_digest:
|
||||
hash_request_id = hashlib.sha1(str(target_url.geturl()).encode())
|
||||
error_msg = (
|
||||
'Image ID: %s, Expected digest "%s" does not match'
|
||||
' calculated digest "%s", Blob path "%s". Blob'
|
||||
' path will be cleaned up.' % (
|
||||
hash_request_id.hexdigest(),
|
||||
digest,
|
||||
layer_digest,
|
||||
blob_path
|
||||
)
|
||||
)
|
||||
LOG.error(error_msg)
|
||||
if os.path.isfile(blob_path):
|
||||
os.remove(blob_path)
|
||||
raise requests.exceptions.HTTPError(error_msg)
|
||||
else:
|
||||
# if the original layer is uncompressed
|
||||
# the digest may change on export
|
||||
expected_blob_path = os.path.join(
|
||||
blob_dir_path, '%s.gz' % layer_digest
|
||||
)
|
||||
if blob_path != expected_blob_path:
|
||||
os.rename(blob_path, expected_blob_path)
|
||||
|
||||
layer['digest'] = layer_digest
|
||||
layer['size'] = length
|
||||
|
|
|
@ -22,6 +22,7 @@ import os
|
|||
import re
|
||||
import requests
|
||||
from requests import auth as requests_auth
|
||||
from requests.adapters import HTTPAdapter
|
||||
import shutil
|
||||
import six
|
||||
from six.moves.urllib import parse
|
||||
|
@ -106,6 +107,38 @@ def get_undercloud_registry():
|
|||
return '%s:%s' % (common_utils.bracket_ipv6(addr), '8787')
|
||||
|
||||
|
||||
class MakeSession(object):
|
||||
"""Class method to uniformly create sessions.
|
||||
|
||||
Sessions created by this class will retry on errors with an exponential
|
||||
backoff before raising an exception. Because our primary interaction is
|
||||
with the container registries the adapter will also retry on 401 and
|
||||
404. This is being done because registries commonly return 401 when an
|
||||
image is not found, which is commonly a cache miss. See the adapter
|
||||
definitions for more on retry details.
|
||||
"""
|
||||
def __init__(self, verify=True):
|
||||
self.session = requests.Session()
|
||||
self.session.verify = verify
|
||||
adapter = HTTPAdapter(
|
||||
max_retries=8,
|
||||
pool_connections=24,
|
||||
pool_maxsize=24,
|
||||
pool_block=False
|
||||
)
|
||||
self.session.mount('http://', adapter)
|
||||
self.session.mount('https://', adapter)
|
||||
|
||||
def create(self):
|
||||
return self.__enter__()
|
||||
|
||||
def __enter__(self):
|
||||
return self.session
|
||||
|
||||
def __exit__(self, *args, **kwargs):
|
||||
self.session.close()
|
||||
|
||||
|
||||
class ImageUploadManager(BaseImageManager):
|
||||
"""Manage the uploading of image files
|
||||
|
||||
|
@ -128,14 +161,16 @@ class ImageUploadManager(BaseImageManager):
|
|||
self.cleanup = cleanup
|
||||
if mirrors:
|
||||
for uploader in self.uploaders.values():
|
||||
uploader.mirrors.update(mirrors)
|
||||
if hasattr(uploader, 'mirrors'):
|
||||
uploader.mirrors.update(mirrors)
|
||||
if registry_credentials:
|
||||
self.validate_registry_credentials(registry_credentials)
|
||||
for uploader in self.uploaders.values():
|
||||
uploader.registry_credentials = registry_credentials
|
||||
self.multi_arch = multi_arch
|
||||
|
||||
def validate_registry_credentials(self, creds_data):
|
||||
@staticmethod
|
||||
def validate_registry_credentials(creds_data):
|
||||
if not isinstance(creds_data, dict):
|
||||
raise TypeError('Credentials data must be a dict')
|
||||
for registry, cred_entry in creds_data.items():
|
||||
|
@ -166,7 +201,8 @@ class ImageUploadManager(BaseImageManager):
|
|||
def get_uploader(self, uploader):
|
||||
return self.uploader(uploader)
|
||||
|
||||
def get_push_destination(self, item):
|
||||
@staticmethod
|
||||
def get_push_destination(item):
|
||||
push_destination = item.get('push_destination')
|
||||
if not push_destination:
|
||||
return get_undercloud_registry()
|
||||
|
@ -256,15 +292,15 @@ class BaseImageUploader(object):
|
|||
def run_modify_playbook(cls, modify_role, modify_vars,
|
||||
source_image, target_image, append_tag,
|
||||
container_build_tool='buildah'):
|
||||
vars = {}
|
||||
run_vars = {}
|
||||
if modify_vars:
|
||||
vars.update(modify_vars)
|
||||
vars['source_image'] = source_image
|
||||
vars['target_image'] = target_image
|
||||
vars['modified_append_tag'] = append_tag
|
||||
vars['container_build_tool'] = container_build_tool
|
||||
run_vars.update(modify_vars)
|
||||
run_vars['source_image'] = source_image
|
||||
run_vars['target_image'] = target_image
|
||||
run_vars['modified_append_tag'] = append_tag
|
||||
run_vars['container_build_tool'] = container_build_tool
|
||||
LOG.info('Playbook variables: \n%s' % yaml.safe_dump(
|
||||
vars, default_flow_style=False))
|
||||
run_vars, default_flow_style=False))
|
||||
playbook = [{
|
||||
'hosts': 'localhost',
|
||||
'tasks': [{
|
||||
|
@ -272,7 +308,7 @@ class BaseImageUploader(object):
|
|||
'import_role': {
|
||||
'name': modify_role
|
||||
},
|
||||
'vars': vars
|
||||
'vars': run_vars
|
||||
}]
|
||||
}]
|
||||
LOG.info('Playbook: \n%s' % yaml.safe_dump(
|
||||
|
@ -350,11 +386,15 @@ class BaseImageUploader(object):
|
|||
session=None):
|
||||
netloc = image_url.netloc
|
||||
image, tag = self._image_tag_from_url(image_url)
|
||||
self.is_insecure_registry(netloc)
|
||||
self.is_insecure_registry(registry_host=netloc)
|
||||
url = self._build_url(image_url, path='/')
|
||||
verify = (netloc not in self.no_verify_registries)
|
||||
if not session:
|
||||
session = MakeSession(verify=verify).create()
|
||||
else:
|
||||
session.headers.pop('Authorization', None)
|
||||
session.verify = verify
|
||||
|
||||
session = requests.Session()
|
||||
session.verify = (netloc not in self.no_verify_registries)
|
||||
r = session.get(url, timeout=30)
|
||||
LOG.debug('%s status code %s' % (url, r.status_code))
|
||||
if r.status_code == 200:
|
||||
|
@ -376,12 +416,20 @@ class BaseImageUploader(object):
|
|||
token_param['service'] = re.search(
|
||||
'service="(.*?)"', www_auth).group(1)
|
||||
token_param['scope'] = 'repository:%s:pull' % image[1:]
|
||||
|
||||
auth = None
|
||||
if username:
|
||||
auth = requests_auth.HTTPBasicAuth(username, password)
|
||||
LOG.debug('Token parameters: params {}'.format(token_param))
|
||||
rauth = session.get(realm, params=token_param, auth=auth, timeout=30)
|
||||
rauth.raise_for_status()
|
||||
session.headers['Authorization'] = 'Bearer %s' % rauth.json()['token']
|
||||
hash_request_id = hashlib.sha1(str(rauth.url).encode())
|
||||
LOG.info(
|
||||
'Session authenticated: id {}'.format(
|
||||
hash_request_id.hexdigest()
|
||||
)
|
||||
)
|
||||
setattr(session, 'reauthenticate', self.authenticate)
|
||||
setattr(
|
||||
session,
|
||||
|
@ -396,14 +444,93 @@ class BaseImageUploader(object):
|
|||
return session
|
||||
|
||||
@staticmethod
|
||||
def check_status(session, request):
|
||||
if hasattr(session, 'reauthenticate'):
|
||||
if request.status_code == 401:
|
||||
session.reauthenticate(**session.auth_args)
|
||||
if hasattr(request, 'text'):
|
||||
raise requests.exceptions.HTTPError(request.text)
|
||||
else:
|
||||
raise SystemError()
|
||||
def _get_response_text(response, encoding='utf-8', force_encoding=False):
|
||||
"""Return request response text
|
||||
|
||||
We need to set the encoding for the response other wise it
|
||||
will attempt to detect the encoding which is very time consuming.
|
||||
See https://github.com/psf/requests/issues/4235 for additional
|
||||
context.
|
||||
|
||||
:param: response: requests Respoinse object
|
||||
:param: encoding: encoding to set if not currently set
|
||||
:param: force_encoding: set response encoding always
|
||||
"""
|
||||
|
||||
if force_encoding or not response.encoding:
|
||||
response.encoding = encoding
|
||||
return response.text
|
||||
|
||||
@staticmethod
|
||||
def check_status(session, request, allow_reauth=True):
|
||||
hash_request_id = hashlib.sha1(str(request.url).encode())
|
||||
request_id = hash_request_id.hexdigest()
|
||||
text = getattr(request, 'text', 'unknown')
|
||||
reason = getattr(request, 'reason', 'unknown')
|
||||
status_code = getattr(request, 'status_code', None)
|
||||
headers = getattr(request, 'headers', {})
|
||||
session_headers = getattr(session, 'headers', {})
|
||||
|
||||
if status_code >= 300:
|
||||
LOG.info(
|
||||
'Non-2xx: id {}, status {}, reason {}, text {}'.format(
|
||||
request_id,
|
||||
status_code,
|
||||
reason,
|
||||
text
|
||||
)
|
||||
)
|
||||
|
||||
if status_code == 401:
|
||||
LOG.warning(
|
||||
'Failure: id {}, status {}, reason {} text {}'.format(
|
||||
request_id,
|
||||
status_code,
|
||||
reason,
|
||||
text
|
||||
)
|
||||
)
|
||||
LOG.debug(
|
||||
'Request headers after 401: id {}, headers {}'.format(
|
||||
request_id,
|
||||
headers
|
||||
)
|
||||
)
|
||||
LOG.debug(
|
||||
'Session headers after 401: id {}, headers {}'.format(
|
||||
request_id,
|
||||
session_headers
|
||||
)
|
||||
)
|
||||
|
||||
www_auth = headers.get(
|
||||
'www-authenticate',
|
||||
headers.get(
|
||||
'Www-Authenticate'
|
||||
)
|
||||
)
|
||||
if www_auth:
|
||||
error = None
|
||||
if 'error=' in www_auth:
|
||||
error = re.search('error="(.*?)"', www_auth).group(1)
|
||||
LOG.warning(
|
||||
'Error detected in auth headers: error {}'.format(
|
||||
error
|
||||
)
|
||||
)
|
||||
if error == 'invalid_token' and allow_reauth:
|
||||
if hasattr(session, 'reauthenticate'):
|
||||
reauth = int(session.headers.get('_TripleOReAuth', 0))
|
||||
reauth += 1
|
||||
session.headers['_TripleOReAuth'] = str(reauth)
|
||||
LOG.warning(
|
||||
'Re-authenticating: id {}, count {}'.format(
|
||||
request_id,
|
||||
reauth
|
||||
)
|
||||
)
|
||||
session.reauthenticate(**session.auth_args)
|
||||
|
||||
request.raise_for_status()
|
||||
|
||||
@classmethod
|
||||
|
@ -413,10 +540,10 @@ class BaseImageUploader(object):
|
|||
mirror = cls.mirrors[netloc]
|
||||
return '%sv2%s' % (mirror, path)
|
||||
else:
|
||||
if netloc in cls.insecure_registries:
|
||||
scheme = 'http'
|
||||
else:
|
||||
if not cls.is_insecure_registry(registry_host=netloc):
|
||||
scheme = 'https'
|
||||
else:
|
||||
scheme = 'http'
|
||||
if netloc == 'docker.io':
|
||||
netloc = 'registry-1.docker.io'
|
||||
return '%s://%s/v2%s' % (scheme, netloc, path)
|
||||
|
@ -469,7 +596,7 @@ class BaseImageUploader(object):
|
|||
tags_r = tags_f.result()
|
||||
cls.check_status(session=session, request=tags_r)
|
||||
|
||||
manifest_str = manifest_r.text
|
||||
manifest_str = cls._get_response_text(manifest_r)
|
||||
|
||||
if 'Docker-Content-Digest' in manifest_r.headers:
|
||||
digest = manifest_r.headers['Docker-Content-Digest']
|
||||
|
@ -524,7 +651,7 @@ class BaseImageUploader(object):
|
|||
}
|
||||
|
||||
def list(self, registry, session=None):
|
||||
self.is_insecure_registry(registry)
|
||||
self.is_insecure_registry(registry_host=registry)
|
||||
url = self._image_to_url(registry)
|
||||
catalog_url = self._build_url(
|
||||
url, CALL_CATALOG
|
||||
|
@ -537,7 +664,7 @@ class BaseImageUploader(object):
|
|||
else:
|
||||
raise ImageUploaderException(
|
||||
'Image registry made invalid response: %s' %
|
||||
(catalog_resp.status_code)
|
||||
catalog_resp.status_code
|
||||
)
|
||||
|
||||
tags_get_args = []
|
||||
|
@ -601,7 +728,7 @@ class BaseImageUploader(object):
|
|||
fallback_tag=None):
|
||||
labels = i.get('Labels', {})
|
||||
|
||||
if(hasattr(labels, 'keys')):
|
||||
if hasattr(labels, 'keys'):
|
||||
label_keys = ', '.join(labels.keys())
|
||||
else:
|
||||
label_keys = ""
|
||||
|
@ -626,7 +753,7 @@ class BaseImageUploader(object):
|
|||
)
|
||||
else:
|
||||
tag_label = None
|
||||
if(isinstance(labels, dict)):
|
||||
if isinstance(labels, dict):
|
||||
tag_label = labels.get(tag_from_label)
|
||||
if tag_label is None:
|
||||
if fallback_tag:
|
||||
|
@ -651,7 +778,7 @@ class BaseImageUploader(object):
|
|||
|
||||
# prime self.insecure_registries by testing every image
|
||||
for url in image_urls:
|
||||
self.is_insecure_registry(url)
|
||||
self.is_insecure_registry(registry_host=url)
|
||||
|
||||
discover_args = []
|
||||
for image in images:
|
||||
|
@ -667,7 +794,7 @@ class BaseImageUploader(object):
|
|||
def discover_image_tag(self, image, tag_from_label=None,
|
||||
fallback_tag=None, username=None, password=None):
|
||||
image_url = self._image_to_url(image)
|
||||
self.is_insecure_registry(image_url.netloc)
|
||||
self.is_insecure_registry(registry_host=image_url.netloc)
|
||||
session = self.authenticate(
|
||||
image_url, username=username, password=password)
|
||||
|
||||
|
@ -680,7 +807,7 @@ class BaseImageUploader(object):
|
|||
images_with_labels = []
|
||||
for image in images:
|
||||
url = self._image_to_url(image)
|
||||
self.is_insecure_registry(url.netloc)
|
||||
self.is_insecure_registry(registry_host=url.netloc)
|
||||
session = self.authenticate(
|
||||
url, username=username, password=password)
|
||||
image_labels = self._image_labels(
|
||||
|
@ -699,19 +826,23 @@ class BaseImageUploader(object):
|
|||
# prime insecure_registries
|
||||
if task.pull_source:
|
||||
self.is_insecure_registry(
|
||||
self._image_to_url(task.pull_source).netloc)
|
||||
registry_host=self._image_to_url(task.pull_source).netloc
|
||||
)
|
||||
else:
|
||||
self.is_insecure_registry(
|
||||
self._image_to_url(task.image_name).netloc)
|
||||
registry_host=self._image_to_url(task.image_name).netloc
|
||||
)
|
||||
self.is_insecure_registry(
|
||||
self._image_to_url(task.push_destination).netloc)
|
||||
registry_host=self._image_to_url(task.push_destination).netloc
|
||||
)
|
||||
self.upload_tasks.append((self, task))
|
||||
|
||||
def is_insecure_registry(self, registry_host):
|
||||
if registry_host in self.secure_registries:
|
||||
@classmethod
|
||||
def is_insecure_registry(cls, registry_host):
|
||||
if registry_host in cls.secure_registries:
|
||||
return False
|
||||
if (registry_host in self.insecure_registries or
|
||||
registry_host in self.no_verify_registries):
|
||||
if (registry_host in cls.insecure_registries or
|
||||
registry_host in cls.no_verify_registries):
|
||||
return True
|
||||
with requests.Session() as s:
|
||||
try:
|
||||
|
@ -722,7 +853,7 @@ class BaseImageUploader(object):
|
|||
try:
|
||||
s.get('https://%s/v2' % registry_host, timeout=30,
|
||||
verify=False)
|
||||
self.no_verify_registries.add(registry_host)
|
||||
cls.no_verify_registries.add(registry_host)
|
||||
# Techinically these type of registries are insecure when
|
||||
# the container engine tries to do a pull. The python
|
||||
# uploader ignores the certificate problem, but they are
|
||||
|
@ -731,14 +862,14 @@ class BaseImageUploader(object):
|
|||
return True
|
||||
except requests.exceptions.SSLError:
|
||||
# So nope, it's really not a certificate verification issue
|
||||
self.insecure_registries.add(registry_host)
|
||||
cls.insecure_registries.add(registry_host)
|
||||
return True
|
||||
except Exception:
|
||||
# for any other error assume it is a secure registry, because:
|
||||
# - it is secure registry
|
||||
# - the host is not accessible
|
||||
pass
|
||||
self.secure_registries.add(registry_host)
|
||||
cls.secure_registries.add(registry_host)
|
||||
return False
|
||||
|
||||
@classmethod
|
||||
|
@ -897,9 +1028,8 @@ class SkopeoImageUploader(BaseImageUploader):
|
|||
(' '.join(cmd), err))
|
||||
return out
|
||||
|
||||
@classmethod
|
||||
def _delete(cls, image_url, session=None):
|
||||
insecure = cls.is_insecure_registry(image_url.netloc)
|
||||
def _delete(self, image_url, session=None):
|
||||
insecure = self.is_insecure_registry(registry_host=image_url.netloc)
|
||||
image = image_url.geturl()
|
||||
LOG.info('Deleting %s' % image)
|
||||
cmd = ['skopeo', 'delete']
|
||||
|
@ -937,14 +1067,11 @@ class SkopeoImageUploader(BaseImageUploader):
|
|||
|
||||
# Pull a single image first, to avoid duplicate pulls of the
|
||||
# same base layers
|
||||
uploader, first_task = self.upload_tasks.pop()
|
||||
result = uploader.upload_image(first_task)
|
||||
local_images.extend(result)
|
||||
local_images.extend(upload_task(args=self.upload_tasks.pop()))
|
||||
|
||||
# workers will be half the CPU count, to a minimum of 2
|
||||
workers = max(2, processutils.get_worker_count() // 2)
|
||||
workers = max(2, (processutils.get_worker_count() - 1))
|
||||
p = futures.ThreadPoolExecutor(max_workers=workers)
|
||||
|
||||
for result in p.map(upload_task, self.upload_tasks):
|
||||
local_images.extend(result)
|
||||
LOG.info('result %s' % local_images)
|
||||
|
@ -1021,7 +1148,11 @@ class PythonImageUploader(BaseImageUploader):
|
|||
if not t.modify_role:
|
||||
LOG.warning('Completed upload for image %s' % t.image_name)
|
||||
else:
|
||||
# Copy ummodified from target to local
|
||||
LOG.info(
|
||||
'Copy ummodified imagename: "{}" from target to local'.format(
|
||||
t.image_name
|
||||
)
|
||||
)
|
||||
self._copy_registry_to_local(t.target_image_source_tag_url)
|
||||
|
||||
if t.cleanup in (CLEANUP_FULL, CLEANUP_PARTIAL):
|
||||
|
@ -1073,7 +1204,7 @@ class PythonImageUploader(BaseImageUploader):
|
|||
return False
|
||||
|
||||
# detect if the registry is push-capable by requesting an upload URL.
|
||||
image, tag = cls._image_tag_from_url(image_url)
|
||||
image, _ = cls._image_tag_from_url(image_url)
|
||||
upload_req_url = cls._build_url(
|
||||
image_url,
|
||||
path=CALL_UPLOAD % {'image': image})
|
||||
|
@ -1111,12 +1242,12 @@ class PythonImageUploader(BaseImageUploader):
|
|||
if r.status_code in (403, 404):
|
||||
raise ImageNotFoundException('Not found image: %s' % url)
|
||||
cls.check_status(session=session, request=r)
|
||||
return r.text
|
||||
return cls._get_response_text(r)
|
||||
|
||||
def _collect_manifests_layers(cls, image_url, session,
|
||||
def _collect_manifests_layers(self, image_url, session,
|
||||
manifests_str, layers,
|
||||
multi_arch):
|
||||
manifest_str = cls._fetch_manifest(
|
||||
manifest_str = self._fetch_manifest(
|
||||
image_url,
|
||||
session=session,
|
||||
multi_arch=multi_arch
|
||||
|
@ -1133,7 +1264,7 @@ class PythonImageUploader(BaseImageUploader):
|
|||
for man in manifest.get('manifests', []):
|
||||
# replace image tag with the manifest hash in the list
|
||||
man_url = parse.urlparse('%s@%s' % (image, man['digest']))
|
||||
cls._collect_manifests_layers(
|
||||
self._collect_manifests_layers(
|
||||
man_url, session, manifests_str, layers,
|
||||
multi_arch=False
|
||||
)
|
||||
|
@ -1182,6 +1313,8 @@ class PythonImageUploader(BaseImageUploader):
|
|||
chunk_size = 2 ** 20
|
||||
with session.get(
|
||||
source_blob_url, stream=True, timeout=30) as blob_req:
|
||||
# TODO(aschultz): unsure if necessary or if only when using .text
|
||||
blob_req.encoding = 'utf-8'
|
||||
cls.check_status(session=session, request=blob_req)
|
||||
for data in blob_req.iter_content(chunk_size):
|
||||
if not data:
|
||||
|
@ -1268,6 +1401,12 @@ class PythonImageUploader(BaseImageUploader):
|
|||
|
||||
for source_manifest in source_manifests:
|
||||
manifest = json.loads(source_manifest)
|
||||
LOG.debug(
|
||||
'Current image manifest: [%s]' % json.dumps(
|
||||
manifest,
|
||||
indent=4
|
||||
)
|
||||
)
|
||||
config_str = None
|
||||
if manifest.get('mediaType') == MEDIA_MANIFEST_V2:
|
||||
config_digest = manifest['config']['digest']
|
||||
|
@ -1275,11 +1414,16 @@ class PythonImageUploader(BaseImageUploader):
|
|||
|
||||
parts['digest'] = config_digest
|
||||
source_config_url = cls._build_url(
|
||||
source_url, CALL_BLOB % parts)
|
||||
source_url,
|
||||
CALL_BLOB % parts
|
||||
)
|
||||
|
||||
r = source_session.get(source_config_url, timeout=30)
|
||||
cls.check_status(session=source_session, request=r)
|
||||
config_str = r.text
|
||||
cls.check_status(
|
||||
session=source_session,
|
||||
request=r
|
||||
)
|
||||
config_str = cls._get_response_text(r)
|
||||
manifest['config']['size'] = len(config_str)
|
||||
manifest['config']['mediaType'] = MEDIA_CONFIG
|
||||
|
||||
|
@ -1361,7 +1505,7 @@ class PythonImageUploader(BaseImageUploader):
|
|||
}
|
||||
)
|
||||
if r.status_code == 400:
|
||||
LOG.error(r.text)
|
||||
LOG.error(cls._get_response_text(r))
|
||||
raise ImageUploaderException('Pushing manifest failed')
|
||||
cls.check_status(session=target_session, request=r)
|
||||
|
||||
|
@ -1374,25 +1518,34 @@ class PythonImageUploader(BaseImageUploader):
|
|||
def _copy_registry_to_local(cls, source_url):
|
||||
cls._assert_scheme(source_url, 'docker')
|
||||
pull_source = source_url.netloc + source_url.path
|
||||
LOG.info('Pulling %s' % pull_source)
|
||||
cmd = ['buildah', 'pull']
|
||||
cmd = ['buildah', '--debug', 'pull']
|
||||
|
||||
if source_url.netloc in [cls.insecure_registries,
|
||||
cls.no_verify_registries]:
|
||||
cmd.append('--tls-verify=false')
|
||||
|
||||
cmd.append(pull_source)
|
||||
LOG.info('Pulling %s' % pull_source)
|
||||
LOG.info('Running %s' % ' '.join(cmd))
|
||||
env = os.environ.copy()
|
||||
process = subprocess.Popen(cmd, env=env, stdout=subprocess.PIPE,
|
||||
universal_newlines=True)
|
||||
|
||||
process = subprocess.Popen(
|
||||
cmd,
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.PIPE,
|
||||
universal_newlines=True,
|
||||
close_fds=True
|
||||
)
|
||||
out, err = process.communicate()
|
||||
LOG.info(out)
|
||||
if process.returncode != 0:
|
||||
LOG.error('Error pulling image:\n%s\n%s' %
|
||||
(' '.join(cmd), err))
|
||||
raise ImageUploaderException('Pulling image failed')
|
||||
error_msg = (
|
||||
'Pulling image failed: cmd "{}", stdout "{}",'
|
||||
' stderr "{}"'.format(
|
||||
' '.join(cmd),
|
||||
out,
|
||||
err
|
||||
)
|
||||
)
|
||||
LOG.error(error_msg)
|
||||
raise ImageUploaderException(error_msg)
|
||||
return out
|
||||
|
||||
@classmethod
|
||||
|
@ -1654,7 +1807,7 @@ class PythonImageUploader(BaseImageUploader):
|
|||
config = json.loads(config_str)
|
||||
|
||||
layers = [l['digest'] for l in manifest['layers']]
|
||||
i, tag = cls._image_tag_from_url(image_url)
|
||||
i, _ = cls._image_tag_from_url(image_url)
|
||||
digest = image['digest']
|
||||
created = image['created']
|
||||
labels = config['config']['Labels']
|
||||
|
@ -1718,14 +1871,11 @@ class PythonImageUploader(BaseImageUploader):
|
|||
|
||||
# Pull a single image first, to avoid duplicate pulls of the
|
||||
# same base layers
|
||||
uploader, first_task = self.upload_tasks.pop()
|
||||
result = uploader.upload_image(first_task)
|
||||
local_images.extend(result)
|
||||
local_images.extend(upload_task(args=self.upload_tasks.pop()))
|
||||
|
||||
# workers will be half the CPU count, to a minimum of 2
|
||||
workers = max(2, processutils.get_worker_count() // 2)
|
||||
# workers will the CPU count minus 1, with a minimum of 2
|
||||
workers = max(2, (processutils.get_worker_count() - 1))
|
||||
p = futures.ThreadPoolExecutor(max_workers=workers)
|
||||
|
||||
for result in p.map(upload_task, self.upload_tasks):
|
||||
local_images.extend(result)
|
||||
LOG.info('result %s' % local_images)
|
||||
|
@ -1773,7 +1923,8 @@ class UploadTask(object):
|
|||
self.source_image_url = image_to_url(self.source_image)
|
||||
self.target_image_url = image_to_url(self.target_image)
|
||||
self.target_image_source_tag_url = image_to_url(
|
||||
self.target_image_source_tag)
|
||||
self.target_image_source_tag
|
||||
)
|
||||
|
||||
|
||||
def upload_task(args):
|
||||
|
|
|
@ -17,6 +17,7 @@ import hashlib
|
|||
import io
|
||||
import json
|
||||
import os
|
||||
import requests
|
||||
import shutil
|
||||
import six
|
||||
from six.moves.urllib.parse import urlparse
|
||||
|
@ -116,7 +117,8 @@ class TestImageExport(base.TestCase):
|
|||
}
|
||||
calc_digest = hashlib.sha256()
|
||||
layer_stream = io.BytesIO(blob_compressed)
|
||||
self.assertRaises(IOError, image_export.export_stream,
|
||||
self.assertRaises(requests.exceptions.HTTPError,
|
||||
image_export.export_stream,
|
||||
target_url, layer, layer_stream,
|
||||
verify_digest=True)
|
||||
blob_dir = os.path.join(image_export.IMAGE_EXPORT_DIR,
|
||||
|
|
|
@ -53,6 +53,8 @@ class TestImageUploadManager(base.TestCase):
|
|||
files.append('testfile')
|
||||
self.filelist = files
|
||||
|
||||
@mock.patch('tripleo_common.image.image_uploader.'
|
||||
'BaseImageUploader.check_status')
|
||||
@mock.patch('tripleo_common.image.image_uploader.'
|
||||
'PythonImageUploader._fetch_manifest')
|
||||
@mock.patch('tripleo_common.image.image_uploader.'
|
||||
|
@ -75,7 +77,8 @@ class TestImageUploadManager(base.TestCase):
|
|||
'get_undercloud_registry', return_value='192.0.2.0:8787')
|
||||
def test_file_parsing(self, mock_gur, mockioctl, mockpath,
|
||||
mock_images_match, mock_is_insecure, mock_inspect,
|
||||
mock_auth, mock_copy, mock_manifest):
|
||||
mock_auth, mock_copy, mock_manifest,
|
||||
check_status):
|
||||
|
||||
mock_manifest.return_value = '{"layers": []}'
|
||||
mock_inspect.return_value = {}
|
||||
|
@ -541,7 +544,7 @@ class TestBaseImageUploader(base.TestCase):
|
|||
insecure_reg.add('registry-1.docker.io')
|
||||
secure_reg.add('192.0.2.1:8787')
|
||||
self.assertEqual(
|
||||
'http://registry-1.docker.io/v2/t/nova-api/manifests/latest',
|
||||
'https://registry-1.docker.io/v2/t/nova-api/manifests/latest',
|
||||
build(url2, '/t/nova-api/manifests/latest')
|
||||
)
|
||||
self.assertEqual(
|
||||
|
@ -884,6 +887,8 @@ class TestSkopeoImageUploader(base.TestCase):
|
|||
self.uploader._copy.retry.sleep = mock.Mock()
|
||||
self.uploader._inspect.retry.sleep = mock.Mock()
|
||||
|
||||
@mock.patch('tripleo_common.image.image_uploader.'
|
||||
'BaseImageUploader.check_status')
|
||||
@mock.patch('os.environ')
|
||||
@mock.patch('subprocess.Popen')
|
||||
@mock.patch('tripleo_common.image.image_uploader.'
|
||||
|
@ -891,7 +896,7 @@ class TestSkopeoImageUploader(base.TestCase):
|
|||
@mock.patch('tripleo_common.image.image_uploader.'
|
||||
'BaseImageUploader.authenticate')
|
||||
def test_upload_image(self, mock_auth, mock_inspect,
|
||||
mock_popen, mock_environ):
|
||||
mock_popen, mock_environ, check_status):
|
||||
mock_process = mock.Mock()
|
||||
mock_process.communicate.return_value = ('copy complete', '')
|
||||
mock_process.returncode = 0
|
||||
|
@ -1163,10 +1168,11 @@ class TestPythonImageUploader(base.TestCase):
|
|||
u._copy_layer_local_to_registry.retry.sleep = mock.Mock()
|
||||
u._copy_layer_registry_to_registry.retry.sleep = mock.Mock()
|
||||
u._copy_registry_to_registry.retry.sleep = mock.Mock()
|
||||
u._copy_registry_to_local.retry.sleep = mock.Mock()
|
||||
u._copy_local_to_registry.retry.sleep = mock.Mock()
|
||||
self.requests = self.useFixture(rm_fixture.Fixture())
|
||||
|
||||
@mock.patch('tripleo_common.image.image_uploader.'
|
||||
'BaseImageUploader.check_status')
|
||||
@mock.patch('tripleo_common.image.image_uploader.'
|
||||
'PythonImageUploader.authenticate')
|
||||
@mock.patch('tripleo_common.image.image_uploader.'
|
||||
|
@ -1177,7 +1183,7 @@ class TestPythonImageUploader(base.TestCase):
|
|||
'PythonImageUploader._copy_registry_to_registry')
|
||||
def test_upload_image(
|
||||
self, _copy_registry_to_registry, _cross_repo_mount,
|
||||
_fetch_manifest, authenticate):
|
||||
_fetch_manifest, authenticate, check_status):
|
||||
|
||||
target_session = mock.Mock()
|
||||
source_session = mock.Mock()
|
||||
|
@ -1260,6 +1266,8 @@ class TestPythonImageUploader(base.TestCase):
|
|||
multi_arch=False
|
||||
)
|
||||
|
||||
@mock.patch('tripleo_common.image.image_uploader.'
|
||||
'BaseImageUploader.check_status')
|
||||
@mock.patch('tripleo_common.image.image_uploader.'
|
||||
'PythonImageUploader.authenticate')
|
||||
@mock.patch('tripleo_common.image.image_uploader.'
|
||||
|
@ -1270,7 +1278,7 @@ class TestPythonImageUploader(base.TestCase):
|
|||
'PythonImageUploader._copy_registry_to_registry')
|
||||
def test_authenticate_upload_image(
|
||||
self, _copy_registry_to_registry, _cross_repo_mount,
|
||||
_fetch_manifest, authenticate):
|
||||
_fetch_manifest, authenticate, check_status):
|
||||
|
||||
self.uploader.registry_credentials = {
|
||||
'docker.io': {'my_username': 'my_password'},
|
||||
|
@ -1332,6 +1340,8 @@ class TestPythonImageUploader(base.TestCase):
|
|||
),
|
||||
])
|
||||
|
||||
@mock.patch('tripleo_common.image.image_uploader.'
|
||||
'BaseImageUploader.check_status')
|
||||
@mock.patch('tripleo_common.image.image_uploader.'
|
||||
'PythonImageUploader.authenticate')
|
||||
@mock.patch('tripleo_common.image.image_uploader.'
|
||||
|
@ -1342,7 +1352,7 @@ class TestPythonImageUploader(base.TestCase):
|
|||
'PythonImageUploader._copy_registry_to_registry')
|
||||
def test_insecure_registry(
|
||||
self, _copy_registry_to_registry, _cross_repo_mount,
|
||||
_fetch_manifest, authenticate):
|
||||
_fetch_manifest, authenticate, check_status):
|
||||
target_session = mock.Mock()
|
||||
source_session = mock.Mock()
|
||||
authenticate.side_effect = [
|
||||
|
@ -1399,6 +1409,8 @@ class TestPythonImageUploader(base.TestCase):
|
|||
),
|
||||
])
|
||||
|
||||
@mock.patch('tripleo_common.image.image_uploader.'
|
||||
'BaseImageUploader.check_status')
|
||||
@mock.patch('tripleo_common.image.image_uploader.'
|
||||
'PythonImageUploader.authenticate')
|
||||
@mock.patch('tripleo_common.image.image_uploader.'
|
||||
|
@ -1409,7 +1421,7 @@ class TestPythonImageUploader(base.TestCase):
|
|||
'PythonImageUploader._copy_registry_to_registry')
|
||||
def test_upload_image_v1_manifest(
|
||||
self, _copy_registry_to_registry, _cross_repo_mount,
|
||||
_fetch_manifest, authenticate):
|
||||
_fetch_manifest, authenticate, check_status):
|
||||
|
||||
target_session = mock.Mock()
|
||||
source_session = mock.Mock()
|
||||
|
@ -1488,6 +1500,8 @@ class TestPythonImageUploader(base.TestCase):
|
|||
multi_arch=False
|
||||
)
|
||||
|
||||
@mock.patch('tripleo_common.image.image_uploader.'
|
||||
'BaseImageUploader.check_status')
|
||||
@mock.patch('tripleo_common.image.image_uploader.'
|
||||
'PythonImageUploader.authenticate')
|
||||
@mock.patch('tripleo_common.image.image_uploader.'
|
||||
|
@ -1507,7 +1521,8 @@ class TestPythonImageUploader(base.TestCase):
|
|||
def test_upload_image_modify(
|
||||
self, _copy_local_to_registry, run_modify_playbook,
|
||||
_copy_registry_to_local, _copy_registry_to_registry,
|
||||
_cross_repo_mount, _fetch_manifest, _image_exists, authenticate):
|
||||
_cross_repo_mount, _fetch_manifest, _image_exists, authenticate,
|
||||
check_status):
|
||||
|
||||
_image_exists.return_value = False
|
||||
target_session = mock.Mock()
|
||||
|
@ -1632,7 +1647,9 @@ class TestPythonImageUploader(base.TestCase):
|
|||
session=target_session
|
||||
)
|
||||
|
||||
def test_fetch_manifest(self):
|
||||
@mock.patch('tripleo_common.image.image_uploader.'
|
||||
'BaseImageUploader.check_status')
|
||||
def test_fetch_manifest(self, check_status):
|
||||
url = urlparse('docker://docker.io/t/nova-api:tripleo-current')
|
||||
manifest = '{"layers": []}'
|
||||
session = mock.Mock()
|
||||
|
@ -1651,8 +1668,9 @@ class TestPythonImageUploader(base.TestCase):
|
|||
'.manifest.v2+json'
|
||||
}
|
||||
)
|
||||
|
||||
def test_upload_url(self):
|
||||
@mock.patch('tripleo_common.image.image_uploader.'
|
||||
'BaseImageUploader.check_status')
|
||||
def test_upload_url(self, check_status):
|
||||
# test with previous request
|
||||
previous_request = mock.Mock()
|
||||
previous_request.headers = {
|
||||
|
@ -1775,12 +1793,15 @@ class TestPythonImageUploader(base.TestCase):
|
|||
'docker'
|
||||
)
|
||||
|
||||
@mock.patch('tripleo_common.image.image_uploader.'
|
||||
'BaseImageUploader.check_status')
|
||||
@mock.patch('tripleo_common.image.image_uploader.'
|
||||
'PythonImageUploader._upload_url')
|
||||
@mock.patch('tripleo_common.image.image_uploader.'
|
||||
'PythonImageUploader.'
|
||||
'_copy_layer_registry_to_registry')
|
||||
def test_copy_registry_to_registry(self, _copy_layer, _upload_url):
|
||||
def test_copy_registry_to_registry(self, _copy_layer, _upload_url,
|
||||
check_status):
|
||||
source_url = urlparse('docker://docker.io/t/nova-api:latest')
|
||||
target_url = urlparse('docker://192.168.2.1:5000/t/nova-api:latest')
|
||||
_upload_url.return_value = 'https://192.168.2.1:5000/v2/upload'
|
||||
|
@ -1844,7 +1865,6 @@ class TestPythonImageUploader(base.TestCase):
|
|||
params={'digest': 'sha256:1234'},
|
||||
timeout=30
|
||||
),
|
||||
mock.call().raise_for_status(),
|
||||
mock.call(
|
||||
'https://192.168.2.1:5000/v2/t/nova-api/manifests/latest',
|
||||
data=mock.ANY,
|
||||
|
@ -1854,7 +1874,6 @@ class TestPythonImageUploader(base.TestCase):
|
|||
},
|
||||
timeout=30
|
||||
),
|
||||
mock.call().raise_for_status(),
|
||||
])
|
||||
put_manifest = json.loads(
|
||||
target_session.put.call_args[1]['data'].decode('utf-8')
|
||||
|
|
Loading…
Reference in New Issue