Merge "Add volume local cache support to os-brick"

This commit is contained in:
Zuul 2020-08-31 19:49:31 +00:00 committed by Gerrit Code Review
commit 8da92bfb3b
6 changed files with 523 additions and 0 deletions

103
os_brick/caches/__init__.py Normal file
View File

@ -0,0 +1,103 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy of
# the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under
# the License.
import abc
from oslo_log import log as logging
from oslo_utils import importutils
from os_brick import exception
from os_brick.i18n import _
LOG = logging.getLogger(__name__)
CACHE_ENGINE_TO_CACHE_CLASS_MAP = {
"opencas": 'os_brick.caches.opencas.OpenCASEngine',
}
class CacheEngineBase(object, metaclass=abc.ABCMeta):
def __init__(self, **kwargs):
self._root_helper = kwargs.get('root_helper')
@abc.abstractmethod
def is_engine_ready(self, **kwargs):
return
@abc.abstractmethod
def attach_volume(self, **kwargs):
return
@abc.abstractmethod
def detach_volume(self, **kwargs):
return
class CacheManager():
"""Cache manager for volumes.
This CacheManager uses cache engines to do volume cache.
"""
def __init__(self, root_helper, connection_info,
*args, **kwargs):
data = connection_info['data']
if not data.get('device_path'):
volume_id = data.get('volume_id') or connection_info.get('serial')
raise exception.VolumeLocalCacheNotSupported(
volume_id=volume_id,
volume_type=connection_info.get('driver_volume_type'))
self.ori_device_path = data.get('device_path')
if not data.get('cacheable'):
self.cacheable = False
return
self.cacheable = True
self.root_helper = root_helper
self.engine_name = kwargs.get('cache_name')
self.args = args
self.kwargs = kwargs
self.kwargs["root_helper"] = root_helper
self.kwargs["dev_path"] = data.get('device_path')
self.engine = self._get_engine(self.engine_name, **self.kwargs)
def _get_engine(self, engine_name, **kwargs):
eng_cls_path = CACHE_ENGINE_TO_CACHE_CLASS_MAP.get(engine_name)
if eng_cls_path:
engine_cls = importutils.import_class(eng_cls_path)
eng = engine_cls(**kwargs)
if eng.is_engine_ready():
return eng
raise exception.Invalid(_("No valid cache engine"))
def attach_volume(self):
"""setup the cache when attaching volume."""
if not self.cacheable:
return self.ori_device_path
LOG.debug("volume before cached: %s", self.kwargs.get('dev_path'))
emulated_disk = self.engine.attach_volume(**self.kwargs)
LOG.debug("volume after cached: %s", emulated_disk)
return emulated_disk
def detach_volume(self):
"""Release the cache on detaching volume."""
if not self.cacheable:
return self.ori_device_path
LOG.debug("volume before detach: %s", self.kwargs.get('dev_path'))
ori_disk = self.engine.detach_volume(**self.kwargs)
LOG.debug("volume after detach: %s", ori_disk)
return ori_disk

119
os_brick/caches/opencas.py Normal file
View File

