Merge "Limit volume copy bandwidth per backend"

This commit is contained in:
Jenkins 2015-02-27 00:14:01 +00:00 committed by Gerrit Code Review
commit 33ccaf8efc
9 changed files with 303 additions and 217 deletions

View File

@ -40,6 +40,7 @@ from cinder.openstack.common import fileutils
from cinder.openstack.common import imageutils
from cinder.openstack.common import log as logging
from cinder import utils
from cinder.volume import throttling
from cinder.volume import utils as volume_utils
LOG = logging.getLogger(__name__)
@ -62,11 +63,11 @@ def qemu_img_info(path, run_as_root=True):
return imageutils.QemuImgInfo(out)
def convert_image(source, dest, out_format, bps_limit=None, run_as_root=True):
def _convert_image(prefix, source, dest, out_format, run_as_root=True):
"""Convert image to other format."""
cmd = ('qemu-img', 'convert',
'-O', out_format, source, dest)
cmd = prefix + ('qemu-img', 'convert',
'-O', out_format, source, dest)
# Check whether O_DIRECT is supported and set '-t none' if it is
# This is needed to ensure that all data hit the device before
@ -81,16 +82,12 @@ def convert_image(source, dest, out_format, bps_limit=None, run_as_root=True):
volume_utils.check_for_odirect_support(source,
dest,
'oflag=direct')):
cmd = ('qemu-img', 'convert',
'-t', 'none',
'-O', out_format, source, dest)
cmd = prefix + ('qemu-img', 'convert',
'-t', 'none',
'-O', out_format, source, dest)
start_time = timeutils.utcnow()
cgcmd = volume_utils.setup_blkio_cgroup(source, dest, bps_limit)
if cgcmd:
cmd = tuple(cgcmd) + cmd
utils.execute(*cmd, run_as_root=run_as_root)
duration = timeutils.delta_seconds(start_time, timeutils.utcnow())
# NOTE(jdg): use a default of 1, mostly for unit test, but in
@ -110,6 +107,15 @@ def convert_image(source, dest, out_format, bps_limit=None, run_as_root=True):
LOG.info(msg % {"sz": fsz_mb, "mbps": mbps})
def convert_image(source, dest, out_format, run_as_root=True, throttle=None):
if not throttle:
throttle = throttling.Throttle.get_default()
with throttle.subcommand(source, dest) as throttle_cmd:
_convert_image(tuple(throttle_cmd['prefix']),
source, dest,
out_format, run_as_root=run_as_root)
def resize_image(source, size, run_as_root=False):
"""Changes the virtual size of the image."""
cmd = ('qemu-img', 'resize', source, '%sG' % size)
@ -278,7 +284,6 @@ def fetch_to_volume_format(context, image_service,
LOG.debug("%s was %s, converting to %s " % (image_id, fmt,
volume_format))
convert_image(tmp, dest, volume_format,
bps_limit=CONF.volume_copy_bps_limit,
run_as_root=run_as_root)
data = qemu_img_info(dest, run_as_root=run_as_root)
@ -310,7 +315,6 @@ def upload_volume(context, image_service, image_meta, volume_path,
LOG.debug("%s was %s, converting to %s" %
(image_id, volume_format, image_meta['disk_format']))
convert_image(volume_path, tmp, image_meta['disk_format'],
bps_limit=CONF.volume_copy_bps_limit,
run_as_root=run_as_root)
data = qemu_img_info(tmp, run_as_root=run_as_root)

View File

@ -24,6 +24,7 @@ from oslo_utils import units
from cinder import exception
from cinder.image import image_utils
from cinder import test
from cinder.volume import throttling
class TestQemuImgInfo(test.TestCase):
@ -72,23 +73,22 @@ class TestQemuImgInfo(test.TestCase):
class TestConvertImage(test.TestCase):
@mock.patch('cinder.image.image_utils.os.stat')
@mock.patch('cinder.utils.execute')
@mock.patch('cinder.volume.utils.setup_blkio_cgroup',
return_value=(mock.sentinel.cgcmd, ))
@mock.patch('cinder.utils.is_blk_device', return_value=True)
def test_defaults_block_dev(self, mock_isblk, mock_cgroup, mock_exec,
def test_defaults_block_dev(self, mock_isblk, mock_exec,
mock_stat):
source = mock.sentinel.source
dest = mock.sentinel.dest
out_format = mock.sentinel.out_format
cgcmd = mock.sentinel.cgcmd
mock_stat.return_value.st_size = 1048576
throttle = throttling.Throttle(prefix=['cgcmd'])
with mock.patch('cinder.volume.utils.check_for_odirect_support',
return_value=True):
output = image_utils.convert_image(source, dest, out_format)
output = image_utils.convert_image(source, dest, out_format,
throttle=throttle)
self.assertIsNone(output)
mock_exec.assert_called_once_with(cgcmd, 'qemu-img', 'convert',
mock_exec.assert_called_once_with('cgcmd', 'qemu-img', 'convert',
'-t', 'none', '-O', out_format,
source, dest, run_as_root=True)
@ -99,7 +99,7 @@ class TestConvertImage(test.TestCase):
output = image_utils.convert_image(source, dest, out_format)
self.assertIsNone(output)
mock_exec.assert_called_once_with(cgcmd, 'qemu-img', 'convert',
mock_exec.assert_called_once_with('qemu-img', 'convert',
'-O', out_format, source, dest,
run_as_root=True)
@ -107,21 +107,18 @@ class TestConvertImage(test.TestCase):
return_value=True)
@mock.patch('cinder.image.image_utils.os.stat')
@mock.patch('cinder.utils.execute')
@mock.patch('cinder.volume.utils.setup_blkio_cgroup',
return_value=(mock.sentinel.cgcmd, ))
@mock.patch('cinder.utils.is_blk_device', return_value=False)
def test_defaults_not_block_dev(self, mock_isblk, mock_cgroup, mock_exec,
def test_defaults_not_block_dev(self, mock_isblk, mock_exec,
mock_stat, mock_odirect):
source = mock.sentinel.source
dest = mock.sentinel.dest
out_format = mock.sentinel.out_format
cgcmd = mock.sentinel.cgcmd
mock_stat.return_value.st_size = 1048576
output = image_utils.convert_image(source, dest, out_format)
self.assertIsNone(output)
mock_exec.assert_called_once_with(cgcmd, 'qemu-img', 'convert', '-O',
mock_exec.assert_called_once_with('qemu-img', 'convert', '-O',
out_format, source, dest,
run_as_root=True)
@ -339,7 +336,6 @@ class TestUploadVolume(test.TestCase):
'disk_format': mock.sentinel.disk_format}
volume_path = mock.sentinel.volume_path
mock_os.name = 'posix'
mock_conf.volume_copy_bps_limit = mock.sentinel.bps_limit
data = mock_info.return_value
data.file_format = mock.sentinel.disk_format
temp_file = mock_temp.return_value.__enter__.return_value
@ -351,7 +347,6 @@ class TestUploadVolume(test.TestCase):
mock_convert.assert_called_once_with(volume_path,
temp_file,
mock.sentinel.disk_format,
bps_limit=mock.sentinel.bps_limit,
run_as_root=True)
mock_info.assert_called_once_with(temp_file, run_as_root=True)
mock_open.assert_called_once_with(temp_file, 'rb')
@ -430,7 +425,6 @@ class TestUploadVolume(test.TestCase):
'disk_format': mock.sentinel.disk_format}
volume_path = mock.sentinel.volume_path
mock_os.name = 'posix'
mock_conf.volume_copy_bps_limit = mock.sentinel.bps_limit
data = mock_info.return_value
data.file_format = mock.sentinel.other_disk_format
temp_file = mock_temp.return_value.__enter__.return_value
@ -441,7 +435,6 @@ class TestUploadVolume(test.TestCase):
mock_convert.assert_called_once_with(volume_path,
temp_file,
mock.sentinel.disk_format,
bps_limit=mock.sentinel.bps_limit,
run_as_root=True)
mock_info.assert_called_once_with(temp_file, run_as_root=True)
self.assertFalse(image_service.update.called)
@ -544,8 +537,6 @@ class TestFetchToVolumeFormat(test.TestCase):
volume_format = mock.sentinel.volume_format
blocksize = mock.sentinel.blocksize
bps_limit = mock.sentinel.bps_limit
mock_conf.volume_copy_bps_limit = bps_limit
data = mock_info.return_value
data.file_format = volume_format
data.backing_file = None
@ -568,7 +559,6 @@ class TestFetchToVolumeFormat(test.TestCase):
self.assertFalse(mock_repl_xen.called)
self.assertFalse(mock_copy.called)
mock_convert.assert_called_once_with(tmp, dest, volume_format,
bps_limit=bps_limit,
run_as_root=True)
@mock.patch('cinder.image.image_utils.convert_image')
@ -594,8 +584,6 @@ class TestFetchToVolumeFormat(test.TestCase):
size = 4321
run_as_root = mock.sentinel.run_as_root
bps_limit = mock.sentinel.bps_limit
mock_conf.volume_copy_bps_limit = bps_limit
data = mock_info.return_value
data.file_format = volume_format
data.backing_file = None
@ -619,7 +607,6 @@ class TestFetchToVolumeFormat(test.TestCase):
self.assertFalse(mock_repl_xen.called)
self.assertFalse(mock_copy.called)
mock_convert.assert_called_once_with(tmp, dest, volume_format,
bps_limit=bps_limit,
run_as_root=run_as_root)
@mock.patch('cinder.image.image_utils.convert_image')
@ -647,8 +634,6 @@ class TestFetchToVolumeFormat(test.TestCase):
size = 4321
run_as_root = mock.sentinel.run_as_root
bps_limit = mock.sentinel.bps_limit
mock_conf.volume_copy_bps_limit = bps_limit
tmp = mock_temp.return_value.__enter__.return_value
image_service.show.return_value = {'disk_format': 'raw',
'size': 41126400}
@ -695,8 +680,6 @@ class TestFetchToVolumeFormat(test.TestCase):
size = 4321
run_as_root = mock.sentinel.run_as_root
bps_limit = mock.sentinel.bps_limit
mock_conf.volume_copy_bps_limit = bps_limit
tmp = mock_temp.return_value.__enter__.return_value
image_service.show.return_value = {'disk_format': 'not_raw'}
@ -740,8 +723,6 @@ class TestFetchToVolumeFormat(test.TestCase):
size = 4321
run_as_root = mock.sentinel.run_as_root
bps_limit = mock.sentinel.bps_limit
mock_conf.volume_copy_bps_limit = bps_limit
tmp = mock_temp.return_value.__enter__.return_value
image_service.show.return_value = None
@ -783,8 +764,6 @@ class TestFetchToVolumeFormat(test.TestCase):
size = 1234
run_as_root = mock.sentinel.run_as_root
bps_limit = mock.sentinel.bps_limit
mock_conf.volume_copy_bps_limit = bps_limit
data = mock_info.return_value
data.file_format = volume_format
data.backing_file = None
@ -833,8 +812,6 @@ class TestFetchToVolumeFormat(test.TestCase):
size = 4321
run_as_root = mock.sentinel.run_as_root
bps_limit = mock.sentinel.bps_limit
mock_conf.volume_copy_bps_limit = bps_limit
data = mock_info.return_value
data.file_format = None
data.backing_file = None
@ -883,8 +860,6 @@ class TestFetchToVolumeFormat(test.TestCase):
size = 4321
run_as_root = mock.sentinel.run_as_root
bps_limit = mock.sentinel.bps_limit
mock_conf.volume_copy_bps_limit = bps_limit
data = mock_info.return_value
data.file_format = volume_format
data.backing_file = mock.sentinel.backing_file
@ -933,8 +908,6 @@ class TestFetchToVolumeFormat(test.TestCase):
size = 4321
run_as_root = mock.sentinel.run_as_root
bps_limit = mock.sentinel.bps_limit
mock_conf.volume_copy_bps_limit = bps_limit
data = mock_info.return_value
data.file_format = mock.sentinel.file_format
data.backing_file = None
@ -959,7 +932,6 @@ class TestFetchToVolumeFormat(test.TestCase):
self.assertFalse(mock_repl_xen.called)
self.assertFalse(mock_copy.called)
mock_convert.assert_called_once_with(tmp, dest, volume_format,
bps_limit=bps_limit,
run_as_root=run_as_root)
@mock.patch('cinder.image.image_utils.convert_image')
@ -986,8 +958,6 @@ class TestFetchToVolumeFormat(test.TestCase):
size = 4321
run_as_root = mock.sentinel.run_as_root
bps_limit = mock.sentinel.bps_limit
mock_conf.volume_copy_bps_limit = bps_limit
data = mock_info.return_value
data.file_format = volume_format
data.backing_file = None
@ -1011,7 +981,6 @@ class TestFetchToVolumeFormat(test.TestCase):
mock_repl_xen.assert_called_once_with(tmp)
self.assertFalse(mock_copy.called)
mock_convert.assert_called_once_with(tmp, dest, volume_format,
bps_limit=bps_limit,
run_as_root=run_as_root)

View File

@ -238,7 +238,8 @@ class VolumeTestCase(BaseVolumeTestCase):
self.stubs.Set(volutils, 'clear_volume',
lambda a, b, volume_clear=mox.IgnoreArg(),
volume_clear_size=mox.IgnoreArg(),
lvm_type=mox.IgnoreArg(): None)
lvm_type=mox.IgnoreArg(),
throttle=mox.IgnoreArg(): None)
self.stubs.Set(tgt.TgtAdm,
'create_iscsi_target',
self._fake_create_iscsi_target)
@ -2488,7 +2489,7 @@ class VolumeTestCase(BaseVolumeTestCase):
pass
def fake_fetch_to_raw(ctx, image_service, image_id, path, blocksize,
size=None):
size=None, throttle=None):
pass
def fake_clone_image(ctx, volume_ref,

View File

@ -0,0 +1,78 @@
# Copyright (c) 2015 Hitachi Data Systems, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Tests for volume copy throttling helpers."""
import mock
from oslo_config import cfg
from cinder import test
from cinder import utils
from cinder.volume import throttling
CONF = cfg.CONF
class ThrottleTestCase(test.TestCase):
def test_NoThrottle(self):
with throttling.Throttle().subcommand('volume1', 'volume2') as cmd:
self.assertEqual([], cmd['prefix'])
@mock.patch.object(utils, 'get_blkdev_major_minor')
def test_BlkioCgroup(self, mock_major_minor):
def fake_get_blkdev_major_minor(path):
return {'src_volume1': "253:0", 'dst_volume1': "253:1",
'src_volume2': "253:2", 'dst_volume2': "253:3"}[path]
mock_major_minor.side_effect = fake_get_blkdev_major_minor
self.exec_cnt = 0
def fake_execute(*cmd, **kwargs):
cmd_set = ['cgset', '-r',
'blkio.throttle.%s_bps_device=%s %d', 'fake_group']
set_order = [None,
('read', '253:0', 1024),
('write', '253:1', 1024),
# a nested job starts; bps limit are set to the half
('read', '253:0', 512),
('read', '253:2', 512),
('write', '253:1', 512),
('write', '253:3', 512),
# a nested job ends; bps limit is resumed
('read', '253:0', 1024),
('write', '253:1', 1024)]
if set_order[self.exec_cnt] is None:
self.assertEqual(('cgcreate', '-g', 'blkio:fake_group'), cmd)
else:
cmd_set[2] %= set_order[self.exec_cnt]
self.assertEqual(tuple(cmd_set), cmd)
self.exec_cnt += 1
with mock.patch.object(utils, 'execute', side_effect=fake_execute):
throttle = throttling.BlkioCgroup(1024, 'fake_group')
with throttle.subcommand('src_volume1', 'dst_volume1') as cmd:
self.assertEqual(['cgexec', '-g', 'blkio:fake_group'],
cmd['prefix'])
# a nested job
with throttle.subcommand('src_volume2', 'dst_volume2') as cmd:
self.assertEqual(['cgexec', '-g', 'blkio:fake_group'],
cmd['prefix'])

View File

@ -23,6 +23,7 @@ from cinder import exception
from cinder.openstack.common import log as logging
from cinder import test
from cinder import utils
from cinder.volume import throttling
from cinder.volume import utils as volume_utils
@ -380,7 +381,8 @@ class ClearVolumeTestCase(test.TestCase):
self.assertIsNone(output)
mock_copy.assert_called_once_with('/dev/zero', 'volume_path', 1024,
'1M', sync=True,
execute=utils.execute, ionice='-c3')
execute=utils.execute, ionice='-c3',
throttle=None)
@mock.patch('cinder.volume.utils.copy_volume', return_value=None)
@mock.patch('cinder.volume.utils.CONF')
@ -394,7 +396,8 @@ class ClearVolumeTestCase(test.TestCase):
self.assertIsNone(output)
mock_copy.assert_called_once_with('/dev/zero', 'volume_path', 1,
'1M', sync=True,
execute=utils.execute, ionice='-c0')
execute=utils.execute, ionice='-c0',
throttle=None)
@mock.patch('cinder.utils.execute')
@mock.patch('cinder.volume.utils.CONF')
@ -429,8 +432,6 @@ class ClearVolumeTestCase(test.TestCase):
class CopyVolumeTestCase(test.TestCase):
@mock.patch('cinder.volume.utils.setup_blkio_cgroup',
return_value=['cg_cmd'])
@mock.patch('cinder.volume.utils._calculate_count',
return_value=(1234, 5678))
@mock.patch('cinder.volume.utils.check_for_odirect_support',
@ -438,13 +439,14 @@ class CopyVolumeTestCase(test.TestCase):
@mock.patch('cinder.utils.execute')
@mock.patch('cinder.volume.utils.CONF')
def test_copy_volume_dd_iflag_and_oflag(self, mock_conf, mock_exec,
mock_support, mock_count, mock_cg):
mock_conf.volume_copy_bps_limit = 10
mock_support, mock_count):
fake_throttle = throttling.Throttle(['fake_throttle'])
output = volume_utils.copy_volume('/dev/zero', '/dev/null', 1024, 1,
sync=True, execute=utils.execute,
ionice=None)
ionice=None, throttle=fake_throttle)
self.assertIsNone(output)
mock_exec.assert_called_once_with('cg_cmd', 'dd', 'if=/dev/zero',
mock_exec.assert_called_once_with('fake_throttle', 'dd',
'if=/dev/zero',
'of=/dev/null', 'count=5678',
'bs=1234', 'iflag=direct',
'oflag=direct', run_as_root=True)
@ -453,30 +455,28 @@ class CopyVolumeTestCase(test.TestCase):
output = volume_utils.copy_volume('/dev/zero', '/dev/null', 1024, 1,
sync=False, execute=utils.execute,
ionice=None)
ionice=None, throttle=fake_throttle)
self.assertIsNone(output)
mock_exec.assert_called_once_with('cg_cmd', 'dd', 'if=/dev/zero',
mock_exec.assert_called_once_with('fake_throttle', 'dd',
'if=/dev/zero',
'of=/dev/null', 'count=5678',
'bs=1234', 'iflag=direct',
'oflag=direct', run_as_root=True)
@mock.patch('cinder.volume.utils.setup_blkio_cgroup',
return_value=['cg_cmd'])
@mock.patch('cinder.volume.utils._calculate_count',
return_value=(1234, 5678))
@mock.patch('cinder.volume.utils.check_for_odirect_support',
return_value=False)
@mock.patch('cinder.utils.execute')
@mock.patch('cinder.volume.utils.CONF')
def test_copy_volume_dd_no_iflag_or_oflag(self, mock_conf, mock_exec,
mock_support, mock_count,
mock_cg):
mock_conf.volume_copy_bps_limit = 10
def test_copy_volume_dd_no_iflag_or_oflag(self, mock_exec,
mock_support, mock_count):
fake_throttle = throttling.Throttle(['fake_throttle'])
output = volume_utils.copy_volume('/dev/zero', '/dev/null', 1024, 1,
sync=True, execute=utils.execute,
ionice=None)
ionice=None, throttle=fake_throttle)
self.assertIsNone(output)
mock_exec.assert_called_once_with('cg_cmd', 'dd', 'if=/dev/zero',
mock_exec.assert_called_once_with('fake_throttle', 'dd',
'if=/dev/zero',
'of=/dev/null', 'count=5678',
'bs=1234', 'conv=fdatasync',
run_as_root=True)
@ -485,23 +485,20 @@ class CopyVolumeTestCase(test.TestCase):
output = volume_utils.copy_volume('/dev/zero', '/dev/null', 1024, 1,
sync=False, execute=utils.execute,
ionice=None)
ionice=None, throttle=fake_throttle)
self.assertIsNone(output)
mock_exec.assert_called_once_with('cg_cmd', 'dd', 'if=/dev/zero',
mock_exec.assert_called_once_with('fake_throttle', 'dd',
'if=/dev/zero',
'of=/dev/null', 'count=5678',
'bs=1234', run_as_root=True)
@mock.patch('cinder.volume.utils.setup_blkio_cgroup',
return_value=None)
@mock.patch('cinder.volume.utils._calculate_count',
return_value=(1234, 5678))
@mock.patch('cinder.volume.utils.check_for_odirect_support',
return_value=False)
@mock.patch('cinder.utils.execute')
@mock.patch('cinder.volume.utils.CONF')
def test_copy_volume_dd_no_cgroup(self, mock_conf, mock_exec, mock_support,
mock_count, mock_cg):
mock_conf.volume_copy_bps_limit = 10
def test_copy_volume_dd_no_throttle(self, mock_exec, mock_support,
mock_count):
output = volume_utils.copy_volume('/dev/zero', '/dev/null', 1024, 1,
sync=True, execute=utils.execute,
ionice=None)
@ -510,17 +507,13 @@ class CopyVolumeTestCase(test.TestCase):
'count=5678', 'bs=1234',
'conv=fdatasync', run_as_root=True)
@mock.patch('cinder.volume.utils.setup_blkio_cgroup',
return_value=None)
@mock.patch('cinder.volume.utils._calculate_count',
return_value=(1234, 5678))
@mock.patch('cinder.volume.utils.check_for_odirect_support',
return_value=False)
@mock.patch('cinder.utils.execute')
@mock.patch('cinder.volume.utils.CONF')
def test_copy_volume_dd_with_ionice(self, mock_conf, mock_exec,
mock_support, mock_count, mock_cg):
mock_conf.volume_copy_bps_limit = 10
def test_copy_volume_dd_with_ionice(self, mock_exec,
mock_support, mock_count):
output = volume_utils.copy_volume('/dev/zero', '/dev/null', 1024, 1,
sync=True, execute=utils.execute,
ionice='-c3')
@ -531,77 +524,6 @@ class CopyVolumeTestCase(test.TestCase):
'conv=fdatasync', run_as_root=True)
class BlkioCgroupTestCase(test.TestCase):
def test_bps_limit_zero(self):
mock_exec = mock.Mock()
output = volume_utils.setup_blkio_cgroup('src', 'dst', 0,
execute=mock_exec)
self.assertIsNone(output)
self.assertFalse(mock_exec.called)
@mock.patch('cinder.utils.get_blkdev_major_minor',
side_effect=exception.Error)
def test_get_blkdev_error(self, mock_get_blkdev):
mock_exec = mock.Mock()
output = volume_utils.setup_blkio_cgroup('src', 'dst', 1,
execute=mock_exec)
self.assertIsNone(output)
mock_get_blkdev.assert_has_calls([mock.call('src'), mock.call('dst')])
self.assertFalse(mock_exec.called)
@mock.patch('cinder.utils.get_blkdev_major_minor',
side_effect=lambda x: x)
@mock.patch('cinder.volume.utils.CONF')
def test_cgcreate_fail(self, mock_conf, mock_get_blkdev):
mock_conf.volume_copy_blkio_cgroup_name = 'test_group'
mock_exec = mock.Mock()
mock_exec.side_effect = processutils.ProcessExecutionError
output = volume_utils.setup_blkio_cgroup('src', 'dst', 1,
execute=mock_exec)
self.assertIsNone(output)
mock_get_blkdev.assert_has_calls([mock.call('src'), mock.call('dst')])
mock_exec.assert_called_once_with('cgcreate', '-g', 'blkio:test_group',
run_as_root=True)
@mock.patch('cinder.utils.get_blkdev_major_minor',
side_effect=lambda x: x)
@mock.patch('cinder.volume.utils.CONF')
def test_cgset_fail(self, mock_conf, mock_get_blkdev):
mock_conf.volume_copy_blkio_cgroup_name = 'test_group'
mock_exec = mock.Mock()
def cgset_exception(*args, **kwargs):
if 'cgset' in args:
raise processutils.ProcessExecutionError
mock_exec.side_effect = cgset_exception
output = volume_utils.setup_blkio_cgroup('src', 'dst', 1,
execute=mock_exec)
self.assertIsNone(output)
mock_get_blkdev.assert_has_calls([mock.call('src'), mock.call('dst')])
mock_exec.assert_has_calls([
mock.call('cgcreate', '-g', 'blkio:test_group', run_as_root=True),
mock.call('cgset', '-r', 'blkio.throttle.read_bps_device=src 1',
'test_group', run_as_root=True)])
@mock.patch('cinder.utils.get_blkdev_major_minor',
side_effect=lambda x: x)
@mock.patch('cinder.volume.utils.CONF')
def test_setup_blkio_cgroup(self, mock_conf, mock_get_blkdev):
mock_conf.volume_copy_blkio_cgroup_name = 'test_group'
mock_exec = mock.Mock()
output = volume_utils.setup_blkio_cgroup('src', 'dst', 1,
execute=mock_exec)
self.assertEqual(['cgexec', '-g', 'blkio:test_group'], output)
mock_get_blkdev.assert_has_calls([mock.call('src'), mock.call('dst')])
mock_exec.assert_has_calls([
mock.call('cgcreate', '-g', 'blkio:test_group', run_as_root=True),
mock.call('cgset', '-r', 'blkio.throttle.read_bps_device=src 1',
'test_group', run_as_root=True),
mock.call('cgset', '-r', 'blkio.throttle.write_bps_device=dst 1',
'test_group', run_as_root=True)])
class VolumeUtilsTestCase(test.TestCase):
def test_null_safe_str(self):
self.assertEqual('', volume_utils.null_safe_str(None))

