Merge "Run backup-restore operations on native thread"

This commit is contained in:
Zuul 2018-01-23 22:09:09 +00:00 committed by Gerrit Code Review
commit f81bb3c77d
6 changed files with 104 additions and 39 deletions

View File

@ -76,12 +76,18 @@ class ChunkedBackupDriver(driver.BackupDriver):
try:
if algorithm.lower() in ('none', 'off', 'no'):
return None
elif algorithm.lower() in ('zlib', 'gzip'):
if algorithm.lower() in ('zlib', 'gzip'):
import zlib as compressor
return compressor
result = compressor
elif algorithm.lower() in ('bz2', 'bzip2'):
import bz2 as compressor
return compressor
result = compressor
else:
result = None
if result:
# NOTE(geguileo): Compression/Decompression starves
# greenthreads so we use a native thread instead.
return eventlet.tpool.Proxy(result)
except ImportError:
pass
@ -105,6 +111,16 @@ class ChunkedBackupDriver(driver.BackupDriver):
self._get_compressor(CONF.backup_compression_algorithm)
self.support_force_delete = True
def _get_object_writer(self, container, object_name, extra_metadata=None):
"""Return writer proxy-wrapped to execute methods in native thread."""
writer = self.get_object_writer(container, object_name, extra_metadata)
return eventlet.tpool.Proxy(writer)
def _get_object_reader(self, container, object_name, extra_metadata=None):
"""Return reader proxy-wrapped to execute methods in native thread."""
reader = self.get_object_reader(container, object_name, extra_metadata)
return eventlet.tpool.Proxy(reader)
# To create your own "chunked" backup driver, implement the following
# abstract methods.
@ -222,7 +238,7 @@ class ChunkedBackupDriver(driver.BackupDriver):
metadata_json = json.dumps(metadata, sort_keys=True, indent=2)
if six.PY3:
metadata_json = metadata_json.encode('utf-8')
with self.get_object_writer(container, filename) as writer:
with self._get_object_writer(container, filename) as writer:
writer.write(metadata_json)
LOG.debug('_write_metadata finished. Metadata: %s.', metadata_json)
@ -243,7 +259,7 @@ class ChunkedBackupDriver(driver.BackupDriver):
sha256file_json = json.dumps(sha256file, sort_keys=True, indent=2)
if six.PY3:
sha256file_json = sha256file_json.encode('utf-8')
with self.get_object_writer(container, filename) as writer:
with self._get_object_writer(container, filename) as writer:
writer.write(sha256file_json)
LOG.debug('_write_sha256file finished.')
@ -253,7 +269,7 @@ class ChunkedBackupDriver(driver.BackupDriver):
LOG.debug('_read_metadata started, container name: %(container)s, '
'metadata filename: %(filename)s.',
{'container': container, 'filename': filename})
with self.get_object_reader(container, filename) as reader:
with self._get_object_reader(container, filename) as reader:
metadata_json = reader.read()
if six.PY3:
metadata_json = metadata_json.decode('utf-8')
@ -267,7 +283,7 @@ class ChunkedBackupDriver(driver.BackupDriver):
LOG.debug('_read_sha256file started, container name: %(container)s, '
'sha256 filename: %(filename)s.',
{'container': container, 'filename': filename})
with self.get_object_reader(container, filename) as reader:
with self._get_object_reader(container, filename) as reader:
sha256file_json = reader.read()
if six.PY3:
sha256file_json = sha256file_json.decode('utf-8')
@ -327,7 +343,7 @@ class ChunkedBackupDriver(driver.BackupDriver):
algorithm, output_data = self._prepare_output_data(data)
obj[object_name]['compression'] = algorithm
LOG.debug('About to put_object')
with self.get_object_writer(
with self._get_object_writer(
container, object_name, extra_metadata=extra_metadata
) as writer:
writer.write(output_data)
@ -349,8 +365,7 @@ class ChunkedBackupDriver(driver.BackupDriver):
data_size_bytes = len(data)
# Execute compression in native thread so it doesn't prevent
# cooperative greenthread switching.
compressed_data = eventlet.tpool.execute(self.compressor.compress,
data)
compressed_data = self.compressor.compress(data)
comp_size_bytes = len(compressed_data)
algorithm = CONF.backup_compression_algorithm.lower()
if comp_size_bytes >= data_size_bytes:
@ -618,7 +633,7 @@ class ChunkedBackupDriver(driver.BackupDriver):
'volume_id': volume_id,
})
with self.get_object_reader(
with self._get_object_reader(
container, object_name,
extra_metadata=extra_metadata) as reader:
body = reader.read()

View File

@ -374,12 +374,24 @@ class BackupDriver(base.Base):
@abc.abstractmethod
def backup(self, backup, volume_file, backup_metadata=False):
"""Start a backup of a specified volume."""
"""Start a backup of a specified volume.
Some I/O operations may block greenthreads, so in order to prevent
starvation parameter volume_file will be a proxy that will execute all
methods in native threads, so the method implementation doesn't need to
worry about that..
"""
return
@abc.abstractmethod
def restore(self, backup, volume_id, volume_file):
"""Restore a saved backup."""
"""Restore a saved backup.
Some I/O operations may block greenthreads, so in order to prevent
starvation parameter volume_file will be a proxy that will execute all
methods in native threads, so the method implementation doesn't need to
worry about that..
"""
return
@abc.abstractmethod

View File

@ -32,6 +32,8 @@ Volume backups can be created, restored, deleted and listed.
"""
import os
from eventlet import tpool
from oslo_config import cfg
from oslo_log import log as logging
from oslo_log import versionutils
@ -82,6 +84,12 @@ CONF.import_opt('num_volume_device_scan_tries', 'cinder.volume.driver')
QUOTAS = quota.QUOTAS
# TODO(geguileo): Once Eventlet issue #432 gets fixed we can just tpool.execute
# the whole call to the driver's backup and restore methods instead of proxy
# wrapping the device_file and having the drivers also proxy wrap their
# writes/reads and the compression/decompression calls.
# (https://github.com/eventlet/eventlet/issues/432)
class BackupManager(manager.ThreadPoolManager):
"""Manages backup of block storage devices."""
@ -407,6 +415,10 @@ class BackupManager(manager.ThreadPoolManager):
backup_service = self.get_backup_driver(context)
properties = utils.brick_get_connector_properties()
# NOTE(geguileo): Not all I/O disk operations properly do greenthread
# context switching and may end up blocking the greenthread, so we go
# with native threads proxy-wrapping the device file object.
try:
backup_device = self.volume_rpcapi.get_backup_device(context,
backup,
@ -422,16 +434,16 @@ class BackupManager(manager.ThreadPoolManager):
if backup_device.secure_enabled:
with open(device_path) as device_file:
updates = backup_service.backup(
backup, device_file)
backup, tpool.Proxy(device_file))
else:
with utils.temporary_chown(device_path):
with open(device_path) as device_file:
updates = backup_service.backup(
backup, device_file)
backup, tpool.Proxy(device_file))
# device_path is already file-like so no need to open it
else:
updates = backup_service.backup(
backup, device_path)
updates = backup_service.backup(backup,
tpool.Proxy(device_path))
finally:
self._detach_device(context, attach_info,
@ -533,21 +545,27 @@ class BackupManager(manager.ThreadPoolManager):
self.volume_rpcapi.secure_file_operations_enabled(context,
volume))
attach_info = self._attach_device(context, volume, properties)
# NOTE(geguileo): Not all I/O disk operations properly do greenthread
# context switching and may end up blocking the greenthread, so we go
# with native threads proxy-wrapping the device file object.
try:
device_path = attach_info['device']['path']
if (isinstance(device_path, six.string_types) and
not os.path.isdir(device_path)):
if secure_enabled:
with open(device_path, 'wb') as device_file:
backup_service.restore(backup, volume.id, device_file)
backup_service.restore(backup, volume.id,
tpool.Proxy(device_file))
else:
with utils.temporary_chown(device_path):
with open(device_path, 'wb') as device_file:
backup_service.restore(backup, volume.id,
device_file)
tpool.Proxy(device_file))
# device_path is already file-like so no need to open it
else:
backup_service.restore(backup, volume.id, device_path)
backup_service.restore(backup, volume.id,
tpool.Proxy(device_path))
finally:
self._detach_device(context, attach_info, volume, properties,
force=True)

View File

@ -28,6 +28,7 @@ import tempfile
import threading
import zlib
from eventlet import tpool
import mock
from oslo_utils import units
@ -551,8 +552,10 @@ class GoogleBackupDriverTestCase(test.TestCase):
self.assertIsNone(compressor)
compressor = service._get_compressor('zlib')
self.assertEqual(zlib, compressor)
self.assertIsInstance(compressor, tpool.Proxy)
compressor = service._get_compressor('bz2')
self.assertEqual(bz2, compressor)
self.assertIsInstance(compressor, tpool.Proxy)
self.assertRaises(ValueError, service._get_compressor, 'fake')
@gcs_client
@ -562,17 +565,17 @@ class GoogleBackupDriverTestCase(test.TestCase):
thread_dict = {}
original_compress = zlib.compress
def my_compress(data, *args, **kwargs):
def my_compress(data):
thread_dict['compress'] = threading.current_thread()
return original_compress(data)
self.mock_object(zlib, 'compress', side_effect=my_compress)
service = google_dr.GoogleBackupDriver(self.ctxt)
# Set up buffer of 128 zeroed bytes
fake_data = b'\0' * 128
with mock.patch.object(service.compressor, 'compress',
side_effect=my_compress):
result = service._prepare_output_data(fake_data)
result = service._prepare_output_data(fake_data)
self.assertEqual('zlib', result[0])
self.assertGreater(len(fake_data), len(result[1]))

View File

@ -25,6 +25,7 @@ import tempfile
import threading
import zlib
from eventlet import tpool
import mock
from os_brick.remotefs import remotefs as remotefs_brick
from oslo_config import cfg
@ -149,6 +150,10 @@ class BackupNFSSwiftBasedTestCase(test.TestCase):
self.volume_file.write(bytes([65] * data_size))
self.volume_file.seek(0)
def _store_thread(self, *args, **kwargs):
self.thread_dict['thread'] = threading.current_thread()
return self.thread_original_method(*args, **kwargs)
def setUp(self):
super(BackupNFSSwiftBasedTestCase, self).setUp()
@ -173,6 +178,9 @@ class BackupNFSSwiftBasedTestCase(test.TestCase):
self.volume_file.write(os.urandom(1024))
self.size_volume_file += 1024
# Use dictionary to share data between threads
self.thread_dict = {}
def test_backup_uncompressed(self):
volume_id = fake.VOLUME_ID
self._create_backup_db_entry(volume_id=volume_id)
@ -573,7 +581,9 @@ class BackupNFSSwiftBasedTestCase(test.TestCase):
restored_file.name))
def test_restore_bz2(self):
self.thread_original_method = bz2.decompress
volume_id = fake.VOLUME_ID
self.mock_object(bz2, 'decompress', side_effect=self._store_thread)
self._create_backup_db_entry(volume_id=volume_id)
self.flags(backup_compression_algorithm='bz2')
@ -591,7 +601,12 @@ class BackupNFSSwiftBasedTestCase(test.TestCase):
self.assertTrue(filecmp.cmp(self.volume_file.name,
restored_file.name))
self.assertNotEqual(threading.current_thread(),
self.thread_dict['thread'])
def test_restore_zlib(self):
self.thread_original_method = zlib.decompress
self.mock_object(zlib, 'decompress', side_effect=self._store_thread)
volume_id = fake.VOLUME_ID
self._create_backup_db_entry(volume_id=volume_id)
@ -610,6 +625,9 @@ class BackupNFSSwiftBasedTestCase(test.TestCase):
self.assertTrue(filecmp.cmp(self.volume_file.name,
restored_file.name))
self.assertNotEqual(threading.current_thread(),
self.thread_dict['thread'])
def test_restore_delta(self):
volume_id = fake.VOLUME_ID
@ -672,8 +690,10 @@ class BackupNFSSwiftBasedTestCase(test.TestCase):
self.assertIsNone(compressor)
compressor = service._get_compressor('zlib')
self.assertEqual(compressor, zlib)
self.assertIsInstance(compressor, tpool.Proxy)
compressor = service._get_compressor('bz2')
self.assertEqual(compressor, bz2)
self.assertIsInstance(compressor, tpool.Proxy)
self.assertRaises(ValueError, service._get_compressor, 'fake')
def create_buffer(self, size):
@ -688,24 +708,18 @@ class BackupNFSSwiftBasedTestCase(test.TestCase):
def test_prepare_output_data_effective_compression(self):
"""Test compression works on a native thread."""
# Use dictionary to share data between threads
thread_dict = {}
original_compress = zlib.compress
def my_compress(data, *args, **kwargs):
thread_dict['compress'] = threading.current_thread()
return original_compress(data)
self.thread_original_method = zlib.compress
self.mock_object(zlib, 'compress', side_effect=self._store_thread)
service = nfs.NFSBackupDriver(self.ctxt)
fake_data = self.create_buffer(128)
with mock.patch.object(service.compressor, 'compress',
side_effect=my_compress):
result = service._prepare_output_data(fake_data)
result = service._prepare_output_data(fake_data)
self.assertEqual('zlib', result[0])
self.assertGreater(len(fake_data), len(result[1]))
self.assertNotEqual(threading.current_thread(),
thread_dict['compress'])
self.thread_dict['thread'])
def test_prepare_output_data_no_compresssion(self):
self.flags(backup_compression_algorithm='none')

View File

@ -27,6 +27,7 @@ import tempfile
import threading
import zlib
from eventlet import tpool
import mock
from oslo_config import cfg
from swiftclient import client as swift
@ -821,8 +822,10 @@ class BackupSwiftTestCase(test.TestCase):
self.assertIsNone(compressor)
compressor = service._get_compressor('zlib')
self.assertEqual(zlib, compressor)
self.assertIsInstance(compressor, tpool.Proxy)
compressor = service._get_compressor('bz2')
self.assertEqual(bz2, compressor)
self.assertIsInstance(compressor, tpool.Proxy)
self.assertRaises(ValueError, service._get_compressor, 'fake')
def test_prepare_output_data_effective_compression(self):
@ -831,17 +834,17 @@ class BackupSwiftTestCase(test.TestCase):
thread_dict = {}
original_compress = zlib.compress
def my_compress(data, *args, **kwargs):
def my_compress(data):
thread_dict['compress'] = threading.current_thread()
return original_compress(data)
self.mock_object(zlib, 'compress', side_effect=my_compress)
service = swift_dr.SwiftBackupDriver(self.ctxt)
# Set up buffer of 128 zeroed bytes
fake_data = b'\0' * 128
with mock.patch.object(service.compressor, 'compress',
side_effect=my_compress):
result = service._prepare_output_data(fake_data)
result = service._prepare_output_data(fake_data)
self.assertEqual('zlib', result[0])
self.assertGreater(len(fake_data), len(result[1]))