@ -0,0 +1,119 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import os
from oslo_concurrency import processutils as putils
from oslo_log import log as logging
from os_brick import caches
from os_brick import exception
from os_brick import executor
LOG = logging.getLogger(__name__)
class OpenCASEngine(executor.Executor, caches.CacheEngineBase):
def __init__(self, **kwargs):
super(OpenCASEngine, self).__init__(**kwargs)
self.cache_id = kwargs.get('opencas_cache_id')
def os_execute(self, *cmd, **kwargs):
LOG.debug('os_execute: cmd: %s, args: %s', cmd, kwargs)
try:
out, err = self._execute(*cmd, **kwargs)
except putils.ProcessExecutionError as err:
LOG.exception('os_execute error')
LOG.error('Cmd :%s', err.cmd)
LOG.error('StdOut :%s', err.stdout)
LOG.error('StdErr :%s', err.stderr)
raise
return out, err
def is_engine_ready(self, **kwargs):
"""'casadm -L' will print like:
type id disk status write policy device
cache 1 /dev/nvme0n1 Running wt -
"""
cmd = ['casadm', '-L']
kwargs = dict(run_as_root=True,
root_helper=self._root_helper)
out, err = self.os_execute(*cmd, **kwargs)
for line in out.splitlines():
fields = line.split()
if str(self.cache_id) == fields[1] and 'Running' == fields[3]:
return True
return False
def attach_volume(self, **kwargs):
core = kwargs.get('dev_path')
if core is None:
LOG.error('dev_path is not specified')
raise exception.VolumePathsNotFound()
core = os.path.realpath(core)
return self._map_casdisk(core)
def detach_volume(self, **kwargs):
casdev = kwargs.get('dev_path')
if casdev is None:
LOG.error('dev_path is not specified')
raise exception.VolumePathsNotFound()
coreid, coredev = self._get_mapped_coredev(casdev)
LOG.info("opencas: coreid=%s,coredev=%s", coreid, coredev)
self._unmap_casdisk(coreid)
return coredev
def _get_mapped_casdev(self, core):
cmd = ['casadm', '-L']
kwargs = dict(run_as_root=True,
root_helper=self._root_helper)
out, err = self.os_execute(*cmd, **kwargs)
for line in out.splitlines():
if line.find(core) < 0:
continue
fields = line.split()
return fields[5]
raise Exception('Cannot find emulated device.')
def _get_mapped_coredev(self, casdev):
cmd = ['casadm', '-L']
kwargs = dict(run_as_root=True,
root_helper=self._root_helper)
out, err = self.os_execute(*cmd, **kwargs)
for line in out.splitlines():
if line.find(casdev) < 0:
continue
fields = line.split()
return (fields[1], fields[2])
raise Exception('Cannot find core device.')
def _map_casdisk(self, core):
cmd = ['casadm', '-A', '-i', self.cache_id, '-d', core]
kwargs = dict(run_as_root=True,
root_helper=self._root_helper)
out, err = self.os_execute(*cmd, **kwargs)
return self._get_mapped_casdev(core)
def _unmap_casdisk(self, coreid):
cmd = ['casadm', '-R', '-f', '-i', self.cache_id, '-j', coreid]
kwargs = dict(run_as_root=True,
root_helper=self._root_helper)
out, err = self.os_execute(*cmd, **kwargs)

View File

@ -158,6 +158,11 @@ class VolumeEncryptionNotSupported(Invalid):
"volume %(volume_id)s.")
class VolumeLocalCacheNotSupported(Invalid):
message = _("Volume local cache is not supported for %(volume_type)s "
"volume %(volume_id)s.")
# NOTE(mriedem): This extends ValueError to maintain backward compatibility.
class InvalidConnectorProtocol(ValueError):
pass

View File

View File