View File

@ -32,6 +32,7 @@ from cinder.openstack.common import fileutils
from cinder.openstack.common import log as logging
from cinder import utils
from cinder.volume import rpcapi as volume_rpcapi
from cinder.volume import throttling
from cinder.volume import utils as volume_utils
LOG = logging.getLogger(__name__)
@ -355,6 +356,24 @@ class BaseVD(object):
def initialized(self):
return self._initialized
def set_throttle(self):
bps_limit = ((self.configuration and
self.configuration.safe_get('volume_copy_bps_limit')) or
CONF.volume_copy_bps_limit)
cgroup_name = ((self.configuration and
self.configuration.safe_get(
'volume_copy_blkio_cgroup_name')) or
CONF.volume_copy_blkio_cgroup_name)
self._throttle = None
if bps_limit:
try:
self._throttle = throttling.BlkioCgroup(int(bps_limit),
cgroup_name)
except processutils.ProcessExecutionError as err:
LOG.warning(_LW('Failed to activate volume copy throttling: '
'%(err)s'), {'err': six.text_type(err)})
throttling.Throttle.set_default(self._throttle)
def get_version(self):
"""Get the current version of this driver."""
return self.VERSION
@ -452,7 +471,8 @@ class BaseVD(object):
src_attach_info['device']['path'],
dest_attach_info['device']['path'],
size_in_mb,
self.configuration.volume_dd_blocksize)
self.configuration.volume_dd_blocksize,
throttle=self._throttle)
copy_error = False
except Exception:
with excutils.save_and_reraise_exception():

