Add I/O Semaphore to limit concurrent disk ops

Introduce an I/O semaphore to limit the number of concurrent
disk-IO-intensive operations. This could reduce disk contention from
image operations like image download, image format conversion, snapshot
extraction, etc.

The new config option max_concurrent_disk_ops can be set in nova.conf
per compute host and would be virt-driver-agnostic. It is default to 0
which means no limit.

blueprint: io-semaphore-for-concurrent-disk-ops
Change-Id: I897999e8a4601694213f068367eae9608cdc7bbb
Signed-off-by: Jack Ding <jack.ding@windriver.com>
This commit is contained in:
Jack Ding 2018-10-02 11:59:37 -04:00
parent 7217e38baf
commit 728f20e8f4
13 changed files with 121 additions and 36 deletions

View File

@ -1192,6 +1192,15 @@ class ComputeManager(manager.Manager):
nova.conf.neutron.register_dynamic_opts(CONF)
# one-time initialization
if CONF.compute.max_concurrent_disk_ops != 0:
compute_utils.disk_ops_semaphore = \
eventlet.semaphore.BoundedSemaphore(
CONF.compute.max_concurrent_disk_ops)
else:
compute_utils.disk_ops_semaphore = \
compute_utils.UnlimitedSemaphore()
self.driver.init_host(host=self.host)
context = nova.context.get_admin_context()
instances = objects.InstanceList.get_by_host(

View File

@ -54,6 +54,11 @@ from nova.virt import driver
CONF = nova.conf.CONF
LOG = log.getLogger(__name__)
# This semaphore is used to enforce a limit on disk-IO-intensive operations
# (image downloads, image conversions) at any given time.
# It is initialized at ComputeManager.init_host()
disk_ops_semaphore = None
def exception_to_dict(fault, message=None):
"""Converts exceptions to a dict for use in notifications."""

View File

@ -727,6 +727,15 @@ Related options:
failing if ``vif_plugging_is_fatal`` is True, or simply continuing with the
live migration
"""),
cfg.IntOpt('max_concurrent_disk_ops',
default=0,
min=0,
help="""
Number of concurrent disk-IO-intensive operations (glance image downloads,
image format conversions, etc.) that we will do in parallel. If this is set
too high then response time suffers.
The default value of 0 means no limit.
"""),
]
interval_opts = [

View File

@ -62,6 +62,7 @@ from nova.compute import manager
from nova.compute import power_state
from nova.compute import provider_tree
from nova.compute import task_states
from nova.compute import utils as compute_utils
from nova.compute import vm_states
import nova.conf
from nova import context
@ -18575,13 +18576,16 @@ class LibvirtDriverTestCase(test.NoDBTestCase, TraitsComparisonMixin):
self.drvr._wait_for_running({'name': 'else',
'uuid': 'other_uuid'})
@mock.patch.object(compute_utils, 'disk_ops_semaphore')
@mock.patch('nova.utils.execute')
@mock.patch('os.rename')
def test_disk_raw_to_qcow2(self, mock_rename, mock_execute):
def test_disk_raw_to_qcow2(self, mock_rename, mock_execute,
mock_disk_op_sema):
path = '/test/disk'
_path_qcow = path + '_qcow'
self.drvr._disk_raw_to_qcow2(path)
mock_disk_op_sema.__enter__.assert_called_once()
mock_execute.assert_has_calls([
mock.call('qemu-img', 'convert', '-f', 'raw',
'-O', 'qcow2', path, _path_qcow)])
@ -21499,6 +21503,7 @@ class LibvirtVolumeSnapshotTestCase(test.NoDBTestCase):
return mock_domain, guest
@mock.patch.object(compute_utils, 'disk_ops_semaphore')
@mock.patch.object(host.Host, "has_min_version",
mock.Mock(return_value=True))
@mock.patch("nova.virt.libvirt.guest.Guest.is_active",
@ -21507,7 +21512,8 @@ class LibvirtVolumeSnapshotTestCase(test.NoDBTestCase):
return_value=mock.Mock(file_format="fake_fmt"))
@mock.patch('nova.utils.execute')
def test_volume_snapshot_delete_when_dom_not_running(self, mock_execute,
mock_qemu_img_info):
mock_qemu_img_info,
mock_disk_op_sema):
"""Deleting newest snapshot of a file-based image when the domain is
not running should trigger a blockRebase using qemu-img not libvirt.
In this test, we rebase the image with another image as backing file.
@ -21523,11 +21529,13 @@ class LibvirtVolumeSnapshotTestCase(test.NoDBTestCase):
self.volume_uuid, snapshot_id,
self.delete_info_1)
mock_disk_op_sema.__enter__.assert_called_once()
mock_qemu_img_info.assert_called_once_with("snap.img")
mock_execute.assert_called_once_with('qemu-img', 'rebase',
'-b', 'snap.img', '-F',
'fake_fmt', 'disk1_file')
@mock.patch.object(compute_utils, 'disk_ops_semaphore')
@mock.patch.object(host.Host, "has_min_version",
mock.Mock(return_value=True))
@mock.patch("nova.virt.libvirt.guest.Guest.is_active",
@ -21536,7 +21544,7 @@ class LibvirtVolumeSnapshotTestCase(test.NoDBTestCase):
return_value=mock.Mock(file_format="fake_fmt"))
@mock.patch('nova.utils.execute')
def test_volume_snapshot_delete_when_dom_not_running_and_no_rebase_base(
self, mock_execute, mock_qemu_img_info):
self, mock_execute, mock_qemu_img_info, mock_disk_op_sema):
"""Deleting newest snapshot of a file-based image when the domain is
not running should trigger a blockRebase using qemu-img not libvirt.
In this test, the image is rebased onto no backing file (i.e.
@ -21553,6 +21561,7 @@ class LibvirtVolumeSnapshotTestCase(test.NoDBTestCase):
self.volume_uuid, snapshot_id,
self.delete_info_3)
mock_disk_op_sema.__enter__.assert_called_once()
self.assertEqual(0, mock_qemu_img_info.call_count)
mock_execute.assert_called_once_with('qemu-img', 'rebase',
'-b', '', 'disk1_file')
@ -21944,10 +21953,13 @@ class _BaseSnapshotTests(test.NoDBTestCase):
recv_meta = self.image_service.create(self.context, sent_meta)
return recv_meta
@mock.patch.object(compute_utils, 'disk_ops_semaphore',
new_callable=compute_utils.UnlimitedSemaphore)
@mock.patch.object(host.Host, 'has_min_version')
@mock.patch.object(imagebackend.Image, 'resolve_driver_format')
@mock.patch.object(host.Host, '_get_domain')
def _snapshot(self, image_id, mock_get_domain, mock_resolve, mock_version):
def _snapshot(self, image_id, mock_get_domain, mock_resolve, mock_version,
mock_disk_op_sema):
mock_get_domain.return_value = FakeVirtDomain()
driver = libvirt_driver.LibvirtDriver(fake.FakeVirtAPI(), False)
driver.snapshot(self.context, self.instance_ref, image_id,

View File

@ -27,6 +27,7 @@ from oslo_utils import imageutils
from oslo_utils import units
from oslo_utils import uuidutils
from nova.compute import utils as compute_utils
import nova.conf
from nova import context
from nova import exception
@ -673,13 +674,14 @@ class LvmTestCase(_ImageTestCase, test.NoDBTestCase):
self.LV = '%s_%s' % (self.INSTANCE['uuid'], self.NAME)
self.PATH = os.path.join('/dev', self.VG, self.LV)
@mock.patch.object(compute_utils, 'disk_ops_semaphore')
@mock.patch('nova.privsep.utils.supports_direct_io', return_value=True)
@mock.patch.object(imagebackend.lvm, 'create_volume')
@mock.patch.object(imagebackend.disk, 'get_disk_size',
return_value=TEMPLATE_SIZE)
@mock.patch('nova.privsep.qemu.convert_image')
def _create_image(self, sparse, mock_convert_image, mock_get, mock_create,
mock_ignored):
mock_ignored, mock_disk_op_sema):
fn = mock.MagicMock()
image = self.image_class(self.INSTANCE, self.NAME)
@ -695,6 +697,7 @@ class LvmTestCase(_ImageTestCase, test.NoDBTestCase):
path = '/dev/%s/%s_%s' % (self.VG, self.INSTANCE.uuid, self.NAME)
mock_convert_image.assert_called_once_with(
self.TEMPLATE_PATH, path, None, 'raw', CONF.instances_path)
mock_disk_op_sema.__enter__.assert_called_once()
@mock.patch.object(imagebackend.lvm, 'create_volume')
def _create_image_generated(self, sparse, mock_create):
@ -708,6 +711,7 @@ class LvmTestCase(_ImageTestCase, test.NoDBTestCase):
self.SIZE, sparse=sparse)
fn.assert_called_once_with(target=self.PATH, ephemeral_size=None)
@mock.patch.object(compute_utils, 'disk_ops_semaphore')
@mock.patch('nova.privsep.utils.supports_direct_io', return_value=True)
@mock.patch.object(imagebackend.disk, 'resize2fs')
@mock.patch.object(imagebackend.lvm, 'create_volume')
@ -715,7 +719,8 @@ class LvmTestCase(_ImageTestCase, test.NoDBTestCase):
return_value=TEMPLATE_SIZE)
@mock.patch('nova.privsep.qemu.convert_image')
def _create_image_resize(self, sparse, mock_convert_image, mock_get,
mock_create, mock_resize, mock_ignored):
mock_create, mock_resize, mock_ignored,
mock_disk_op_sema):
fn = mock.MagicMock()
fn(target=self.TEMPLATE_PATH)
image = self.image_class(self.INSTANCE, self.NAME)
@ -727,6 +732,7 @@ class LvmTestCase(_ImageTestCase, test.NoDBTestCase):
mock_convert_image.assert_called_once_with(
self.TEMPLATE_PATH, self.PATH, None, 'raw',
CONF.instances_path)
mock_disk_op_sema.__enter__.assert_called_once()
mock_resize.assert_called_once_with(self.PATH, run_as_root=True)
@mock.patch.object(imagebackend.fileutils, 'ensure_tree')
@ -945,13 +951,14 @@ class EncryptedLvmTestCase(_ImageTestCase, test.NoDBTestCase):
mock.Mock()),
mock.patch.object(self.libvirt_utils, 'remove_logical_volumes',
mock.Mock()),
mock.patch('nova.privsep.qemu.convert_image')):
mock.patch('nova.privsep.qemu.convert_image'),
mock.patch.object(compute_utils, 'disk_ops_semaphore')):
fn = mock.Mock()
image = self.image_class(self.INSTANCE, self.NAME)
image.create_image(fn, self.TEMPLATE_PATH, self.TEMPLATE_SIZE,
context=self.CONTEXT)
compute_utils.disk_ops_semaphore.__enter__.assert_called_once()
fn.assert_called_with(context=self.CONTEXT,
target=self.TEMPLATE_PATH)
self.lvm.create_volume.assert_called_with(self.VG,
@ -1021,13 +1028,14 @@ class EncryptedLvmTestCase(_ImageTestCase, test.NoDBTestCase):
mock.Mock()),
mock.patch.object(self.libvirt_utils, 'remove_logical_volumes',
mock.Mock()),
mock.patch('nova.privsep.qemu.convert_image')):
mock.patch('nova.privsep.qemu.convert_image'),
mock.patch.object(compute_utils, 'disk_ops_semaphore')):
fn = mock.Mock()
image = self.image_class(self.INSTANCE, self.NAME)
image.create_image(fn, self.TEMPLATE_PATH, self.SIZE,
context=self.CONTEXT)
compute_utils.disk_ops_semaphore.__enter__.assert_called_once()
fn.assert_called_with(context=self.CONTEXT,
target=self.TEMPLATE_PATH)
self.disk.get_disk_size.assert_called_with(self.TEMPLATE_PATH)

View File

@ -25,6 +25,7 @@ from oslo_utils import fileutils
from oslo_utils.fixture import uuidsentinel as uuids
import six
from nova.compute import utils as compute_utils
from nova import context
from nova import exception
from nova import objects
@ -518,7 +519,9 @@ disk size: 4.4M
finally:
os.unlink(dst_path)
def _do_test_extract_snapshot(self, mock_execute, src_format='qcow2',
@mock.patch.object(compute_utils, 'disk_ops_semaphore')
def _do_test_extract_snapshot(self, mock_execute, mock_disk_op_sema,
src_format='qcow2',
dest_format='raw', out_format='raw'):
libvirt_utils.extract_snapshot('/path/to/disk/image', src_format,
'/extracted/snap', dest_format)
@ -528,6 +531,7 @@ disk size: 4.4M
qemu_img_cmd += ('-c',)
qemu_img_cmd += ('/path/to/disk/image', '/extracted/snap')
mock_execute.assert_called_once_with(*qemu_img_cmd)
mock_disk_op_sema.__enter__.assert_called_once()
@mock.patch.object(utils, 'execute')
def test_extract_snapshot_raw(self, mock_execute):
@ -636,9 +640,11 @@ disk size: 4.4M
mock_images.assert_called_once_with(
_context, image_id, target, trusted_certs)
@mock.patch.object(compute_utils, 'disk_ops_semaphore')
@mock.patch('nova.privsep.utils.supports_direct_io', return_value=True)
@mock.patch('nova.privsep.qemu.unprivileged_convert_image')
def test_fetch_raw_image(self, mock_convert_image, mock_direct_io):
def test_fetch_raw_image(self, mock_convert_image, mock_direct_io,
mock_disk_op_sema):
def fake_rename(old, new):
self.executes.append(('mv', old, new))
@ -693,6 +699,7 @@ disk size: 4.4M
('mv', 't.qcow2.converted', 't.qcow2')]
images.fetch_to_raw(context, image_id, target)
self.assertEqual(self.executes, expected_commands)
mock_disk_op_sema.__enter__.assert_called_once()
mock_convert_image.assert_called_with(
't.qcow2.part', 't.qcow2.converted', 'qcow2', 'raw',
CONF.instances_path)

View File

@ -18,6 +18,7 @@ import mock
from oslo_concurrency import processutils
import six
from nova.compute import utils as compute_utils
from nova import exception
from nova import test
from nova import utils
@ -49,16 +50,19 @@ class QemuTestCase(test.NoDBTestCase):
self.assertTrue(image_info)
self.assertTrue(str(image_info))
@mock.patch.object(compute_utils, 'disk_ops_semaphore')
@mock.patch('nova.privsep.utils.supports_direct_io', return_value=True)
@mock.patch.object(processutils, 'execute',
side_effect=processutils.ProcessExecutionError)
def test_convert_image_with_errors(self, mocked_execute, mock_direct_io):
def test_convert_image_with_errors(self, mocked_execute, mock_direct_io,
mock_disk_op_sema):
self.assertRaises(exception.ImageUnacceptable,
images.convert_image,
'/path/that/does/not/exist',
'/other/path/that/does/not/exist',
'qcow2',
'raw')
mock_disk_op_sema.__enter__.assert_called_once()
@mock.patch.object(utils, 'execute')
@mock.patch.object(os.path, 'exists', return_value=True)
@ -101,22 +105,28 @@ class QemuTestCase(test.NoDBTestCase):
images.fetch_to_raw,
None, 'href123', '/no/path')
@mock.patch.object(compute_utils, 'disk_ops_semaphore')
@mock.patch('nova.privsep.utils.supports_direct_io', return_value=True)
@mock.patch('oslo_concurrency.processutils.execute')
def test_convert_image_with_direct_io_support(self, mock_execute,
mock_direct_io):
mock_direct_io,
mock_disk_op_sema):
images._convert_image('source', 'dest', 'in_format', 'out_format',
run_as_root=False)
expected = ('qemu-img', 'convert', '-t', 'none', '-O', 'out_format',
'-f', 'in_format', 'source', 'dest')
mock_disk_op_sema.__enter__.assert_called_once()
self.assertTupleEqual(expected, mock_execute.call_args[0])
@mock.patch.object(compute_utils, 'disk_ops_semaphore')
@mock.patch('nova.privsep.utils.supports_direct_io', return_value=False)
@mock.patch('oslo_concurrency.processutils.execute')
def test_convert_image_without_direct_io_support(self, mock_execute,
mock_direct_io):
mock_direct_io,
mock_disk_op_sema):
images._convert_image('source', 'dest', 'in_format', 'out_format',
run_as_root=False)
expected = ('qemu-img', 'convert', '-t', 'writethrough',
'-O', 'out_format', '-f', 'in_format', 'source', 'dest')
mock_disk_op_sema.__enter__.assert_called_once()
self.assertTupleEqual(expected, mock_execute.call_args[0])

View File

@ -27,6 +27,7 @@ import six
from nova.compute import flavors
from nova.compute import power_state
from nova.compute import utils as compute_utils
import nova.conf
from nova import context
from nova import exception
@ -132,6 +133,7 @@ class LookupTestCase(VMUtilsTestBase):
class GenerateConfigDriveTestCase(VMUtilsTestBase):
@mock.patch.object(compute_utils, 'disk_ops_semaphore')
@mock.patch.object(vm_utils, 'safe_find_sr')
@mock.patch.object(vm_utils, "create_vdi", return_value='vdi_ref')
@mock.patch.object(vm_utils.instance_metadata, "InstanceMetadata")
@ -144,7 +146,7 @@ class GenerateConfigDriveTestCase(VMUtilsTestBase):
def test_no_admin_pass(self, mock_tmpdir, mock_create_vbd, mock_size,
mock_stream, mock_execute, mock_builder,
mock_instance_metadata, mock_create_vdi,
mock_find_sr):
mock_find_sr, mock_disk_op_sema):
mock_tmpdir.return_value.__enter__.return_value = '/mock'
@ -153,7 +155,7 @@ class GenerateConfigDriveTestCase(VMUtilsTestBase):
vm_utils.generate_configdrive('session', 'context', 'instance',
'vm_ref', 'userdevice',
'network_info')
mock_disk_op_sema.__enter__.assert_called_once()
mock_size.assert_called_with('/mock/configdrive.vhd')
mock_open.assert_called_with('/mock/configdrive.vhd')
mock_execute.assert_called_with('qemu-img', 'convert', '-Ovpc',

View File

@ -28,6 +28,7 @@ from oslo_utils import fileutils
from oslo_utils import imageutils
from oslo_utils import units
from nova.compute import utils as compute_utils
import nova.conf
from nova import exception
from nova.i18n import _
@ -118,12 +119,13 @@ def convert_image_unsafe(source, dest, out_format, run_as_root=False):
def _convert_image(source, dest, in_format, out_format, run_as_root):
try:
if not run_as_root:
nova.privsep.qemu.unprivileged_convert_image(
source, dest, in_format, out_format, CONF.instances_path)
else:
nova.privsep.qemu.convert_image(
source, dest, in_format, out_format, CONF.instances_path)
with compute_utils.disk_ops_semaphore:
if not run_as_root:
nova.privsep.qemu.unprivileged_convert_image(
source, dest, in_format, out_format, CONF.instances_path)
else:
nova.privsep.qemu.convert_image(
source, dest, in_format, out_format, CONF.instances_path)
except processutils.ProcessExecutionError as exp:
msg = (_("Unable to convert image to %(format)s: %(exp)s") %
@ -133,8 +135,9 @@ def _convert_image(source, dest, in_format, out_format, run_as_root):
def fetch(context, image_href, path, trusted_certs=None):
with fileutils.remove_path_on_error(path):
IMAGE_API.download(context, image_href, dest_path=path,
trusted_certs=trusted_certs)
with compute_utils.disk_ops_semaphore:
IMAGE_API.download(context, image_href, dest_path=path,
trusted_certs=trusted_certs)
def get_info(context, image_href):

View File

@ -1967,10 +1967,12 @@ class LibvirtDriver(driver.ComputeDriver):
update_task_state(task_state=task_states.IMAGE_UPLOADING,
expected_state=task_states.IMAGE_PENDING_UPLOAD)
with libvirt_utils.file_open(out_path, 'rb') as image_file:
self._image_api.update(context,
image_id,
metadata,
image_file)
# execute operation with disk concurrency semaphore
with compute_utils.disk_ops_semaphore:
self._image_api.update(context,
image_id,
metadata,
image_file)
except Exception:
with excutils.save_and_reraise_exception():
LOG.exception(_("Failed to snapshot image"))
@ -2425,8 +2427,10 @@ class LibvirtDriver(driver.ComputeDriver):
qemu_img_extra_arg = ['-F', b_file_fmt]
qemu_img_extra_arg.append(active_disk_object.source_path)
utils.execute("qemu-img", "rebase", "-b", backing_file,
*qemu_img_extra_arg)
# execute operation with disk concurrency semaphore
with compute_utils.disk_ops_semaphore:
utils.execute("qemu-img", "rebase", "-b", backing_file,
*qemu_img_extra_arg)
def _volume_snapshot_delete(self, context, instance, volume_id,
snapshot_id, delete_info=None):
@ -8361,8 +8365,10 @@ class LibvirtDriver(driver.ComputeDriver):
def _disk_raw_to_qcow2(path):
"""Converts a raw disk to qcow2."""
path_qcow = path + '_qcow'
utils.execute('qemu-img', 'convert', '-f', 'raw',
'-O', 'qcow2', path, path_qcow)
# execute operation with disk concurrency semaphore
with compute_utils.disk_ops_semaphore:
utils.execute('qemu-img', 'convert', '-f', 'raw',
'-O', 'qcow2', path, path_qcow)
os.rename(path_qcow, path)
def finish_migration(self, context, migration, instance, disk_info,

View File

@ -26,6 +26,7 @@ from oslo_concurrency import processutils
from oslo_log import log as logging
from oslo_utils import fileutils
from nova.compute import utils as compute_utils
import nova.conf
from nova.i18n import _
from nova.objects import fields as obj_fields
@ -315,7 +316,9 @@ def extract_snapshot(disk_path, source_fmt, out_path, dest_fmt):
qemu_img_cmd += ('-c',)
qemu_img_cmd += (disk_path, out_path)
utils.execute(*qemu_img_cmd)
# execute operation with disk concurrency semaphore
with compute_utils.disk_ops_semaphore:
utils.execute(*qemu_img_cmd)
def load_file(path):

View File

@ -46,6 +46,7 @@ import six.moves.urllib.request as urlrequest
from nova.api.metadata import base as instance_metadata
from nova.compute import power_state
from nova.compute import task_states
from nova.compute import utils as compute_utils
import nova.conf
from nova import exception
from nova.i18n import _
@ -1153,8 +1154,9 @@ def generate_configdrive(session, context, instance, vm_ref, userdevice,
cdb.make_drive(tmp_file)
# XAPI can only import a VHD file, so convert to vhd format
vhd_file = '%s.vhd' % tmp_file
utils.execute('qemu-img', 'convert', '-Ovpc', tmp_file,
vhd_file)
with compute_utils.disk_ops_semaphore:
utils.execute('qemu-img', 'convert', '-Ovpc', tmp_file,
vhd_file)
vhd_file_size = os.path.getsize(vhd_file)
with open(vhd_file) as file_obj:
volume_utils.stream_to_vdi(

View File

@ -0,0 +1,9 @@
---
features:
- |
Introduced a new config option ``[compute]/max_concurrent_disk_ops`` to
reduce disk contention by specifying the maximum number of concurrent
disk-IO-intensive operations per compute service. This would include
operations such as image download, image format conversion, snapshot
extraction, etc.
The default value is 0, which means that there is no limit.