Merge "Backup: Limit number of concurent operations"
This commit is contained in:
commit
f154bc431f
|
@ -31,6 +31,7 @@ Volume backups can be created, restored, deleted and listed.
|
|||
|
||||
"""
|
||||
|
||||
import contextlib
|
||||
import os
|
||||
|
||||
from castellan import key_manager
|
||||
|
@ -127,6 +128,7 @@ class BackupManager(manager.SchedulerDependentManager):
|
|||
self.is_initialized = False
|
||||
self._set_tpool_size(CONF.backup_native_threads_pool_size)
|
||||
self._process_number = kwargs.get('process_number', 1)
|
||||
self._semaphore = kwargs.get('semaphore', contextlib.suppress())
|
||||
self.driver_name = CONF.backup_driver
|
||||
if self.driver_name in MAPPING:
|
||||
new_name = MAPPING[self.driver_name]
|
||||
|
@ -330,6 +332,7 @@ class BackupManager(manager.SchedulerDependentManager):
|
|||
if backup.temp_snapshot_id:
|
||||
self._delete_temp_snapshot(ctxt, backup)
|
||||
|
||||
@utils.limit_operations
|
||||
def create_backup(self, context, backup):
|
||||
"""Create volume backups using configured backup service."""
|
||||
volume_id = backup.volume_id
|
||||
|
@ -519,6 +522,7 @@ class BackupManager(manager.SchedulerDependentManager):
|
|||
|
||||
return False
|
||||
|
||||
@utils.limit_operations
|
||||
def restore_backup(self, context, backup, volume_id):
|
||||
"""Restore volume backups from configured backup service."""
|
||||
LOG.info('Restore backup started, backup: %(backup_id)s '
|
||||
|
|
|
@ -21,9 +21,10 @@ import logging as python_logging
|
|||
import shlex
|
||||
import sys
|
||||
|
||||
# NOTE(geguileo): Monkey patching must go before OSLO.log import, otherwise
|
||||
# OSLO.context will not use greenthread thread local and all greenthreads will
|
||||
# share the same context.
|
||||
# NOTE: Monkey patching must go before OSLO.log import, otherwise OSLO.context
|
||||
# will not use greenthread thread local and all greenthreads will share the
|
||||
# same context. It's also a good idea to monkey patch everything before
|
||||
# loading multiprocessing
|
||||
import eventlet
|
||||
eventlet.monkey_patch()
|
||||
# Monkey patch the original current_thread to use the up-to-date _active
|
||||
|
@ -53,12 +54,18 @@ from cinder import version
|
|||
|
||||
CONF = cfg.CONF
|
||||
|
||||
backup_workers_opt = cfg.IntOpt(
|
||||
'backup_workers',
|
||||
default=1, min=1, max=processutils.get_worker_count(),
|
||||
help='Number of backup processes to launch. Improves performance with '
|
||||
'concurrent backups.')
|
||||
CONF.register_opt(backup_workers_opt)
|
||||
backup_cmd_opts = [
|
||||
cfg.IntOpt('backup_workers',
|
||||
default=1, min=1, max=processutils.get_worker_count(),
|
||||
help='Number of backup processes to launch. '
|
||||
'Improves performance with concurrent backups.'),
|
||||
cfg.IntOpt('backup_max_operations',
|
||||
default=15,
|
||||
min=0,
|
||||
help='Maximum number of concurrent memory heavy operations: '
|
||||
'backup and restore. Value of 0 means unlimited'),
|
||||
]
|
||||
CONF.register_opts(backup_cmd_opts)
|
||||
|
||||
LOG = None
|
||||
|
||||
|
@ -69,12 +76,13 @@ LOG = None
|
|||
_EXTRA_DEFAULT_LOG_LEVELS = ['swiftclient=WARN']
|
||||
|
||||
|
||||
def _launch_backup_process(launcher, num_process):
|
||||
def _launch_backup_process(launcher, num_process, _semaphore):
|
||||
try:
|
||||
server = service.Service.create(binary='cinder-backup',
|
||||
coordination=True,
|
||||
service_name='backup',
|
||||
process_number=num_process)
|
||||
process_number=num_process,
|
||||
semaphore=_semaphore)
|
||||
except Exception:
|
||||
LOG.exception('Backup service %s failed to start.', CONF.host)
|
||||
sys.exit(1)
|
||||
|
@ -101,11 +109,13 @@ def main():
|
|||
gmr.TextGuruMeditation.setup_autorun(version, conf=CONF)
|
||||
global LOG
|
||||
LOG = logging.getLogger(__name__)
|
||||
semaphore = utils.semaphore_factory(CONF.backup_max_operations,
|
||||
CONF.backup_workers)
|
||||
|
||||
LOG.info('Backup running with %s processes.', CONF.backup_workers)
|
||||
launcher = service.get_launcher()
|
||||
|
||||
for i in range(1, CONF.backup_workers + 1):
|
||||
_launch_backup_process(launcher, i)
|
||||
_launch_backup_process(launcher, i, semaphore)
|
||||
|
||||
launcher.wait()
|
||||
|
|
|
@ -218,7 +218,7 @@ def list_opts():
|
|||
cinder_backup_drivers_swift.swiftbackup_service_opts,
|
||||
cinder_backup_drivers_tsm.tsm_opts,
|
||||
cinder_backup_manager.backup_manager_opts,
|
||||
[cinder_cmd_backup.backup_workers_opt],
|
||||
cinder_cmd_backup.backup_cmd_opts,
|
||||
[cinder_cmd_volume.cluster_opt],
|
||||
cinder_common_config.api_opts,
|
||||
cinder_common_config.core_opts,
|
||||
|
|
|
@ -1870,6 +1870,34 @@ class BackupTestCase(BaseBackupTest):
|
|||
self.assertEqual(100, tpool._nthreads)
|
||||
self.assertListEqual([], tpool._threads)
|
||||
|
||||
@mock.patch('cinder.backup.manager.BackupManager._run_restore')
|
||||
def test_backup_max_operations_restore(self, mock_restore):
|
||||
mock_sem = self.mock_object(self.backup_mgr, '_semaphore')
|
||||
vol_id = self._create_volume_db_entry(
|
||||
status=fields.VolumeStatus.RESTORING_BACKUP)
|
||||
backup = self._create_backup_db_entry(
|
||||
volume_id=vol_id, status=fields.BackupStatus.RESTORING)
|
||||
|
||||
self.backup_mgr.restore_backup(self.ctxt, backup, vol_id)
|
||||
|
||||
self.assertEqual(1, mock_sem.__enter__.call_count)
|
||||
self.assertEqual(1, mock_restore.call_count)
|
||||
self.assertEqual(1, mock_sem.__exit__.call_count)
|
||||
|
||||
@mock.patch('cinder.backup.manager.BackupManager._run_backup')
|
||||
def test_backup_max_operations_backup(self, mock_backup):
|
||||
mock_sem = self.mock_object(self.backup_mgr, '_semaphore')
|
||||
vol_id = self._create_volume_db_entry(
|
||||
status=fields.VolumeStatus.BACKING_UP)
|
||||
backup = self._create_backup_db_entry(
|
||||
volume_id=vol_id, status=fields.BackupStatus.CREATING)
|
||||
|
||||
self.backup_mgr.create_backup(self.ctxt, backup)
|
||||
|
||||
self.assertEqual(1, mock_sem.__enter__.call_count)
|
||||
self.assertEqual(1, mock_backup.call_count)
|
||||
self.assertEqual(1, mock_sem.__exit__.call_count)
|
||||
|
||||
|
||||
@ddt.ddt
|
||||
class BackupAPITestCase(BaseBackupTest):
|
||||
|
|
|
@ -96,25 +96,31 @@ class TestCinderBackupCmd(test.TestCase):
|
|||
super(TestCinderBackupCmd, self).setUp()
|
||||
sys.argv = ['cinder-backup']
|
||||
|
||||
@mock.patch('cinder.utils.Semaphore')
|
||||
@mock.patch('cinder.service.get_launcher')
|
||||
@mock.patch('cinder.service.Service.create')
|
||||
@mock.patch('cinder.utils.monkey_patch')
|
||||
@mock.patch('oslo_log.log.setup')
|
||||
def test_main_multiprocess(self, log_setup, monkey_patch, service_create,
|
||||
get_launcher):
|
||||
get_launcher, mock_semaphore):
|
||||
CONF.set_override('backup_workers', 2)
|
||||
mock_semaphore.side_effect = [mock.sentinel.semaphore1,
|
||||
mock.sentinel.semaphore2]
|
||||
cinder_backup.main()
|
||||
|
||||
self.assertEqual('cinder', CONF.project)
|
||||
self.assertEqual(CONF.version, version.version_string())
|
||||
|
||||
# Both calls must receive the same semaphore
|
||||
c1 = mock.call(binary=constants.BACKUP_BINARY,
|
||||
coordination=True,
|
||||
process_number=1,
|
||||
semaphore=mock.sentinel.semaphore1,
|
||||
service_name='backup')
|
||||
c2 = mock.call(binary=constants.BACKUP_BINARY,
|
||||
coordination=True,
|
||||
process_number=2,
|
||||
semaphore=mock.sentinel.semaphore1,
|
||||
service_name='backup')
|
||||
service_create.assert_has_calls([c1, c2])
|
||||
|
||||
|
|
|
@ -1670,3 +1670,44 @@ class TestAutoMaxOversubscriptionRatio(test.TestCase):
|
|||
if result is not None:
|
||||
result = round(result, 2)
|
||||
self.assertEqual(expected_result, result)
|
||||
|
||||
|
||||
@ddt.ddt
|
||||
class LimitOperationsTestCase(test.TestCase):
|
||||
@ddt.data(1, 5)
|
||||
@mock.patch('contextlib.suppress')
|
||||
def test_semaphore_factory_no_limit(self, processes, mock_suppress):
|
||||
res = utils.semaphore_factory(0, processes)
|
||||
mock_suppress.assert_called_once_with()
|
||||
self.assertEqual(mock_suppress.return_value, res)
|
||||
|
||||
@mock.patch('eventlet.Semaphore')
|
||||
def test_semaphore_factory_with_limit(self, mock_semaphore):
|
||||
max_operations = 15
|
||||
res = utils.semaphore_factory(max_operations, 1)
|
||||
mock_semaphore.assert_called_once_with(max_operations)
|
||||
self.assertEqual(mock_semaphore.return_value, res)
|
||||
|
||||
@mock.patch('cinder.utils.Semaphore')
|
||||
def test_semaphore_factory_with_limit_and_workers(self, mock_semaphore):
|
||||
max_operations = 15
|
||||
processes = 5
|
||||
res = utils.semaphore_factory(max_operations, processes)
|
||||
mock_semaphore.assert_called_once_with(max_operations)
|
||||
self.assertEqual(mock_semaphore.return_value, res)
|
||||
|
||||
@mock.patch('multiprocessing.Semaphore')
|
||||
@mock.patch('eventlet.tpool.execute')
|
||||
def test_semaphore(self, mock_exec, mock_semaphore):
|
||||
limit = 15
|
||||
res = utils.Semaphore(limit)
|
||||
self.assertEqual(limit, res.limit)
|
||||
|
||||
mocked_semaphore = mock_semaphore.return_value
|
||||
self.assertEqual(mocked_semaphore, res.semaphore)
|
||||
mock_semaphore.assert_called_once_with(limit)
|
||||
|
||||
with res:
|
||||
mock_exec.assert_called_once_with(mocked_semaphore.__enter__)
|
||||
mocked_semaphore.__exit__.assert_not_called()
|
||||
mocked_semaphore.__exit__.assert_called_once_with(None, None, None)
|
||||
|
|
|
@ -25,6 +25,7 @@ import functools
|
|||
import inspect
|
||||
import logging as py_logging
|
||||
import math
|
||||
import multiprocessing
|
||||
import operator
|
||||
import os
|
||||
import pyclbr
|
||||
|
@ -37,6 +38,8 @@ import time
|
|||
import types
|
||||
|
||||
from castellan import key_manager
|
||||
import eventlet
|
||||
from eventlet import tpool
|
||||
from os_brick import encryptors
|
||||
from os_brick.initiator import connector
|
||||
from oslo_concurrency import lockutils
|
||||
|
@ -1066,3 +1069,52 @@ def create_ordereddict(adict):
|
|||
"""Given a dict, return a sorted OrderedDict."""
|
||||
return OrderedDict(sorted(adict.items(),
|
||||
key=operator.itemgetter(0)))
|
||||
|
||||
|
||||
class Semaphore(object):
|
||||
"""Custom semaphore to workaround eventlet issues with multiprocessing."""
|
||||
def __init__(self, limit):
|
||||
self.limit = limit
|
||||
self.semaphore = multiprocessing.Semaphore(limit)
|
||||
|
||||
def __enter__(self):
|
||||
# Eventlet does not work with multiprocessing's Semaphore, so we have
|
||||
# to execute it in a native thread to avoid getting blocked when trying
|
||||
# to acquire the semaphore.
|
||||
return tpool.execute(self.semaphore.__enter__)
|
||||
|
||||
def __exit__(self, *args):
|
||||
# Don't use native thread for exit, as it will only add overhead
|
||||
return self.semaphore.__exit__(*args)
|
||||
|
||||
|
||||
def semaphore_factory(limit, concurrent_processes):
|
||||
"""Get a semaphore to limit concurrent operations.
|
||||
|
||||
The semaphore depends on the limit we want to set and the concurrent
|
||||
processes that need to be limited.
|
||||
"""
|
||||
# Limit of 0 is no limit, so we won't use a semaphore
|
||||
if limit:
|
||||
# If we only have 1 process we can use eventlet's Semaphore
|
||||
if concurrent_processes == 1:
|
||||
return eventlet.Semaphore(limit)
|
||||
# Use our own Sempahore for interprocess because eventlet blocks with
|
||||
# the standard one
|
||||
return Semaphore(limit)
|
||||
return contextlib.suppress()
|
||||
|
||||
|
||||
def limit_operations(func):
|
||||
"""Decorator to limit the number of concurrent operations.
|
||||
|
||||
This method decorator expects to have a _semaphore attribute holding an
|
||||
initialized semaphore in the self instance object.
|
||||
|
||||
We can get the appropriate semaphore with the semaphore_factory method.
|
||||
"""
|
||||
@functools.wraps(func)
|
||||
def wrapper(self, *args, **kwargs):
|
||||
with self._semaphore:
|
||||
return func(self, *args, **kwargs)
|
||||
return wrapper
|
||||
|
|
|
@ -215,3 +215,49 @@ to avoid any confusion on whether the restore was successful or not.
|
|||
destination volume is useless, as there is no way of knowing how much data,
|
||||
or if any, was actually restored, hence our recommendation of using the
|
||||
"error" state.
|
||||
|
||||
backup_max_operations
|
||||
---------------------
|
||||
|
||||
With this configuration option will let us select the maximum number of
|
||||
operations, backup and restore, that can be performed concurrently.
|
||||
|
||||
This option has a default value of 15, which means that we can have 15
|
||||
concurrent backups, or 15 concurrent restores, or any combination of backups
|
||||
and restores as long as the sum of the 2 operations don't exceed 15.
|
||||
|
||||
The concurrency limitation of this configuration option is also enforced when
|
||||
we run multiple processes for the same backup service using the
|
||||
``backup_workers`` configuration option. It is not a per process restriction,
|
||||
but global to the service, so we won't be able to run ``backup_max_operations``
|
||||
on each one of the processes, but on all the running processes from the same
|
||||
backup service.
|
||||
|
||||
Backups and restore operations are both CPU and memory intensive, but thanks to
|
||||
this option we can limit the concurrency and prevent DoS attacks or just
|
||||
service disruptions caused by many concurrent requests that lead to Out of
|
||||
Memory (OOM) kills.
|
||||
|
||||
The amount of memory (RAM) used during the operation depends on the configured
|
||||
chunk size as well as the compression ratio achieved on the data during the
|
||||
operation.
|
||||
|
||||
Example:
|
||||
|
||||
Let's have a look at how much memory would be needed if we use the default
|
||||
backup chunk size (~1.86 GB) while doing a restore to an RBD volume from a
|
||||
non Ceph backend (Swift, NFS etc).
|
||||
|
||||
In a restore operation the worst case scenario, from the memory point of
|
||||
view, is when the compression ratio is close to 0% (the compressed data chunk
|
||||
is almost the same size as the uncompressed data).
|
||||
|
||||
In this case the memory usage would be ~5.58 GB of data for each chunk:
|
||||
~5.58 GB = read buffer + decompressed buffer + write buffer used by the
|
||||
librbd library = ~1.86 GB + 1.86 GB + 1.86 GB
|
||||
|
||||
For 15 concurrent restore operations, the cinder-backup service will require
|
||||
~83.7 GB of memory.
|
||||
|
||||
Similar calculations can be done for environment specific scenarios and this
|
||||
config option can be set accordingly.
|
||||
|
|
|
@ -0,0 +1,6 @@
|
|||
---
|
||||
features:
|
||||
- |
|
||||
We can now limit the number of concurrent backup/restore operations that a
|
||||
Cinder backup service can perform using the ``backup_max_operations``
|
||||
configuration option.
|
Loading…
Reference in New Issue