Run backup-restore operations on native thread

During huge backup file read write operations holds the CPU which
leads to thread starvation, and cause cinder backup service to
report down, as DB operations are impacted.
Proposed changes are to run CPU and file sensitive operations like
read, write, compress, decompress on a native thread.

Change-Id: I1f1d9c0d6e3f04f1ecd5ef7c5d813005ee116409
Closes-Bug: #1692775
Co-Authored-By: Gorka Eguileor <geguileo@redhat.com>
This commit is contained in:
Chhavi Agarwal
2017-11-07 07:05:49 -05:00
committed by Gorka Eguileor
parent 8f3dfd7299
commit dd556fa755
6 changed files with 104 additions and 39 deletions
+26 -11
View File
@@ -76,12 +76,18 @@ class ChunkedBackupDriver(driver.BackupDriver):
try:
if algorithm.lower() in ('none', 'off', 'no'):
return None
elif algorithm.lower() in ('zlib', 'gzip'):
if algorithm.lower() in ('zlib', 'gzip'):
import zlib as compressor
return compressor
result = compressor
elif algorithm.lower() in ('bz2', 'bzip2'):
import bz2 as compressor
return compressor
result = compressor
else:
result = None
if result:
# NOTE(geguileo): Compression/Decompression starves
# greenthreads so we use a native thread instead.
return eventlet.tpool.Proxy(result)
except ImportError:
pass
@@ -105,6 +111,16 @@ class ChunkedBackupDriver(driver.BackupDriver):
self._get_compressor(CONF.backup_compression_algorithm)
self.support_force_delete = True
def _get_object_writer(self, container, object_name, extra_metadata=None):
"""Return writer proxy-wrapped to execute methods in native thread."""
writer = self.get_object_writer(container, object_name, extra_metadata)
return eventlet.tpool.Proxy(writer)
def _get_object_reader(self, container, object_name, extra_metadata=None):
"""Return reader proxy-wrapped to execute methods in native thread."""
reader = self.get_object_reader(container, object_name, extra_metadata)
return eventlet.tpool.Proxy(reader)
# To create your own "chunked" backup driver, implement the following
# abstract methods.
@@ -222,7 +238,7 @@ class ChunkedBackupDriver(driver.BackupDriver):
metadata_json = json.dumps(metadata, sort_keys=True, indent=2)
if six.PY3:
metadata_json = metadata_json.encode('utf-8')
with self.get_object_writer(container, filename) as writer:
with self._get_object_writer(container, filename) as writer:
writer.write(metadata_json)
LOG.debug('_write_metadata finished. Metadata: %s.', metadata_json)
@@ -243,7 +259,7 @@ class ChunkedBackupDriver(driver.BackupDriver):
sha256file_json = json.dumps(sha256file, sort_keys=True, indent=2)
if six.PY3:
sha256file_json = sha256file_json.encode('utf-8')
with self.get_object_writer(container, filename) as writer:
with self._get_object_writer(container, filename) as writer:
writer.write(sha256file_json)
LOG.debug('_write_sha256file finished.')
@@ -253,7 +269,7 @@ class ChunkedBackupDriver(driver.BackupDriver):
LOG.debug('_read_metadata started, container name: %(container)s, '
'metadata filename: %(filename)s.',
{'container': container, 'filename': filename})
with self.get_object_reader(container, filename) as reader:
with self._get_object_reader(container, filename) as reader:
metadata_json = reader.read()
if six.PY3:
metadata_json = metadata_json.decode('utf-8')
@@ -267,7 +283,7 @@ class ChunkedBackupDriver(driver.BackupDriver):
LOG.debug('_read_sha256file started, container name: %(container)s, '
'sha256 filename: %(filename)s.',
{'container': container, 'filename': filename})
with self.get_object_reader(container, filename) as reader:
with self._get_object_reader(container, filename) as reader:
sha256file_json = reader.read()
if six.PY3:
sha256file_json = sha256file_json.decode('utf-8')
@@ -327,7 +343,7 @@ class ChunkedBackupDriver(driver.BackupDriver):
algorithm, output_data = self._prepare_output_data(data)
obj[object_name]['compression'] = algorithm
LOG.debug('About to put_object')
with self.get_object_writer(
with self._get_object_writer(
container, object_name, extra_metadata=extra_metadata
) as writer:
writer.write(output_data)
@@ -349,8 +365,7 @@ class ChunkedBackupDriver(driver.BackupDriver):
data_size_bytes = len(data)
# Execute compression in native thread so it doesn't prevent
# cooperative greenthread switching.
compressed_data = eventlet.tpool.execute(self.compressor.compress,
data)
compressed_data = self.compressor.compress(data)
comp_size_bytes = len(compressed_data)
algorithm = CONF.backup_compression_algorithm.lower()
if comp_size_bytes >= data_size_bytes:
@@ -618,7 +633,7 @@ class ChunkedBackupDriver(driver.BackupDriver):
'volume_id': volume_id,
})
with self.get_object_reader(
with self._get_object_reader(
container, object_name,
extra_metadata=extra_metadata) as reader:
body = reader.read()
+14 -2
View File
@@ -374,12 +374,24 @@ class BackupDriver(base.Base):
@abc.abstractmethod
def backup(self, backup, volume_file, backup_metadata=False):
"""Start a backup of a specified volume."""
"""Start a backup of a specified volume.
Some I/O operations may block greenthreads, so in order to prevent
starvation parameter volume_file will be a proxy that will execute all
methods in native threads, so the method implementation doesn't need to
worry about that..
"""
return
@abc.abstractmethod
def restore(self, backup, volume_id, volume_file):
"""Restore a saved backup."""
"""Restore a saved backup.
Some I/O operations may block greenthreads, so in order to prevent
starvation parameter volume_file will be a proxy that will execute all
methods in native threads, so the method implementation doesn't need to
worry about that..
"""
return
@abc.abstractmethod
+25 -7
View File
@@ -32,6 +32,8 @@ Volume backups can be created, restored, deleted and listed.
"""
import os
from eventlet import tpool
from oslo_config import cfg
from oslo_log import log as logging
from oslo_log import versionutils
@@ -82,6 +84,12 @@ CONF.import_opt('num_volume_device_scan_tries', 'cinder.volume.driver')
QUOTAS = quota.QUOTAS
# TODO(geguileo): Once Eventlet issue #432 gets fixed we can just tpool.execute
# the whole call to the driver's backup and restore methods instead of proxy
# wrapping the device_file and having the drivers also proxy wrap their
# writes/reads and the compression/decompression calls.
# (https://github.com/eventlet/eventlet/issues/432)
class BackupManager(manager.ThreadPoolManager):
"""Manages backup of block storage devices."""
@@ -408,6 +416,10 @@ class BackupManager(manager.ThreadPoolManager):
backup_service = self.get_backup_driver(context)
properties = utils.brick_get_connector_properties()
# NOTE(geguileo): Not all I/O disk operations properly do greenthread
# context switching and may end up blocking the greenthread, so we go
# with native threads proxy-wrapping the device file object.
try:
backup_device = self.volume_rpcapi.get_backup_device(context,
backup,
@@ -423,16 +435,16 @@ class BackupManager(manager.ThreadPoolManager):
if backup_device.secure_enabled:
with open(device_path) as device_file:
updates = backup_service.backup(
backup, device_file)
backup, tpool.Proxy(device_file))
else:
with utils.temporary_chown(device_path):
with open(device_path) as device_file:
updates = backup_service.backup(
backup, device_file)
backup, tpool.Proxy(device_file))
# device_path is already file-like so no need to open it
else:
updates = backup_service.backup(
backup, device_path)
updates = backup_service.backup(backup,
tpool.Proxy(device_path))
finally:
self._detach_device(context, attach_info,
@@ -534,21 +546,27 @@ class BackupManager(manager.ThreadPoolManager):
self.volume_rpcapi.secure_file_operations_enabled(context,
volume))
attach_info = self._attach_device(context, volume, properties)
# NOTE(geguileo): Not all I/O disk operations properly do greenthread
# context switching and may end up blocking the greenthread, so we go
# with native threads proxy-wrapping the device file object.
try:
device_path = attach_info['device']['path']
if (isinstance(device_path, six.string_types) and
not os.path.isdir(device_path)):
if secure_enabled:
with open(device_path, 'wb') as device_file:
backup_service.restore(backup, volume.id, device_file)
backup_service.restore(backup, volume.id,
tpool.Proxy(device_file))
else:
with utils.temporary_chown(device_path):
with open(device_path, 'wb') as device_file:
backup_service.restore(backup, volume.id,
device_file)
tpool.Proxy(device_file))
# device_path is already file-like so no need to open it
else:
backup_service.restore(backup, volume.id, device_path)
backup_service.restore(backup, volume.id,
tpool.Proxy(device_path))
finally:
self._detach_device(context, attach_info, volume, properties,
force=True)
@@ -28,6 +28,7 @@ import tempfile
import threading
import zlib
from eventlet import tpool
import mock
from oslo_utils import units
@@ -551,8 +552,10 @@ class GoogleBackupDriverTestCase(test.TestCase):
self.assertIsNone(compressor)
compressor = service._get_compressor('zlib')
self.assertEqual(zlib, compressor)
self.assertIsInstance(compressor, tpool.Proxy)
compressor = service._get_compressor('bz2')
self.assertEqual(bz2, compressor)
self.assertIsInstance(compressor, tpool.Proxy)
self.assertRaises(ValueError, service._get_compressor, 'fake')
@gcs_client
@@ -562,17 +565,17 @@ class GoogleBackupDriverTestCase(test.TestCase):
thread_dict = {}
original_compress = zlib.compress
def my_compress(data, *args, **kwargs):
def my_compress(data):
thread_dict['compress'] = threading.current_thread()
return original_compress(data)
self.mock_object(zlib, 'compress', side_effect=my_compress)
service = google_dr.GoogleBackupDriver(self.ctxt)
# Set up buffer of 128 zeroed bytes
fake_data = b'\0' * 128
with mock.patch.object(service.compressor, 'compress',
side_effect=my_compress):
result = service._prepare_output_data(fake_data)
result = service._prepare_output_data(fake_data)
self.assertEqual('zlib', result[0])
self.assertGreater(len(fake_data), len(result[1]))
@@ -25,6 +25,7 @@ import tempfile
import threading
import zlib
from eventlet import tpool
import mock
from os_brick.remotefs import remotefs as remotefs_brick
from oslo_config import cfg
@@ -149,6 +150,10 @@ class BackupNFSSwiftBasedTestCase(test.TestCase):
self.volume_file.write(bytes([65] * data_size))
self.volume_file.seek(0)
def _store_thread(self, *args, **kwargs):
self.thread_dict['thread'] = threading.current_thread()
return self.thread_original_method(*args, **kwargs)
def setUp(self):
super(BackupNFSSwiftBasedTestCase, self).setUp()
@@ -173,6 +178,9 @@ class BackupNFSSwiftBasedTestCase(test.TestCase):
self.volume_file.write(os.urandom(1024))
self.size_volume_file += 1024
# Use dictionary to share data between threads
self.thread_dict = {}
def test_backup_uncompressed(self):
volume_id = fake.VOLUME_ID
self._create_backup_db_entry(volume_id=volume_id)
@@ -573,7 +581,9 @@ class BackupNFSSwiftBasedTestCase(test.TestCase):
restored_file.name))
def test_restore_bz2(self):
self.thread_original_method = bz2.decompress
volume_id = fake.VOLUME_ID
self.mock_object(bz2, 'decompress', side_effect=self._store_thread)
self._create_backup_db_entry(volume_id=volume_id)
self.flags(backup_compression_algorithm='bz2')
@@ -591,7 +601,12 @@ class BackupNFSSwiftBasedTestCase(test.TestCase):
self.assertTrue(filecmp.cmp(self.volume_file.name,
restored_file.name))
self.assertNotEqual(threading.current_thread(),
self.thread_dict['thread'])
def test_restore_zlib(self):
self.thread_original_method = zlib.decompress
self.mock_object(zlib, 'decompress', side_effect=self._store_thread)
volume_id = fake.VOLUME_ID
self._create_backup_db_entry(volume_id=volume_id)
@@ -610,6 +625,9 @@ class BackupNFSSwiftBasedTestCase(test.TestCase):
self.assertTrue(filecmp.cmp(self.volume_file.name,
restored_file.name))
self.assertNotEqual(threading.current_thread(),
self.thread_dict['thread'])
def test_restore_delta(self):
volume_id = fake.VOLUME_ID
@@ -672,8 +690,10 @@ class BackupNFSSwiftBasedTestCase(test.TestCase):
self.assertIsNone(compressor)
compressor = service._get_compressor('zlib')
self.assertEqual(compressor, zlib)
self.assertIsInstance(compressor, tpool.Proxy)
compressor = service._get_compressor('bz2')
self.assertEqual(compressor, bz2)
self.assertIsInstance(compressor, tpool.Proxy)
self.assertRaises(ValueError, service._get_compressor, 'fake')
def create_buffer(self, size):
@@ -688,24 +708,18 @@ class BackupNFSSwiftBasedTestCase(test.TestCase):
def test_prepare_output_data_effective_compression(self):
"""Test compression works on a native thread."""
# Use dictionary to share data between threads
thread_dict = {}
original_compress = zlib.compress
def my_compress(data, *args, **kwargs):
thread_dict['compress'] = threading.current_thread()
return original_compress(data)
self.thread_original_method = zlib.compress
self.mock_object(zlib, 'compress', side_effect=self._store_thread)
service = nfs.NFSBackupDriver(self.ctxt)
fake_data = self.create_buffer(128)
with mock.patch.object(service.compressor, 'compress',
side_effect=my_compress):
result = service._prepare_output_data(fake_data)
result = service._prepare_output_data(fake_data)
self.assertEqual('zlib', result[0])
self.assertGreater(len(fake_data), len(result[1]))
self.assertNotEqual(threading.current_thread(),
thread_dict['compress'])
self.thread_dict['thread'])
def test_prepare_output_data_no_compresssion(self):
self.flags(backup_compression_algorithm='none')
@@ -27,6 +27,7 @@ import tempfile
import threading
import zlib
from eventlet import tpool
import mock
from oslo_config import cfg
from swiftclient import client as swift
@@ -821,8 +822,10 @@ class BackupSwiftTestCase(test.TestCase):
self.assertIsNone(compressor)
compressor = service._get_compressor('zlib')
self.assertEqual(zlib, compressor)
self.assertIsInstance(compressor, tpool.Proxy)
compressor = service._get_compressor('bz2')
self.assertEqual(bz2, compressor)
self.assertIsInstance(compressor, tpool.Proxy)
self.assertRaises(ValueError, service._get_compressor, 'fake')
def test_prepare_output_data_effective_compression(self):
@@ -831,17 +834,17 @@ class BackupSwiftTestCase(test.TestCase):
thread_dict = {}
original_compress = zlib.compress
def my_compress(data, *args, **kwargs):
def my_compress(data):
thread_dict['compress'] = threading.current_thread()
return original_compress(data)
self.mock_object(zlib, 'compress', side_effect=my_compress)
service = swift_dr.SwiftBackupDriver(self.ctxt)
# Set up buffer of 128 zeroed bytes
fake_data = b'\0' * 128
with mock.patch.object(service.compressor, 'compress',
side_effect=my_compress):
result = service._prepare_output_data(fake_data)
result = service._prepare_output_data(fake_data)
self.assertEqual('zlib', result[0])
self.assertGreater(len(fake_data), len(result[1]))