2344 lines
87 KiB
Python
2344 lines
87 KiB
Python
# Copyright 2015 Red Hat, Inc.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
#
|
|
|
|
import base64
|
|
from concurrent import futures
|
|
import hashlib
|
|
import json
|
|
import os
|
|
import random
|
|
import re
|
|
import requests
|
|
from requests import auth as requests_auth
|
|
from requests.adapters import HTTPAdapter
|
|
import shutil
|
|
import six
|
|
from six.moves.urllib import parse
|
|
import socket
|
|
import subprocess
|
|
import tempfile
|
|
import tenacity
|
|
import yaml
|
|
|
|
from oslo_concurrency import processutils
|
|
from oslo_log import log as logging
|
|
from tripleo_common.actions import ansible
|
|
from tripleo_common.image.base import BaseImageManager
|
|
from tripleo_common.image.exception import ImageNotFoundException
|
|
from tripleo_common.image.exception import ImageUploaderException
|
|
from tripleo_common.image.exception import ImageUploaderThreadException
|
|
from tripleo_common.image import image_export
|
|
from tripleo_common.utils import image as image_utils
|
|
from tripleo_common.utils.locks import threadinglock
|
|
|
|
|
|
LOG = logging.getLogger(__name__)
|
|
|
|
|
|
SECURE_REGISTRIES = (
|
|
'trunk.registry.rdoproject.org',
|
|
'docker.io',
|
|
'registry-1.docker.io',
|
|
)
|
|
|
|
NO_VERIFY_REGISTRIES = ()
|
|
|
|
CLEANUP = (
|
|
CLEANUP_FULL, CLEANUP_PARTIAL, CLEANUP_NONE
|
|
) = (
|
|
'full', 'partial', 'none'
|
|
)
|
|
|
|
CALL_TYPES = (
|
|
CALL_PING,
|
|
CALL_MANIFEST,
|
|
CALL_BLOB,
|
|
CALL_UPLOAD,
|
|
CALL_TAGS,
|
|
CALL_CATALOG
|
|
) = (
|
|
'/',
|
|
'%(image)s/manifests/%(tag)s',
|
|
'%(image)s/blobs/%(digest)s',
|
|
'%(image)s/blobs/uploads/',
|
|
'%(image)s/tags/list',
|
|
'/_catalog',
|
|
)
|
|
|
|
MEDIA_TYPES = (
|
|
MEDIA_MANIFEST_V1,
|
|
MEDIA_MANIFEST_V1_SIGNED,
|
|
MEDIA_MANIFEST_V2,
|
|
MEDIA_MANIFEST_V2_LIST,
|
|
MEDIA_CONFIG,
|
|
MEDIA_BLOB,
|
|
MEDIA_BLOB_COMPRESSED
|
|
) = (
|
|
'application/vnd.docker.distribution.manifest.v1+json',
|
|
'application/vnd.docker.distribution.manifest.v1+prettyjws',
|
|
'application/vnd.docker.distribution.manifest.v2+json',
|
|
'application/vnd.docker.distribution.manifest.list.v2+json',
|
|
'application/vnd.docker.container.image.v1+json',
|
|
'application/vnd.docker.image.rootfs.diff.tar',
|
|
'application/vnd.docker.image.rootfs.diff.tar.gzip'
|
|
)
|
|
|
|
DEFAULT_UPLOADER = 'python'
|
|
|
|
|
|
def get_undercloud_registry():
|
|
ctlplane_hostname = '.'.join([socket.gethostname().split('.')[0],
|
|
'ctlplane'])
|
|
cmd = ['getent', 'hosts', ctlplane_hostname]
|
|
process = subprocess.Popen(cmd, stdout=subprocess.PIPE,
|
|
universal_newlines=True)
|
|
out, err = process.communicate()
|
|
|
|
if process.returncode != 0:
|
|
LOG.warning('No entry for %s in /etc/hosts. Falling back to use the '
|
|
'default (localhost) undercloud registry.'
|
|
% ctlplane_hostname)
|
|
address = 'localhost'
|
|
else:
|
|
address = out.split()[1]
|
|
|
|
return '%s:%s' % (address, '8787')
|
|
|
|
|
|
class MakeSession(object):
|
|
"""Class method to uniformly create sessions.
|
|
|
|
Sessions created by this class will retry on errors with an exponential
|
|
backoff before raising an exception. Because our primary interaction is
|
|
with the container registries the adapter will also retry on 401 and
|
|
404. This is being done because registries commonly return 401 when an
|
|
image is not found, which is commonly a cache miss. See the adapter
|
|
definitions for more on retry details.
|
|
"""
|
|
def __init__(self, verify=True):
|
|
self.session = requests.Session()
|
|
self.session.verify = verify
|
|
adapter = HTTPAdapter(
|
|
max_retries=8,
|
|
pool_connections=24,
|
|
pool_maxsize=24,
|
|
pool_block=False
|
|
)
|
|
self.session.mount('http://', adapter)
|
|
self.session.mount('https://', adapter)
|
|
|
|
def create(self):
|
|
return self.__enter__()
|
|
|
|
def __enter__(self):
|
|
return self.session
|
|
|
|
def __exit__(self, *args, **kwargs):
|
|
self.session.close()
|
|
|
|
|
|
class ImageUploadManager(BaseImageManager):
|
|
"""Manage the uploading of image files
|
|
|
|
Manage the uploading of images from a config file specified in YAML
|
|
syntax. Multiple config files can be specified. They will be merged.
|
|
"""
|
|
|
|
def __init__(self, config_files=None,
|
|
dry_run=False, cleanup=CLEANUP_FULL,
|
|
mirrors=None, registry_credentials=None,
|
|
multi_arch=False, lock=None):
|
|
if config_files is None:
|
|
config_files = []
|
|
super(ImageUploadManager, self).__init__(config_files)
|
|
self.uploaders = {
|
|
'skopeo': SkopeoImageUploader(),
|
|
'python': PythonImageUploader()
|
|
}
|
|
self.uploaders['python'].init_global_state(lock)
|
|
self.dry_run = dry_run
|
|
self.cleanup = cleanup
|
|
if mirrors:
|
|
for uploader in self.uploaders.values():
|
|
if hasattr(uploader, 'mirrors'):
|
|
uploader.mirrors.update(mirrors)
|
|
if registry_credentials:
|
|
self.validate_registry_credentials(registry_credentials)
|
|
for uploader in self.uploaders.values():
|
|
uploader.registry_credentials = registry_credentials
|
|
self.multi_arch = multi_arch
|
|
|
|
@staticmethod
|
|
def validate_registry_credentials(creds_data):
|
|
if not isinstance(creds_data, dict):
|
|
raise TypeError('Credentials data must be a dict')
|
|
for registry, cred_entry in creds_data.items():
|
|
if not isinstance(cred_entry, dict) or len(cred_entry) != 1:
|
|
raise TypeError('Credentials entry must be '
|
|
'a dict with a single item')
|
|
if not isinstance(registry, six.string_types):
|
|
raise TypeError('Key must be a registry host string: %s' %
|
|
registry)
|
|
username, password = next(iter(cred_entry.items()))
|
|
if not (isinstance(username, six.string_types) and
|
|
isinstance(password, six.string_types)):
|
|
raise TypeError('Username and password must be strings: %s' %
|
|
username)
|
|
|
|
def discover_image_tag(self, image, tag_from_label=None,
|
|
username=None, password=None):
|
|
uploader = self.uploader(DEFAULT_UPLOADER)
|
|
return uploader.discover_image_tag(
|
|
image, tag_from_label=tag_from_label,
|
|
username=username, password=password)
|
|
|
|
def uploader(self, uploader):
|
|
if uploader not in self.uploaders:
|
|
raise ImageUploaderException('Unknown image uploader type')
|
|
return self.uploaders[uploader]
|
|
|
|
def get_uploader(self, uploader):
|
|
return self.uploader(uploader)
|
|
|
|
@staticmethod
|
|
def get_push_destination(item):
|
|
push_destination = item.get('push_destination')
|
|
if not push_destination:
|
|
return get_undercloud_registry()
|
|
|
|
# If set to True, use discovered undercloud registry
|
|
if isinstance(push_destination, bool):
|
|
return get_undercloud_registry()
|
|
|
|
return push_destination
|
|
|
|
def upload(self):
|
|
"""Start the upload process"""
|
|
|
|
LOG.info('Using config files: %s' % self.config_files)
|
|
|
|
uploads = self.load_config_files(self.UPLOADS) or []
|
|
container_images = self.load_config_files(self.CONTAINER_IMAGES) or []
|
|
upload_images = uploads + container_images
|
|
|
|
tasks = []
|
|
for item in upload_images:
|
|
image_name = item.get('imagename')
|
|
uploader = item.get('uploader', DEFAULT_UPLOADER)
|
|
pull_source = item.get('pull_source')
|
|
push_destination = self.get_push_destination(item)
|
|
|
|
# This updates the parsed upload_images dict with real values
|
|
item['push_destination'] = push_destination
|
|
append_tag = item.get('modify_append_tag')
|
|
modify_role = item.get('modify_role')
|
|
modify_vars = item.get('modify_vars')
|
|
multi_arch = item.get('multi_arch', self.multi_arch)
|
|
|
|
uploader = self.uploader(uploader)
|
|
tasks.append(UploadTask(
|
|
image_name, pull_source, push_destination,
|
|
append_tag, modify_role, modify_vars, self.dry_run,
|
|
self.cleanup, multi_arch))
|
|
|
|
# NOTE(mwhahaha): We want to randomize the upload process because of
|
|
# the shared nature of container layers. Because we multiprocess the
|
|
# handling of containers, if performed in an alphabetical order (the
|
|
# default) we end up duplicating fetching of container layers. Things
|
|
# Like cinder-volume and cinder-backup share almost all of the same
|
|
# layers so when they are fetched at the same time, we will duplicate
|
|
# the processing. By randomizing the list we will reduce the amount
|
|
# of duplicating that occurs. In my testing I went from ~30mins to
|
|
# ~20mins to run. In the future this could be improved if we added
|
|
# some locking to the container fetching based on layer hashes but
|
|
# will require a significant rewrite.
|
|
random.shuffle(tasks)
|
|
for task in tasks:
|
|
uploader.add_upload_task(task)
|
|
|
|
for uploader in self.uploaders.values():
|
|
uploader.run_tasks()
|
|
|
|
return upload_images # simply to make test validation easier
|
|
|
|
|
|
class BaseImageUploader(object):
|
|
|
|
mirrors = {}
|
|
insecure_registries = set()
|
|
no_verify_registries = set(NO_VERIFY_REGISTRIES)
|
|
secure_registries = set(SECURE_REGISTRIES)
|
|
export_registries = set()
|
|
push_registries = set()
|
|
|
|
def __init__(self):
|
|
self.upload_tasks = []
|
|
# A mapping of layer hashs to the image which first copied that
|
|
# layer to the target
|
|
self.image_layers = {}
|
|
self.registry_credentials = {}
|
|
|
|
@classmethod
|
|
def init_registries_cache(cls):
|
|
cls.insecure_registries.clear()
|
|
cls.no_verify_registries.clear()
|
|
cls.no_verify_registries.update(NO_VERIFY_REGISTRIES)
|
|
cls.secure_registries.clear()
|
|
cls.secure_registries.update(SECURE_REGISTRIES)
|
|
cls.mirrors.clear()
|
|
cls.export_registries.clear()
|
|
cls.push_registries.clear()
|
|
|
|
def cleanup(self):
|
|
pass
|
|
|
|
def run_tasks(self):
|
|
pass
|
|
|
|
def credentials_for_registry(self, registry):
|
|
creds = self.registry_credentials.get(registry)
|
|
if not creds:
|
|
return None, None
|
|
username, password = next(iter(creds.items()))
|
|
return username, password
|
|
|
|
@classmethod
|
|
def run_modify_playbook(cls, modify_role, modify_vars,
|
|
source_image, target_image, append_tag,
|
|
container_build_tool='buildah'):
|
|
run_vars = {}
|
|
if modify_vars:
|
|
run_vars.update(modify_vars)
|
|
run_vars['source_image'] = source_image
|
|
run_vars['target_image'] = target_image
|
|
run_vars['modified_append_tag'] = append_tag
|
|
run_vars['container_build_tool'] = container_build_tool
|
|
LOG.info('Playbook variables: \n%s' % yaml.safe_dump(
|
|
run_vars, default_flow_style=False))
|
|
playbook = [{
|
|
'hosts': 'localhost',
|
|
'gather_facts': 'no',
|
|
'tasks': [{
|
|
'name': 'Import role %s' % modify_role,
|
|
'import_role': {
|
|
'name': modify_role
|
|
},
|
|
'vars': run_vars
|
|
}]
|
|
}]
|
|
LOG.info('Playbook: \n%s' % yaml.safe_dump(
|
|
playbook, default_flow_style=False))
|
|
work_dir = tempfile.mkdtemp(prefix='tripleo-modify-image-playbook-')
|
|
try:
|
|
action = ansible.AnsiblePlaybookAction(
|
|
playbook=playbook,
|
|
work_dir=work_dir,
|
|
verbosity=1,
|
|
extra_env_variables=dict(os.environ),
|
|
override_ansible_cfg=(
|
|
"[defaults]\n"
|
|
"stdout_callback=yaml\n"
|
|
)
|
|
)
|
|
result = action.run(None)
|
|
log_path = result.get('log_path')
|
|
if log_path and os.path.isfile(log_path):
|
|
with open(log_path) as f:
|
|
for line in f:
|
|
LOG.info(line.rstrip())
|
|
shutil.rmtree(work_dir)
|
|
except processutils.ProcessExecutionError as e:
|
|
LOG.error('%s\nError running playbook in directory: %s'
|
|
% (e.stdout, work_dir))
|
|
raise ImageUploaderException(
|
|
'Modifying image %s failed' % target_image)
|
|
|
|
@classmethod
|
|
def _images_match(cls, image1, image2, session1=None):
|
|
try:
|
|
image1_digest = cls._image_digest(image1, session=session1)
|
|
except Exception:
|
|
return False
|
|
try:
|
|
image2_digest = cls._image_digest(image2)
|
|
except Exception:
|
|
return False
|
|
|
|
# missing digest, no way to know if they match
|
|
if not image1_digest or not image2_digest:
|
|
return False
|
|
return image1_digest == image2_digest
|
|
|
|
@classmethod
|
|
def _image_digest(cls, image, session=None):
|
|
image_url = cls._image_to_url(image)
|
|
i = cls._inspect(image_url, session)
|
|
return i.get('Digest')
|
|
|
|
@classmethod
|
|
def _image_labels(cls, image_url, session=None):
|
|
i = cls._inspect(image_url, session)
|
|
return i.get('Labels', {}) or {}
|
|
|
|
@classmethod
|
|
def _image_exists(cls, image, session=None):
|
|
try:
|
|
cls._image_digest(
|
|
image, session=session)
|
|
except ImageNotFoundException:
|
|
return False
|
|
else:
|
|
return True
|
|
|
|
@tenacity.retry( # Retry up to 5 times with jittered exponential backoff
|
|
reraise=True,
|
|
retry=tenacity.retry_if_exception_type(
|
|
requests.exceptions.RequestException
|
|
),
|
|
wait=tenacity.wait_random_exponential(multiplier=1, max=10),
|
|
stop=tenacity.stop_after_attempt(5)
|
|
)
|
|
def authenticate(self, image_url, username=None, password=None,
|
|
session=None):
|
|
netloc = image_url.netloc
|
|
image, tag = self._image_tag_from_url(image_url)
|
|
self.is_insecure_registry(registry_host=netloc)
|
|
url = self._build_url(image_url, path='/')
|
|
verify = (netloc not in self.no_verify_registries)
|
|
if not session:
|
|
session = MakeSession(verify=verify).create()
|
|
else:
|
|
session.headers.pop('Authorization', None)
|
|
session.verify = verify
|
|
|
|
r = session.get(url, timeout=30)
|
|
LOG.debug('%s status code %s' % (url, r.status_code))
|
|
if r.status_code == 200:
|
|
return session
|
|
if r.status_code != 401:
|
|
r.raise_for_status()
|
|
if 'www-authenticate' not in r.headers:
|
|
raise ImageUploaderException(
|
|
'Unknown authentication method for headers: %s' % r.headers)
|
|
|
|
www_auth = r.headers['www-authenticate']
|
|
if not www_auth.startswith('Bearer '):
|
|
raise ImageUploaderException(
|
|
'Unknown www-authenticate value: %s' % www_auth)
|
|
token_param = {}
|
|
|
|
realm = re.search('realm="(.*?)"', www_auth).group(1)
|
|
if 'service=' in www_auth:
|
|
token_param['service'] = re.search(
|
|
'service="(.*?)"', www_auth).group(1)
|
|
token_param['scope'] = 'repository:%s:pull' % image[1:]
|
|
|
|
auth = None
|
|
if username:
|
|
auth = requests_auth.HTTPBasicAuth(username, password)
|
|
LOG.debug('Token parameters: params {}'.format(token_param))
|
|
rauth = session.get(realm, params=token_param, auth=auth, timeout=30)
|
|
rauth.raise_for_status()
|
|
session.headers['Authorization'] = 'Bearer %s' % rauth.json()['token']
|
|
hash_request_id = hashlib.sha1(str(rauth.url).encode())
|
|
LOG.debug(
|
|
'Session authenticated: id {}'.format(
|
|
hash_request_id.hexdigest()
|
|
)
|
|
)
|
|
setattr(session, 'reauthenticate', self.authenticate)
|
|
setattr(
|
|
session,
|
|
'auth_args',
|
|
dict(
|
|
image_url=image_url,
|
|
username=username,
|
|
password=password,
|
|
session=session
|
|
)
|
|
)
|
|
return session
|
|
|
|
@staticmethod
|
|
def _get_response_text(response, encoding='utf-8', force_encoding=False):
|
|
"""Return request response text
|
|
|
|
We need to set the encoding for the response other wise it
|
|
will attempt to detect the encoding which is very time consuming.
|
|
See https://github.com/psf/requests/issues/4235 for additional
|
|
context.
|
|
|
|
:param: response: requests Respoinse object
|
|
:param: encoding: encoding to set if not currently set
|
|
:param: force_encoding: set response encoding always
|
|
"""
|
|
|
|
if force_encoding or not response.encoding:
|
|
response.encoding = encoding
|
|
return response.text
|
|
|
|
@staticmethod
|
|
def check_status(session, request, allow_reauth=True):
|
|
hash_request_id = hashlib.sha1(str(request.url).encode())
|
|
request_id = hash_request_id.hexdigest()
|
|
text = getattr(request, 'text', 'unknown')
|
|
reason = getattr(request, 'reason', 'unknown')
|
|
status_code = getattr(request, 'status_code', None)
|
|
headers = getattr(request, 'headers', {})
|
|
session_headers = getattr(session, 'headers', {})
|
|
|
|
if status_code >= 300:
|
|
LOG.info(
|
|
'Non-2xx: id {}, status {}, reason {}, text {}'.format(
|
|
request_id,
|
|
status_code,
|
|
reason,
|
|
text
|
|
)
|
|
)
|
|
|
|
if status_code == 401:
|
|
LOG.warning(
|
|
'Failure: id {}, status {}, reason {} text {}'.format(
|
|
request_id,
|
|
status_code,
|
|
reason,
|
|
text
|
|
)
|
|
)
|
|
LOG.debug(
|
|
'Request headers after 401: id {}, headers {}'.format(
|
|
request_id,
|
|
headers
|
|
)
|
|
)
|
|
LOG.debug(
|
|
'Session headers after 401: id {}, headers {}'.format(
|
|
request_id,
|
|
session_headers
|
|
)
|
|
)
|
|
|
|
www_auth = headers.get(
|
|
'www-authenticate',
|
|
headers.get(
|
|
'Www-Authenticate'
|
|
)
|
|
)
|
|
if www_auth:
|
|
error = None
|
|
if 'error=' in www_auth:
|
|
error = re.search('error="(.*?)"', www_auth).group(1)
|
|
LOG.warning(
|
|
'Error detected in auth headers: error {}'.format(
|
|
error
|
|
)
|
|
)
|
|
if error == 'invalid_token' and allow_reauth:
|
|
if hasattr(session, 'reauthenticate'):
|
|
reauth = int(session.headers.get('_TripleOReAuth', 0))
|
|
reauth += 1
|
|
session.headers['_TripleOReAuth'] = str(reauth)
|
|
LOG.warning(
|
|
'Re-authenticating: id {}, count {}'.format(
|
|
request_id,
|
|
reauth
|
|
)
|
|
)
|
|
session.reauthenticate(**session.auth_args)
|
|
|
|
request.raise_for_status()
|
|
|
|
@classmethod
|
|
def _build_url(cls, url, path):
|
|
netloc = url.netloc
|
|
if netloc in cls.mirrors:
|
|
mirror = cls.mirrors[netloc]
|
|
return '%sv2%s' % (mirror, path)
|
|
else:
|
|
if (cls.is_insecure_registry(registry_host=netloc) and
|
|
netloc not in cls.no_verify_registries):
|
|
scheme = 'http'
|
|
else:
|
|
scheme = 'https'
|
|
if netloc == 'docker.io':
|
|
netloc = 'registry-1.docker.io'
|
|
return '%s://%s/v2%s' % (scheme, netloc, path)
|
|
|
|
@classmethod
|
|
def _image_tag_from_url(cls, image_url):
|
|
if '@' in image_url.path:
|
|
parts = image_url.path.split('@')
|
|
else:
|
|
parts = image_url.path.split(':')
|
|
tag = parts[-1]
|
|
image = ':'.join(parts[:-1])
|
|
return image, tag
|
|
|
|
@classmethod
|
|
@tenacity.retry( # Retry up to 5 times with jittered exponential backoff
|
|
reraise=True,
|
|
retry=tenacity.retry_if_exception_type(
|
|
requests.exceptions.RequestException
|
|
),
|
|
wait=tenacity.wait_random_exponential(multiplier=1, max=10),
|
|
stop=tenacity.stop_after_attempt(5)
|
|
)
|
|
def _inspect(cls, image_url, session=None):
|
|
image, tag = cls._image_tag_from_url(image_url)
|
|
parts = {
|
|
'image': image,
|
|
'tag': tag
|
|
}
|
|
|
|
manifest_url = cls._build_url(
|
|
image_url, CALL_MANIFEST % parts
|
|
)
|
|
tags_url = cls._build_url(
|
|
image_url, CALL_TAGS % parts
|
|
)
|
|
manifest_headers = {'Accept': MEDIA_MANIFEST_V2}
|
|
|
|
manifest_r = session.get(manifest_url, headers=manifest_headers,
|
|
timeout=30)
|
|
if manifest_r.status_code in (403, 404):
|
|
raise ImageNotFoundException('Not found image: %s' %
|
|
image_url.geturl())
|
|
cls.check_status(session=session, request=manifest_r)
|
|
|
|
tags_r = session.get(tags_url, timeout=30)
|
|
cls.check_status(session=session, request=tags_r)
|
|
|
|
manifest_str = cls._get_response_text(manifest_r)
|
|
|
|
if 'Docker-Content-Digest' in manifest_r.headers:
|
|
digest = manifest_r.headers['Docker-Content-Digest']
|
|
else:
|
|
# The registry didn't supply the manifest digest, so calculate it
|
|
calc_digest = hashlib.sha256()
|
|
calc_digest.update(manifest_str.encode('utf-8'))
|
|
digest = 'sha256:%s' % calc_digest.hexdigest()
|
|
|
|
manifest = json.loads(manifest_str)
|
|
|
|
if manifest.get('schemaVersion', 2) == 1:
|
|
config = json.loads(manifest['history'][0]['v1Compatibility'])
|
|
layers = list(reversed([l['blobSum']
|
|
for l in manifest['fsLayers']]))
|
|
else:
|
|
layers = [l['digest'] for l in manifest['layers']]
|
|
|
|
parts['digest'] = manifest['config']['digest']
|
|
config_headers = {
|
|
'Accept': manifest['config']['mediaType']
|
|
}
|
|
config_url = cls._build_url(
|
|
image_url, CALL_BLOB % parts)
|
|
config_r = session.get(config_url, headers=config_headers,
|
|
timeout=30)
|
|
cls.check_status(session=session, request=config_r)
|
|
config = config_r.json()
|
|
|
|
tags = tags_r.json()['tags']
|
|
|
|
image, tag = cls._image_tag_from_url(image_url)
|
|
name = '%s%s' % (image_url.netloc, image)
|
|
created = config['created']
|
|
docker_version = config.get('docker_version', '')
|
|
labels = config['config'].get('Labels', {})
|
|
# NOTE: labels can be null
|
|
if labels is None:
|
|
labels = {}
|
|
architecture = config['architecture']
|
|
image_os = config['os']
|
|
|
|
return {
|
|
'Name': name,
|
|
'Tag': tag,
|
|
'Digest': digest,
|
|
'RepoTags': tags,
|
|
'Created': created,
|
|
'DockerVersion': docker_version,
|
|
'Labels': labels,
|
|
'Architecture': architecture,
|
|
'Os': image_os,
|
|
'Layers': layers,
|
|
}
|
|
|
|
def list(self, registry, session=None):
|
|
self.is_insecure_registry(registry_host=registry)
|
|
url = self._image_to_url(registry)
|
|
catalog_url = self._build_url(
|
|
url, CALL_CATALOG
|
|
)
|
|
catalog_resp = session.get(catalog_url, timeout=30)
|
|
if catalog_resp.status_code in [200]:
|
|
catalog = catalog_resp.json()
|
|
elif catalog_resp.status_code in [404]:
|
|
# just return since the catalog returned a 404
|
|
LOG.debug('catalog_url return 404')
|
|
return []
|
|
else:
|
|
raise ImageUploaderException(
|
|
'Image registry made invalid response: %s' %
|
|
catalog_resp.status_code
|
|
)
|
|
|
|
tags_get_args = []
|
|
for repo in catalog.get('repositories', []):
|
|
image = '%s/%s' % (registry, repo)
|
|
tags_get_args.append((self, image, session))
|
|
|
|
images = []
|
|
with futures.ThreadPoolExecutor(max_workers=16) as p:
|
|
for image, tags in p.map(tags_for_image, tags_get_args):
|
|
if not tags:
|
|
continue
|
|
for tag in tags:
|
|
images.append('%s:%s' % (image, tag))
|
|
return images
|
|
|
|
def inspect(self, image, session=None):
|
|
image_url = self._image_to_url(image)
|
|
return self._inspect(image_url, session)
|
|
|
|
def delete(self, image, session=None):
|
|
image_url = self._image_to_url(image)
|
|
return self._delete(image_url, session)
|
|
|
|
@classmethod
|
|
def _delete(cls, image, session=None):
|
|
raise NotImplementedError()
|
|
|
|
@classmethod
|
|
@tenacity.retry( # Retry up to 5 times with jittered exponential backoff
|
|
reraise=True,
|
|
retry=tenacity.retry_if_exception_type(
|
|
requests.exceptions.RequestException
|
|
),
|
|
wait=tenacity.wait_random_exponential(multiplier=1, max=10),
|
|
stop=tenacity.stop_after_attempt(5)
|
|
)
|
|
def _tags_for_image(cls, image, session):
|
|
url = cls._image_to_url(image)
|
|
parts = {
|
|
'image': url.path,
|
|
}
|
|
tags_url = cls._build_url(
|
|
url, CALL_TAGS % parts
|
|
)
|
|
r = session.get(tags_url, timeout=30)
|
|
if r.status_code in (403, 404):
|
|
return image, []
|
|
tags = r.json()
|
|
return image, tags.get('tags', [])
|
|
|
|
@classmethod
|
|
def _image_to_url(cls, image):
|
|
if '://' not in image:
|
|
image = 'docker://' + image
|
|
url = parse.urlparse(image)
|
|
return url
|
|
|
|
@classmethod
|
|
def _discover_tag_from_inspect(cls, i, image, tag_from_label=None,
|
|
fallback_tag=None):
|
|
labels = i.get('Labels', {})
|
|
|
|
if hasattr(labels, 'keys'):
|
|
label_keys = ', '.join(labels.keys())
|
|
else:
|
|
label_keys = ""
|
|
|
|
if not tag_from_label:
|
|
raise ImageUploaderException(
|
|
'No label specified. Available labels: %s' % label_keys
|
|
)
|
|
|
|
if "{" in tag_from_label:
|
|
try:
|
|
tag_label = tag_from_label.format(**labels)
|
|
except ValueError as e:
|
|
raise ImageUploaderException(e)
|
|
except KeyError as e:
|
|
if fallback_tag:
|
|
tag_label = fallback_tag
|
|
else:
|
|
raise ImageUploaderException(
|
|
'Image %s %s. Available labels: %s' %
|
|
(image, e, label_keys)
|
|
)
|
|
else:
|
|
tag_label = None
|
|
if isinstance(labels, dict):
|
|
tag_label = labels.get(tag_from_label)
|
|
if tag_label is None:
|
|
if fallback_tag:
|
|
tag_label = fallback_tag
|
|
else:
|
|
raise ImageUploaderException(
|
|
'Image %s has no label %s. Available labels: %s' %
|
|
(image, tag_from_label, label_keys)
|
|
)
|
|
|
|
# confirm the tag exists by checking for an entry in RepoTags
|
|
repo_tags = i.get('RepoTags', [])
|
|
if tag_label not in repo_tags:
|
|
raise ImageUploaderException(
|
|
'Image %s has no tag %s.\nAvailable tags: %s' %
|
|
(image, tag_label, ', '.join(repo_tags))
|
|
)
|
|
return tag_label
|
|
|
|
def discover_image_tags(self, images, tag_from_label=None):
|
|
image_urls = [self._image_to_url(i) for i in images]
|
|
|
|
# prime self.insecure_registries by testing every image
|
|
for url in image_urls:
|
|
self.is_insecure_registry(registry_host=url)
|
|
|
|
discover_args = []
|
|
for image in images:
|
|
discover_args.append((self, image, tag_from_label))
|
|
|
|
versioned_images = {}
|
|
with futures.ThreadPoolExecutor(max_workers=16) as p:
|
|
for image, versioned_image in p.map(discover_tag_from_inspect,
|
|
discover_args):
|
|
versioned_images[image] = versioned_image
|
|
return versioned_images
|
|
|
|
def discover_image_tag(self, image, tag_from_label=None,
|
|
fallback_tag=None, username=None, password=None):
|
|
image_url = self._image_to_url(image)
|
|
self.is_insecure_registry(registry_host=image_url.netloc)
|
|
try:
|
|
session = self.authenticate(
|
|
image_url, username=username, password=password)
|
|
except requests.exceptions.HTTPError as e:
|
|
if e.response.status_code == 401:
|
|
raise ImageUploaderException(
|
|
'Unable to authenticate. This may indicate '
|
|
'missing registry credentials or the provided '
|
|
'container or namespace does not exist. %s' % e)
|
|
raise
|
|
|
|
i = self._inspect(image_url, session)
|
|
return self._discover_tag_from_inspect(i, image, tag_from_label,
|
|
fallback_tag)
|
|
|
|
def filter_images_with_labels(self, images, labels,
|
|
username=None, password=None):
|
|
images_with_labels = []
|
|
for image in images:
|
|
url = self._image_to_url(image)
|
|
self.is_insecure_registry(registry_host=url.netloc)
|
|
try:
|
|
session = self.authenticate(
|
|
url, username=username, password=password)
|
|
except requests.exceptions.HTTPError as e:
|
|
if e.response.status_code == 401:
|
|
raise ImageUploaderException(
|
|
'Unable to authenticate. This may indicate '
|
|
'missing registry credentials or the provided '
|
|
'container or namespace does not exist. %s' % e)
|
|
raise
|
|
image_labels = self._image_labels(
|
|
url, session=session)
|
|
if set(labels).issubset(set(image_labels)):
|
|
images_with_labels.append(image)
|
|
|
|
return images_with_labels
|
|
|
|
def add_upload_task(self, task):
|
|
if task.modify_role and task.multi_arch:
|
|
raise ImageUploaderException(
|
|
'Cannot run a modify role on multi-arch image %s' %
|
|
task.image_name
|
|
)
|
|
# prime insecure_registries
|
|
if task.pull_source:
|
|
self.is_insecure_registry(
|
|
registry_host=self._image_to_url(task.pull_source).netloc
|
|
)
|
|
else:
|
|
self.is_insecure_registry(
|
|
registry_host=self._image_to_url(task.image_name).netloc
|
|
)
|
|
self.is_insecure_registry(
|
|
registry_host=self._image_to_url(task.push_destination).netloc
|
|
)
|
|
self.upload_tasks.append((self, task))
|
|
|
|
@classmethod
|
|
def is_insecure_registry(cls, registry_host):
|
|
if registry_host in cls.secure_registries:
|
|
return False
|
|
if (registry_host in cls.insecure_registries or
|
|
registry_host in cls.no_verify_registries):
|
|
return True
|
|
with requests.Session() as s:
|
|
try:
|
|
s.get('https://%s/v2' % registry_host, timeout=30)
|
|
except requests.exceptions.SSLError:
|
|
# Might be just a TLS certificate validation issue
|
|
# Just retry without the verification
|
|
try:
|
|
s.get('https://%s/v2' % registry_host, timeout=30,
|
|
verify=False)
|
|
cls.no_verify_registries.add(registry_host)
|
|
# Techinically these type of registries are insecure when
|
|
# the container engine tries to do a pull. The python
|
|
# uploader ignores the certificate problem, but they are
|
|
# still inscure so we return True here while we'll still
|
|
# use https when we access the registry. LP#1833751
|
|
return True
|
|
except requests.exceptions.SSLError:
|
|
# So nope, it's really not a certificate verification issue
|
|
cls.insecure_registries.add(registry_host)
|
|
return True
|
|
except Exception:
|
|
# for any other error assume it is a secure registry, because:
|
|
# - it is secure registry
|
|
# - the host is not accessible
|
|
pass
|
|
cls.secure_registries.add(registry_host)
|
|
return False
|
|
|
|
@classmethod
|
|
@tenacity.retry( # Retry up to 5 times with jittered exponential backoff
|
|
reraise=True,
|
|
retry=tenacity.retry_if_exception_type(
|
|
requests.exceptions.RequestException
|
|
),
|
|
wait=tenacity.wait_random_exponential(multiplier=1, max=10),
|
|
stop=tenacity.stop_after_attempt(5)
|
|
)
|
|
def _cross_repo_mount(cls, target_image_url, image_layers,
|
|
source_layers, session):
|
|
netloc = target_image_url.netloc
|
|
name = target_image_url.path.split(':')[0][1:]
|
|
export = netloc in cls.export_registries
|
|
if export:
|
|
linked_layers = image_export.cross_repo_mount(
|
|
target_image_url, image_layers, source_layers,
|
|
uploaded_layers=cls._global_view_proxy())
|
|
# track linked layers globally for future references
|
|
for layer, info in linked_layers.items():
|
|
cls._track_uploaded_layers(
|
|
layer, known_path=info['known_path'],
|
|
image_ref=info['ref_image'], scope='local')
|
|
return
|
|
|
|
if netloc in cls.insecure_registries:
|
|
scheme = 'http'
|
|
else:
|
|
scheme = 'https'
|
|
url = '%s://%s/v2/%s/blobs/uploads/' % (scheme, netloc, name)
|
|
|
|
for layer in source_layers:
|
|
known_path, existing_name = image_utils.uploaded_layers_details(
|
|
cls._global_view_proxy(), layer, scope='remote')
|
|
if layer not in image_layers and not existing_name:
|
|
continue
|
|
if not existing_name:
|
|
existing_name = image_layers[layer].path.split(':')[0][1:]
|
|
if existing_name != name:
|
|
LOG.debug('[%s] Layer %s ref. by image %s already exists '
|
|
'at %s' % (name, layer, existing_name, known_path))
|
|
LOG.info('[%s] Cross repository blob mount from %s' %
|
|
(layer, existing_name))
|
|
data = {
|
|
'mount': layer,
|
|
'from': existing_name
|
|
}
|
|
r = session.post(url, data=data, timeout=30)
|
|
cls.check_status(session=session, request=r)
|
|
LOG.debug('%s %s' % (r.status_code, r.reason))
|
|
|
|
|
|
class SkopeoImageUploader(BaseImageUploader):
|
|
"""Upload images using skopeo copy"""
|
|
|
|
def upload_image(self, task):
|
|
t = task
|
|
LOG.info('[%s] Got imagename' % t.image_name)
|
|
|
|
source_image_local_url = parse.urlparse('containers-storage:%s'
|
|
% t.source_image)
|
|
|
|
target_image_local_url = parse.urlparse('containers-storage:%s' %
|
|
t.target_image)
|
|
|
|
if t.dry_run:
|
|
return []
|
|
|
|
target_username, target_password = self.credentials_for_registry(
|
|
t.target_image_url.netloc)
|
|
target_session = self.authenticate(
|
|
t.target_image_url,
|
|
username=target_username,
|
|
password=target_password
|
|
)
|
|
|
|
image_exists = False
|
|
try:
|
|
image_exists = self._image_exists(t.target_image,
|
|
target_session)
|
|
except Exception:
|
|
LOG.warning('[%s] Failed to check if the target '
|
|
'image exists' % t.target_image)
|
|
pass
|
|
if t.modify_role and image_exists:
|
|
LOG.warning('[%s] Skipping upload for modified '
|
|
'image' % t.target_image)
|
|
target_session.close()
|
|
return []
|
|
|
|
# Keep the target session open yet
|
|
source_username, source_password = self.credentials_for_registry(
|
|
t.source_image_url.netloc)
|
|
source_session = self.authenticate(
|
|
t.source_image_url,
|
|
username=source_username,
|
|
password=source_password
|
|
)
|
|
try:
|
|
source_inspect = self._inspect(
|
|
t.source_image_url,
|
|
session=source_session)
|
|
source_layers = source_inspect.get('Layers', [])
|
|
self._cross_repo_mount(
|
|
t.target_image_url, self.image_layers, source_layers,
|
|
session=target_session)
|
|
except Exception:
|
|
LOG.error('[%s] Failed uploading the target '
|
|
'image' % t.target_image)
|
|
raise
|
|
finally:
|
|
source_session.close()
|
|
target_session.close()
|
|
|
|
to_cleanup = []
|
|
|
|
if t.modify_role:
|
|
|
|
# Copy from source registry to local storage
|
|
self._copy(
|
|
t.source_image_url,
|
|
source_image_local_url,
|
|
)
|
|
if t.cleanup in (CLEANUP_FULL, CLEANUP_PARTIAL):
|
|
to_cleanup = [t.source_image]
|
|
|
|
self.run_modify_playbook(
|
|
t.modify_role, t.modify_vars, t.source_image,
|
|
t.target_image_source_tag, t.append_tag,
|
|
container_build_tool='buildah')
|
|
if t.cleanup == CLEANUP_FULL:
|
|
to_cleanup.append(t.target_image)
|
|
|
|
# Copy from local storage to target registry
|
|
self._copy(
|
|
target_image_local_url,
|
|
t.target_image_url,
|
|
)
|
|
for layer in source_layers:
|
|
self.image_layers.setdefault(layer, t.target_image_url)
|
|
LOG.warning('[%s] Completed modify and upload for '
|
|
'image' % t.image_name)
|
|
else:
|
|
self._copy(
|
|
t.source_image_url,
|
|
t.target_image_url,
|
|
)
|
|
LOG.warning('[%s] Completed upload for image' % t.image_name)
|
|
for layer in source_layers:
|
|
self.image_layers.setdefault(layer, t.target_image_url)
|
|
return to_cleanup
|
|
|
|
@classmethod
|
|
@tenacity.retry( # Retry up to 5 times with jittered exponential backoff
|
|
reraise=True,
|
|
wait=tenacity.wait_random_exponential(multiplier=1, max=10),
|
|
stop=tenacity.stop_after_attempt(5)
|
|
)
|
|
def _copy(cls, source_url, target_url):
|
|
source = source_url.geturl()
|
|
target = target_url.geturl()
|
|
LOG.info('Copying from %s to %s' % (source, target))
|
|
cmd = ['skopeo', 'copy']
|
|
|
|
if source_url.netloc in [cls.insecure_registries,
|
|
cls.no_verify_registries]:
|
|
cmd.append('--src-tls-verify=false')
|
|
|
|
if target_url.netloc in [cls.insecure_registries,
|
|
cls.no_verify_registries]:
|
|
cmd.append('--dest-tls-verify=false')
|
|
|
|
cmd.append(source)
|
|
cmd.append(target)
|
|
LOG.info('Running %s' % ' '.join(cmd))
|
|
env = os.environ.copy()
|
|
try:
|
|
process = subprocess.Popen(cmd, env=env, stdout=subprocess.PIPE,
|
|
universal_newlines=True)
|
|
|
|
out, err = process.communicate()
|
|
LOG.info(out)
|
|
if process.returncode != 0:
|
|
raise ImageUploaderException('Error copying image:\n%s\n%s' %
|
|
(' '.join(cmd), err))
|
|
except KeyboardInterrupt:
|
|
raise Exception('Action interrupted with ctrl+c')
|
|
return out
|
|
|
|
def _delete(self, image_url, session=None):
|
|
insecure = self.is_insecure_registry(registry_host=image_url.netloc)
|
|
image = image_url.geturl()
|
|
LOG.info('[%s] Deleting image' % image)
|
|
cmd = ['skopeo', 'delete']
|
|
|
|
if insecure:
|
|
cmd.append('--tls-verify=false')
|
|
cmd.append(image)
|
|
LOG.info('Running %s' % ' '.join(cmd))
|
|
env = os.environ.copy()
|
|
try:
|
|
process = subprocess.Popen(cmd, env=env, stdout=subprocess.PIPE,
|
|
universal_newlines=True)
|
|
|
|
out, err = process.communicate()
|
|
LOG.info(out.decode('utf-8'))
|
|
if process.returncode != 0:
|
|
raise ImageUploaderException('Error deleting image:\n%s\n%s' %
|
|
(' '.join(cmd), err))
|
|
except KeyboardInterrupt:
|
|
raise Exception('Action interrupted with ctrl+c')
|
|
return out
|
|
|
|
def cleanup(self, local_images):
|
|
if not local_images:
|
|
return []
|
|
|
|
for image in sorted(local_images):
|
|
if not image:
|
|
continue
|
|
LOG.warning('[%s] Removing local copy of image' % image)
|
|
image_url = parse.urlparse('containers-storage:%s' % image)
|
|
self._delete(image_url)
|
|
|
|
def run_tasks(self):
|
|
if not self.upload_tasks:
|
|
return
|
|
local_images = []
|
|
|
|
# Pull a single image first, to avoid duplicate pulls of the
|
|
# same base layers
|
|
local_images.extend(upload_task(args=self.upload_tasks.pop()))
|
|
|
|
# workers will be half the CPU count, to a minimum of 2
|
|
workers = max(2, (processutils.get_worker_count() - 1))
|
|
with futures.ThreadPoolExecutor(max_workers=workers) as p:
|
|
for result in p.map(upload_task, self.upload_tasks):
|
|
local_images.extend(result)
|
|
LOG.info('result %s' % local_images)
|
|
|
|
# Do cleanup after all the uploads so common layers don't get deleted
|
|
# repeatedly
|
|
self.cleanup(local_images)
|
|
|
|
|
|
class PythonImageUploader(BaseImageUploader):
|
|
"""Upload images using a direct implementation of the registry API"""
|
|
|
|
uploaded_layers = {} # provides global view for multi-threading workers
|
|
lock = None # provides global locking info plus global view, if MP is used
|
|
|
|
@classmethod
|
|
def init_global_state(cls, lock):
|
|
if not cls.lock:
|
|
cls.lock = lock
|
|
|
|
@classmethod
|
|
@tenacity.retry( # Retry until we no longer have collisions
|
|
retry=tenacity.retry_if_exception_type(ImageUploaderThreadException),
|
|
wait=tenacity.wait_random_exponential(multiplier=1, max=10)
|
|
)
|
|
def _layer_fetch_lock(cls, layer):
|
|
if not cls.lock:
|
|
LOG.warning('No lock information provided for layer %s' % layer)
|
|
return
|
|
if layer in cls.lock.objects():
|
|
LOG.debug('[%s] Layer is being fetched by another thread' % layer)
|
|
raise ImageUploaderThreadException('layer being fetched')
|
|
known_path, image = image_utils.uploaded_layers_details(
|
|
cls._global_view_proxy(), layer, scope='local')
|
|
if not known_path or not image:
|
|
known_path, image = image_utils.uploaded_layers_details(
|
|
cls._global_view_proxy(), layer, scope='remote')
|
|
if image and known_path:
|
|
# already processed layers needs no further locking
|
|
return
|
|
with cls.lock.get_lock():
|
|
if layer in cls.lock.objects():
|
|
LOG.debug('Collision for lock %s' % layer)
|
|
raise ImageUploaderThreadException('layer conflict')
|
|
cls.lock.objects().append(layer)
|
|
LOG.debug('Got lock on layer %s' % layer)
|
|
|
|
@classmethod
|
|
def _layer_fetch_unlock(cls, layer):
|
|
if not cls.lock:
|
|
LOG.warning('No lock information provided for layer %s' % layer)
|
|
return
|
|
with cls.lock.get_lock():
|
|
while layer in cls.lock.objects():
|
|
cls.lock.objects().remove(layer)
|
|
LOG.debug('Released lock on layer %s' % layer)
|
|
|
|
@classmethod
|
|
def _global_view_proxy(cls, value=None, forget=False):
|
|
if not cls.lock:
|
|
LOG.warning('No lock information provided for value %s' % value)
|
|
return
|
|
with cls.lock.get_lock():
|
|
if value and forget:
|
|
cls.uploaded_layers.pop(value, None)
|
|
if hasattr(cls.lock, '_global_view'):
|
|
cls.lock._global_view.pop(value, None)
|
|
elif value:
|
|
cls.uploaded_layers.update(value)
|
|
if hasattr(cls.lock, '_global_view'):
|
|
cls.lock._global_view.update(value)
|
|
|
|
if not value:
|
|
# return global view consolidated among MP/MT workers state
|
|
if hasattr(cls.lock, '_global_view'):
|
|
consolidated_view = cls.uploaded_layers.copy()
|
|
consolidated_view.update(cls.lock._global_view)
|
|
return consolidated_view
|
|
else:
|
|
return cls.uploaded_layers
|
|
|
|
@classmethod
|
|
def _track_uploaded_layers(cls, layer, known_path=None, image_ref=None,
|
|
forget=False, scope='remote'):
|
|
if forget:
|
|
LOG.debug('Untracking processed layer %s for any scope' % layer)
|
|
cls._global_view_proxy(value=layer, forget=True)
|
|
else:
|
|
LOG.debug('Tracking processed layer %s for %s scope'
|
|
% (layer, scope))
|
|
cls._global_view_proxy(
|
|
value={layer: {scope: {'ref': image_ref, 'path': known_path}}})
|
|
|
|
def upload_image(self, task):
|
|
"""Upload image from a task
|
|
|
|
This function takes an UploadTask and pushes it to the appropriate
|
|
target destinations. It should be noted that if the source container
|
|
is prefix with 'containers-storage:' instead of 'docker://' or no
|
|
prefix, this process will assume that the source container is already
|
|
local to the system. The local container upload does not currently
|
|
support any of the modification actions. In order to run the
|
|
modification actions on a container prior to upload, the source must
|
|
be a remote image. Additionally, cleanup has no affect when
|
|
uploading a local image as well.
|
|
|
|
:param: task: UploadTask with container information
|
|
"""
|
|
t = task
|
|
LOG.info('[%s] Starting upload image process' % t.image_name)
|
|
|
|
source_local = t.source_image.startswith('containers-storage:')
|
|
target_image_local_url = parse.urlparse('containers-storage:%s' %
|
|
t.target_image)
|
|
if t.dry_run:
|
|
return []
|
|
|
|
target_username, target_password = self.credentials_for_registry(
|
|
t.target_image_url.netloc)
|
|
try:
|
|
target_session = self.authenticate(
|
|
t.target_image_url,
|
|
username=target_username,
|
|
password=target_password
|
|
)
|
|
except requests.exceptions.HTTPError as e:
|
|
if e.response.status_code == 401:
|
|
raise ImageUploaderException(
|
|
'Unable to authenticate. This may indicate '
|
|
'missing registry credentials or the provided '
|
|
'container or namespace does not exist. %s' % e)
|
|
raise
|
|
|
|
try:
|
|
self._detect_target_export(t.target_image_url, target_session)
|
|
except Exception:
|
|
LOG.error('[%s] Failed uploading the target '
|
|
'image' % t.target_image)
|
|
# Close the session before raising it for more of retrying perhaps
|
|
target_session.close()
|
|
raise
|
|
|
|
if source_local:
|
|
if t.modify_role:
|
|
target_session.close()
|
|
raise NotImplementedError('Modify role not implemented for '
|
|
'local containers')
|
|
if t.cleanup:
|
|
LOG.warning('[%s] Cleanup has no effect with a local source '
|
|
'container.' % t.image_name)
|
|
|
|
try:
|
|
source_local_url = parse.urlparse(t.source_image)
|
|
# Copy from local storage to target registry
|
|
self._copy_local_to_registry(
|
|
source_local_url,
|
|
t.target_image_url,
|
|
session=target_session
|
|
)
|
|
except Exception:
|
|
LOG.warning('[%s] Failed copying the target image '
|
|
'to the target registry' % t.target_image)
|
|
pass
|
|
target_session.close()
|
|
return []
|
|
|
|
if t.modify_role:
|
|
image_exists = False
|
|
try:
|
|
image_exists = self._image_exists(t.target_image,
|
|
target_session)
|
|
except Exception:
|
|
LOG.warning('[%s] Failed to check if the target '
|
|
'image exists' % t.target_image)
|
|
pass
|
|
if image_exists:
|
|
LOG.warning('[%s] Skipping upload for modified image %s' %
|
|
(t.image_name, t.target_image))
|
|
target_session.close()
|
|
return []
|
|
copy_target_url = t.target_image_source_tag_url
|
|
else:
|
|
copy_target_url = t.target_image_url
|
|
# Keep the target session open yet
|
|
|
|
source_username, source_password = self.credentials_for_registry(
|
|
t.source_image_url.netloc)
|
|
try:
|
|
source_session = self.authenticate(
|
|
t.source_image_url,
|
|
username=source_username,
|
|
password=source_password
|
|
)
|
|
except requests.exceptions.HTTPError as e:
|
|
if e.response.status_code == 401:
|
|
raise ImageUploaderException(
|
|
'Unable to authenticate. This may indicate '
|
|
'missing registry credentials or the provided '
|
|
'container or namespace does not exist. %s' % e)
|
|
raise
|
|
|
|
source_layers = []
|
|
manifests_str = []
|
|
try:
|
|
self._collect_manifests_layers(
|
|
t.source_image_url, source_session,
|
|
manifests_str, source_layers,
|
|
t.multi_arch
|
|
)
|
|
|
|
self._cross_repo_mount(
|
|
copy_target_url, self.image_layers, source_layers,
|
|
session=target_session)
|
|
to_cleanup = []
|
|
|
|
# Copy unmodified images from source to target
|
|
self._copy_registry_to_registry(
|
|
t.source_image_url,
|
|
copy_target_url,
|
|
source_manifests=manifests_str,
|
|
source_session=source_session,
|
|
target_session=target_session,
|
|
source_layers=source_layers,
|
|
multi_arch=t.multi_arch
|
|
)
|
|
except Exception:
|
|
LOG.error('[%s] Failed uploading the target '
|
|
'image' % t.target_image)
|
|
# Close the sessions before raising it for more of
|
|
# retrying perhaps
|
|
source_session.close()
|
|
target_session.close()
|
|
raise
|
|
|
|
if not t.modify_role:
|
|
LOG.info('[%s] Completed upload for image' % t.image_name)
|
|
else:
|
|
LOG.info('[%s] Copy ummodified image from target to local' %
|
|
t.image_name)
|
|
try:
|
|
self._copy_registry_to_local(t.target_image_source_tag_url)
|
|
|
|
if t.cleanup in (CLEANUP_FULL, CLEANUP_PARTIAL):
|
|
to_cleanup.append(t.target_image_source_tag)
|
|
|
|
self.run_modify_playbook(
|
|
t.modify_role,
|
|
t.modify_vars,
|
|
t.target_image_source_tag,
|
|
t.target_image_source_tag,
|
|
t.append_tag,
|
|
container_build_tool='buildah')
|
|
if t.cleanup == CLEANUP_FULL:
|
|
to_cleanup.append(t.target_image)
|
|
|
|
# cross-repo mount the unmodified image to the modified image
|
|
self._cross_repo_mount(
|
|
t.target_image_url, self.image_layers, source_layers,
|
|
session=target_session)
|
|
|
|
# Copy from local storage to target registry
|
|
self._copy_local_to_registry(
|
|
target_image_local_url,
|
|
t.target_image_url,
|
|
session=target_session
|
|
)
|
|
LOG.info('[%s] Completed modify and upload for image' %
|
|
t.image_name)
|
|
except Exception:
|
|
LOG.error('[%s] Failed processing the target '
|
|
'image' % t.target_image)
|
|
# Close the sessions before raising it for more of
|
|
# retrying perhaps
|
|
source_session.close()
|
|
target_session.close()
|
|
raise
|
|
|
|
try:
|
|
for layer in source_layers:
|
|
self.image_layers.setdefault(layer, t.target_image_url)
|
|
except Exception:
|
|
LOG.warning('[%s] Failed setting default layer %s for the '
|
|
'target image' % (t.target_image, layer))
|
|
pass
|
|
target_session.close()
|
|
source_session.close()
|
|
return to_cleanup
|
|
|
|
@classmethod
|
|
@tenacity.retry( # Retry up to 5 times with jittered exponential backoff
|
|
reraise=True,
|
|
retry=tenacity.retry_if_exception_type(
|
|
requests.exceptions.RequestException
|
|
),
|
|
wait=tenacity.wait_random_exponential(multiplier=1, max=10),
|
|
stop=tenacity.stop_after_attempt(5)
|
|
)
|
|
def _detect_target_export(cls, image_url, session):
|
|
if image_url.netloc in cls.export_registries:
|
|
return True
|
|
if image_url.netloc in cls.push_registries:
|
|
return False
|
|
|
|
# detect if the registry is push-capable by requesting an upload URL.
|
|
image, _ = cls._image_tag_from_url(image_url)
|
|
upload_req_url = cls._build_url(
|
|
image_url,
|
|
path=CALL_UPLOAD % {'image': image})
|
|
r = session.post(upload_req_url, timeout=30)
|
|
if r.status_code in (501, 403, 404, 405):
|
|
cls.export_registries.add(image_url.netloc)
|
|
return True
|
|
cls.check_status(session=session, request=r)
|
|
cls.push_registries.add(image_url.netloc)
|
|
return False
|
|
|
|
@classmethod
|
|
@tenacity.retry( # Retry up to 5 times with jittered exponential backoff
|
|
reraise=True,
|
|
retry=tenacity.retry_if_exception_type(
|
|
requests.exceptions.RequestException
|
|
),
|
|
wait=tenacity.wait_random_exponential(multiplier=1, max=10),
|
|
stop=tenacity.stop_after_attempt(5)
|
|
)
|
|
def _fetch_manifest(cls, url, session, multi_arch):
|
|
image, tag = cls._image_tag_from_url(url)
|
|
parts = {
|
|
'image': image,
|
|
'tag': tag
|
|
}
|
|
url = cls._build_url(
|
|
url, CALL_MANIFEST % parts
|
|
)
|
|
if multi_arch:
|
|
manifest_headers = {'Accept': MEDIA_MANIFEST_V2_LIST}
|
|
else:
|
|
manifest_headers = {'Accept': MEDIA_MANIFEST_V2}
|
|
r = session.get(url, headers=manifest_headers, timeout=30)
|
|
if r.status_code in (403, 404):
|
|
raise ImageNotFoundException('Not found image: %s' % url)
|
|
cls.check_status(session=session, request=r)
|
|
return cls._get_response_text(r)
|
|
|
|
def _collect_manifests_layers(self, image_url, session,
|
|
manifests_str, layers,
|
|
multi_arch):
|
|
manifest_str = self._fetch_manifest(
|
|
image_url,
|
|
session=session,
|
|
multi_arch=multi_arch
|
|
)
|
|
manifests_str.append(manifest_str)
|
|
manifest = json.loads(manifest_str)
|
|
if manifest.get('schemaVersion', 2) == 1:
|
|
layers.extend(reversed([l['blobSum']
|
|
for l in manifest['fsLayers']]))
|
|
elif manifest.get('mediaType') == MEDIA_MANIFEST_V2:
|
|
layers.extend(l['digest'] for l in manifest['layers'])
|
|
elif manifest.get('mediaType') == MEDIA_MANIFEST_V2_LIST:
|
|
image, _, tag = image_url.geturl().rpartition(':')
|
|
for man in manifest.get('manifests', []):
|
|
# replace image tag with the manifest hash in the list
|
|
man_url = parse.urlparse('%s@%s' % (image, man['digest']))
|
|
self._collect_manifests_layers(
|
|
man_url, session, manifests_str, layers,
|
|
multi_arch=False
|
|
)
|
|
|
|
@classmethod
|
|
@tenacity.retry( # Retry up to 5 times with jittered exponential backoff
|
|
reraise=True,
|
|
retry=tenacity.retry_if_exception_type(
|
|
requests.exceptions.RequestException
|
|
),
|
|
wait=tenacity.wait_random_exponential(multiplier=1, max=10),
|
|
stop=tenacity.stop_after_attempt(5)
|
|
)
|
|
def _upload_url(cls, image_url, session, previous_request=None):
|
|
if previous_request and 'Location' in previous_request.headers:
|
|
return previous_request.headers['Location']
|
|
|
|
image, tag = cls._image_tag_from_url(image_url)
|
|
upload_req_url = cls._build_url(
|
|
image_url,
|
|
path=CALL_UPLOAD % {'image': image})
|
|
r = session.post(upload_req_url, timeout=30)
|
|
cls.check_status(session=session, request=r)
|
|
return r.headers['Location']
|
|
|
|
@classmethod
|
|
@tenacity.retry( # Retry up to 5 times with jittered exponential backoff
|
|
reraise=True,
|
|
retry=tenacity.retry_if_exception_type(
|
|
requests.exceptions.RequestException
|
|
),
|
|
wait=tenacity.wait_random_exponential(multiplier=1, max=10),
|
|
stop=tenacity.stop_after_attempt(5)
|
|
)
|
|
def _layer_stream_registry(cls, digest, source_url, calc_digest,
|
|
session):
|
|
image, tag = cls._image_tag_from_url(source_url)
|
|
parts = {
|
|
'image': image,
|
|
'tag': tag,
|
|
'digest': digest
|
|
}
|
|
source_blob_url = cls._build_url(
|
|
source_url, CALL_BLOB % parts)
|
|
# NOTE(aschultz): We specify None and let requests figure it out
|
|
chunk_size = None
|
|
LOG.info("[%s] Fetching layer %s from %s" %
|
|
(image, digest, source_blob_url))
|
|
with session.get(
|
|
source_blob_url, stream=True, timeout=30) as blob_req:
|
|
# TODO(aschultz): unsure if necessary or if only when using .text
|
|
blob_req.encoding = 'utf-8'
|
|
cls.check_status(session=session, request=blob_req)
|
|
for data in blob_req.iter_content(chunk_size):
|
|
LOG.debug("[%s] Read %i bytes for %s" %
|
|
(image, len(data), digest))
|
|
if not data:
|
|
break
|
|
calc_digest.update(data)
|
|
yield data
|
|
LOG.info("[%s] Done fetching layer %s from registry" % (image, digest))
|
|
|
|
@classmethod
|
|
@tenacity.retry( # Retry up to 5 times with jittered exponential backoff
|
|
reraise=True,
|
|
retry=tenacity.retry_if_exception_type(
|
|
IOError
|
|
),
|
|
wait=tenacity.wait_random_exponential(multiplier=1, max=10),
|
|
stop=tenacity.stop_after_attempt(5)
|
|
)
|
|
def _copy_layer_registry_to_registry(cls, source_url, target_url,
|
|
layer,
|
|
source_session=None,
|
|
target_session=None):
|
|
layer_entry = {'digest': layer}
|
|
try:
|
|
cls._layer_fetch_lock(layer)
|
|
if cls._target_layer_exists_registry(
|
|
target_url, layer_entry, [layer_entry], target_session):
|
|
cls._layer_fetch_unlock(layer)
|
|
return
|
|
known_path, ref_image = image_utils.uploaded_layers_details(
|
|
cls._global_view_proxy(), layer, scope='local')
|
|
if known_path and ref_image:
|
|
# cross-link target from local source, skip fetching it again
|
|
image_export.layer_cross_link(
|
|
layer, ref_image, known_path, target_url)
|
|
cls._layer_fetch_unlock(layer)
|
|
return
|
|
except ImageUploaderThreadException:
|
|
# skip trying to unlock, because that's what threw the exception
|
|
raise
|
|
except Exception:
|
|
cls._layer_fetch_unlock(layer)
|
|
raise
|
|
|
|
digest = layer_entry['digest']
|
|
LOG.debug('[%s] Uploading layer' % digest)
|
|
|
|
calc_digest = hashlib.sha256()
|
|
known_path = None
|
|
layer_val = None
|
|
try:
|
|
layer_stream = cls._layer_stream_registry(
|
|
digest, source_url, calc_digest, source_session)
|
|
layer_val, known_path = cls._copy_stream_to_registry(
|
|
target_url, layer_entry, calc_digest, layer_stream,
|
|
target_session)
|
|
except (IOError, requests.exceptions.HTTPError):
|
|
cls._track_uploaded_layers(layer, forget=True, scope='remote')
|
|
LOG.error('[%s] Failed processing layer for the target '
|
|
'image %s' % (layer, target_url.geturl()))
|
|
raise
|
|
except Exception:
|
|
raise
|
|
else:
|
|
if layer_val and known_path:
|
|
image_ref = target_url.path.split(':')[0][1:]
|
|
uploaded = parse.urlparse(known_path).scheme
|
|
cls._track_uploaded_layers(
|
|
layer_val, known_path=known_path, image_ref=image_ref,
|
|
scope=('remote' if uploaded else 'local'))
|
|
return layer_val
|
|
finally:
|
|
cls._layer_fetch_unlock(layer)
|
|
|
|
@classmethod
|
|
def _assert_scheme(cls, url, scheme):
|
|
if url.scheme != scheme:
|
|
raise ImageUploaderException(
|
|
'Expected %s scheme: %s' % (scheme, url.geturl()))
|
|
|
|
@classmethod
|
|
@tenacity.retry( # Retry up to 5 times with jittered exponential backoff
|
|
reraise=True,
|
|
retry=tenacity.retry_if_exception_type(
|
|
requests.exceptions.RequestException
|
|
),
|
|
wait=tenacity.wait_random_exponential(multiplier=1, max=10),
|
|
stop=tenacity.stop_after_attempt(5)
|
|
)
|
|
def _copy_registry_to_registry(cls, source_url, target_url,
|
|
source_manifests,
|
|
source_session=None,
|
|
target_session=None,
|
|
source_layers=None,
|
|
multi_arch=False):
|
|
cls._assert_scheme(source_url, 'docker')
|
|
cls._assert_scheme(target_url, 'docker')
|
|
|
|
image, tag = cls._image_tag_from_url(source_url)
|
|
parts = {
|
|
'image': image,
|
|
'tag': tag
|
|
}
|
|
|
|
# Upload all layers
|
|
copy_jobs = []
|
|
jobs_count = 0
|
|
jobs_finished = 0
|
|
with futures.ThreadPoolExecutor(max_workers=4) as p:
|
|
if source_layers:
|
|
for layer in source_layers:
|
|
copy_jobs.append(p.submit(
|
|
cls._copy_layer_registry_to_registry,
|
|
source_url, target_url,
|
|
layer=layer,
|
|
source_session=source_session,
|
|
target_session=target_session
|
|
))
|
|
|
|
jobs_count = len(copy_jobs)
|
|
LOG.debug('[%s] Waiting for %i jobs to finish' %
|
|
(image, jobs_count))
|
|
for job in futures.as_completed(copy_jobs):
|
|
e = job.exception()
|
|
if e:
|
|
raise e
|
|
layer = job.result()
|
|
if layer:
|
|
LOG.debug('[%s] Upload complete for layer %s' %
|
|
(image, layer))
|
|
jobs_finished += 1
|
|
LOG.debug('[%s] Waiting for next job: %i of %i complete' %
|
|
(image, jobs_finished, jobs_count))
|
|
|
|
LOG.debug('[%s] Completed %i jobs' % (image, jobs_count))
|
|
|
|
for source_manifest in source_manifests:
|
|
manifest = json.loads(source_manifest)
|
|
config_str = None
|
|
if manifest.get('mediaType') == MEDIA_MANIFEST_V2:
|
|
config_digest = manifest['config']['digest']
|
|
LOG.debug('[%s] Uploading config with digest: %s' %
|
|
(image, config_digest))
|
|
|
|
parts['digest'] = config_digest
|
|
source_config_url = cls._build_url(
|
|
source_url,
|
|
CALL_BLOB % parts
|
|
)
|
|
|
|
r = source_session.get(source_config_url, timeout=30)
|
|
cls.check_status(
|
|
session=source_session,
|
|
request=r
|
|
)
|
|
config_str = cls._get_response_text(r)
|
|
manifest['config']['size'] = len(config_str)
|
|
manifest['config']['mediaType'] = MEDIA_CONFIG
|
|
|
|
cls._copy_manifest_config_to_registry(
|
|
target_url=target_url,
|
|
manifest_str=source_manifest,
|
|
config_str=config_str,
|
|
target_session=target_session,
|
|
multi_arch=multi_arch
|
|
)
|
|
LOG.debug('[%s] Finished copying image' % image)
|
|
|
|
@classmethod
|
|
def _copy_manifest_config_to_registry(cls, target_url,
|
|
manifest_str,
|
|
config_str,
|
|
target_session=None,
|
|
multi_arch=False):
|
|
|
|
manifest = json.loads(manifest_str)
|
|
if manifest.get('schemaVersion', 2) == 1:
|
|
if 'signatures' in manifest:
|
|
manifest_type = MEDIA_MANIFEST_V1_SIGNED
|
|
else:
|
|
manifest_type = MEDIA_MANIFEST_V1
|
|
else:
|
|
manifest_type = manifest.get(
|
|
'mediaType', MEDIA_MANIFEST_V2)
|
|
manifest_str = json.dumps(manifest, indent=3)
|
|
|
|
export = target_url.netloc in cls.export_registries
|
|
if export:
|
|
image_export.export_manifest_config(
|
|
target_url,
|
|
manifest_str,
|
|
manifest_type,
|
|
config_str,
|
|
multi_arch=multi_arch
|
|
)
|
|
return
|
|
|
|
if config_str is not None:
|
|
config_digest = manifest['config']['digest']
|
|
# Upload the config json as a blob
|
|
upload_url = cls._upload_url(
|
|
target_url,
|
|
session=target_session)
|
|
r = target_session.put(
|
|
upload_url,
|
|
timeout=30,
|
|
params={
|
|
'digest': config_digest
|
|
},
|
|
data=config_str.encode('utf-8'),
|
|
headers={
|
|
'Content-Length': str(len(config_str)),
|
|
'Content-Type': 'application/octet-stream'
|
|
}
|
|
)
|
|
cls.check_status(session=target_session, request=r)
|
|
|
|
# Upload the manifest
|
|
image, tag = cls._image_tag_from_url(target_url)
|
|
parts = {
|
|
'image': image,
|
|
'tag': tag
|
|
}
|
|
manifest_url = cls._build_url(
|
|
target_url, CALL_MANIFEST % parts)
|
|
|
|
LOG.debug('[%s] Uploading manifest of type %s to: %s' %
|
|
(image, manifest_type, manifest_url))
|
|
|
|
r = target_session.put(
|
|
manifest_url,
|
|
timeout=30,
|
|
data=manifest_str.encode('utf-8'),
|
|
headers={
|
|
'Content-Type': manifest_type
|
|
}
|
|
)
|
|
if r.status_code == 400:
|
|
LOG.error(cls._get_response_text(r))
|
|
raise ImageUploaderException('Pushing manifest failed')
|
|
cls.check_status(session=target_session, request=r)
|
|
|
|
@classmethod
|
|
@tenacity.retry( # Retry up to 5 times with jittered exponential backoff
|
|
reraise=True,
|
|
wait=tenacity.wait_random_exponential(multiplier=1, max=10),
|
|
stop=tenacity.stop_after_attempt(5)
|
|
)
|
|
def _copy_registry_to_local(cls, source_url):
|
|
cls._assert_scheme(source_url, 'docker')
|
|
pull_source = source_url.netloc + source_url.path
|
|
cmd = ['buildah', '--debug', 'pull']
|
|
|
|
if source_url.netloc in [cls.insecure_registries,
|
|
cls.no_verify_registries]:
|
|
cmd.append('--tls-verify=false')
|
|
|
|
cmd.append(pull_source)
|
|
LOG.info('Pulling %s' % pull_source)
|
|
LOG.info('Running %s' % ' '.join(cmd))
|
|
try:
|
|
process = subprocess.Popen(
|
|
cmd,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE,
|
|
universal_newlines=True,
|
|
close_fds=True
|
|
)
|
|
out, err = process.communicate()
|
|
if process.returncode != 0:
|
|
error_msg = (
|
|
'Pulling image failed: cmd "{}", stdout "{}",'
|
|
' stderr "{}"'.format(
|
|
' '.join(cmd),
|
|
out,
|
|
err
|
|
)
|
|
)
|
|
LOG.error(error_msg)
|
|
raise ImageUploaderException(error_msg)
|
|
except KeyboardInterrupt:
|
|
raise Exception('Action interrupted with ctrl+c')
|
|
return out
|
|
|
|
@classmethod
|
|
def _target_layer_exists_registry(cls, target_url, layer, check_layers,
|
|
session):
|
|
image, tag = cls._image_tag_from_url(target_url)
|
|
norm_image = (image[1:] if image.startswith('/') else image)
|
|
parts = {
|
|
'image': image,
|
|
'tag': tag
|
|
}
|
|
layer_found = None
|
|
# Check in global view or do a HEAD call for the supplied
|
|
# digests to see if the layer is already in the registry
|
|
for l in check_layers:
|
|
if not l:
|
|
continue
|
|
known_path, ref_image = image_utils.uploaded_layers_details(
|
|
cls._global_view_proxy(), l['digest'], scope='remote')
|
|
if ref_image == norm_image:
|
|
LOG.debug('[%s] Layer %s already exists at %s' %
|
|
(image, l['digest'], known_path))
|
|
layer_found = l
|
|
break
|
|
else:
|
|
parts['digest'] = l['digest']
|
|
blob_url = cls._build_url(
|
|
target_url, CALL_BLOB % parts)
|
|
if session.head(blob_url, timeout=30).status_code == 200:
|
|
LOG.debug('[%s] Layer already exists: %s' %
|
|
(image, l['digest']))
|
|
layer_found = l
|
|
break
|
|
if layer_found:
|
|
layer['digest'] = layer_found['digest']
|
|
if 'size' in layer_found:
|
|
layer['size'] = layer_found['size']
|
|
if 'mediaType' in layer_found:
|
|
layer['mediaType'] = layer_found['mediaType']
|
|
return True
|
|
return False
|
|
|
|
@classmethod
|
|
def _layer_stream_local(cls, layer_id, calc_digest):
|
|
LOG.debug('[%s] Exporting layer' % layer_id)
|
|
|
|
tar_split_path = cls._containers_file_path(
|
|
'overlay-layers',
|
|
'%s.tar-split.gz' % layer_id
|
|
)
|
|
overlay_path = cls._containers_file_path(
|
|
'overlay', layer_id, 'diff'
|
|
)
|
|
cmd = [
|
|
'tar-split', 'asm',
|
|
'--input', tar_split_path,
|
|
'--path', overlay_path,
|
|
'--compress'
|
|
]
|
|
LOG.debug(' '.join(cmd))
|
|
try:
|
|
p = subprocess.Popen(cmd, stdout=subprocess.PIPE)
|
|
|
|
chunk_size = 2 ** 20
|
|
|
|
while True:
|
|
data = p.stdout.read(chunk_size)
|
|
if not data:
|
|
break
|
|
calc_digest.update(data)
|
|
yield data
|
|
p.wait()
|
|
if p.returncode != 0:
|
|
raise ImageUploaderException('Extracting layer failed')
|
|
except KeyboardInterrupt:
|
|
raise Exception('Action interrupted with ctrl+c')
|
|
|
|
@classmethod
|
|
@tenacity.retry( # Retry up to 5 times with jittered exponential backoff
|
|
reraise=True,
|
|
retry=tenacity.retry_if_exception_type(
|
|
requests.exceptions.RequestException
|
|
),
|
|
wait=tenacity.wait_random_exponential(multiplier=1, max=10),
|
|
stop=tenacity.stop_after_attempt(5)
|
|
)
|
|
def _copy_layer_local_to_registry(cls, target_url,
|
|
session, layer, layer_entry):
|
|
|
|
# Check in global view or do a HEAD call for the compressed-diff-digest
|
|
# and diff-digest to see if the layer is already in the registry
|
|
check_layers = []
|
|
compressed_digest = layer_entry.get('compressed-diff-digest')
|
|
if compressed_digest:
|
|
check_layers.append({
|
|
'digest': compressed_digest,
|
|
'size': layer_entry.get('compressed-size'),
|
|
'mediaType': MEDIA_BLOB_COMPRESSED,
|
|
})
|
|
|
|
digest = layer_entry.get('diff-digest')
|
|
if digest:
|
|
check_layers.append({
|
|
'digest': digest,
|
|
'size': layer_entry.get('diff-size'),
|
|
'mediaType': MEDIA_BLOB,
|
|
})
|
|
if cls._target_layer_exists_registry(target_url, layer, check_layers,
|
|
session):
|
|
return
|
|
|
|
layer_id = layer_entry['id']
|
|
LOG.debug('[%s] Uploading layer' % layer_id)
|
|
|
|
calc_digest = hashlib.sha256()
|
|
known_path = None
|
|
layer_val = None
|
|
try:
|
|
layer_stream = cls._layer_stream_local(layer_id, calc_digest)
|
|
layer_val, known_path = cls._copy_stream_to_registry(
|
|
target_url, layer, calc_digest, layer_stream, session,
|
|
verify_digest=False)
|
|
except (IOError, requests.exceptions.HTTPError):
|
|
cls._track_uploaded_layers(
|
|
layer['digest'], forget=True, scope='remote')
|
|
LOG.error('[%s] Failed processing layer for the target '
|
|
'image %s' % (layer['digest'], target_url.geturl()))
|
|
raise
|
|
except Exception:
|
|
raise
|
|
else:
|
|
if layer_val and known_path:
|
|
image_ref = target_url.path.split(':')[0][1:]
|
|
uploaded = parse.urlparse(known_path).scheme
|
|
cls._track_uploaded_layers(
|
|
layer_val, known_path=known_path, image_ref=image_ref,
|
|
scope=('remote' if uploaded else 'local'))
|
|
return layer_val
|
|
|
|
@classmethod
|
|
def _copy_stream_to_registry(cls, target_url, layer, calc_digest,
|
|
layer_stream, session, verify_digest=True):
|
|
layer['mediaType'] = MEDIA_BLOB_COMPRESSED
|
|
length = 0
|
|
upload_resp = None
|
|
|
|
export = target_url.netloc in cls.export_registries
|
|
if export:
|
|
return image_export.export_stream(
|
|
target_url, layer, layer_stream, verify_digest=verify_digest)
|
|
|
|
for chunk in layer_stream:
|
|
if not chunk:
|
|
break
|
|
|
|
chunk_length = len(chunk)
|
|
upload_url = cls._upload_url(
|
|
target_url, session, upload_resp)
|
|
upload_resp = session.patch(
|
|
upload_url,
|
|
timeout=30,
|
|
data=chunk,
|
|
headers={
|
|
'Content-Length': str(chunk_length),
|
|
'Content-Range': '%d-%d' % (
|
|
length, length + chunk_length - 1),
|
|
'Content-Type': 'application/octet-stream'
|
|
}
|
|
)
|
|
cls.check_status(session=session, request=upload_resp)
|
|
length += chunk_length
|
|
|
|
layer_digest = 'sha256:%s' % calc_digest.hexdigest()
|
|
LOG.debug('[%s] Calculated layer digest' % layer_digest)
|
|
upload_url = cls._upload_url(
|
|
target_url, session, upload_resp)
|
|
upload_resp = session.put(
|
|
upload_url,
|
|
timeout=30,
|
|
params={
|
|
'digest': layer_digest
|
|
},
|
|
)
|
|
cls.check_status(session=session, request=upload_resp)
|
|
layer['digest'] = layer_digest
|
|
layer['size'] = length
|
|
return (layer_digest, cls._build_url(target_url, target_url.path))
|
|
|
|
@classmethod
|
|
@tenacity.retry( # Retry up to 5 times with jittered exponential backoff
|
|
reraise=True,
|
|
retry=tenacity.retry_if_exception_type(
|
|
requests.exceptions.RequestException
|
|
),
|
|
wait=tenacity.wait_random_exponential(multiplier=1, max=10),
|
|
stop=tenacity.stop_after_attempt(5)
|
|
)
|
|
def _copy_local_to_registry(cls, source_url, target_url, session):
|
|
cls._assert_scheme(source_url, 'containers-storage')
|
|
cls._assert_scheme(target_url, 'docker')
|
|
|
|
name = '%s%s' % (source_url.netloc, source_url.path)
|
|
image, manifest, config_str = cls._image_manifest_config(name)
|
|
all_layers = cls._containers_json('overlay-layers', 'layers.json')
|
|
layers_by_digest = {}
|
|
for l in all_layers:
|
|
if 'diff-digest' in l:
|
|
layers_by_digest[l['diff-digest']] = l
|
|
if 'compressed-diff-digest' in l:
|
|
layers_by_digest[l['compressed-diff-digest']] = l
|
|
|
|
# Upload all layers
|
|
copy_jobs = []
|
|
jobs_count = 0
|
|
jobs_finished = 0
|
|
with futures.ThreadPoolExecutor(max_workers=4) as p:
|
|
for layer in manifest['layers']:
|
|
layer_entry = layers_by_digest[layer['digest']]
|
|
copy_jobs.append(p.submit(
|
|
cls._copy_layer_local_to_registry,
|
|
target_url, session, layer, layer_entry
|
|
))
|
|
jobs_count = len(copy_jobs)
|
|
LOG.debug('[%s] Waiting for %i jobs to finish' %
|
|
(name, jobs_count))
|
|
for job in futures.as_completed(copy_jobs):
|
|
e = job.exception()
|
|
if e:
|
|
raise e
|
|
layer = job.result()
|
|
if layer:
|
|
LOG.debug('[%s] Upload complete for layer: %s' %
|
|
(name, layer))
|
|
jobs_finished += 1
|
|
LOG.debug('[%s] Waiting for next job: %i of %i complete' %
|
|
(name, jobs_finished, jobs_count))
|
|
|
|
LOG.debug('[%s] Completed %i jobs' % (name, jobs_count))
|
|
|
|
manifest_str = json.dumps(manifest, indent=3)
|
|
cls._copy_manifest_config_to_registry(
|
|
target_url=target_url,
|
|
manifest_str=manifest_str,
|
|
config_str=config_str,
|
|
target_session=session
|
|
)
|
|
LOG.debug('[%s] Finished copying' % name)
|
|
|
|
@classmethod
|
|
def _containers_file_path(cls, *path):
|
|
full_path = os.path.join('/var/lib/containers/storage/', *path)
|
|
if not os.path.exists(full_path):
|
|
raise ImageUploaderException('Missing file %s' % full_path)
|
|
return full_path
|
|
|
|
@classmethod
|
|
def _containers_file(cls, *path):
|
|
full_path = cls._containers_file_path(*path)
|
|
|
|
try:
|
|
with open(full_path, 'r') as f:
|
|
return f.read()
|
|
except Exception as e:
|
|
raise ImageUploaderException(e)
|
|
|
|
@classmethod
|
|
def _containers_json(cls, *path):
|
|
return json.loads(cls._containers_file(*path))
|
|
|
|
@classmethod
|
|
def _image_manifest_config(cls, name):
|
|
image = None
|
|
images = cls._containers_json('overlay-images', 'images.json')
|
|
for i in images:
|
|
for n in i.get('names', []):
|
|
if name == n:
|
|
image = i
|
|
break
|
|
if image:
|
|
break
|
|
if not image:
|
|
raise ImageNotFoundException('Not found image: %s' % name)
|
|
image_id = image['id']
|
|
manifest = cls._containers_json('overlay-images', image_id, 'manifest')
|
|
config_digest = manifest['config']['digest']
|
|
|
|
config_id = '=' + base64.b64encode(
|
|
six.b(config_digest)).decode("utf-8")
|
|
config_str = cls._containers_file('overlay-images', image_id,
|
|
config_id)
|
|
manifest['config']['size'] = len(config_str)
|
|
manifest['config']['mediaType'] = MEDIA_CONFIG
|
|
return image, manifest, config_str
|
|
|
|
@classmethod
|
|
def _inspect(cls, image_url, session=None):
|
|
if image_url.scheme == 'docker':
|
|
return super(PythonImageUploader, cls)._inspect(
|
|
image_url, session=session)
|
|
if image_url.scheme != 'containers-storage':
|
|
raise ImageUploaderException('Inspect not implemented for %s' %
|
|
image_url.geturl())
|
|
|
|
name = '%s%s' % (image_url.netloc, image_url.path)
|
|
image, manifest, config_str = cls._image_manifest_config(name)
|
|
config = json.loads(config_str)
|
|
|
|
layers = [l['digest'] for l in manifest['layers']]
|
|
i, _ = cls._image_tag_from_url(image_url)
|
|
digest = image['digest']
|
|
created = image['created']
|
|
labels = config['config'].get('Labels', {})
|
|
# NOTE: labels can be null
|
|
if labels is None:
|
|
labels = {}
|
|
architecture = config['architecture']
|
|
image_os = config['os']
|
|
return {
|
|
'Name': i,
|
|
'Digest': digest,
|
|
'RepoTags': [],
|
|
'Created': created,
|
|
'DockerVersion': '',
|
|
'Labels': labels,
|
|
'Architecture': architecture,
|
|
'Os': image_os,
|
|
'Layers': layers,
|
|
}
|
|
|
|
@classmethod
|
|
def _delete_from_registry(cls, image_url, session=None):
|
|
if not cls._detect_target_export(image_url, session):
|
|
raise NotImplementedError(
|
|
'Deleting not supported via the registry API')
|
|
return image_export.delete_image(image_url)
|
|
|
|
@classmethod
|
|
def _delete(cls, image_url, session=None):
|
|
image = image_url.geturl()
|
|
LOG.info('[%s] Deleting image' % image)
|
|
if image_url.scheme == 'docker':
|
|
return cls._delete_from_registry(image_url, session)
|
|
if image_url.scheme != 'containers-storage':
|
|
raise ImageUploaderException('Delete not implemented for %s' %
|
|
image_url.geturl())
|
|
cmd = ['buildah', 'rmi', image_url.path]
|
|
LOG.info('Running %s' % ' '.join(cmd))
|
|
env = os.environ.copy()
|
|
try:
|
|
process = subprocess.Popen(cmd, env=env, stdout=subprocess.PIPE,
|
|
universal_newlines=True)
|
|
|
|
out, err = process.communicate()
|
|
LOG.info(out)
|
|
if process.returncode != 0:
|
|
LOG.warning('Error deleting image:\n%s\n%s' %
|
|
(' '.join(cmd), err))
|
|
except KeyboardInterrupt:
|
|
raise Exception('Action interrupted with ctrl+c')
|
|
return out
|
|
|
|
def cleanup(self, local_images):
|
|
if not local_images:
|
|
return []
|
|
|
|
for image in sorted(local_images):
|
|
if not image:
|
|
continue
|
|
LOG.info('[%s] Removing local copy of image' % image)
|
|
image_url = parse.urlparse('containers-storage:%s' % image)
|
|
self._delete(image_url)
|
|
|
|
def _get_executor(self):
|
|
"""Get executor type based on lock object
|
|
|
|
We check to see if the lock object is not set or if it is a threading
|
|
lock. We cannot check if it is a ProcessLock due to the side effect
|
|
of trying to include ProcessLock when running under Mistral breaks
|
|
Mistral.
|
|
"""
|
|
if not self.lock or isinstance(self.lock, threadinglock.ThreadingLock):
|
|
# workers will scale from 2 to 8 based on the cpu count // 2
|
|
workers = min(max(2, processutils.get_worker_count() // 2), 8)
|
|
return futures.ThreadPoolExecutor(max_workers=workers)
|
|
else:
|
|
# there really isn't an improvement with > 4 workers due to the
|
|
# container layer overlaps. The higher the workers, the more
|
|
# RAM required which can lead to OOMs. It's best to limit to 4
|
|
return futures.ProcessPoolExecutor(max_workers=4)
|
|
|
|
def run_tasks(self):
|
|
if not self.upload_tasks:
|
|
return
|
|
local_images = []
|
|
|
|
with self._get_executor() as p:
|
|
for result in p.map(upload_task, self.upload_tasks):
|
|
local_images.extend(result)
|
|
LOG.info('result %s' % local_images)
|
|
|
|
# Do cleanup after all the uploads so common layers don't get deleted
|
|
# repeatedly
|
|
self.cleanup(local_images)
|
|
|
|
|
|
class UploadTask(object):
|
|
|
|
def __init__(self, image_name, pull_source, push_destination,
|
|
append_tag, modify_role, modify_vars, dry_run, cleanup,
|
|
multi_arch):
|
|
self.image_name = image_name
|
|
self.pull_source = pull_source
|
|
self.push_destination = push_destination
|
|
self.append_tag = append_tag or ''
|
|
self.modify_role = modify_role
|
|
self.modify_vars = modify_vars
|
|
self.dry_run = dry_run
|
|
self.cleanup = cleanup
|
|
self.multi_arch = multi_arch
|
|
|
|
if ':' in image_name:
|
|
image = image_name.rpartition(':')[0]
|
|
self.source_tag = image_name.rpartition(':')[2]
|
|
else:
|
|
image = image_name
|
|
self.source_tag = 'latest'
|
|
if pull_source:
|
|
# prevent a double // in the url which causes auth problems
|
|
# with docker.io
|
|
if pull_source.endswith('/'):
|
|
pull_source = pull_source[:-1]
|
|
self.repo = pull_source + '/' + image
|
|
else:
|
|
self.repo = image
|
|
|
|
if push_destination.endswith('/'):
|
|
push_destination = push_destination[:-1]
|
|
self.target_image_no_tag = (push_destination + '/' +
|
|
self.repo.partition('/')[2])
|
|
self.target_tag = self.source_tag + self.append_tag
|
|
self.source_image = self.repo + ':' + self.source_tag
|
|
self.target_image_source_tag = (self.target_image_no_tag + ':' +
|
|
self.source_tag)
|
|
self.target_image = self.target_image_no_tag + ':' + self.target_tag
|
|
|
|
image_to_url = BaseImageUploader._image_to_url
|
|
self.source_image_url = image_to_url(self.source_image)
|
|
self.target_image_url = image_to_url(self.target_image)
|
|
self.target_image_source_tag_url = image_to_url(
|
|
self.target_image_source_tag
|
|
)
|
|
|
|
|
|
def upload_task(args):
|
|
uploader, task = args
|
|
return uploader.upload_image(task)
|
|
|
|
|
|
def discover_tag_from_inspect(args):
|
|
self, image, tag_from_label = args
|
|
image_url = self._image_to_url(image)
|
|
username, password = self.credentials_for_registry(image_url.netloc)
|
|
try:
|
|
session = self.authenticate(
|
|
image_url, username=username, password=password)
|
|
except requests.exceptions.HTTPError as e:
|
|
if e.response.status_code == 401:
|
|
raise ImageUploaderException(
|
|
'Unable to authenticate. This may indicate '
|
|
'missing registry credentials or the provided '
|
|
'container or namespace does not exist. %s' % e)
|
|
raise
|
|
i = self._inspect(image_url, session=session)
|
|
session.close()
|
|
if ':' in image_url.path:
|
|
# break out the tag from the url to be the fallback tag
|
|
path = image.rpartition(':')
|
|
fallback_tag = path[2]
|
|
image = path[0]
|
|
else:
|
|
fallback_tag = None
|
|
return image, self._discover_tag_from_inspect(
|
|
i, image, tag_from_label, fallback_tag)
|
|
|
|
|
|
def tags_for_image(args):
|
|
self, image, session = args
|
|
return self._tags_for_image(image, session)
|