From 30c2289c9b0456d3783f01e3d65985ed1b09976a Mon Sep 17 00:00:00 2001 From: Gorka Eguileor Date: Thu, 27 Feb 2020 16:24:09 +0100 Subject: [PATCH] Backup: Limit number of concurent operations This patch adds the backup_max_operations configuration option that limits the number of concurrent backup and restore operations. These operations can consume large amounts of RAM, so we should limit them according to our system resources to avoid processes being killed because we run out of memory. The default is 15 concurrent operations, which could use up to 85GB of RAM(in case of ceph which uses the maximum memory for this operation). More details are added in the document. Closes-Bug: #1865011 Co-authored-by: Rajat Dhasmana Change-Id: Ica4e0cea67bc894f61a57c7898977951ce3a3633 --- cinder/backup/manager.py | 4 ++ cinder/cmd/backup.py | 34 +++++++----- cinder/opts.py | 2 +- cinder/tests/unit/backup/test_backup.py | 28 ++++++++++ cinder/tests/unit/test_cmd.py | 8 ++- cinder/tests/unit/test_utils.py | 41 +++++++++++++++ cinder/utils.py | 52 +++++++++++++++++++ .../admin/blockstorage-volume-backups.rst | 46 ++++++++++++++++ ...ackup_max_operations-27753c748ba1dc1a.yaml | 6 +++ 9 files changed, 207 insertions(+), 14 deletions(-) create mode 100644 releasenotes/notes/backup_max_operations-27753c748ba1dc1a.yaml diff --git a/cinder/backup/manager.py b/cinder/backup/manager.py index 203ce129871..ea2e44ae40e 100644 --- a/cinder/backup/manager.py +++ b/cinder/backup/manager.py @@ -31,6 +31,7 @@ Volume backups can be created, restored, deleted and listed. """ +import contextlib import os from castellan import key_manager @@ -127,6 +128,7 @@ class BackupManager(manager.SchedulerDependentManager): self.is_initialized = False self._set_tpool_size(CONF.backup_native_threads_pool_size) self._process_number = kwargs.get('process_number', 1) + self._semaphore = kwargs.get('semaphore', contextlib.suppress()) self.driver_name = CONF.backup_driver if self.driver_name in MAPPING: new_name = MAPPING[self.driver_name] @@ -330,6 +332,7 @@ class BackupManager(manager.SchedulerDependentManager): if backup.temp_snapshot_id: self._delete_temp_snapshot(ctxt, backup) + @utils.limit_operations def create_backup(self, context, backup): """Create volume backups using configured backup service.""" volume_id = backup.volume_id @@ -519,6 +522,7 @@ class BackupManager(manager.SchedulerDependentManager): return False + @utils.limit_operations def restore_backup(self, context, backup, volume_id): """Restore volume backups from configured backup service.""" LOG.info('Restore backup started, backup: %(backup_id)s ' diff --git a/cinder/cmd/backup.py b/cinder/cmd/backup.py index 3b39f9aa3c0..d741c84a3c3 100644 --- a/cinder/cmd/backup.py +++ b/cinder/cmd/backup.py @@ -21,9 +21,10 @@ import logging as python_logging import shlex import sys -# NOTE(geguileo): Monkey patching must go before OSLO.log import, otherwise -# OSLO.context will not use greenthread thread local and all greenthreads will -# share the same context. +# NOTE: Monkey patching must go before OSLO.log import, otherwise OSLO.context +# will not use greenthread thread local and all greenthreads will share the +# same context. It's also a good idea to monkey patch everything before +# loading multiprocessing import eventlet eventlet.monkey_patch() # Monkey patch the original current_thread to use the up-to-date _active @@ -53,12 +54,18 @@ from cinder import version CONF = cfg.CONF -backup_workers_opt = cfg.IntOpt( - 'backup_workers', - default=1, min=1, max=processutils.get_worker_count(), - help='Number of backup processes to launch. Improves performance with ' - 'concurrent backups.') -CONF.register_opt(backup_workers_opt) +backup_cmd_opts = [ + cfg.IntOpt('backup_workers', + default=1, min=1, max=processutils.get_worker_count(), + help='Number of backup processes to launch. ' + 'Improves performance with concurrent backups.'), + cfg.IntOpt('backup_max_operations', + default=15, + min=0, + help='Maximum number of concurrent memory heavy operations: ' + 'backup and restore. Value of 0 means unlimited'), +] +CONF.register_opts(backup_cmd_opts) LOG = None @@ -69,12 +76,13 @@ LOG = None _EXTRA_DEFAULT_LOG_LEVELS = ['swiftclient=WARN'] -def _launch_backup_process(launcher, num_process): +def _launch_backup_process(launcher, num_process, _semaphore): try: server = service.Service.create(binary='cinder-backup', coordination=True, service_name='backup', - process_number=num_process) + process_number=num_process, + semaphore=_semaphore) except Exception: LOG.exception('Backup service %s failed to start.', CONF.host) sys.exit(1) @@ -101,11 +109,13 @@ def main(): gmr.TextGuruMeditation.setup_autorun(version, conf=CONF) global LOG LOG = logging.getLogger(__name__) + semaphore = utils.semaphore_factory(CONF.backup_max_operations, + CONF.backup_workers) LOG.info('Backup running with %s processes.', CONF.backup_workers) launcher = service.get_launcher() for i in range(1, CONF.backup_workers + 1): - _launch_backup_process(launcher, i) + _launch_backup_process(launcher, i, semaphore) launcher.wait() diff --git a/cinder/opts.py b/cinder/opts.py index b63a9d22815..bd6855d07f2 100644 --- a/cinder/opts.py +++ b/cinder/opts.py @@ -210,7 +210,7 @@ def list_opts(): cinder_backup_drivers_swift.swiftbackup_service_opts, cinder_backup_drivers_tsm.tsm_opts, cinder_backup_manager.backup_manager_opts, - [cinder_cmd_backup.backup_workers_opt], + cinder_cmd_backup.backup_cmd_opts, [cinder_cmd_volume.cluster_opt], cinder_common_config.api_opts, cinder_common_config.core_opts, diff --git a/cinder/tests/unit/backup/test_backup.py b/cinder/tests/unit/backup/test_backup.py index 58de2a53bd2..5b2db50595b 100644 --- a/cinder/tests/unit/backup/test_backup.py +++ b/cinder/tests/unit/backup/test_backup.py @@ -1869,6 +1869,34 @@ class BackupTestCase(BaseBackupTest): self.assertEqual(100, tpool._nthreads) self.assertListEqual([], tpool._threads) + @mock.patch('cinder.backup.manager.BackupManager._run_restore') + def test_backup_max_operations_restore(self, mock_restore): + mock_sem = self.mock_object(self.backup_mgr, '_semaphore') + vol_id = self._create_volume_db_entry( + status=fields.VolumeStatus.RESTORING_BACKUP) + backup = self._create_backup_db_entry( + volume_id=vol_id, status=fields.BackupStatus.RESTORING) + + self.backup_mgr.restore_backup(self.ctxt, backup, vol_id) + + self.assertEqual(1, mock_sem.__enter__.call_count) + self.assertEqual(1, mock_restore.call_count) + self.assertEqual(1, mock_sem.__exit__.call_count) + + @mock.patch('cinder.backup.manager.BackupManager._run_backup') + def test_backup_max_operations_backup(self, mock_backup): + mock_sem = self.mock_object(self.backup_mgr, '_semaphore') + vol_id = self._create_volume_db_entry( + status=fields.VolumeStatus.BACKING_UP) + backup = self._create_backup_db_entry( + volume_id=vol_id, status=fields.BackupStatus.CREATING) + + self.backup_mgr.create_backup(self.ctxt, backup) + + self.assertEqual(1, mock_sem.__enter__.call_count) + self.assertEqual(1, mock_backup.call_count) + self.assertEqual(1, mock_sem.__exit__.call_count) + @ddt.ddt class BackupAPITestCase(BaseBackupTest): diff --git a/cinder/tests/unit/test_cmd.py b/cinder/tests/unit/test_cmd.py index 5ff34beb6e8..49665ea601a 100644 --- a/cinder/tests/unit/test_cmd.py +++ b/cinder/tests/unit/test_cmd.py @@ -96,25 +96,31 @@ class TestCinderBackupCmd(test.TestCase): super(TestCinderBackupCmd, self).setUp() sys.argv = ['cinder-backup'] + @mock.patch('cinder.utils.Semaphore') @mock.patch('cinder.service.get_launcher') @mock.patch('cinder.service.Service.create') @mock.patch('cinder.utils.monkey_patch') @mock.patch('oslo_log.log.setup') def test_main_multiprocess(self, log_setup, monkey_patch, service_create, - get_launcher): + get_launcher, mock_semaphore): CONF.set_override('backup_workers', 2) + mock_semaphore.side_effect = [mock.sentinel.semaphore1, + mock.sentinel.semaphore2] cinder_backup.main() self.assertEqual('cinder', CONF.project) self.assertEqual(CONF.version, version.version_string()) + # Both calls must receive the same semaphore c1 = mock.call(binary=constants.BACKUP_BINARY, coordination=True, process_number=1, + semaphore=mock.sentinel.semaphore1, service_name='backup') c2 = mock.call(binary=constants.BACKUP_BINARY, coordination=True, process_number=2, + semaphore=mock.sentinel.semaphore1, service_name='backup') service_create.assert_has_calls([c1, c2]) diff --git a/cinder/tests/unit/test_utils.py b/cinder/tests/unit/test_utils.py index 469506d0b3e..91d4a97480a 100644 --- a/cinder/tests/unit/test_utils.py +++ b/cinder/tests/unit/test_utils.py @@ -1670,3 +1670,44 @@ class TestAutoMaxOversubscriptionRatio(test.TestCase): if result is not None: result = round(result, 2) self.assertEqual(expected_result, result) + + +@ddt.ddt +class LimitOperationsTestCase(test.TestCase): + @ddt.data(1, 5) + @mock.patch('contextlib.suppress') + def test_semaphore_factory_no_limit(self, processes, mock_suppress): + res = utils.semaphore_factory(0, processes) + mock_suppress.assert_called_once_with() + self.assertEqual(mock_suppress.return_value, res) + + @mock.patch('eventlet.Semaphore') + def test_semaphore_factory_with_limit(self, mock_semaphore): + max_operations = 15 + res = utils.semaphore_factory(max_operations, 1) + mock_semaphore.assert_called_once_with(max_operations) + self.assertEqual(mock_semaphore.return_value, res) + + @mock.patch('cinder.utils.Semaphore') + def test_semaphore_factory_with_limit_and_workers(self, mock_semaphore): + max_operations = 15 + processes = 5 + res = utils.semaphore_factory(max_operations, processes) + mock_semaphore.assert_called_once_with(max_operations) + self.assertEqual(mock_semaphore.return_value, res) + + @mock.patch('multiprocessing.Semaphore') + @mock.patch('eventlet.tpool.execute') + def test_semaphore(self, mock_exec, mock_semaphore): + limit = 15 + res = utils.Semaphore(limit) + self.assertEqual(limit, res.limit) + + mocked_semaphore = mock_semaphore.return_value + self.assertEqual(mocked_semaphore, res.semaphore) + mock_semaphore.assert_called_once_with(limit) + + with res: + mock_exec.assert_called_once_with(mocked_semaphore.__enter__) + mocked_semaphore.__exit__.assert_not_called() + mocked_semaphore.__exit__.assert_called_once_with(None, None, None) diff --git a/cinder/utils.py b/cinder/utils.py index 74f285a198f..1ca58ffd983 100644 --- a/cinder/utils.py +++ b/cinder/utils.py @@ -25,6 +25,7 @@ import functools import inspect import logging as py_logging import math +import multiprocessing import operator import os import pyclbr @@ -37,6 +38,8 @@ import time import types from castellan import key_manager +import eventlet +from eventlet import tpool from os_brick import encryptors from os_brick.initiator import connector from oslo_concurrency import lockutils @@ -1066,3 +1069,52 @@ def create_ordereddict(adict): """Given a dict, return a sorted OrderedDict.""" return OrderedDict(sorted(adict.items(), key=operator.itemgetter(0))) + + +class Semaphore(object): + """Custom semaphore to workaround eventlet issues with multiprocessing.""" + def __init__(self, limit): + self.limit = limit + self.semaphore = multiprocessing.Semaphore(limit) + + def __enter__(self): + # Eventlet does not work with multiprocessing's Semaphore, so we have + # to execute it in a native thread to avoid getting blocked when trying + # to acquire the semaphore. + return tpool.execute(self.semaphore.__enter__) + + def __exit__(self, *args): + # Don't use native thread for exit, as it will only add overhead + return self.semaphore.__exit__(*args) + + +def semaphore_factory(limit, concurrent_processes): + """Get a semaphore to limit concurrent operations. + + The semaphore depends on the limit we want to set and the concurrent + processes that need to be limited. + """ + # Limit of 0 is no limit, so we won't use a semaphore + if limit: + # If we only have 1 process we can use eventlet's Semaphore + if concurrent_processes == 1: + return eventlet.Semaphore(limit) + # Use our own Sempahore for interprocess because eventlet blocks with + # the standard one + return Semaphore(limit) + return contextlib.suppress() + + +def limit_operations(func): + """Decorator to limit the number of concurrent operations. + + This method decorator expects to have a _semaphore attribute holding an + initialized semaphore in the self instance object. + + We can get the appropriate semaphore with the semaphore_factory method. + """ + @functools.wraps(func) + def wrapper(self, *args, **kwargs): + with self._semaphore: + return func(self, *args, **kwargs) + return wrapper diff --git a/doc/source/admin/blockstorage-volume-backups.rst b/doc/source/admin/blockstorage-volume-backups.rst index 00e49837bfb..f19d3f10724 100644 --- a/doc/source/admin/blockstorage-volume-backups.rst +++ b/doc/source/admin/blockstorage-volume-backups.rst @@ -215,3 +215,49 @@ to avoid any confusion on whether the restore was successful or not. destination volume is useless, as there is no way of knowing how much data, or if any, was actually restored, hence our recommendation of using the "error" state. + +backup_max_operations +--------------------- + +With this configuration option will let us select the maximum number of +operations, backup and restore, that can be performed concurrently. + +This option has a default value of 15, which means that we can have 15 +concurrent backups, or 15 concurrent restores, or any combination of backups +and restores as long as the sum of the 2 operations don't exceed 15. + +The concurrency limitation of this configuration option is also enforced when +we run multiple processes for the same backup service using the +``backup_workers`` configuration option. It is not a per process restriction, +but global to the service, so we won't be able to run ``backup_max_operations`` +on each one of the processes, but on all the running processes from the same +backup service. + +Backups and restore operations are both CPU and memory intensive, but thanks to +this option we can limit the concurrency and prevent DoS attacks or just +service disruptions caused by many concurrent requests that lead to Out of +Memory (OOM) kills. + +The amount of memory (RAM) used during the operation depends on the configured +chunk size as well as the compression ratio achieved on the data during the +operation. + +Example: + + Let's have a look at how much memory would be needed if we use the default + backup chunk size (~1.86 GB) while doing a restore to an RBD volume from a + non Ceph backend (Swift, NFS etc). + + In a restore operation the worst case scenario, from the memory point of + view, is when the compression ratio is close to 0% (the compressed data chunk + is almost the same size as the uncompressed data). + + In this case the memory usage would be ~5.58 GB of data for each chunk: + ~5.58 GB = read buffer + decompressed buffer + write buffer used by the + librbd library = ~1.86 GB + 1.86 GB + 1.86 GB + + For 15 concurrent restore operations, the cinder-backup service will require + ~83.7 GB of memory. + +Similar calculations can be done for environment specific scenarios and this +config option can be set accordingly. diff --git a/releasenotes/notes/backup_max_operations-27753c748ba1dc1a.yaml b/releasenotes/notes/backup_max_operations-27753c748ba1dc1a.yaml new file mode 100644 index 00000000000..16164e1006c --- /dev/null +++ b/releasenotes/notes/backup_max_operations-27753c748ba1dc1a.yaml @@ -0,0 +1,6 @@ +--- +features: + - | + We can now limit the number of concurrent backup/restore operations that a + Cinder backup service can perform using the ``backup_max_operations`` + configuration option.