Concurrent upload container images
This change uses multiprocessing to speed the pull/push of the container images for the command "overcloud container image upload" Closes-Bug: #1733740 Change-Id: I70cb86de4e0be5a8994ca1936e3b26749b6dde59
This commit is contained in:
parent
3c92369e5c
commit
aa1651a61e
@ -17,6 +17,7 @@
|
||||
import abc
|
||||
import json
|
||||
import logging
|
||||
import multiprocessing
|
||||
import netifaces
|
||||
import six
|
||||
import time
|
||||
@ -30,6 +31,9 @@ from tripleo_common.image.base import BaseImageManager
|
||||
from tripleo_common.image.exception import ImageUploaderException
|
||||
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class ImageUploadManager(BaseImageManager):
|
||||
"""Manage the uploading of image files
|
||||
|
||||
@ -55,7 +59,7 @@ class ImageUploadManager(BaseImageManager):
|
||||
def upload(self):
|
||||
"""Start the upload process"""
|
||||
|
||||
self.logger.info('Using config files: %s' % self.config_files)
|
||||
LOG.info('Using config files: %s' % self.config_files)
|
||||
|
||||
uploads = self.load_config_files(self.UPLOADS) or []
|
||||
container_images = self.load_config_files(self.CONTAINER_IMAGES) or []
|
||||
@ -72,15 +76,11 @@ class ImageUploadManager(BaseImageManager):
|
||||
# This updates the parsed upload_images dict with real values
|
||||
item['push_destination'] = push_destination
|
||||
|
||||
self.logger.info('imagename: %s' % image_name)
|
||||
|
||||
self.uploader(uploader).upload_image(
|
||||
self.uploader(uploader).add_upload_task(
|
||||
image_name, pull_source, push_destination)
|
||||
|
||||
# Do cleanup after all the uploads so common layers don't get deleted
|
||||
# repeatedly
|
||||
for uploader in self.uploaders.values():
|
||||
uploader.cleanup()
|
||||
uploader.run_tasks()
|
||||
|
||||
return upload_images # simply to make test validation easier
|
||||
|
||||
@ -104,8 +104,13 @@ class ImageUploader(object):
|
||||
raise ImageUploaderException('Unknown image uploader type')
|
||||
|
||||
@abc.abstractmethod
|
||||
def upload_image(self, image_name, pull_source, push_destination):
|
||||
"""Upload a disk image"""
|
||||
def run_tasks(self):
|
||||
"""Run all tasks"""
|
||||
pass
|
||||
|
||||
@abc.abstractmethod
|
||||
def add_upload_task(self, image_name, pull_source, push_destination):
|
||||
"""Add an upload task to be executed later"""
|
||||
pass
|
||||
|
||||
@abc.abstractmethod
|
||||
@ -122,12 +127,12 @@ class ImageUploader(object):
|
||||
class DockerImageUploader(ImageUploader):
|
||||
"""Upload images using docker push"""
|
||||
|
||||
logger = logging.getLogger(__name__ + '.DockerImageUploader')
|
||||
|
||||
def __init__(self):
|
||||
self.local_images = set()
|
||||
self.upload_tasks = []
|
||||
|
||||
def upload_image(self, image_name, pull_source, push_destination):
|
||||
@staticmethod
|
||||
def upload_image(image_name, pull_source, push_destination):
|
||||
LOG.info('imagename: %s' % image_name)
|
||||
dockerc = Client(base_url='unix://var/run/docker.sock', version='auto')
|
||||
if ':' in image_name:
|
||||
image = image_name.rpartition(':')[0]
|
||||
@ -140,44 +145,48 @@ class DockerImageUploader(ImageUploader):
|
||||
else:
|
||||
repo = image
|
||||
|
||||
self._pull_retry(dockerc, repo, tag=tag)
|
||||
DockerImageUploader._pull_retry(dockerc, repo, tag=tag)
|
||||
|
||||
full_image = repo + ':' + tag
|
||||
self.local_images.add(full_image)
|
||||
|
||||
new_repo = push_destination + '/' + repo.partition('/')[2]
|
||||
full_new_repo = new_repo + ':' + tag
|
||||
|
||||
response = dockerc.tag(image=full_image, repository=new_repo,
|
||||
tag=tag, force=True)
|
||||
self.logger.debug(response)
|
||||
LOG.debug(response)
|
||||
|
||||
response = [line for line in dockerc.push(new_repo,
|
||||
tag=tag, stream=True)]
|
||||
self.local_images.add(full_new_repo)
|
||||
self.logger.debug(response)
|
||||
LOG.debug(response)
|
||||
|
||||
self.logger.info('Completed upload for docker image %s' % image_name)
|
||||
LOG.info('Completed upload for docker image %s' % image_name)
|
||||
return full_image, full_new_repo
|
||||
|
||||
@staticmethod
|
||||
def _pull(dockerc, image, tag=None):
|
||||
LOG.debug('Pulling %s' % image)
|
||||
|
||||
def _pull(self, dockerc, image, tag=None):
|
||||
self.logger.debug('Pulling %s' % image)
|
||||
for line in dockerc.pull(image, tag=tag, stream=True):
|
||||
status = json.loads(line)
|
||||
if 'error' in status:
|
||||
self.logger.warning('docker pull failed: %s' % status['error'])
|
||||
LOG.warning('docker pull failed: %s' % status['error'])
|
||||
return 1
|
||||
self.logger.debug(status.get('status'))
|
||||
LOG.debug(status.get('status'))
|
||||
return 0
|
||||
|
||||
def _pull_retry(self, dockerc, image, tag=None):
|
||||
@staticmethod
|
||||
def _pull_retry(dockerc, image, tag=None):
|
||||
retval = -1
|
||||
count = 0
|
||||
while retval != 0:
|
||||
if count >= 5:
|
||||
raise ImageUploaderException('Could not pull image %s' % image)
|
||||
count += 1
|
||||
retval = self._pull(dockerc, image, tag)
|
||||
retval = DockerImageUploader._pull(dockerc, image, tag)
|
||||
if retval != 0:
|
||||
time.sleep(3)
|
||||
self.logger.warning('retrying pulling image: %s' % image)
|
||||
LOG.warning('retrying pulling image: %s' % image)
|
||||
|
||||
def discover_image_tag(self, image, tag_from_label=None):
|
||||
dockerc = Client(base_url='unix://var/run/docker.sock', version='auto')
|
||||
@ -190,7 +199,7 @@ class DockerImageUploader(ImageUploader):
|
||||
tag = 'latest'
|
||||
image = '%s:%s' % (image_name, tag)
|
||||
|
||||
self._pull_retry(dockerc, image)
|
||||
DockerImageUploader._pull_retry(dockerc, image)
|
||||
i = dockerc.inspect_image(image)
|
||||
labels = i['Config']['Labels']
|
||||
|
||||
@ -211,19 +220,47 @@ class DockerImageUploader(ImageUploader):
|
||||
# confirm the tag exists by pulling it, which should be fast
|
||||
# because that image has just been pulled
|
||||
versioned_image = '%s:%s' % (image_name, tag_label)
|
||||
self._pull_retry(dockerc, versioned_image)
|
||||
DockerImageUploader._pull_retry(dockerc, versioned_image)
|
||||
return tag_label
|
||||
|
||||
def cleanup(self):
|
||||
def cleanup(self, local_images):
|
||||
dockerc = Client(base_url='unix://var/run/docker.sock', version='auto')
|
||||
|
||||
for image in sorted(self.local_images):
|
||||
self.logger.info('Removing local copy of %s' % image)
|
||||
for image in sorted(local_images):
|
||||
if not image:
|
||||
continue
|
||||
LOG.info('Removing local copy of %s' % image)
|
||||
try:
|
||||
dockerc.remove_image(image)
|
||||
except docker.errors.APIError as e:
|
||||
if e.explanation:
|
||||
self.logger.warning(e.explanation)
|
||||
LOG.warning(e.explanation)
|
||||
else:
|
||||
self.logger.warning(e)
|
||||
self.local_images.clear()
|
||||
LOG.warning(e)
|
||||
|
||||
def add_upload_task(self, image_name, pull_source, push_destination):
|
||||
self.upload_tasks.append((image_name, pull_source, push_destination))
|
||||
|
||||
def run_tasks(self):
|
||||
if not self.upload_tasks:
|
||||
return
|
||||
local_images = []
|
||||
|
||||
# Pull a single image first, to avoid duplicate pulls of the
|
||||
# same base layers
|
||||
first = self.upload_tasks.pop()
|
||||
result = self.upload_image(*first)
|
||||
local_images.extend(result)
|
||||
|
||||
p = multiprocessing.Pool(4)
|
||||
|
||||
for result in p.map(docker_upload, self.upload_tasks):
|
||||
local_images.extend(result)
|
||||
LOG.info('result %s' % local_images)
|
||||
|
||||
# Do cleanup after all the uploads so common layers don't get deleted
|
||||
# repeatedly
|
||||
self.cleanup(local_images)
|
||||
|
||||
|
||||
def docker_upload(args):
|
||||
return DockerImageUploader.upload_image(*args)
|
||||
|
Loading…
Reference in New Issue
Block a user