Backup: Limit number of concurent operations
This patch adds the backup_max_operations configuration option that limits the number of concurrent backup and restore operations. These operations can consume large amounts of RAM, so we should limit them according to our system resources to avoid processes being killed because we run out of memory. The default is 15 concurrent operations, which could use up to 85GB of RAM(in case of ceph which uses the maximum memory for this operation). More details are added in the document. Closes-Bug: #1865011 Co-authored-by: Rajat Dhasmana <rajatdhasmana@gmail.com> Change-Id: Ica4e0cea67bc894f61a57c7898977951ce3a3633
This commit is contained in:
parent
294f10d7b2
commit
30c2289c9b
@ -31,6 +31,7 @@ Volume backups can be created, restored, deleted and listed.
|
||||
|
||||
"""
|
||||
|
||||
import contextlib
|
||||
import os
|
||||
|
||||
from castellan import key_manager
|
||||
@ -127,6 +128,7 @@ class BackupManager(manager.SchedulerDependentManager):
|
||||
self.is_initialized = False
|
||||
self._set_tpool_size(CONF.backup_native_threads_pool_size)
|
||||
self._process_number = kwargs.get('process_number', 1)
|
||||
self._semaphore = kwargs.get('semaphore', contextlib.suppress())
|
||||
self.driver_name = CONF.backup_driver
|
||||
if self.driver_name in MAPPING:
|
||||
new_name = MAPPING[self.driver_name]
|
||||
@ -330,6 +332,7 @@ class BackupManager(manager.SchedulerDependentManager):
|
||||
if backup.temp_snapshot_id:
|
||||
self._delete_temp_snapshot(ctxt, backup)
|
||||
|
||||
@utils.limit_operations
|
||||
def create_backup(self, context, backup):
|
||||
"""Create volume backups using configured backup service."""
|
||||
volume_id = backup.volume_id
|
||||
@ -519,6 +522,7 @@ class BackupManager(manager.SchedulerDependentManager):
|
||||
|
||||
return False
|
||||
|
||||
@utils.limit_operations
|
||||
def restore_backup(self, context, backup, volume_id):
|
||||
"""Restore volume backups from configured backup service."""
|
||||
LOG.info('Restore backup started, backup: %(backup_id)s '
|
||||
|
@ -21,9 +21,10 @@ import logging as python_logging
|
||||
import shlex
|
||||
import sys
|
||||
|
||||
# NOTE(geguileo): Monkey patching must go before OSLO.log import, otherwise
|
||||
# OSLO.context will not use greenthread thread local and all greenthreads will
|
||||
# share the same context.
|
||||
# NOTE: Monkey patching must go before OSLO.log import, otherwise OSLO.context
|
||||
# will not use greenthread thread local and all greenthreads will share the
|
||||
# same context. It's also a good idea to monkey patch everything before
|
||||
# loading multiprocessing
|
||||
import eventlet
|
||||
eventlet.monkey_patch()
|
||||
# Monkey patch the original current_thread to use the up-to-date _active
|
||||
@ -53,12 +54,18 @@ from cinder import version
|
||||
|
||||
CONF = cfg.CONF
|
||||
|
||||
backup_workers_opt = cfg.IntOpt(
|
||||
'backup_workers',
|
||||
backup_cmd_opts = [
|
||||
cfg.IntOpt('backup_workers',
|
||||
default=1, min=1, max=processutils.get_worker_count(),
|
||||
help='Number of backup processes to launch. Improves performance with '
|
||||
'concurrent backups.')
|
||||
CONF.register_opt(backup_workers_opt)
|
||||
help='Number of backup processes to launch. '
|
||||
'Improves performance with concurrent backups.'),
|
||||
cfg.IntOpt('backup_max_operations',
|
||||
default=15,
|
||||
min=0,
|
||||
help='Maximum number of concurrent memory heavy operations: '
|
||||
'backup and restore. Value of 0 means unlimited'),
|
||||
]
|
||||
CONF.register_opts(backup_cmd_opts)
|
||||
|
||||
LOG = None
|
||||
|
||||
@ -69,12 +76,13 @@ LOG = None
|
||||
_EXTRA_DEFAULT_LOG_LEVELS = ['swiftclient=WARN']
|
||||
|
||||
|
||||
def _launch_backup_process(launcher, num_process):
|
||||
def _launch_backup_process(launcher, num_process, _semaphore):
|
||||
try:
|
||||
server = service.Service.create(binary='cinder-backup',
|
||||
coordination=True,
|
||||
service_name='backup',
|
||||
process_number=num_process)
|
||||
process_number=num_process,
|
||||
semaphore=_semaphore)
|
||||
except Exception:
|
||||
LOG.exception('Backup service %s failed to start.', CONF.host)
|
||||
sys.exit(1)
|
||||
@ -101,11 +109,13 @@ def main():
|
||||
gmr.TextGuruMeditation.setup_autorun(version, conf=CONF)
|
||||
global LOG
|
||||
LOG = logging.getLogger(__name__)
|
||||
semaphore = utils.semaphore_factory(CONF.backup_max_operations,
|
||||
CONF.backup_workers)
|
||||
|
||||
LOG.info('Backup running with %s processes.', CONF.backup_workers)
|
||||
launcher = service.get_launcher()
|
||||
|
||||
for i in range(1, CONF.backup_workers + 1):
|
||||
_launch_backup_process(launcher, i)
|
||||
_launch_backup_process(launcher, i, semaphore)
|
||||
|
||||
launcher.wait()
|
||||
|
@ -210,7 +210,7 @@ def list_opts():
|
||||
cinder_backup_drivers_swift.swiftbackup_service_opts,
|
||||
cinder_backup_drivers_tsm.tsm_opts,
|
||||
cinder_backup_manager.backup_manager_opts,
|
||||
[cinder_cmd_backup.backup_workers_opt],
|
||||
cinder_cmd_backup.backup_cmd_opts,
|
||||
[cinder_cmd_volume.cluster_opt],
|
||||
cinder_common_config.api_opts,
|
||||
cinder_common_config.core_opts,
|
||||
|
@ -1869,6 +1869,34 @@ class BackupTestCase(BaseBackupTest):
|
||||
self.assertEqual(100, tpool._nthreads)
|
||||
self.assertListEqual([], tpool._threads)
|
||||
|
||||
@mock.patch('cinder.backup.manager.BackupManager._run_restore')
|
||||
def test_backup_max_operations_restore(self, mock_restore):
|
||||
mock_sem = self.mock_object(self.backup_mgr, '_semaphore')
|
||||
vol_id = self._create_volume_db_entry(
|
||||
status=fields.VolumeStatus.RESTORING_BACKUP)
|
||||
backup = self._create_backup_db_entry(
|
||||
volume_id=vol_id, status=fields.BackupStatus.RESTORING)
|
||||
|
||||
self.backup_mgr.restore_backup(self.ctxt, backup, vol_id)
|
||||
|
||||
self.assertEqual(1, mock_sem.__enter__.call_count)
|
||||
self.assertEqual(1, mock_restore.call_count)
|
||||
self.assertEqual(1, mock_sem.__exit__.call_count)
|
||||
|
||||
@mock.patch('cinder.backup.manager.BackupManager._run_backup')
|
||||
def test_backup_max_operations_backup(self, mock_backup):
|
||||
mock_sem = self.mock_object(self.backup_mgr, '_semaphore')
|
||||
vol_id = self._create_volume_db_entry(
|
||||
status=fields.VolumeStatus.BACKING_UP)
|
||||
backup = self._create_backup_db_entry(
|
||||
volume_id=vol_id, status=fields.BackupStatus.CREATING)
|
||||
|
||||
self.backup_mgr.create_backup(self.ctxt, backup)
|
||||
|
||||
self.assertEqual(1, mock_sem.__enter__.call_count)
|
||||
self.assertEqual(1, mock_backup.call_count)
|
||||
self.assertEqual(1, mock_sem.__exit__.call_count)
|
||||
|
||||
|
||||
@ddt.ddt
|
||||
class BackupAPITestCase(BaseBackupTest):
|
||||
|
@ -96,25 +96,31 @@ class TestCinderBackupCmd(test.TestCase):
|
||||
super(TestCinderBackupCmd, self).setUp()
|
||||
sys.argv = ['cinder-backup']
|
||||
|
||||
@mock.patch('cinder.utils.Semaphore')
|
||||
@mock.patch('cinder.service.get_launcher')
|
||||
@mock.patch('cinder.service.Service.create')
|
||||
@mock.patch('cinder.utils.monkey_patch')
|
||||
@mock.patch('oslo_log.log.setup')
|
||||
def test_main_multiprocess(self, log_setup, monkey_patch, service_create,
|
||||
get_launcher):
|
||||
get_launcher, mock_semaphore):
|
||||
CONF.set_override('backup_workers', 2)
|
||||
mock_semaphore.side_effect = [mock.sentinel.semaphore1,
|
||||
mock.sentinel.semaphore2]
|
||||
cinder_backup.main()
|
||||
|
||||
self.assertEqual('cinder', CONF.project)
|
||||
self.assertEqual(CONF.version, version.version_string())
|
||||
|
||||
# Both calls must receive the same semaphore
|
||||
c1 = mock.call(binary=constants.BACKUP_BINARY,
|
||||
coordination=True,
|
||||
process_number=1,
|
||||
semaphore=mock.sentinel.semaphore1,
|
||||
service_name='backup')
|
||||
c2 = mock.call(binary=constants.BACKUP_BINARY,
|
||||
coordination=True,
|
||||
process_number=2,
|
||||
semaphore=mock.sentinel.semaphore1,
|
||||
service_name='backup')
|
||||
service_create.assert_has_calls([c1, c2])
|
||||
|
||||
|
@ -1670,3 +1670,44 @@ class TestAutoMaxOversubscriptionRatio(test.TestCase):
|
||||
if result is not None:
|
||||
result = round(result, 2)
|
||||
self.assertEqual(expected_result, result)
|
||||
|
||||
|
||||
@ddt.ddt
|
||||
class LimitOperationsTestCase(test.TestCase):
|
||||
@ddt.data(1, 5)
|
||||
@mock.patch('contextlib.suppress')
|
||||
def test_semaphore_factory_no_limit(self, processes, mock_suppress):
|
||||
res = utils.semaphore_factory(0, processes)
|
||||
mock_suppress.assert_called_once_with()
|
||||
self.assertEqual(mock_suppress.return_value, res)
|
||||
|
||||
@mock.patch('eventlet.Semaphore')
|
||||
def test_semaphore_factory_with_limit(self, mock_semaphore):
|
||||
max_operations = 15
|
||||
res = utils.semaphore_factory(max_operations, 1)
|
||||
mock_semaphore.assert_called_once_with(max_operations)
|
||||
self.assertEqual(mock_semaphore.return_value, res)
|
||||
|
||||
@mock.patch('cinder.utils.Semaphore')
|
||||
def test_semaphore_factory_with_limit_and_workers(self, mock_semaphore):
|
||||
max_operations = 15
|
||||
processes = 5
|
||||
res = utils.semaphore_factory(max_operations, processes)
|
||||
mock_semaphore.assert_called_once_with(max_operations)
|
||||
self.assertEqual(mock_semaphore.return_value, res)
|
||||
|
||||
@mock.patch('multiprocessing.Semaphore')
|
||||
@mock.patch('eventlet.tpool.execute')
|
||||
def test_semaphore(self, mock_exec, mock_semaphore):
|
||||
limit = 15
|
||||
res = utils.Semaphore(limit)
|
||||
self.assertEqual(limit, res.limit)
|
||||
|
||||
mocked_semaphore = mock_semaphore.return_value
|
||||
self.assertEqual(mocked_semaphore, res.semaphore)
|
||||
mock_semaphore.assert_called_once_with(limit)
|
||||
|
||||
with res:
|
||||
mock_exec.assert_called_once_with(mocked_semaphore.__enter__)
|
||||
mocked_semaphore.__exit__.assert_not_called()
|
||||
mocked_semaphore.__exit__.assert_called_once_with(None, None, None)
|
||||
|
@ -25,6 +25,7 @@ import functools
|
||||
import inspect
|
||||
import logging as py_logging
|
||||
import math
|
||||
import multiprocessing
|
||||
import operator
|
||||
import os
|
||||
import pyclbr
|
||||
@ -37,6 +38,8 @@ import time
|
||||
import types
|
||||
|
||||
from castellan import key_manager
|
||||
import eventlet
|
||||
from eventlet import tpool
|
||||
from os_brick import encryptors
|
||||
from os_brick.initiator import connector
|
||||
from oslo_concurrency import lockutils
|
||||
@ -1066,3 +1069,52 @@ def create_ordereddict(adict):
|
||||
"""Given a dict, return a sorted OrderedDict."""
|
||||
return OrderedDict(sorted(adict.items(),
|
||||
key=operator.itemgetter(0)))
|
||||
|
||||
|
||||
class Semaphore(object):
|
||||
"""Custom semaphore to workaround eventlet issues with multiprocessing."""
|
||||
def __init__(self, limit):
|
||||
self.limit = limit
|
||||
self.semaphore = multiprocessing.Semaphore(limit)
|
||||
|
||||
def __enter__(self):
|
||||
# Eventlet does not work with multiprocessing's Semaphore, so we have
|
||||
# to execute it in a native thread to avoid getting blocked when trying
|
||||
# to acquire the semaphore.
|
||||
return tpool.execute(self.semaphore.__enter__)
|
||||
|
||||
def __exit__(self, *args):
|
||||
# Don't use native thread for exit, as it will only add overhead
|
||||
return self.semaphore.__exit__(*args)
|
||||
|
||||
|
||||
def semaphore_factory(limit, concurrent_processes):
|
||||
"""Get a semaphore to limit concurrent operations.
|
||||
|
||||
The semaphore depends on the limit we want to set and the concurrent
|
||||
processes that need to be limited.
|
||||
"""
|
||||
# Limit of 0 is no limit, so we won't use a semaphore
|
||||
if limit:
|
||||
# If we only have 1 process we can use eventlet's Semaphore
|
||||
if concurrent_processes == 1:
|
||||
return eventlet.Semaphore(limit)
|
||||
# Use our own Sempahore for interprocess because eventlet blocks with
|
||||
# the standard one
|
||||
return Semaphore(limit)
|
||||
return contextlib.suppress()
|
||||
|
||||
|
||||
def limit_operations(func):
|
||||
"""Decorator to limit the number of concurrent operations.
|
||||
|
||||
This method decorator expects to have a _semaphore attribute holding an
|
||||
initialized semaphore in the self instance object.
|
||||
|
||||
We can get the appropriate semaphore with the semaphore_factory method.
|
||||
"""
|
||||
@functools.wraps(func)
|
||||
def wrapper(self, *args, **kwargs):
|
||||
with self._semaphore:
|
||||
return func(self, *args, **kwargs)
|
||||
return wrapper
|
||||
|
@ -215,3 +215,49 @@ to avoid any confusion on whether the restore was successful or not.
|
||||
destination volume is useless, as there is no way of knowing how much data,
|
||||
or if any, was actually restored, hence our recommendation of using the
|
||||
"error" state.
|
||||
|
||||
backup_max_operations
|
||||
---------------------
|
||||
|
||||
With this configuration option will let us select the maximum number of
|
||||
operations, backup and restore, that can be performed concurrently.
|
||||
|
||||
This option has a default value of 15, which means that we can have 15
|
||||
concurrent backups, or 15 concurrent restores, or any combination of backups
|
||||
and restores as long as the sum of the 2 operations don't exceed 15.
|
||||
|
||||
The concurrency limitation of this configuration option is also enforced when
|
||||
we run multiple processes for the same backup service using the
|
||||
``backup_workers`` configuration option. It is not a per process restriction,
|
||||
but global to the service, so we won't be able to run ``backup_max_operations``
|
||||
on each one of the processes, but on all the running processes from the same
|
||||
backup service.
|
||||
|
||||
Backups and restore operations are both CPU and memory intensive, but thanks to
|
||||
this option we can limit the concurrency and prevent DoS attacks or just
|
||||
service disruptions caused by many concurrent requests that lead to Out of
|
||||
Memory (OOM) kills.
|
||||
|
||||
The amount of memory (RAM) used during the operation depends on the configured
|
||||
chunk size as well as the compression ratio achieved on the data during the
|
||||
operation.
|
||||
|
||||
Example:
|
||||
|
||||
Let's have a look at how much memory would be needed if we use the default
|
||||
backup chunk size (~1.86 GB) while doing a restore to an RBD volume from a
|
||||
non Ceph backend (Swift, NFS etc).
|
||||
|
||||
In a restore operation the worst case scenario, from the memory point of
|
||||
view, is when the compression ratio is close to 0% (the compressed data chunk
|
||||
is almost the same size as the uncompressed data).
|
||||
|
||||
In this case the memory usage would be ~5.58 GB of data for each chunk:
|
||||
~5.58 GB = read buffer + decompressed buffer + write buffer used by the
|
||||
librbd library = ~1.86 GB + 1.86 GB + 1.86 GB
|
||||
|
||||
For 15 concurrent restore operations, the cinder-backup service will require
|
||||
~83.7 GB of memory.
|
||||
|
||||
Similar calculations can be done for environment specific scenarios and this
|
||||
config option can be set accordingly.
|
||||
|
@ -0,0 +1,6 @@
|
||||
---
|
||||
features:
|
||||
- |
|
||||
We can now limit the number of concurrent backup/restore operations that a
|
||||
Cinder backup service can perform using the ``backup_max_operations``
|
||||
configuration option.
|
Loading…
x
Reference in New Issue
Block a user