Make executor type dynamic
When we run the tripleo-container-image-prepare script, it performs better under python2 when the process leverages a ProcessPoolExecutor. Rather than using threading, we should be using processes to handle the image upload processing. Currently when we're processing the images, we end up being single threaded due to the GIL when processing the data. By switching to the ProcessPoolExecutor, we eliminate the locking that is occuring during the data processing as it'll be handled in each process. Unfortunately, we cannot leverage the ProcessPoolExecutor when the same code is run under Mistral. In order to make the code work for both methods, we need to make the execution type dynamic. This change creates two types of lock objects that are used to determine what type of executor to ultimately use when processing the images for uploading. Additionally this change limits the number of concurrent image upload processes to 4 if using the ProcessPoolExecutor and caps the number of threads at a max of 8 based on (cpu count / 2) Change-Id: I60507eba9884a0660fe269da5ad27b0e57a70ca8 Related-Bug: #1844446
This commit is contained in:
parent
98f2d962f9
commit
60afc0eec4
@ -22,6 +22,7 @@ import sys
|
|||||||
from tripleo_common import constants
|
from tripleo_common import constants
|
||||||
from tripleo_common.image import image_uploader
|
from tripleo_common.image import image_uploader
|
||||||
from tripleo_common.image import kolla_builder
|
from tripleo_common.image import kolla_builder
|
||||||
|
from tripleo_common.utils.locks import processlock
|
||||||
import yaml
|
import yaml
|
||||||
|
|
||||||
|
|
||||||
@ -131,8 +132,10 @@ if __name__ == '__main__':
|
|||||||
env = yaml.safe_load(f)
|
env = yaml.safe_load(f)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
|
lock = processlock.ProcessLock()
|
||||||
params = kolla_builder.container_images_prepare_multi(
|
params = kolla_builder.container_images_prepare_multi(
|
||||||
env, roles_data, cleanup=args.cleanup, dry_run=args.dry_run)
|
env, roles_data, cleanup=args.cleanup, dry_run=args.dry_run,
|
||||||
|
lock=lock)
|
||||||
result = yaml.safe_dump(params, default_flow_style=False)
|
result = yaml.safe_dump(params, default_flow_style=False)
|
||||||
log.info(result)
|
log.info(result)
|
||||||
print(result)
|
print(result)
|
||||||
|
@ -30,7 +30,6 @@ from six.moves.urllib import parse
|
|||||||
import subprocess
|
import subprocess
|
||||||
import tempfile
|
import tempfile
|
||||||
import tenacity
|
import tenacity
|
||||||
import threading
|
|
||||||
import yaml
|
import yaml
|
||||||
|
|
||||||
from oslo_concurrency import processutils
|
from oslo_concurrency import processutils
|
||||||
@ -42,6 +41,7 @@ from tripleo_common.image.exception import ImageUploaderException
|
|||||||
from tripleo_common.image.exception import ImageUploaderThreadException
|
from tripleo_common.image.exception import ImageUploaderThreadException
|
||||||
from tripleo_common.image import image_export
|
from tripleo_common.image import image_export
|
||||||
from tripleo_common.utils import common as common_utils
|
from tripleo_common.utils import common as common_utils
|
||||||
|
from tripleo_common.utils.locks import threadinglock
|
||||||
|
|
||||||
|
|
||||||
LOG = logging.getLogger(__name__)
|
LOG = logging.getLogger(__name__)
|
||||||
@ -152,13 +152,13 @@ class ImageUploadManager(BaseImageManager):
|
|||||||
def __init__(self, config_files=None,
|
def __init__(self, config_files=None,
|
||||||
dry_run=False, cleanup=CLEANUP_FULL,
|
dry_run=False, cleanup=CLEANUP_FULL,
|
||||||
mirrors=None, registry_credentials=None,
|
mirrors=None, registry_credentials=None,
|
||||||
multi_arch=False):
|
multi_arch=False, lock=None):
|
||||||
if config_files is None:
|
if config_files is None:
|
||||||
config_files = []
|
config_files = []
|
||||||
super(ImageUploadManager, self).__init__(config_files)
|
super(ImageUploadManager, self).__init__(config_files)
|
||||||
self.uploaders = {
|
self.uploaders = {
|
||||||
'skopeo': SkopeoImageUploader(),
|
'skopeo': SkopeoImageUploader(),
|
||||||
'python': PythonImageUploader()
|
'python': PythonImageUploader(lock)
|
||||||
}
|
}
|
||||||
self.dry_run = dry_run
|
self.dry_run = dry_run
|
||||||
self.cleanup = cleanup
|
self.cleanup = cleanup
|
||||||
@ -171,6 +171,7 @@ class ImageUploadManager(BaseImageManager):
|
|||||||
for uploader in self.uploaders.values():
|
for uploader in self.uploaders.values():
|
||||||
uploader.registry_credentials = registry_credentials
|
uploader.registry_credentials = registry_credentials
|
||||||
self.multi_arch = multi_arch
|
self.multi_arch = multi_arch
|
||||||
|
self.lock = lock
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def validate_registry_credentials(creds_data):
|
def validate_registry_credentials(creds_data):
|
||||||
@ -243,7 +244,7 @@ class ImageUploadManager(BaseImageManager):
|
|||||||
tasks.append(UploadTask(
|
tasks.append(UploadTask(
|
||||||
image_name, pull_source, push_destination,
|
image_name, pull_source, push_destination,
|
||||||
append_tag, modify_role, modify_vars, self.dry_run,
|
append_tag, modify_role, modify_vars, self.dry_run,
|
||||||
self.cleanup, multi_arch))
|
self.cleanup, multi_arch, self.lock))
|
||||||
|
|
||||||
# NOTE(mwhahaha): We want to randomize the upload process because of
|
# NOTE(mwhahaha): We want to randomize the upload process because of
|
||||||
# the shared nature of container layers. Because we multiprocess the
|
# the shared nature of container layers. Because we multiprocess the
|
||||||
@ -275,12 +276,13 @@ class BaseImageUploader(object):
|
|||||||
export_registries = set()
|
export_registries = set()
|
||||||
push_registries = set()
|
push_registries = set()
|
||||||
|
|
||||||
def __init__(self):
|
def __init__(self, lock=None):
|
||||||
self.upload_tasks = []
|
self.upload_tasks = []
|
||||||
# A mapping of layer hashs to the image which first copied that
|
# A mapping of layer hashs to the image which first copied that
|
||||||
# layer to the target
|
# layer to the target
|
||||||
self.image_layers = {}
|
self.image_layers = {}
|
||||||
self.registry_credentials = {}
|
self.registry_credentials = {}
|
||||||
|
self.lock = lock
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def init_registries_cache(cls):
|
def init_registries_cache(cls):
|
||||||
@ -1118,39 +1120,42 @@ class SkopeoImageUploader(BaseImageUploader):
|
|||||||
class PythonImageUploader(BaseImageUploader):
|
class PythonImageUploader(BaseImageUploader):
|
||||||
"""Upload images using a direct implementation of the registry API"""
|
"""Upload images using a direct implementation of the registry API"""
|
||||||
|
|
||||||
uploader_lock = threading.Lock()
|
|
||||||
uploader_lock_info = set()
|
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
@tenacity.retry( # Retry until we no longer have collisions
|
@tenacity.retry( # Retry until we no longer have collisions
|
||||||
retry=tenacity.retry_if_exception_type(ImageUploaderThreadException),
|
retry=tenacity.retry_if_exception_type(ImageUploaderThreadException),
|
||||||
wait=tenacity.wait_random_exponential(multiplier=1, max=10)
|
wait=tenacity.wait_random_exponential(multiplier=1, max=10)
|
||||||
)
|
)
|
||||||
def _layer_fetch_lock(cls, layer):
|
def _layer_fetch_lock(cls, layer, lock=None):
|
||||||
if layer in cls.uploader_lock_info:
|
if not lock:
|
||||||
|
LOG.warning('No lock information provided for layer %s' % layer)
|
||||||
|
return
|
||||||
|
if layer in lock.objects():
|
||||||
LOG.debug('[%s] Layer is being fetched by another thread' % layer)
|
LOG.debug('[%s] Layer is being fetched by another thread' % layer)
|
||||||
raise ImageUploaderThreadException('layer being fetched')
|
raise ImageUploaderThreadException('layer being fetched')
|
||||||
LOG.debug('[%s] Locking layer' % layer)
|
LOG.debug('Locking layer %s' % layer)
|
||||||
LOG.debug('[%s] Starting acquire for lock' % layer)
|
LOG.debug('Starting acquire for lock %s' % layer)
|
||||||
with cls.uploader_lock:
|
with lock.get_lock():
|
||||||
if layer in cls.uploader_lock_info:
|
if layer in lock.objects():
|
||||||
LOG.debug('[%s] Collision for lock' % layer)
|
LOG.debug('Collision for lock %s' % layer)
|
||||||
raise ImageUploaderThreadException('layer conflict')
|
raise ImageUploaderThreadException('layer conflict')
|
||||||
LOG.debug('[%s] Acquired for lock' % layer)
|
LOG.debug('Acquired for lock %s' % layer)
|
||||||
cls.uploader_lock_info.add(layer)
|
lock.objects().append(layer)
|
||||||
LOG.debug('[%s] Updated lock info' % layer)
|
LOG.debug('Updated lock info %s' % layer)
|
||||||
LOG.debug('[%s] Got lock on layer' % layer)
|
LOG.debug('Got lock on layer %s' % layer)
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def _layer_fetch_unlock(cls, layer):
|
def _layer_fetch_unlock(cls, layer, lock=None):
|
||||||
LOG.debug('[%s] Unlocking layer' % layer)
|
if not lock:
|
||||||
LOG.debug('[%s] Starting acquire for lock' % layer)
|
LOG.warning('No lock information provided for layer %s' % layer)
|
||||||
with cls.uploader_lock:
|
return
|
||||||
LOG.debug('[%s] Acquired for unlock' % layer)
|
LOG.debug('Unlocking layer %s' % layer)
|
||||||
if layer in cls.uploader_lock_info:
|
LOG.debug('Starting acquire for lock %s' % layer)
|
||||||
cls.uploader_lock_info.remove(layer)
|
with lock.get_lock():
|
||||||
LOG.debug('[%s] Updated lock info' % layer)
|
LOG.debug('Acquired for unlock %s' % layer)
|
||||||
LOG.debug('[%s] Released lock on layer' % layer)
|
if layer in lock.objects():
|
||||||
|
lock.objects().remove(layer)
|
||||||
|
LOG.debug('Updated lock info %s' % layer)
|
||||||
|
LOG.debug('Released lock on layer %s' % layer)
|
||||||
|
|
||||||
def upload_image(self, task):
|
def upload_image(self, task):
|
||||||
"""Upload image from a task
|
"""Upload image from a task
|
||||||
@ -1176,6 +1181,8 @@ class PythonImageUploader(BaseImageUploader):
|
|||||||
if t.dry_run:
|
if t.dry_run:
|
||||||
return []
|
return []
|
||||||
|
|
||||||
|
lock = t.lock
|
||||||
|
|
||||||
target_username, target_password = self.credentials_for_registry(
|
target_username, target_password = self.credentials_for_registry(
|
||||||
t.target_image_url.netloc)
|
t.target_image_url.netloc)
|
||||||
target_session = self.authenticate(
|
target_session = self.authenticate(
|
||||||
@ -1266,7 +1273,8 @@ class PythonImageUploader(BaseImageUploader):
|
|||||||
source_session=source_session,
|
source_session=source_session,
|
||||||
target_session=target_session,
|
target_session=target_session,
|
||||||
source_layers=source_layers,
|
source_layers=source_layers,
|
||||||
multi_arch=t.multi_arch
|
multi_arch=t.multi_arch,
|
||||||
|
lock=lock
|
||||||
)
|
)
|
||||||
except Exception:
|
except Exception:
|
||||||
LOG.error('[%s] Failed uploading the target '
|
LOG.error('[%s] Failed uploading the target '
|
||||||
@ -1482,19 +1490,20 @@ class PythonImageUploader(BaseImageUploader):
|
|||||||
def _copy_layer_registry_to_registry(cls, source_url, target_url,
|
def _copy_layer_registry_to_registry(cls, source_url, target_url,
|
||||||
layer,
|
layer,
|
||||||
source_session=None,
|
source_session=None,
|
||||||
target_session=None):
|
target_session=None,
|
||||||
|
lock=None):
|
||||||
layer_entry = {'digest': layer}
|
layer_entry = {'digest': layer}
|
||||||
try:
|
try:
|
||||||
cls._layer_fetch_lock(layer)
|
cls._layer_fetch_lock(layer)
|
||||||
if cls._target_layer_exists_registry(
|
if cls._target_layer_exists_registry(
|
||||||
target_url, layer_entry, [layer_entry], target_session):
|
target_url, layer_entry, [layer_entry], target_session):
|
||||||
cls._layer_fetch_unlock(layer)
|
cls._layer_fetch_unlock(layer, lock)
|
||||||
return
|
return
|
||||||
except ImageUploaderThreadException:
|
except ImageUploaderThreadException:
|
||||||
# skip trying to unlock, because that's what threw the exception
|
# skip trying to unlock, because that's what threw the exception
|
||||||
raise
|
raise
|
||||||
except Exception:
|
except Exception:
|
||||||
cls._layer_fetch_unlock(layer)
|
cls._layer_fetch_unlock(layer, lock)
|
||||||
raise
|
raise
|
||||||
|
|
||||||
digest = layer_entry['digest']
|
digest = layer_entry['digest']
|
||||||
@ -1512,7 +1521,7 @@ class PythonImageUploader(BaseImageUploader):
|
|||||||
else:
|
else:
|
||||||
return layer_val
|
return layer_val
|
||||||
finally:
|
finally:
|
||||||
cls._layer_fetch_unlock(layer)
|
cls._layer_fetch_unlock(layer, lock)
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def _assert_scheme(cls, url, scheme):
|
def _assert_scheme(cls, url, scheme):
|
||||||
@ -1534,7 +1543,8 @@ class PythonImageUploader(BaseImageUploader):
|
|||||||
source_session=None,
|
source_session=None,
|
||||||
target_session=None,
|
target_session=None,
|
||||||
source_layers=None,
|
source_layers=None,
|
||||||
multi_arch=False):
|
multi_arch=False,
|
||||||
|
lock=None):
|
||||||
cls._assert_scheme(source_url, 'docker')
|
cls._assert_scheme(source_url, 'docker')
|
||||||
cls._assert_scheme(target_url, 'docker')
|
cls._assert_scheme(target_url, 'docker')
|
||||||
|
|
||||||
@ -1556,7 +1566,8 @@ class PythonImageUploader(BaseImageUploader):
|
|||||||
source_url, target_url,
|
source_url, target_url,
|
||||||
layer=layer,
|
layer=layer,
|
||||||
source_session=source_session,
|
source_session=source_session,
|
||||||
target_session=target_session
|
target_session=target_session,
|
||||||
|
lock=lock
|
||||||
))
|
))
|
||||||
|
|
||||||
jobs_count = len(copy_jobs)
|
jobs_count = len(copy_jobs)
|
||||||
@ -2051,6 +2062,24 @@ class PythonImageUploader(BaseImageUploader):
|
|||||||
image_url = parse.urlparse('containers-storage:%s' % image)
|
image_url = parse.urlparse('containers-storage:%s' % image)
|
||||||
self._delete(image_url)
|
self._delete(image_url)
|
||||||
|
|
||||||
|
def _get_executor(self):
|
||||||
|
"""Get executor type based on lock object
|
||||||
|
|
||||||
|
We check to see if the lock object is not set or if it is a threading
|
||||||
|
lock. We cannot check if it is a ProcessLock due to the side effect
|
||||||
|
of trying to include ProcessLock when running under Mistral breaks
|
||||||
|
Mistral.
|
||||||
|
"""
|
||||||
|
if not self.lock or isinstance(self.lock, threadinglock.ThreadingLock):
|
||||||
|
# workers will scale from 2 to 8 based on the cpu count // 2
|
||||||
|
workers = min(max(2, processutils.get_worker_count() // 2), 8)
|
||||||
|
return futures.ThreadPoolExecutor(max_workers=workers)
|
||||||
|
else:
|
||||||
|
# there really isn't an improvement with > 4 workers due to the
|
||||||
|
# container layer overlaps. The higher the workers, the more
|
||||||
|
# RAM required which can lead to OOMs. It's best to limit to 4
|
||||||
|
return futures.ProcessPoolExecutor(max_workers=4)
|
||||||
|
|
||||||
def run_tasks(self):
|
def run_tasks(self):
|
||||||
if not self.upload_tasks:
|
if not self.upload_tasks:
|
||||||
return
|
return
|
||||||
@ -2060,9 +2089,7 @@ class PythonImageUploader(BaseImageUploader):
|
|||||||
# same base layers
|
# same base layers
|
||||||
local_images.extend(upload_task(args=self.upload_tasks.pop()))
|
local_images.extend(upload_task(args=self.upload_tasks.pop()))
|
||||||
|
|
||||||
# workers will be half the CPU, with a minimum of 2
|
with self._get_executor() as p:
|
||||||
workers = max(2, processutils.get_worker_count() // 2)
|
|
||||||
with futures.ThreadPoolExecutor(max_workers=workers) as p:
|
|
||||||
for result in p.map(upload_task, self.upload_tasks):
|
for result in p.map(upload_task, self.upload_tasks):
|
||||||
local_images.extend(result)
|
local_images.extend(result)
|
||||||
LOG.info('result %s' % local_images)
|
LOG.info('result %s' % local_images)
|
||||||
@ -2076,7 +2103,7 @@ class UploadTask(object):
|
|||||||
|
|
||||||
def __init__(self, image_name, pull_source, push_destination,
|
def __init__(self, image_name, pull_source, push_destination,
|
||||||
append_tag, modify_role, modify_vars, dry_run, cleanup,
|
append_tag, modify_role, modify_vars, dry_run, cleanup,
|
||||||
multi_arch):
|
multi_arch, lock=None):
|
||||||
self.image_name = image_name
|
self.image_name = image_name
|
||||||
self.pull_source = pull_source
|
self.pull_source = pull_source
|
||||||
self.push_destination = push_destination
|
self.push_destination = push_destination
|
||||||
@ -2086,6 +2113,7 @@ class UploadTask(object):
|
|||||||
self.dry_run = dry_run
|
self.dry_run = dry_run
|
||||||
self.cleanup = cleanup
|
self.cleanup = cleanup
|
||||||
self.multi_arch = multi_arch
|
self.multi_arch = multi_arch
|
||||||
|
self.lock = lock
|
||||||
|
|
||||||
if ':' in image_name:
|
if ':' in image_name:
|
||||||
image = image_name.rpartition(':')[0]
|
image = image_name.rpartition(':')[0]
|
||||||
|
@ -27,6 +27,7 @@ from osc_lib.i18n import _
|
|||||||
from oslo_log import log as logging
|
from oslo_log import log as logging
|
||||||
from tripleo_common.image import base
|
from tripleo_common.image import base
|
||||||
from tripleo_common.image import image_uploader
|
from tripleo_common.image import image_uploader
|
||||||
|
from tripleo_common.utils.locks import threadinglock
|
||||||
|
|
||||||
CONTAINER_IMAGE_PREPARE_PARAM_STR = None
|
CONTAINER_IMAGE_PREPARE_PARAM_STR = None
|
||||||
|
|
||||||
@ -135,7 +136,8 @@ def set_neutron_driver(pd, mapping_args):
|
|||||||
|
|
||||||
|
|
||||||
def container_images_prepare_multi(environment, roles_data, dry_run=False,
|
def container_images_prepare_multi(environment, roles_data, dry_run=False,
|
||||||
cleanup=image_uploader.CLEANUP_FULL):
|
cleanup=image_uploader.CLEANUP_FULL,
|
||||||
|
lock=None):
|
||||||
"""Perform multiple container image prepares and merge result
|
"""Perform multiple container image prepares and merge result
|
||||||
|
|
||||||
Given the full heat environment and roles data, perform multiple image
|
Given the full heat environment and roles data, perform multiple image
|
||||||
@ -146,10 +148,14 @@ def container_images_prepare_multi(environment, roles_data, dry_run=False,
|
|||||||
|
|
||||||
:param environment: Heat environment for deployment
|
:param environment: Heat environment for deployment
|
||||||
:param roles_data: Roles file data used to filter services
|
:param roles_data: Roles file data used to filter services
|
||||||
|
:param lock: a locking object to use when handling uploads
|
||||||
:returns: dict containing merged container image parameters from all
|
:returns: dict containing merged container image parameters from all
|
||||||
prepare operations
|
prepare operations
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
if not lock:
|
||||||
|
lock = threadinglock.ThreadingLock()
|
||||||
|
|
||||||
pd = environment.get('parameter_defaults', {})
|
pd = environment.get('parameter_defaults', {})
|
||||||
cip = pd.get('ContainerImagePrepare')
|
cip = pd.get('ContainerImagePrepare')
|
||||||
# if user does not provide a ContainerImagePrepare, use the defaults.
|
# if user does not provide a ContainerImagePrepare, use the defaults.
|
||||||
@ -207,7 +213,8 @@ def container_images_prepare_multi(environment, roles_data, dry_run=False,
|
|||||||
modify_only_with_labels=modify_only_with_labels,
|
modify_only_with_labels=modify_only_with_labels,
|
||||||
mirrors=mirrors,
|
mirrors=mirrors,
|
||||||
registry_credentials=creds,
|
registry_credentials=creds,
|
||||||
multi_arch=multi_arch
|
multi_arch=multi_arch,
|
||||||
|
lock=lock
|
||||||
)
|
)
|
||||||
env_params.update(prepare_data['image_params'])
|
env_params.update(prepare_data['image_params'])
|
||||||
|
|
||||||
@ -222,7 +229,8 @@ def container_images_prepare_multi(environment, roles_data, dry_run=False,
|
|||||||
cleanup=cleanup,
|
cleanup=cleanup,
|
||||||
mirrors=mirrors,
|
mirrors=mirrors,
|
||||||
registry_credentials=creds,
|
registry_credentials=creds,
|
||||||
multi_arch=multi_arch
|
multi_arch=multi_arch,
|
||||||
|
lock=lock
|
||||||
)
|
)
|
||||||
uploader.upload()
|
uploader.upload()
|
||||||
return env_params
|
return env_params
|
||||||
@ -246,7 +254,7 @@ def container_images_prepare(template_file=DEFAULT_TEMPLATE_FILE,
|
|||||||
append_tag=None, modify_role=None,
|
append_tag=None, modify_role=None,
|
||||||
modify_vars=None, modify_only_with_labels=None,
|
modify_vars=None, modify_only_with_labels=None,
|
||||||
mirrors=None, registry_credentials=None,
|
mirrors=None, registry_credentials=None,
|
||||||
multi_arch=False):
|
multi_arch=False, lock=None):
|
||||||
"""Perform container image preparation
|
"""Perform container image preparation
|
||||||
|
|
||||||
:param template_file: path to Jinja2 file containing all image entries
|
:param template_file: path to Jinja2 file containing all image entries
|
||||||
@ -280,6 +288,8 @@ def container_images_prepare(template_file=DEFAULT_TEMPLATE_FILE,
|
|||||||
value.
|
value.
|
||||||
:param multi_arch: boolean whether to prepare every architecture of
|
:param multi_arch: boolean whether to prepare every architecture of
|
||||||
each image
|
each image
|
||||||
|
|
||||||
|
:param lock: a locking object to use when handling uploads
|
||||||
:returns: dict with entries for the supplied output_env_file or
|
:returns: dict with entries for the supplied output_env_file or
|
||||||
output_images_file
|
output_images_file
|
||||||
"""
|
"""
|
||||||
@ -287,6 +297,9 @@ def container_images_prepare(template_file=DEFAULT_TEMPLATE_FILE,
|
|||||||
if mapping_args is None:
|
if mapping_args is None:
|
||||||
mapping_args = {}
|
mapping_args = {}
|
||||||
|
|
||||||
|
if not lock:
|
||||||
|
lock = threadinglock.ThreadingLock()
|
||||||
|
|
||||||
def ffunc(entry):
|
def ffunc(entry):
|
||||||
imagename = entry.get('imagename', '')
|
imagename = entry.get('imagename', '')
|
||||||
if service_filter is not None:
|
if service_filter is not None:
|
||||||
@ -312,7 +325,8 @@ def container_images_prepare(template_file=DEFAULT_TEMPLATE_FILE,
|
|||||||
manager = image_uploader.ImageUploadManager(
|
manager = image_uploader.ImageUploadManager(
|
||||||
mirrors=mirrors,
|
mirrors=mirrors,
|
||||||
registry_credentials=registry_credentials,
|
registry_credentials=registry_credentials,
|
||||||
multi_arch=multi_arch
|
multi_arch=multi_arch,
|
||||||
|
lock=lock
|
||||||
)
|
)
|
||||||
uploader = manager.uploader('python')
|
uploader = manager.uploader('python')
|
||||||
images = [i.get('imagename', '') for i in result]
|
images = [i.get('imagename', '') for i in result]
|
||||||
|
@ -1315,7 +1315,8 @@ class TestPythonImageUploader(base.TestCase):
|
|||||||
source_session=source_session,
|
source_session=source_session,
|
||||||
target_session=target_session,
|
target_session=target_session,
|
||||||
source_layers=['sha256:aaa', 'sha256:bbb', 'sha256:ccc'],
|
source_layers=['sha256:aaa', 'sha256:bbb', 'sha256:ccc'],
|
||||||
multi_arch=False
|
multi_arch=False,
|
||||||
|
lock=None
|
||||||
)
|
)
|
||||||
|
|
||||||
@mock.patch('tripleo_common.image.image_uploader.'
|
@mock.patch('tripleo_common.image.image_uploader.'
|
||||||
@ -1549,7 +1550,8 @@ class TestPythonImageUploader(base.TestCase):
|
|||||||
source_session=source_session,
|
source_session=source_session,
|
||||||
target_session=target_session,
|
target_session=target_session,
|
||||||
source_layers=['sha256:aaa', 'sha256:bbb', 'sha256:ccc'],
|
source_layers=['sha256:aaa', 'sha256:bbb', 'sha256:ccc'],
|
||||||
multi_arch=False
|
multi_arch=False,
|
||||||
|
lock=None
|
||||||
)
|
)
|
||||||
|
|
||||||
@mock.patch('tripleo_common.image.image_uploader.'
|
@mock.patch('tripleo_common.image.image_uploader.'
|
||||||
@ -1680,7 +1682,8 @@ class TestPythonImageUploader(base.TestCase):
|
|||||||
source_session=source_session,
|
source_session=source_session,
|
||||||
target_session=target_session,
|
target_session=target_session,
|
||||||
source_layers=['sha256:aaa', 'sha256:bbb', 'sha256:ccc'],
|
source_layers=['sha256:aaa', 'sha256:bbb', 'sha256:ccc'],
|
||||||
multi_arch=False
|
multi_arch=False,
|
||||||
|
lock=None
|
||||||
)
|
)
|
||||||
_copy_registry_to_local.assert_called_once_with(unmodified_target_url)
|
_copy_registry_to_local.assert_called_once_with(unmodified_target_url)
|
||||||
run_modify_playbook.assert_called_once_with(
|
run_modify_playbook.assert_called_once_with(
|
||||||
|
@ -928,6 +928,7 @@ class TestPrepare(base.TestCase):
|
|||||||
@mock.patch('tripleo_common.image.image_uploader.ImageUploadManager',
|
@mock.patch('tripleo_common.image.image_uploader.ImageUploadManager',
|
||||||
autospec=True)
|
autospec=True)
|
||||||
def test_container_images_prepare_multi(self, mock_im, mock_cip):
|
def test_container_images_prepare_multi(self, mock_im, mock_cip):
|
||||||
|
mock_lock = mock.MagicMock()
|
||||||
mapping_args = {
|
mapping_args = {
|
||||||
'namespace': 't',
|
'namespace': 't',
|
||||||
'name_prefix': '',
|
'name_prefix': '',
|
||||||
@ -981,7 +982,8 @@ class TestPrepare(base.TestCase):
|
|||||||
},
|
},
|
||||||
]
|
]
|
||||||
|
|
||||||
image_params = kb.container_images_prepare_multi(env, roles_data)
|
image_params = kb.container_images_prepare_multi(env, roles_data,
|
||||||
|
lock=mock_lock)
|
||||||
|
|
||||||
mock_cip.assert_has_calls([
|
mock_cip.assert_has_calls([
|
||||||
mock.call(
|
mock.call(
|
||||||
@ -1004,7 +1006,8 @@ class TestPrepare(base.TestCase):
|
|||||||
registry_credentials={
|
registry_credentials={
|
||||||
'docker.io': {'my_username': 'my_password'}
|
'docker.io': {'my_username': 'my_password'}
|
||||||
},
|
},
|
||||||
multi_arch=False
|
multi_arch=False,
|
||||||
|
lock=mock_lock
|
||||||
),
|
),
|
||||||
mock.call(
|
mock.call(
|
||||||
excludes=['nova', 'neutron'],
|
excludes=['nova', 'neutron'],
|
||||||
@ -1026,7 +1029,8 @@ class TestPrepare(base.TestCase):
|
|||||||
registry_credentials={
|
registry_credentials={
|
||||||
'docker.io': {'my_username': 'my_password'}
|
'docker.io': {'my_username': 'my_password'}
|
||||||
},
|
},
|
||||||
multi_arch=False
|
multi_arch=False,
|
||||||
|
lock=mock_lock
|
||||||
)
|
)
|
||||||
])
|
])
|
||||||
|
|
||||||
@ -1046,6 +1050,7 @@ class TestPrepare(base.TestCase):
|
|||||||
@mock.patch('tripleo_common.image.image_uploader.ImageUploadManager',
|
@mock.patch('tripleo_common.image.image_uploader.ImageUploadManager',
|
||||||
autospec=True)
|
autospec=True)
|
||||||
def test_container_images_prepare_multi_dry_run(self, mock_im, mock_cip):
|
def test_container_images_prepare_multi_dry_run(self, mock_im, mock_cip):
|
||||||
|
mock_lock = mock.MagicMock()
|
||||||
mapping_args = {
|
mapping_args = {
|
||||||
'namespace': 't',
|
'namespace': 't',
|
||||||
'name_prefix': '',
|
'name_prefix': '',
|
||||||
@ -1094,7 +1099,8 @@ class TestPrepare(base.TestCase):
|
|||||||
},
|
},
|
||||||
]
|
]
|
||||||
|
|
||||||
image_params = kb.container_images_prepare_multi(env, roles_data, True)
|
image_params = kb.container_images_prepare_multi(env, roles_data, True,
|
||||||
|
lock=mock_lock)
|
||||||
|
|
||||||
mock_cip.assert_has_calls([
|
mock_cip.assert_has_calls([
|
||||||
mock.call(
|
mock.call(
|
||||||
@ -1113,7 +1119,8 @@ class TestPrepare(base.TestCase):
|
|||||||
modify_vars=None,
|
modify_vars=None,
|
||||||
mirrors={},
|
mirrors={},
|
||||||
registry_credentials=None,
|
registry_credentials=None,
|
||||||
multi_arch=False
|
multi_arch=False,
|
||||||
|
lock=mock_lock
|
||||||
),
|
),
|
||||||
mock.call(
|
mock.call(
|
||||||
excludes=['nova', 'neutron'],
|
excludes=['nova', 'neutron'],
|
||||||
@ -1131,13 +1138,14 @@ class TestPrepare(base.TestCase):
|
|||||||
modify_vars={'foo_version': '1.0.1'},
|
modify_vars={'foo_version': '1.0.1'},
|
||||||
mirrors={},
|
mirrors={},
|
||||||
registry_credentials=None,
|
registry_credentials=None,
|
||||||
multi_arch=False
|
multi_arch=False,
|
||||||
|
lock=mock_lock
|
||||||
)
|
)
|
||||||
])
|
])
|
||||||
|
|
||||||
mock_im.assert_called_once_with(mock.ANY, dry_run=True, cleanup='full',
|
mock_im.assert_called_once_with(mock.ANY, dry_run=True, cleanup='full',
|
||||||
mirrors={}, registry_credentials=None,
|
mirrors={}, registry_credentials=None,
|
||||||
multi_arch=False)
|
multi_arch=False, lock=mock_lock)
|
||||||
|
|
||||||
self.assertEqual(
|
self.assertEqual(
|
||||||
{
|
{
|
||||||
|
0
tripleo_common/utils/locks/__init__.py
Normal file
0
tripleo_common/utils/locks/__init__.py
Normal file
21
tripleo_common/utils/locks/base.py
Normal file
21
tripleo_common/utils/locks/base.py
Normal file
@ -0,0 +1,21 @@
|
|||||||
|
# Copyright 2019 Red Hat, Inc.
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
# not use this file except in compliance with the License. You may obtain
|
||||||
|
# a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
# License for the specific language governing permissions and limitations
|
||||||
|
# under the License.
|
||||||
|
|
||||||
|
|
||||||
|
class BaseLock(object):
|
||||||
|
def get_lock(self):
|
||||||
|
return self._lock
|
||||||
|
|
||||||
|
def objects(self):
|
||||||
|
return self._objects
|
29
tripleo_common/utils/locks/processlock.py
Normal file
29
tripleo_common/utils/locks/processlock.py
Normal file
@ -0,0 +1,29 @@
|
|||||||
|
# Copyright 2019 Red Hat, Inc.
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
# not use this file except in compliance with the License. You may obtain
|
||||||
|
# a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
# License for the specific language governing permissions and limitations
|
||||||
|
# under the License.
|
||||||
|
#
|
||||||
|
# NOTE(mwhahaha): this class cannot be imported under Mistral because the
|
||||||
|
# multiprocessor.Manager inclusion breaks things due to the service launching
|
||||||
|
# to handle the multiprocess work.
|
||||||
|
|
||||||
|
import multiprocessing
|
||||||
|
from tripleo_common.utils.locks import base
|
||||||
|
|
||||||
|
|
||||||
|
class ProcessLock(base.BaseLock):
|
||||||
|
# the manager cannot live in __init__
|
||||||
|
_mgr = multiprocessing.Manager()
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
self._lock = self._mgr.Lock()
|
||||||
|
self._objects = self._mgr.list()
|
22
tripleo_common/utils/locks/threadinglock.py
Normal file
22
tripleo_common/utils/locks/threadinglock.py
Normal file
@ -0,0 +1,22 @@
|
|||||||
|
# Copyright 2019 Red Hat, Inc.
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
# not use this file except in compliance with the License. You may obtain
|
||||||
|
# a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
# License for the specific language governing permissions and limitations
|
||||||
|
# under the License.
|
||||||
|
|
||||||
|
from tripleo_common.utils.locks import base
|
||||||
|
import threading
|
||||||
|
|
||||||
|
|
||||||
|
class ThreadingLock(base.BaseLock):
|
||||||
|
def __init__(self):
|
||||||
|
self._lock = threading.Lock()
|
||||||
|
self._objects = set()
|
Loading…
Reference in New Issue
Block a user