@ -0,0 +1,134 @@
# Copyright (c) 2013 The Johns Hopkins University/Applied Physics Laboratory
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from unittest import mock
from os_brick import caches
from os_brick import exception
from os_brick.tests import base
class CacheManagerTestCase(base.TestCase):
def setUp(self):
super(CacheManagerTestCase, self).setUp()
self.connection_info = {
"data": {
"device_path": "/dev/disk/by-path/"
"ip-192.0.2.0:3260-iscsi-iqn.2010-10.org.openstack"
":volume-fake_uuid-lun-1",
},
}
self.root_helper = None
@mock.patch('os_brick.executor.Executor._execute')
def test_init_invalid_device_path(self, moc_exec):
conn_info_invalid = {
'data': {
}
}
self.assertRaises(
exception.VolumeLocalCacheNotSupported,
caches.CacheManager,
root_helper=None,
connection_info=conn_info_invalid
)
@mock.patch('os_brick.caches.CacheManager._get_engine')
def test_init_cacheable(self, moc_get_engine):
moc_get_engine.return_value = None
conn_info_cacheable = {
'data': {
'device_path': '/dev/sdd',
'cacheable': True
}
}
conn_info_non_cacheable = {
'data': {
'device_path': '/dev/sdd',
}
}
mgr_cacheable = caches.CacheManager(
root_helper=None,
connection_info=conn_info_cacheable)
mgr_non_cacheable = caches.CacheManager(
root_helper=None,
connection_info=conn_info_non_cacheable)
self.assertTrue(mgr_cacheable.cacheable)
self.assertFalse(mgr_non_cacheable.cacheable)
@mock.patch('os_brick.caches.opencas.OpenCASEngine.is_engine_ready')
def test_get_engine(self, moc_get_engine):
conn_info = {
'data': {
'device_path': '/dev/sdd',
'cacheable': True
}
}
mgr = caches.CacheManager(root_helper=None,
cache_name='opencas',
connection_info=conn_info)
self.assertIsNotNone(mgr.engine)
self.assertRaises(
exception.Invalid,
caches.CacheManager,
root_helper=None,
connection_info=conn_info
)
@mock.patch('os_brick.caches.opencas.OpenCASEngine.is_engine_ready')
@mock.patch('os_brick.caches.opencas.OpenCASEngine.attach_volume')
def test_attach_volume(self, moc_attach, moc_eng_ready):
conn_info = {
'data': {
'device_path': '/dev/sdd',
}
}
moc_attach.return_value = '/dev/cas1-1'
moc_eng_ready.return_value = True
mgr = caches.CacheManager(root_helper=None,
cache_name='opencas',
connection_info=conn_info)
self.assertEqual('/dev/sdd', mgr.attach_volume())
conn_info['data']['cacheable'] = True
mgr = caches.CacheManager(root_helper=None,
cache_name='opencas',
connection_info=conn_info)
self.assertEqual('/dev/cas1-1', mgr.attach_volume())
@mock.patch('os_brick.caches.opencas.OpenCASEngine.is_engine_ready')
@mock.patch('os_brick.caches.opencas.OpenCASEngine.detach_volume')
def test_detach_volume(self, moc_detach, moc_eng_ready):
conn_info = {
'data': {
'device_path': '/dev/sdd',
}
}
moc_detach.return_value = '/dev/sdd'
moc_eng_ready.return_value = True
# cacheable == False
mgr = caches.CacheManager(root_helper=None,
cache_name='opencas',
connection_info=conn_info)
self.assertEqual('/dev/sdd', mgr.attach_volume())
# cacheable == True
conn_info['data']['cacheable'] = True
mgr = caches.CacheManager(root_helper=None,
cache_name='opencas',
connection_info=conn_info)
self.assertEqual('/dev/sdd', mgr.detach_volume())

View File

