Merge "Enhance the 'add-disk' action for disk replacement"

This commit is contained in:
Zuul 2022-02-22 07:45:04 +00:00 committed by Gerrit Code Review
commit 99ef6cb306
8 changed files with 546 additions and 16 deletions

View File

@ -41,6 +41,24 @@ add-disk:
bucket:
type: string
description: The name of the bucket in Ceph to add these devices into
osd-ids:
type: string
description: |
The OSD ids to recycle. If specified, the number of elements in this
list must be the same as the number of 'osd-devices'.
cache-devices:
type: string
description: |
A list of devices to act as caching devices for 'bcache', using the
'osd-devices' as backing. If the number of elements in this list is
less than the number of 'osd-devices', then the caching ones will be
distributed in a round-robin fashion.
partition-size:
type: integer
description: |
The size of the partitions to create for the caching devices. If left
unspecified, then the full size of the devices will be split evenly
across partitions.
required:
- osd-devices
blacklist-add-disk:

View File

@ -23,20 +23,51 @@ sys.path.append('hooks')
import charmhelpers.contrib.storage.linux.ceph as ch_ceph
import charmhelpers.core.hookenv as hookenv
from charmhelpers.core.hookenv import function_fail
from charmhelpers.core.unitdata import kv
from utils import (PartitionIter, device_size, DeviceError)
import ceph_hooks
import charms_ceph.utils
def add_device(request, device_path, bucket=None):
charms_ceph.utils.osdize(device_path, hookenv.config('osd-format'),
def add_device(request, device_path, bucket=None,
osd_id=None, part_iter=None):
"""Add a new device to be used by the OSD unit.
:param request: A broker request to notify monitors of changes.
:type request: CephBrokerRq
:param device_path: The absolute path to the device to be added.
:type device_path: str
:param bucket: The bucket name in ceph to add the device into, or None.
:type bucket: Option[str, None]
:param osd_id: The OSD Id to use, or None.
:type osd_id: Option[str, None]
:param part_iter: The partition iterator that will create partitions on
demand, to service bcache creation, or None, if no
partitions need to be created.
:type part_iter: Option[PartitionIter, None]
"""
if part_iter is not None:
effective_dev = part_iter.create_bcache(device_path)
if not effective_dev:
raise DeviceError(
'Failed to create bcache for device {}'.format(device_path))
else:
effective_dev = device_path
charms_ceph.utils.osdize(effective_dev, hookenv.config('osd-format'),
ceph_hooks.get_journal_devices(),
hookenv.config('ignore-device-errors'),
hookenv.config('osd-encrypt'),
hookenv.config('bluestore'),
hookenv.config('osd-encrypt-keymanager'))
hookenv.config('osd-encrypt-keymanager'),
osd_id)
# Make it fast!
if hookenv.config('autotune'):
charms_ceph.utils.tune_dev(device_path)
@ -63,9 +94,10 @@ def add_device(request, device_path, bucket=None):
return request
def get_devices():
def get_devices(key):
"""Get a list of the devices passed for this action, for a key."""
devices = []
for path in hookenv.action_get('osd-devices').split(' '):
for path in (hookenv.action_get(key) or '').split():
path = path.strip()
if os.path.isabs(path):
devices.append(path)
@ -73,10 +105,83 @@ def get_devices():
return devices
def cache_storage():
"""Return a list of Juju storage for caches."""
cache_ids = hookenv.storage_list('cache-devices')
return [hookenv.storage_get('location', cid) for cid in cache_ids]
def validate_osd_id(osd_id):
"""Test that an OSD id is actually valid."""
if isinstance(osd_id, str):
if osd_id.startswith('osd.'):
osd_id = osd_id[4:]
try:
return int(osd_id) >= 0
except ValueError:
return False
elif isinstance(osd_id, int):
return osd_id >= 0
return False
def validate_partition_size(psize, devices, caches):
"""Test that the cache devices have enough room."""
sizes = [device_size(cache) for cache in caches]
n_caches = len(caches)
for idx in range(len(devices)):
cache_idx = idx % n_caches
prev = sizes[cache_idx] - psize
if prev < 0:
function_fail('''Cache device {} does not have enough
room to provide {} {}GB partitions'''.format(
caches[cache_idx], (idx + 1) // n_caches, psize))
sys.exit(1)
sizes[cache_idx] = prev
if __name__ == "__main__":
request = ch_ceph.CephBrokerRq()
for dev in get_devices():
request = add_device(request=request,
device_path=dev,
bucket=hookenv.action_get("bucket"))
devices = get_devices('osd-devices')
caches = get_devices('cache-devices') or cache_storage()
if caches:
psize = hookenv.action_get('partition-size')
if psize:
validate_partition_size(psize, devices, caches)
part_iter = PartitionIter(caches, psize, devices)
else:
part_iter = None
osd_ids = hookenv.action_get('osd-ids')
if osd_ids:
# Validate number and format for OSD ids.
osd_ids = osd_ids.split()
if len(osd_ids) != len(devices):
function_fail('The number of osd-ids and osd-devices must match')
sys.exit(1)
for osd_id in osd_ids:
if not validate_osd_id(osd_id):
function_fail('Invalid OSD ID passed: {}'.format(osd_id))
sys.exit(1)
else:
osd_ids = [None] * len(devices)
errors = []
for dev, osd_id in zip(devices, osd_ids):
try:
request = add_device(request=request,
device_path=dev,
bucket=hookenv.action_get("bucket"),
osd_id=osd_id, part_iter=part_iter)
except Exception:
errors.append(dev)
ch_ceph.send_request_if_needed(request, relation='mon')
if errors:
if part_iter is not None:
for error in errors:
part_iter.cleanup(error)
function_fail('Failed to add devices: {}', ','.join(errors))
sys.exit(1)

View File

@ -12,11 +12,13 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import re
import os
import socket
import subprocess
import sys
import time
sys.path.append('lib')
import charms_ceph.utils as ceph
@ -336,3 +338,229 @@ def parse_osds_arguments():
"explicitly defined OSD IDs", WARNING)
return args
class DeviceError(Exception):
"""Exception type used to signal errors raised by calling
external commands that manipulate devices.
"""
pass
def _check_output(args):
try:
return subprocess.check_output(args).decode('UTF-8')
except subprocess.CalledProcessError as e:
raise DeviceError(str(e))
def _check_call(args):
try:
return subprocess.check_call(args)
except subprocess.CalledProcessError as e:
raise DeviceError(str(e))
def setup_bcache(backing, cache):
"""Create a bcache device out of the backing storage and caching device.
:param backing: The path to the backing device.
:type backing: str
:param cache: The path to the caching device.
:type cache: str
:returns: The full path of the newly created bcache device.
:rtype: str
"""
_check_call(['sudo', 'make-bcache', '-B', backing,
'-C', cache, '--writeback'])
def bcache_name(dev):
rv = _check_output(['lsblk', '-p', '-b', cache, '-J', '-o', 'NAME'])
for x in json.loads(rv)['blockdevices'][0].get('children', []):
if x['name'] != dev:
return x['name']
for _ in range(100):
rv = bcache_name(cache)
if rv is not None:
return rv
# Tell the kernel to refresh the partitions.
time.sleep(0.3)
_check_call(['sudo', 'partprobe'])
def get_partition_names(dev):
"""Given a raw device, return a set of the partitions it contains.
:param dev: The path to the device.
:type dev: str
:returns: A set with the partitions of the passed device.
:rtype: set[str]
"""
rv = _check_output(['lsblk', '-b', dev, '-J', '-p', '-o', 'NAME'])
rv = json.loads(rv)['blockdevices'][0].get('children', {})
return set(x['name'] for x in rv)
def create_partition(cache, size, n_iter):
"""Create a partition of a specific size in a device. If needed,
make sure the device has a GPT ready.
:param cache: The path to the caching device from which to create
the partition.
:type cache: str
:param size: The size (in GB) of the partition to create.
:type size: int
:param n_iter: The iteration number. If zero, this function will
also create the GPT on the caching device.
:type n_iter: int
:returns: The full path of the newly created partition.
:rtype: str
"""
if not n_iter:
# In our first iteration, make sure the device has a GPT.
_check_call(['sudo', 'parted', '-s', cache, 'mklabel', 'gpt'])
prev_partitions = get_partition_names(cache)
cmd = ['sudo', 'parted', '-s', cache, 'mkpart', 'primary',
str(n_iter * size) + 'GB', str((n_iter + 1) * size) + 'GB']
_check_call(cmd)
for _ in range(100):
ret = get_partition_names(cache) - prev_partitions
if ret:
return next(iter(ret))
time.sleep(0.3)
_check_call(['sudo', 'partprobe'])
raise DeviceError('Failed to create partition')
def device_size(dev):
"""Compute the size of a device, in GB.
:param dev: The full path to the device.
:type dev: str
:returns: The size in GB of the specified device.
:rtype: int
"""
ret = _check_output(['lsblk', '-b', '-d', dev, '-J', '-o', 'SIZE'])
ret = int(json.loads(ret)['blockdevices'][0]['size'])
return ret / (1024 * 1024 * 1024) # Return size in GB.
def bcache_remove(bcache, cache_dev):
"""Remove a bcache kernel device, given its caching.
:param bache: The path of the bcache device.
:type bcache: str
:param cache_dev: The caching device used for the bcache name.
:type cache_dev: str
"""
rv = _check_output(['sudo', 'bcache-super-show', cache_dev])
uuid = None
# Fetch the UUID for the caching device.
for line in rv.split('\n'):
idx = line.find('cset.uuid')
if idx >= 0:
uuid = line[idx + 9:].strip()
break
else:
return
bcache_name = bcache[bcache.rfind('/') + 1:]
with open('/sys/block/{}/bcache/stop'.format(bcache_name), 'wb') as f:
f.write(b'1')
with open('/sys/fs/bcache/{}/stop'.format(uuid), 'wb') as f:
f.write(b'1')
def wipe_disk(dev):
"""Destroy all data in a specific device, including partition tables."""
_check_call(['sudo', 'wipefs', '-a', dev])
class PartitionIter:
"""Class used to create partitions iteratively.
Objects of this type are used to create partitions out of
the specified cache devices, either with a specific size,
or with a size proportional to what is needed."""
def __init__(self, caches, psize, devices):
"""Construct a partition iterator.
:param caches: The list of cache devices to use.
:type caches: iterable
:param psize: The size of the partitions (in GB), or None
:type psize: Option[int, None]
:param devices: The backing devices. Only used to get their length.
:type devices: iterable
"""
self.caches = [[cache, 0] for cache in caches]
self.idx = 0
if not psize:
factor = min(1.0, len(caches) / len(devices))
self.psize = [factor * device_size(cache) for cache in caches]
else:
self.psize = psize
self.created = {}
def __iter__(self):
return self
def __next__(self):
"""Return a newly created partition.
The object keeps track of the currently used caching device,
so upon creating a new partition, it will move to the next one,
distributing the load among them in a round-robin fashion.
"""
cache, n_iter = self.caches[self.idx]
size = self.psize
if not isinstance(size, (int, float)):
size = self.psize[self.idx]
self.caches[self.idx][1] += 1
self.idx = (self.idx + 1) % len(self.caches)
log('Creating partition in device {} of size {}'.format(cache, size))
return create_partition(cache, size, n_iter)
def create_bcache(self, backing):
"""Create a bcache device, using the internal caching device,
and an external backing one.
:param backing: The path to the backing device.
:type backing: str
:returns: The name for the newly created bcache device.
:rtype: str
"""
cache = next(self)
ret = setup_bcache(backing, cache)
if ret is not None:
self.created[backing] = (ret, cache)
log('Bcache device created: {}'.format(cache))
return ret
def cleanup(self, device):
args = self.created.get(device)
if not args:
return
try:
bcache_remove(*args)
except DeviceError:
log('Failed to cleanup bcache device: {}'.format(args[0]))

View File

@ -1514,11 +1514,11 @@ def get_devices(name):
def osdize(dev, osd_format, osd_journal, ignore_errors=False, encrypt=False,
bluestore=False, key_manager=CEPH_KEY_MANAGER):
bluestore=False, key_manager=CEPH_KEY_MANAGER, osd_id=None):
if dev.startswith('/dev'):
osdize_dev(dev, osd_format, osd_journal,
ignore_errors, encrypt,
bluestore, key_manager)
bluestore, key_manager, osd_id)
else:
if cmp_pkgrevno('ceph', '14.0.0') >= 0:
log("Directory backed OSDs can not be created on Nautilus",
@ -1528,7 +1528,8 @@ def osdize(dev, osd_format, osd_journal, ignore_errors=False, encrypt=False,
def osdize_dev(dev, osd_format, osd_journal, ignore_errors=False,
encrypt=False, bluestore=False, key_manager=CEPH_KEY_MANAGER):
encrypt=False, bluestore=False, key_manager=CEPH_KEY_MANAGER,
osd_id=None):
"""
Prepare a block device for use as a Ceph OSD
@ -1593,7 +1594,8 @@ def osdize_dev(dev, osd_format, osd_journal, ignore_errors=False,
osd_journal,
encrypt,
bluestore,
key_manager)
key_manager,
osd_id)
else:
cmd = _ceph_disk(dev,
osd_format,
@ -1677,7 +1679,7 @@ def _ceph_disk(dev, osd_format, osd_journal, encrypt=False, bluestore=False):
def _ceph_volume(dev, osd_journal, encrypt=False, bluestore=False,
key_manager=CEPH_KEY_MANAGER):
key_manager=CEPH_KEY_MANAGER, osd_id=None):
"""
Prepare and activate a device for usage as a Ceph OSD using ceph-volume.
@ -1689,6 +1691,7 @@ def _ceph_volume(dev, osd_journal, encrypt=False, bluestore=False,
:param: encrypt: Use block device encryption
:param: bluestore: Use bluestore storage for OSD
:param: key_manager: dm-crypt Key Manager to use
:param: osd_id: The OSD-id to recycle, or None to create a new one
:raises subprocess.CalledProcessError: in the event that any supporting
LVM operation failed.
:returns: list. 'ceph-volume' command and required parameters for
@ -1710,6 +1713,9 @@ def _ceph_volume(dev, osd_journal, encrypt=False, bluestore=False,
if encrypt and key_manager == CEPH_KEY_MANAGER:
cmd.append('--dmcrypt')
if osd_id is not None:
cmd.extend(['--osd-id', str(osd_id)])
# On-disk journal volume creation
if not osd_journal and not bluestore:
journal_lv_type = 'journal'

View File

@ -45,3 +45,8 @@ storage:
type: block
multiple:
range: 0-
cache-devices:
type: block
multiple:
range: 0-
minimum-size: 10G

View File

@ -41,3 +41,5 @@ git+https://opendev.org/openstack/tempest.git#egg=tempest;python_version>='3.6'
tempest<24.0.0;python_version<'3.6'
croniter # needed for charm-rabbitmq-server unit tests
pyparsing<3.0.0 # aodhclient is pinned in zaza and needs pyparsing < 3.0.0, but cffi also needs it, so pin here.
cffi==1.14.6; python_version < '3.6' # cffi 1.15.0 drops support for py35.

View File

@ -54,4 +54,50 @@ class AddDiskActionTests(CharmTestCase):
self.hookenv.relation_set.assert_has_calls([call])
mock_osdize.assert_has_calls([mock.call('/dev/myosddev',
None, '', True, True, True,
True)])
True, None)])
piter = add_disk.PartitionIter(['/dev/cache'], 100, ['/dev/myosddev'])
mock_create_bcache = mock.MagicMock(side_effect=lambda b: b)
with mock.patch.object(add_disk.PartitionIter, 'create_bcache',
mock_create_bcache) as mock_call:
add_disk.add_device(request, '/dev/myosddev', part_iter=piter)
mock_call.assert_called()
mock_create_bcache.side_effect = lambda b: None
with mock.patch.object(add_disk.PartitionIter, 'create_bcache',
mock_create_bcache) as mock_call:
with self.assertRaises(add_disk.DeviceError):
add_disk.add_device(request, '/dev/myosddev', part_iter=piter)
def test_get_devices(self):
self.hookenv.action_get.return_value = '/dev/foo bar'
rv = add_disk.get_devices('')
self.assertEqual(rv, ['/dev/foo'])
self.hookenv.action_get.return_value = None
rv = add_disk.get_devices('')
self.assertEqual(rv, [])
@mock.patch.object(add_disk, 'device_size')
@mock.patch.object(add_disk, 'function_fail')
def test_validate_psize(self, function_fail, device_size):
caches = {'cache1': 100, 'cache2': 200}
device_size.side_effect = lambda c: caches[c]
function_fail.return_value = None
with self.assertRaises(SystemExit):
add_disk.validate_partition_size(
60, ['a', 'b', 'c'], list(caches.keys()))
self.assertIsNone(add_disk.validate_partition_size(
60, ['a', 'b'], list(caches.keys())))
def test_cache_storage(self):
self.hookenv.storage_list.return_value = [{'location': 'a', 'key': 1},
{'location': 'b'}]
self.hookenv.storage_get.side_effect = lambda k, elem: elem.get(k)
rv = add_disk.cache_storage()
self.assertEqual(['a', 'b'], rv)
def test_validate_osd_id(self):
for elem in ('osd.1', '1', 0, 113):
self.assertTrue(add_disk.validate_osd_id(elem))
for elem in ('osd.-1', '-3', '???', -100, 3.4, {}):
self.assertFalse(add_disk.validate_osd_id(elem))

