From aa1651a61e349dd50b94c55e2322c576989b0f34 Mon Sep 17 00:00:00 2001 From: Steve Baker Date: Wed, 22 Nov 2017 11:25:35 +1300 Subject: [PATCH] Concurrent upload container images This change uses multiprocessing to speed the pull/push of the container images for the command "overcloud container image upload" Closes-Bug: #1733740 Change-Id: I70cb86de4e0be5a8994ca1936e3b26749b6dde59 --- tripleo_common/image/image_uploader.py | 107 +++++++++++++++++-------- 1 file changed, 72 insertions(+), 35 deletions(-) diff --git a/tripleo_common/image/image_uploader.py b/tripleo_common/image/image_uploader.py index afb555ffa..ade504da9 100644 --- a/tripleo_common/image/image_uploader.py +++ b/tripleo_common/image/image_uploader.py @@ -17,6 +17,7 @@ import abc import json import logging +import multiprocessing import netifaces import six import time @@ -30,6 +31,9 @@ from tripleo_common.image.base import BaseImageManager from tripleo_common.image.exception import ImageUploaderException +LOG = logging.getLogger(__name__) + + class ImageUploadManager(BaseImageManager): """Manage the uploading of image files @@ -55,7 +59,7 @@ class ImageUploadManager(BaseImageManager): def upload(self): """Start the upload process""" - self.logger.info('Using config files: %s' % self.config_files) + LOG.info('Using config files: %s' % self.config_files) uploads = self.load_config_files(self.UPLOADS) or [] container_images = self.load_config_files(self.CONTAINER_IMAGES) or [] @@ -72,15 +76,11 @@ class ImageUploadManager(BaseImageManager): # This updates the parsed upload_images dict with real values item['push_destination'] = push_destination - self.logger.info('imagename: %s' % image_name) - - self.uploader(uploader).upload_image( + self.uploader(uploader).add_upload_task( image_name, pull_source, push_destination) - # Do cleanup after all the uploads so common layers don't get deleted - # repeatedly for uploader in self.uploaders.values(): - uploader.cleanup() + uploader.run_tasks() return upload_images # simply to make test validation easier @@ -104,8 +104,13 @@ class ImageUploader(object): raise ImageUploaderException('Unknown image uploader type') @abc.abstractmethod - def upload_image(self, image_name, pull_source, push_destination): - """Upload a disk image""" + def run_tasks(self): + """Run all tasks""" + pass + + @abc.abstractmethod + def add_upload_task(self, image_name, pull_source, push_destination): + """Add an upload task to be executed later""" pass @abc.abstractmethod @@ -122,12 +127,12 @@ class ImageUploader(object): class DockerImageUploader(ImageUploader): """Upload images using docker push""" - logger = logging.getLogger(__name__ + '.DockerImageUploader') - def __init__(self): - self.local_images = set() + self.upload_tasks = [] - def upload_image(self, image_name, pull_source, push_destination): + @staticmethod + def upload_image(image_name, pull_source, push_destination): + LOG.info('imagename: %s' % image_name) dockerc = Client(base_url='unix://var/run/docker.sock', version='auto') if ':' in image_name: image = image_name.rpartition(':')[0] @@ -140,44 +145,48 @@ class DockerImageUploader(ImageUploader): else: repo = image - self._pull_retry(dockerc, repo, tag=tag) + DockerImageUploader._pull_retry(dockerc, repo, tag=tag) full_image = repo + ':' + tag - self.local_images.add(full_image) + new_repo = push_destination + '/' + repo.partition('/')[2] full_new_repo = new_repo + ':' + tag + response = dockerc.tag(image=full_image, repository=new_repo, tag=tag, force=True) - self.logger.debug(response) + LOG.debug(response) response = [line for line in dockerc.push(new_repo, tag=tag, stream=True)] - self.local_images.add(full_new_repo) - self.logger.debug(response) + LOG.debug(response) - self.logger.info('Completed upload for docker image %s' % image_name) + LOG.info('Completed upload for docker image %s' % image_name) + return full_image, full_new_repo + + @staticmethod + def _pull(dockerc, image, tag=None): + LOG.debug('Pulling %s' % image) - def _pull(self, dockerc, image, tag=None): - self.logger.debug('Pulling %s' % image) for line in dockerc.pull(image, tag=tag, stream=True): status = json.loads(line) if 'error' in status: - self.logger.warning('docker pull failed: %s' % status['error']) + LOG.warning('docker pull failed: %s' % status['error']) return 1 - self.logger.debug(status.get('status')) + LOG.debug(status.get('status')) return 0 - def _pull_retry(self, dockerc, image, tag=None): + @staticmethod + def _pull_retry(dockerc, image, tag=None): retval = -1 count = 0 while retval != 0: if count >= 5: raise ImageUploaderException('Could not pull image %s' % image) count += 1 - retval = self._pull(dockerc, image, tag) + retval = DockerImageUploader._pull(dockerc, image, tag) if retval != 0: time.sleep(3) - self.logger.warning('retrying pulling image: %s' % image) + LOG.warning('retrying pulling image: %s' % image) def discover_image_tag(self, image, tag_from_label=None): dockerc = Client(base_url='unix://var/run/docker.sock', version='auto') @@ -190,7 +199,7 @@ class DockerImageUploader(ImageUploader): tag = 'latest' image = '%s:%s' % (image_name, tag) - self._pull_retry(dockerc, image) + DockerImageUploader._pull_retry(dockerc, image) i = dockerc.inspect_image(image) labels = i['Config']['Labels'] @@ -211,19 +220,47 @@ class DockerImageUploader(ImageUploader): # confirm the tag exists by pulling it, which should be fast # because that image has just been pulled versioned_image = '%s:%s' % (image_name, tag_label) - self._pull_retry(dockerc, versioned_image) + DockerImageUploader._pull_retry(dockerc, versioned_image) return tag_label - def cleanup(self): + def cleanup(self, local_images): dockerc = Client(base_url='unix://var/run/docker.sock', version='auto') - - for image in sorted(self.local_images): - self.logger.info('Removing local copy of %s' % image) + for image in sorted(local_images): + if not image: + continue + LOG.info('Removing local copy of %s' % image) try: dockerc.remove_image(image) except docker.errors.APIError as e: if e.explanation: - self.logger.warning(e.explanation) + LOG.warning(e.explanation) else: - self.logger.warning(e) - self.local_images.clear() + LOG.warning(e) + + def add_upload_task(self, image_name, pull_source, push_destination): + self.upload_tasks.append((image_name, pull_source, push_destination)) + + def run_tasks(self): + if not self.upload_tasks: + return + local_images = [] + + # Pull a single image first, to avoid duplicate pulls of the + # same base layers + first = self.upload_tasks.pop() + result = self.upload_image(*first) + local_images.extend(result) + + p = multiprocessing.Pool(4) + + for result in p.map(docker_upload, self.upload_tasks): + local_images.extend(result) + LOG.info('result %s' % local_images) + + # Do cleanup after all the uploads so common layers don't get deleted + # repeatedly + self.cleanup(local_images) + + +def docker_upload(args): + return DockerImageUploader.upload_image(*args)