@ -0,0 +1,162 @@
# Copyright (c) 2013 The Johns Hopkins University/Applied Physics Laboratory
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_concurrency import processutils as putils
from unittest import mock
from os_brick.caches import opencas
from os_brick import exception
from os_brick.tests import base
class OpenCASEngineTestCase(base.TestCase):
def setUp(self):
super(OpenCASEngineTestCase, self).setUp()
self.connection_info = {
"data": {
"device_path": "/dev/disk/by-path/"
"ip-192.0.2.0:3260-iscsi-iqn.2010-10.org.openstack"
":volume-fake_uuid-lun-1",
},
}
self.root_helper = None
@mock.patch('os_brick.executor.Executor._execute')
def test_os_execute_exception(self, mock_execute):
raise_err = [
putils.ProcessExecutionError(exit_code=1),
mock.DEFAULT,
]
engine = opencas.OpenCASEngine(root_helper=None, opencas_cache_id=1)
mock_execute.side_effect = raise_err
self.assertRaises(putils.ProcessExecutionError,
engine.os_execute, 'cmd', 'param')
mock_execute.side_effect = raise_err
self.assertRaises(putils.ProcessExecutionError,
engine.is_engine_ready)
mock_execute.side_effect = raise_err
self.assertRaises(putils.ProcessExecutionError,
engine._get_mapped_casdev, 'path')
mock_execute.side_effect = raise_err
self.assertRaises(putils.ProcessExecutionError,
engine._get_mapped_coredev, 'path')
mock_execute.side_effect = raise_err
self.assertRaises(putils.ProcessExecutionError,
engine._map_casdisk, 'path')
mock_execute.side_effect = raise_err
self.assertRaises(putils.ProcessExecutionError,
engine._unmap_casdisk, 'path')
@mock.patch('os_brick.executor.Executor._execute')
def test_is_engine_ready(self, moc_exec):
out_ready = """type id disk status write policy device
cache 1 /dev/nvme0n1 Running wt -"""
out_not_ready = 'type id disk status write policy device'
err = ''
engine = opencas.OpenCASEngine(root_helper=None, opencas_cache_id=1)
moc_exec.return_value = (out_ready, err)
ret = engine.is_engine_ready()
self.assertTrue(ret)
moc_exec.return_value = (out_not_ready, err)
ret = engine.is_engine_ready()
self.assertFalse(ret)
moc_exec.assert_has_calls([
mock.call('casadm', '-L', run_as_root=True, root_helper=None)
])
@mock.patch('os_brick.executor.Executor._execute')
def test_get_mapped_casdev(self, moc_exec):
out_ready = """type id disk status write policy device
cache 1 /dev/nvme0n1 Running wt -
core 1 /dev/sdd Active - /dev/cas1-1"""
err = ''
engine = opencas.OpenCASEngine(root_helper=None, opencas_cache_id=1)
moc_exec.return_value = (out_ready, err)
ret1 = engine._get_mapped_casdev('/dev/sdd')
self.assertEqual('/dev/cas1-1', ret1)
@mock.patch('os_brick.executor.Executor._execute')
def test_get_mapped_coredev(self, moc_exec):
out_ready = """type id disk status write policy device
cache 1 /dev/nvme0n1 Running wt -
core 1 /dev/sdd Active - /dev/cas1-1"""
err = ''
engine = opencas.OpenCASEngine(root_helper=None, opencas_cache_id=1)
moc_exec.return_value = (out_ready, err)
ret1, ret2 = engine._get_mapped_coredev('/dev/cas1-1')
self.assertEqual('1', ret1)
self.assertEqual('/dev/sdd', ret2)
@mock.patch('os_brick.executor.Executor._execute')
@mock.patch('os_brick.caches.opencas.OpenCASEngine._get_mapped_casdev')
def test_map_casdisk(self, moc_get_mapped_casdev, moc_exec):
engine = opencas.OpenCASEngine(root_helper=None, opencas_cache_id=1)
moc_get_mapped_casdev.return_value = ''
moc_exec.return_value = ('', '')
engine._map_casdisk('/dev/sdd')
moc_exec.assert_has_calls([
mock.call('casadm', '-A', '-i', 1, '-d', '/dev/sdd',
run_as_root=True, root_helper=None)
])
@mock.patch('os_brick.executor.Executor._execute')
def test_unmap_casdisk(self, moc_exec):
engine = opencas.OpenCASEngine(root_helper=None, opencas_cache_id=1)
moc_exec.return_value = ('', '')
engine._unmap_casdisk('1')
moc_exec.assert_has_calls([
mock.call('casadm', '-R', '-f', '-i', 1, '-j', '1',
run_as_root=True, root_helper=None)
])
@mock.patch('os_brick.caches.opencas.OpenCASEngine._map_casdisk')
def test_attach_volume(self, moc_map):
engine = opencas.OpenCASEngine(root_helper=None, opencas_cache_id=1)
moc_map.return_value = ''
args = {'no_dev_path': 'path'}
self.assertRaises(exception.VolumePathsNotFound,
engine.attach_volume, **args)
self.assertRaises(exception.VolumePathsNotFound, engine.attach_volume)
# No exception if dev_path set correctly
args = {'dev_path': 'path'}
engine.attach_volume(**args)
@mock.patch('os_brick.executor.Executor._execute')
def test_detach_volume(self, moc_exec):
out_ready = """type id disk status write policy device
cache 1 /dev/nvme0n1 Running wt -
core 1 /dev/sdd Active - /dev/cas1-1"""
err = ''
engine = opencas.OpenCASEngine(root_helper=None, opencas_cache_id=1)
moc_exec.return_value = (out_ready, err)
args = {'no_dev_path': 'path'}
self.assertRaises(exception.VolumePathsNotFound,
engine.detach_volume, **args)
self.assertRaises(exception.VolumePathsNotFound, engine.detach_volume)
# No exception if dev_path set correctly
args = {'dev_path': '/dev/cas1-1'}
engine.detach_volume(**args)