View File

@ -327,6 +327,8 @@ class VolumeManager(manager.SchedulerDependentManager):
LOG.exception(ex)
return
self.driver.set_throttle()
# at this point the driver is considered initialized.
self.driver.set_initialized()

129
cinder/volume/throttling.py Normal file
View File

@ -0,0 +1,129 @@
# Copyright (c) 2015 Hitachi Data Systems, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Volume copy throttling helpers."""
import contextlib
from oslo_concurrency import processutils
from cinder import exception
from cinder.i18n import _LW, _LE
from cinder.openstack.common import log as logging
from cinder import utils
LOG = logging.getLogger(__name__)
class Throttle(object):
"""Base class for throttling disk I/O bandwidth"""
DEFAULT = None
@staticmethod
def set_default(throttle):
Throttle.DEFAULT = throttle
@staticmethod
def get_default():
return Throttle.DEFAULT or Throttle()
def __init__(self, prefix=None):
self.prefix = prefix or []
@contextlib.contextmanager
def subcommand(self, srcpath, dstpath):
"""Throttle disk I/O bandwidth used by a sub-command, such as 'dd',
that reads from srcpath and writes to dstpath. The sub-command
must be executed with the generated prefix command.
"""
yield {'prefix': self.prefix}
class BlkioCgroup(Throttle):
"""Throttle disk I/O bandwidth using blkio cgroups."""
def __init__(self, bps_limit, cgroup_name):
self.bps_limit = bps_limit
self.cgroup = cgroup_name
self.srcdevs = {}
self.dstdevs = {}
try:
utils.execute('cgcreate', '-g', 'blkio:%s' % self.cgroup,
run_as_root=True)
except processutils.ProcessExecutionError:
LOG.error(_LE('Failed to create blkio cgroup \'%(name)s\'.'),
{'name': cgroup_name})
raise
def _get_device_number(self, path):
try:
return utils.get_blkdev_major_minor(path)
except exception.Error as e:
LOG.error(_LE('Failed to get device number for throttling: '
'%(error)s'), {'error': e})
def _limit_bps(self, rw, dev, bps):
try:
utils.execute('cgset', '-r', 'blkio.throttle.%s_bps_device=%s %d'
% (rw, dev, bps), self.cgroup, run_as_root=True)
except processutils.ProcessExecutionError:
LOG.warn(_LW('Failed to setup blkio cgroup to throttle the '
'device \'%(device)s\'.'), {'device': dev})
def _set_limits(self, rw, devs):
total = sum(devs.itervalues())
for dev in devs:
self._limit_bps(rw, dev, self.bps_limit * devs[dev] / total)
@utils.synchronized('BlkioCgroup')
def _inc_device(self, srcdev, dstdev):
if srcdev:
self.srcdevs[srcdev] = self.srcdevs.get(srcdev, 0) + 1
self._set_limits('read', self.srcdevs)
if dstdev:
self.dstdevs[dstdev] = self.dstdevs.get(dstdev, 0) + 1
self._set_limits('write', self.dstdevs)
@utils.synchronized('BlkioCgroup')
def _dec_device(self, srcdev, dstdev):
if srcdev:
self.srcdevs[srcdev] -= 1
if self.srcdevs[srcdev] == 0:
del self.srcdevs[srcdev]
self._set_limits('read', self.srcdevs)
if dstdev:
self.dstdevs[dstdev] -= 1
if self.dstdevs[dstdev] == 0:
del self.dstdevs[dstdev]
self._set_limits('write', self.dstdevs)
@contextlib.contextmanager
def subcommand(self, srcpath, dstpath):
srcdev = self._get_device_number(srcpath)
dstdev = self._get_device_number(dstpath)
if srcdev is None and dstdev is None:
yield {'prefix': []}
return
self._inc_device(srcdev, dstdev)
try:
yield {'prefix': ['cgexec', '-g', 'blkio:%s' % self.cgroup]}
finally:
self._dec_device(srcdev, dstdev)

