tripleo-common/tripleo_common/image/image_uploader.py

569 lines
20 KiB
Python

# Copyright 2015 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
import abc
from concurrent import futures
import json
import netifaces
import os
import requests
import shutil
import six
from six.moves import urllib
import subprocess
import tempfile
import tenacity
import yaml
import docker
try:
from docker import APIClient as Client
except ImportError:
from docker import Client
from oslo_concurrency import processutils
from oslo_log import log as logging
from tripleo_common.actions import ansible
from tripleo_common.image.base import BaseImageManager
from tripleo_common.image.exception import ImageNotFoundException
from tripleo_common.image.exception import ImageUploaderException
LOG = logging.getLogger(__name__)
SECURE_REGISTRIES = (
'trunk.registry.rdoproject.org',
'docker.io',
)
CLEANUP = (
CLEANUP_FULL, CLEANUP_PARTIAL, CLEANUP_NONE
) = (
'full', 'partial', 'none'
)
def get_undercloud_registry():
addr = 'localhost'
if 'br-ctlplane' in netifaces.interfaces():
addrs = netifaces.ifaddresses('br-ctlplane')
if netifaces.AF_INET in addrs and addrs[netifaces.AF_INET]:
addr = addrs[netifaces.AF_INET][0].get('addr', 'localhost')
return '%s:%s' % (addr, '8787')
class ImageUploadManager(BaseImageManager):
"""Manage the uploading of image files
Manage the uploading of images from a config file specified in YAML
syntax. Multiple config files can be specified. They will be merged.
"""
def __init__(self, config_files=None, verbose=False, debug=False,
dry_run=False, cleanup=CLEANUP_FULL):
if config_files is None:
config_files = []
super(ImageUploadManager, self).__init__(config_files)
self.uploaders = {}
self.dry_run = dry_run
self.cleanup = cleanup
def discover_image_tag(self, image, tag_from_label=None):
uploader = self.uploader('docker')
return uploader.discover_image_tag(
image, tag_from_label=tag_from_label)
def uploader(self, uploader):
if uploader not in self.uploaders:
self.uploaders[uploader] = ImageUploader.get_uploader(uploader)
return self.uploaders[uploader]
def get_push_destination(self, item):
push_destination = item.get('push_destination')
if not push_destination:
return get_undercloud_registry()
# If set to True, use discovered undercloud registry
if isinstance(push_destination, bool):
return get_undercloud_registry()
return push_destination
def upload(self):
"""Start the upload process"""
LOG.info('Using config files: %s' % self.config_files)
uploads = self.load_config_files(self.UPLOADS) or []
container_images = self.load_config_files(self.CONTAINER_IMAGES) or []
upload_images = uploads + container_images
for item in upload_images:
image_name = item.get('imagename')
uploader = item.get('uploader', 'docker')
pull_source = item.get('pull_source')
push_destination = self.get_push_destination(item)
# This updates the parsed upload_images dict with real values
item['push_destination'] = push_destination
append_tag = item.get('modify_append_tag')
modify_role = item.get('modify_role')
modify_vars = item.get('modify_vars')
self.uploader(uploader).add_upload_task(
image_name, pull_source, push_destination,
append_tag, modify_role, modify_vars, self.dry_run,
self.cleanup)
for uploader in self.uploaders.values():
uploader.run_tasks()
return upload_images # simply to make test validation easier
@six.add_metaclass(abc.ABCMeta)
class ImageUploader(object):
"""Base representation of an image uploading method"""
@staticmethod
def get_uploader(uploader):
if uploader == 'docker':
return DockerImageUploader()
raise ImageUploaderException('Unknown image uploader type')
@abc.abstractmethod
def run_tasks(self):
"""Run all tasks"""
pass
@abc.abstractmethod
def add_upload_task(self, image_name, pull_source, push_destination,
append_tag, modify_role, modify_vars, dry_run,
cleanup):
"""Add an upload task to be executed later"""
pass
@abc.abstractmethod
def discover_image_tag(self, image, tag_from_label=None):
"""Discover a versioned tag for an image"""
pass
@abc.abstractmethod
def cleanup(self):
"""Remove unused images or temporary files from upload"""
pass
@abc.abstractmethod
def is_insecure_registry(self, registry_host):
"""Detect whether a registry host is not configured with TLS"""
pass
class DockerImageUploader(ImageUploader):
"""Upload images using docker push"""
def __init__(self):
self.upload_tasks = []
self.secure_registries = set(SECURE_REGISTRIES)
self.insecure_registries = set()
@staticmethod
def run_modify_playbook(modify_role, modify_vars,
source_image, target_image, append_tag):
vars = {}
if modify_vars:
vars.update(modify_vars)
vars['source_image'] = source_image
vars['target_image'] = target_image
vars['modified_append_tag'] = append_tag
LOG.debug('Playbook variables: \n%s' % yaml.safe_dump(
vars, default_flow_style=False))
playbook = [{
'hosts': 'localhost',
'tasks': [{
'name': 'Import role %s' % modify_role,
'import_role': {
'name': modify_role
},
'vars': vars
}]
}]
LOG.debug('Playbook: \n%s' % yaml.safe_dump(
playbook, default_flow_style=False))
work_dir = tempfile.mkdtemp(prefix='tripleo-modify-image-playbook-')
try:
action = ansible.AnsiblePlaybookAction(
playbook=playbook,
work_dir=work_dir
)
result = action.run(None)
LOG.debug(result.get('stdout', ''))
finally:
shutil.rmtree(work_dir)
@staticmethod
def upload_image(image_name, pull_source, push_destination,
insecure_registries, append_tag, modify_role,
modify_vars, dry_run, cleanup):
LOG.info('imagename: %s' % image_name)
if ':' in image_name:
image = image_name.rpartition(':')[0]
source_tag = image_name.rpartition(':')[2]
else:
image = image_name
source_tag = 'latest'
if pull_source:
repo = pull_source + '/' + image
else:
repo = image
source_image = repo + ':' + source_tag
target_image_no_tag = push_destination + '/' + repo.partition('/')[2]
append_tag = append_tag or ''
target_tag = source_tag + append_tag
target_image_source_tag = target_image_no_tag + ':' + source_tag
target_image = target_image_no_tag + ':' + target_tag
if dry_run:
return []
if modify_role:
if DockerImageUploader._image_exists(target_image,
insecure_registries):
LOG.warning('Skipping upload for modified image %s' %
target_image)
return []
else:
if DockerImageUploader._images_match(source_image, target_image,
insecure_registries):
LOG.warning('Skipping upload for image %s' % image_name)
return []
dockerc = Client(base_url='unix://var/run/docker.sock', version='auto')
DockerImageUploader._pull(dockerc, repo, tag=source_tag)
if modify_role:
DockerImageUploader.run_modify_playbook(
modify_role, modify_vars, source_image,
target_image_source_tag, append_tag)
# raise an exception if the playbook didn't tag
# the expected target image
dockerc.inspect_image(target_image)
else:
response = dockerc.tag(
image=source_image, repository=target_image_no_tag,
tag=target_tag, force=True
)
LOG.debug(response)
DockerImageUploader._push(dockerc, target_image_no_tag, tag=target_tag)
LOG.warning('Completed upload for image %s' % image_name)
if cleanup == CLEANUP_NONE:
return []
if cleanup == CLEANUP_PARTIAL:
return [source_image]
return [source_image, target_image]
@staticmethod
@tenacity.retry( # Retry up to 5 times with jittered exponential backoff
reraise=True,
wait=tenacity.wait_random_exponential(multiplier=1, max=10),
stop=tenacity.stop_after_attempt(5)
)
def _pull(dockerc, image, tag=None):
LOG.info('Pulling %s' % image)
for line in dockerc.pull(image, tag=tag, stream=True):
status = json.loads(line)
if 'error' in status:
LOG.warning('docker pull failed: %s' % status['error'])
raise ImageUploaderException('Could not pull image %s' % image)
@staticmethod
@tenacity.retry( # Retry up to 5 times with jittered exponential backoff
reraise=True,
wait=tenacity.wait_random_exponential(multiplier=1, max=10),
stop=tenacity.stop_after_attempt(5)
)
def _push(dockerc, image, tag=None):
LOG.info('Pushing %s' % image)
for line in dockerc.push(image, tag=tag, stream=True):
status = json.loads(line)
if 'error' in status:
LOG.warning('docker push failed: %s' % status['error'])
raise ImageUploaderException('Could not push image %s' % image)
@staticmethod
def _images_match(image1, image2, insecure_registries):
try:
image1_digest = DockerImageUploader._image_digest(
image1, insecure_registries)
except Exception:
return False
try:
image2_digest = DockerImageUploader._image_digest(
image2, insecure_registries)
except Exception:
return False
# missing digest, no way to know if they match
if not image1_digest or not image2_digest:
return False
return image1_digest == image2_digest
@staticmethod
def _image_digest(image, insecure_registries):
image_url = DockerImageUploader._image_to_url(image)
insecure = image_url.netloc in insecure_registries
i = DockerImageUploader._inspect(image_url.geturl(), insecure)
return i.get('Digest')
@staticmethod
def _image_labels(image, insecure):
image_url = DockerImageUploader._image_to_url(image)
i = DockerImageUploader._inspect(image_url.geturl(), insecure)
return i.get('Labels', {}) or {}
@staticmethod
def _image_exists(image, insecure_registries):
try:
DockerImageUploader._image_digest(image, insecure_registries)
except ImageNotFoundException:
return False
else:
return True
@staticmethod
@tenacity.retry( # Retry up to 5 times with jittered exponential backoff
reraise=True,
retry=tenacity.retry_if_exception_type(ImageUploaderException),
wait=tenacity.wait_random_exponential(multiplier=1, max=10),
stop=tenacity.stop_after_attempt(5)
)
def _inspect(image, insecure=False):
cmd = ['skopeo', 'inspect']
if insecure:
cmd.append('--tls-verify=false')
cmd.append(image)
LOG.info('Running %s' % ' '.join(cmd))
env = os.environ.copy()
try:
process = subprocess.Popen(cmd, env=env, stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
out, err = process.communicate()
if process.returncode != 0:
not_found_msgs = (
'manifest unknown',
# returned by docker.io
'requested access to the resource is denied'
)
if any(n in err for n in not_found_msgs):
raise ImageNotFoundException('Not found image: %s\n%s' %
(image, err))
raise ImageUploaderException('Error inspecting image: %s\n%s' %
(image, err))
except KeyboardInterrupt:
raise Exception('Action interrupted with ctrl+c')
return json.loads(out)
@staticmethod
def _image_to_url(image):
if '://' not in image:
image = 'docker://' + image
return urllib.parse.urlparse(image)
@staticmethod
def _discover_tag_from_inspect(i, image, tag_from_label=None,
fallback_tag=None):
labels = i.get('Labels', {})
label_keys = ', '.join(labels.keys())
if not tag_from_label:
raise ImageUploaderException(
'No label specified. Available labels: %s' % label_keys
)
if "{" in tag_from_label:
try:
tag_label = tag_from_label.format(**labels)
except ValueError as e:
raise ImageUploaderException(e)
except KeyError as e:
if fallback_tag:
tag_label = fallback_tag
else:
raise ImageUploaderException(
'Image %s %s. Available labels: %s' %
(image, e, label_keys)
)
else:
tag_label = labels.get(tag_from_label)
if tag_label is None:
if fallback_tag:
tag_label = fallback_tag
else:
raise ImageUploaderException(
'Image %s has no label %s. Available labels: %s' %
(image, tag_from_label, label_keys)
)
# confirm the tag exists by checking for an entry in RepoTags
repo_tags = i.get('RepoTags', [])
if tag_label not in repo_tags:
raise ImageUploaderException(
'Image %s has no tag %s.\nAvailable tags: %s' %
(image, tag_label, ', '.join(repo_tags))
)
return tag_label
def discover_image_tags(self, images, tag_from_label=None):
image_urls = [self._image_to_url(i) for i in images]
# prime self.insecure_registries by testing every image
for url in image_urls:
self.is_insecure_registry(url.netloc)
discover_args = []
for image in images:
discover_args.append((image, tag_from_label,
self.insecure_registries))
p = futures.ThreadPoolExecutor(max_workers=16)
versioned_images = {}
for image, versioned_image in p.map(discover_tag_from_inspect,
discover_args):
versioned_images[image] = versioned_image
return versioned_images
def discover_image_tag(self, image, tag_from_label=None,
fallback_tag=None):
image_url = self._image_to_url(image)
insecure = self.is_insecure_registry(image_url.netloc)
i = self._inspect(image_url.geturl(), insecure)
return self._discover_tag_from_inspect(i, image, tag_from_label,
fallback_tag)
def filter_images_with_labels(self, images, labels):
images_with_labels = []
for image in images:
url = self._image_to_url(image)
image_labels = self._image_labels(url.geturl(),
self.is_insecure_registry(
url.netloc))
if set(labels).issubset(set(image_labels)):
images_with_labels.append(image)
return images_with_labels
def cleanup(self, local_images):
if not local_images:
return []
dockerc = Client(base_url='unix://var/run/docker.sock', version='auto')
for image in sorted(local_images):
if not image:
continue
LOG.warning('Removing local copy of %s' % image)
try:
dockerc.remove_image(image)
except docker.errors.APIError as e:
if e.explanation:
LOG.error(e.explanation)
else:
LOG.error(e)
def add_upload_task(self, image_name, pull_source, push_destination,
append_tag, modify_role, modify_vars, dry_run,
cleanup):
# prime self.insecure_registries
if pull_source:
self.is_insecure_registry(self._image_to_url(pull_source).netloc)
else:
self.is_insecure_registry(self._image_to_url(image_name).netloc)
self.is_insecure_registry(self._image_to_url(push_destination).netloc)
self.upload_tasks.append((image_name, pull_source, push_destination,
self.insecure_registries, append_tag,
modify_role, modify_vars, dry_run, cleanup))
def run_tasks(self):
if not self.upload_tasks:
return
local_images = []
# Pull a single image first, to avoid duplicate pulls of the
# same base layers
first = self.upload_tasks.pop()
result = self.upload_image(*first)
local_images.extend(result)
# workers will be based on CPU count with a min 4, max 12
workers = min(12, max(4, processutils.get_worker_count()))
p = futures.ThreadPoolExecutor(max_workers=workers)
for result in p.map(docker_upload, self.upload_tasks):
local_images.extend(result)
LOG.info('result %s' % local_images)
# Do cleanup after all the uploads so common layers don't get deleted
# repeatedly
self.cleanup(local_images)
def is_insecure_registry(self, registry_host):
if registry_host in self.secure_registries:
return False
if registry_host in self.insecure_registries:
return True
try:
requests.get('https://%s/v2' % registry_host)
except requests.exceptions.SSLError:
self.insecure_registries.add(registry_host)
return True
except Exception:
# for any other error assume it is a secure registry, because:
# - it is secure registry
# - the host is not accessible
pass
self.secure_registries.add(registry_host)
return False
def docker_upload(args):
return DockerImageUploader.upload_image(*args)
def discover_tag_from_inspect(args):
image, tag_from_label, insecure_registries = args
image_url = DockerImageUploader._image_to_url(image)
insecure = image_url.netloc in insecure_registries
i = DockerImageUploader._inspect(image_url.geturl(), insecure)
if ':' in image_url.path:
# break out the tag from the url to be the fallback tag
path = image.rpartition(':')
fallback_tag = path[2]
image = path[0]
else:
fallback_tag = None
return image, DockerImageUploader._discover_tag_from_inspect(
i, image, tag_from_label, fallback_tag)