View File

@ -15,7 +15,7 @@
import unittest
from unittest.mock import patch
from unittest.mock import patch, mock_open
with patch('charmhelpers.contrib.hardening.harden.harden') as mock_dec:
mock_dec.side_effect = (lambda *dargs, **dkwargs: lambda f:
@ -138,3 +138,123 @@ class CephUtilsTestCase(unittest.TestCase):
parsed = utils.parse_osds_arguments()
self.assertEqual(parsed, expected_id)
@patch('subprocess.check_call')
@patch('subprocess.check_output')
def test_setup_bcache(self, check_output, check_call):
check_output.return_value = b'''
{
"blockdevices": [
{"name":"/dev/nvme0n1",
"children": [
{"name":"/dev/bcache0"}
]
}
]
}
'''
self.assertEqual(utils.setup_bcache('', ''), '/dev/bcache0')
@patch('subprocess.check_output')
def test_get_partition_names(self, check_output):
check_output.return_value = b'''
{
"blockdevices": [
{"name":"/dev/sdd",
"children": [
{"name":"/dev/sdd1"}
]
}
]
}
'''
partitions = utils.get_partition_names('')
self.assertEqual(partitions, set(['/dev/sdd1']))
# Check for a raw device with no partitions.
check_output.return_value = b'''
{"blockdevices": [{"name":"/dev/sdd"}]}
'''
self.assertEqual(set(), utils.get_partition_names(''))
@patch.object(utils, 'get_partition_names')
@patch('subprocess.check_call')
def test_create_partition(self, check_call, get_partition_names):
first_call = True
def gpn(dev):
nonlocal first_call
if first_call:
first_call = False
return set()
return set(['/dev/nvm0n1p1'])
get_partition_names.side_effect = gpn
partition_name = utils.create_partition('/dev/nvm0n1', 101, 0)
self.assertEqual(partition_name, '/dev/nvm0n1p1')
args = check_call.call_args[0][0]
self.assertIn('/dev/nvm0n1', args)
self.assertIn('101GB', args)
@patch('subprocess.check_output')
def test_device_size(self, check_output):
check_output.return_value = b'''
{
"blockdevices": [{"size":800166076416}]
}
'''
self.assertEqual(745, int(utils.device_size('')))
@patch('subprocess.check_output')
def test_bcache_remove(self, check_output):
check_output.return_value = b'''
sb.magic ok
sb.first_sector 8 [match]
sb.csum 63F23B706BA0FE6A [match]
sb.version 3 [cache device]
dev.label (empty)
dev.uuid ca4ce5e1-4cf3-4330-b1c9-2c735b14cd0b
dev.sectors_per_block 1
dev.sectors_per_bucket 1024
dev.cache.first_sector 1024
dev.cache.cache_sectors 1562822656
dev.cache.total_sectors 1562823680
dev.cache.ordered yes
dev.cache.discard no
dev.cache.pos 0
dev.cache.replacement 0 [lru]
cset.uuid 424242
'''
mo = mock_open()
with patch('builtins.open', mo):
utils.bcache_remove('/dev/bcache0', '/dev/nvme0n1p1')
mo.assert_any_call('/sys/block/bcache0/bcache/stop', 'wb')
mo.assert_any_call('/sys/fs/bcache/424242/stop', 'wb')
@patch.object(utils, 'create_partition')
@patch.object(utils, 'setup_bcache')
def test_partition_iter(self, setup_bcache, create_partition):
create_partition.side_effect = \
lambda c, s, n: c + '|' + str(s) + '|' + str(n)
setup_bcache.side_effect = lambda *args: args
piter = utils.PartitionIter(['/dev/nvm0n1', '/dev/nvm0n2'],
200, ['dev1', 'dev2', 'dev3'])
piter.create_bcache('dev1')
setup_bcache.assert_called_with('dev1', '/dev/nvm0n1|200|0')
setup_bcache.mock_reset()
piter.create_bcache('dev2')
setup_bcache.assert_called_with('dev2', '/dev/nvm0n2|200|0')
piter.create_bcache('dev3')
setup_bcache.assert_called_with('dev3', '/dev/nvm0n1|200|1')
@patch.object(utils, 'device_size')
@patch.object(utils, 'create_partition')
@patch.object(utils, 'setup_bcache')
def test_partition_iter_no_size(self, setup_bcache, create_partition,
device_size):
device_size.return_value = 300
piter = utils.PartitionIter(['/dev/nvm0n1'], 0,
['dev1', 'dev2', 'dev3'])
create_partition.side_effect = lambda c, sz, g: sz
# 300GB across 3 devices, i.e: 100 for each.
self.assertEqual(100, next(piter))
self.assertEqual(100, next(piter))