View File

@ -26,10 +26,11 @@ from oslo_utils import units
from cinder.brick.local_dev import lvm as brick_lvm
from cinder import exception
from cinder.i18n import _, _LI, _LW
from cinder.i18n import _, _LI
from cinder.openstack.common import log as logging
from cinder import rpc
from cinder import utils
from cinder.volume import throttling
CONF = cfg.CONF
@ -246,56 +247,6 @@ def notify_about_cgsnapshot_usage(context, cgsnapshot, event_suffix,
usage_info)
def setup_blkio_cgroup(srcpath, dstpath, bps_limit, execute=utils.execute):
if not bps_limit:
LOG.debug('Not using bps rate limiting on volume copy')
return None
try:
srcdev = utils.get_blkdev_major_minor(srcpath)
except exception.Error as e:
msg = (_('Failed to get device number for read throttling: %(error)s')
% {'error': e})
LOG.error(msg)
srcdev = None
try:
dstdev = utils.get_blkdev_major_minor(dstpath)
except exception.Error as e:
msg = (_('Failed to get device number for write throttling: %(error)s')
% {'error': e})
LOG.error(msg)
dstdev = None
if not srcdev and not dstdev:
return None
group_name = CONF.volume_copy_blkio_cgroup_name
LOG.debug('Setting rate limit to %s bps for blkio '
'group: %s' % (bps_limit, group_name))
try:
execute('cgcreate', '-g', 'blkio:%s' % group_name, run_as_root=True)
except processutils.ProcessExecutionError:
LOG.warn(_LW('Failed to create blkio cgroup'))
return None
try:
if srcdev:
execute('cgset', '-r', 'blkio.throttle.read_bps_device=%s %d'
% (srcdev, bps_limit), group_name, run_as_root=True)
if dstdev:
execute('cgset', '-r', 'blkio.throttle.write_bps_device=%s %d'
% (dstdev, bps_limit), group_name, run_as_root=True)
except processutils.ProcessExecutionError:
msg = (_('Failed to setup blkio cgroup to throttle the devices: '
'\'%(src)s\',\'%(dst)s\'')
% {'src': srcdev, 'dst': dstdev})
LOG.warn(msg)
return None
return ['cgexec', '-g', 'blkio:%s' % group_name]
def _calculate_count(size_in_m, blocksize):
# Check if volume_dd_blocksize is valid
@ -332,8 +283,8 @@ def check_for_odirect_support(src, dest, flag='oflag=direct'):
return False
def copy_volume(srcstr, deststr, size_in_m, blocksize, sync=False,
execute=utils.execute, ionice=None):
def _copy_volume(prefix, srcstr, deststr, size_in_m, blocksize, sync=False,
execute=utils.execute, ionice=None):
# Use O_DIRECT to avoid thrashing the system buffer cache
extra_flags = []
if check_for_odirect_support(srcstr, deststr, 'iflag=direct'):
@ -357,9 +308,7 @@ def copy_volume(srcstr, deststr, size_in_m, blocksize, sync=False,
if ionice is not None:
cmd = ['ionice', ionice] + cmd
cgcmd = setup_blkio_cgroup(srcstr, deststr, CONF.volume_copy_bps_limit)
if cgcmd:
cmd = cgcmd + cmd
cmd = prefix + cmd
# Perform the copy
start_time = timeutils.utcnow()
@ -381,8 +330,19 @@ def copy_volume(srcstr, deststr, size_in_m, blocksize, sync=False,
LOG.info(mesg % {'size_in_m': size_in_m, 'mbps': mbps})
def copy_volume(srcstr, deststr, size_in_m, blocksize, sync=False,
execute=utils.execute, ionice=None, throttle=None):
if not throttle:
throttle = throttling.Throttle.get_default()
with throttle.subcommand(srcstr, deststr) as throttle_cmd:
_copy_volume(throttle_cmd['prefix'], srcstr, deststr,
size_in_m, blocksize, sync=sync,
execute=execute, ionice=ionice)
def clear_volume(volume_size, volume_path, volume_clear=None,
volume_clear_size=None, volume_clear_ionice=None):
volume_clear_size=None, volume_clear_ionice=None,
throttle=None):
"""Unprovision old volumes to prevent data leaking between users."""
if volume_clear is None:
volume_clear = CONF.volume_clear
@ -402,7 +362,8 @@ def clear_volume(volume_size, volume_path, volume_clear=None,
return copy_volume('/dev/zero', volume_path, volume_clear_size,
CONF.volume_dd_blocksize,
sync=True, execute=utils.execute,
ionice=volume_clear_ionice)
ionice=volume_clear_ionice,
throttle=throttle)
elif volume_clear == 'shred':
clear_cmd = ['shred', '-n3']
if volume_clear_size: