Adding SPDK volume driver

Adding Storage Performance Development Kit volume
driver. SPDK NVMe-oF Target driver depends on this
driver. It provides a new type of a backend.

Signed-off-by: Maciej Szwed <maciej.szwed@intel.com>
Change-Id: I3964acadfd6e8396fc52adcd8c4933790a94ec50
This commit is contained in:
Maciej Szwed 2018-08-16 13:17:20 +02:00
parent cb96d2da3a
commit de0dc85971
3 changed files with 1237 additions and 0 deletions

View File

@ -0,0 +1,821 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import copy
import json
import mock
from os_brick import initiator
from os_brick.initiator import connector
from oslo_utils import units
from cinder import context
from cinder import objects
from cinder import test
from cinder.tests.unit import fake_volume
from cinder import utils
from cinder.volume import configuration as conf
from cinder.volume.drivers import spdk as spdk_driver
BDEVS = [{
"num_blocks": 4096000,
"name": "Nvme0n1",
"driver_specific": {
"nvme": {
"trid": {
"trtype": "PCIe",
"traddr": "0000:00:04.0"
},
"ns_data": {
"id": 1
},
"pci_address": "0000:00:04.0",
"vs": {
"nvme_version": "1.1"
},
"ctrlr_data": {
"firmware_revision": "1.0",
"serial_number": "deadbeef",
"oacs": {
"ns_manage": 0,
"security": 0,
"firmware": 0,
"format": 0
},
"vendor_id": "0x8086",
"model_number": "QEMU NVMe Ctrl"
},
"csts": {
"rdy": 1,
"cfs": 0
}
}
},
"supported_io_types": {
"reset": True,
"nvme_admin": True,
"unmap": False,
"read": True,
"write_zeroes": False,
"write": True,
"flush": True,
"nvme_io": True
},
"claimed": False,
"block_size": 512,
"product_name": "NVMe disk",
"aliases": ["Nvme0n1"]
}, {
"num_blocks": 8192,
"uuid": "70efd305-4e66-49bd-99ff-faeda5c3052d",
"aliases": [
"Nvme0n1p0"
],
"driver_specific": {
"lvol": {
"base_bdev": "Nvme0n1",
"lvol_store_uuid": "58b17014-d4a1-4f85-9761-093643ed18f1",
"thin_provision": False
}
},
"supported_io_types": {
"reset": True,
"nvme_admin": False,
"unmap": True,
"read": True,
"write_zeroes": True,
"write": True,
"flush": False,
"nvme_io": False
},
"claimed": False,
"block_size": 4096,
"product_name": "Split Disk",
"name": "Nvme0n1p0"
}, {
"num_blocks": 8192,
"uuid": "70efd305-4e66-49bd-99ff-faeda5c3052d",
"aliases": [
"Nvme0n1p1"
],
"driver_specific": {
"lvol": {
"base_bdev": "Nvme0n1",
"lvol_store_uuid": "58b17014-d4a1-4f85-9761-093643ed18f1",
"thin_provision": False
}
},
"supported_io_types": {
"reset": True,
"nvme_admin": False,
"unmap": True,
"read": True,
"write_zeroes": True,
"write": True,
"flush": False,
"nvme_io": False
},
"claimed": False,
"block_size": 4096,
"product_name": "Split Disk",
"name": "Nvme0n1p1"
}, {
"num_blocks": 8192,
"uuid": "70efd305-4e66-49bd-99ff-faeda5c3052d",
"aliases": [
"lvs_test/lvol0"
],
"driver_specific": {
"lvol": {
"base_bdev": "Malloc0",
"lvol_store_uuid": "58b17014-d4a1-4f85-9761-093643ed18f1",
"thin_provision": False
}
},
"supported_io_types": {
"reset": True,
"nvme_admin": False,
"unmap": True,
"read": True,
"write_zeroes": True,
"write": True,
"flush": False,
"nvme_io": False
},
"claimed": False,
"block_size": 4096,
"product_name": "Logical Volume",
"name": "58b17014-d4a1-4f85-9761-093643ed18f1_4294967297"
}, {
"num_blocks": 8192,
"uuid": "8dec1964-d533-41df-bea7-40520efdb416",
"aliases": [
"lvs_test/lvol1"
],
"driver_specific": {
"lvol": {
"base_bdev": "Malloc0",
"lvol_store_uuid": "58b17014-d4a1-4f85-9761-093643ed18f1",
"thin_provision": True
}
},
"supported_io_types": {
"reset": True,
"nvme_admin": False,
"unmap": True,
"read": True,
"write_zeroes": True,
"write": True,
"flush": False,
"nvme_io": False
},
"claimed": False,
"block_size": 4096,
"product_name": "Logical Volume",
"name": "58b17014-d4a1-4f85-9761-093643ed18f1_4294967298"
}]
LVOL_STORES = [{
"uuid": "58b17014-d4a1-4f85-9761-093643ed18f1",
"base_bdev": "Nvme0n1",
"free_clusters": 5976,
"cluster_size": 1048576,
"total_data_clusters": 5976,
"block_size": 4096,
"name": "lvs_test"
}]
NVMF_SUBSYSTEMS = [{
"listen_addresses": [],
"subtype": "Discovery",
"nqn": "nqn.2014-08.org.nvmexpress.discovery",
"hosts": [],
"allow_any_host": True
}, {
"listen_addresses": [],
"subtype": "NVMe",
"hosts": [{
"nqn": "nqn.2016-06.io.spdk:init"
}],
"namespaces": [{
"bdev_name": "Nvme0n1p0",
"nsid": 1,
"name": "Nvme0n1p0"
}],
"allow_any_host": False,
"serial_number": "SPDK00000000000001",
"nqn": "nqn.2016-06.io.spdk:cnode1"
}, {
"listen_addresses": [],
"subtype": "NVMe",
"hosts": [],
"namespaces": [{
"bdev_name": "Nvme1n1p0",
"nsid": 1,
"name": "Nvme1n1p0"
}],
"allow_any_host": True,
"serial_number": "SPDK00000000000002",
"nqn": "nqn.2016-06.io.spdk:cnode2"
}]
class Volume(object):
def __init__(self):
self.size = 1
self.name = "lvol2"
class Snapshot(object):
def __init__(self):
self.name = "snapshot0"
self.volume_size = 1
class JSONRPCException(Exception):
def __init__(self, message):
self.message = message
class JSONRPCClient(object):
def __init__(self, addr=None, port=None):
self.methods = {"get_bdevs": self.get_bdevs,
"get_lvol_stores": self.get_lvol_stores,
"destroy_lvol_bdev": self.destroy_lvol_bdev,
"snapshot_lvol_bdev": self.snapshot_lvol_bdev,
"clone_lvol_bdev": self.clone_lvol_bdev,
"construct_lvol_bdev": self.construct_lvol_bdev,
"resize_lvol_bdev": self.resize_lvol_bdev,
"get_nvmf_subsystems": self.get_nvmf_subsystems,
"construct_nvmf_subsystem":
self.construct_nvmf_subsystem,
"nvmf_subsystem_create":
self.nvmf_subsystem_create,
"nvmf_subsystem_add_listener":
self.nvmf_subsystem_add_listener,
"nvmf_subsystem_add_ns":
self.nvmf_subsystem_add_ns,
"inflate_lvol_bdev": self.inflate_lvol_bdev}
self.bdevs = copy.deepcopy(BDEVS)
self.nvmf_subsystems = copy.deepcopy(NVMF_SUBSYSTEMS)
self.lvol_stores = copy.deepcopy(LVOL_STORES)
def get_bdevs(self, params=None):
if params and 'name' in params:
for bdev in self.bdevs:
for alias in bdev['aliases']:
if params['name'] in alias:
return json.dumps({"result": [bdev]})
if bdev['name'] == params['name']:
return json.dumps({"result": [bdev]})
return json.dumps({"error": "Not found"})
return json.dumps({"result": self.bdevs})
def destroy_lvol_bdev(self, params=None):
if 'name' not in params:
return json.dumps({})
i = 0
found_bdev = -1
for bdev in self.bdevs:
if bdev['name'] == params['name']:
found_bdev = i
break
i += 1
if found_bdev != -1:
del self.bdevs[found_bdev]
return json.dumps({"result": {}})
def get_lvol_stores(self, params=None):
return json.dumps({"result": self.lvol_stores})
def snapshot_lvol_bdev(self, params=None):
snapshot = {
'num_blocks': 5376,
'name': '58b17014-d4a1-4f85-9761-093643ed18f2',
'aliases': ['lvs_test/%s' % params['snapshot_name']],
'driver_specific': {
'lvol': {
'base_bdev': u'Malloc0',
'lvol_store_uuid': u'58b17014-d4a1-4f85-9761-093643ed18f1',
'thin_provision': False,
'clones': ['clone0', 'clone1']
}
},
'claimed': False,
'block_size': 4096,
'product_name': 'Logical Volume',
'supported_io_types': {
'reset': True,
'nvme_admin': False,
'unmap': True,
'read': True,
'write_zeroes': True,
'write': True,
'flush': False,
'nvme_io': False
}
}
self.bdevs.append(snapshot)
return json.dumps({"result": [snapshot]})
def clone_lvol_bdev(self, params=None):
clone = {
'num_blocks': 7936,
'supported_io_types': {
'reset': True,
'nvme_admin': False,
'unmap': True,
'read': True,
'write_zeroes': True,
'write': True,
'flush': False,
'nvme_io': False
},
'name': '3735a554-0dce-4d13-ba67-597d41186104',
'driver_specific': {
'lvol': {
'base_bdev': 'Malloc0',
'lvol_store_uuid': '58b17014-d4a1-4f85-9761-093643ed18f1',
'thin_provision': False
}
},
'block_size': 4096,
'claimed': False,
'aliases': [u'lvs_test/%s' % params['clone_name']],
'product_name': 'Logical Volume',
'uuid': '3735a554-0dce-4d13-ba67-597d41186104'
}
self.bdevs.append(clone)
return json.dumps({"result": [clone]})
def construct_lvol_bdev(self, params=None):
lvol_bdev = {
"num_blocks": 8192,
"uuid": "8dec1964-d533-41df-bea7-40520efdb416",
"aliases": [
"lvs_test/%s" % params['lvol_name']
],
"driver_specific": {
"lvol": {
"base_bdev": "Malloc0",
"lvol_store_uuid": "58b17014-d4a1-4f85-9761-093643ed18f1",
"thin_provision": True
}
},
"supported_io_types": {
"reset": True,
"nvme_admin": False,
"unmap": True,
"read": True,
"write_zeroes": True,
"write": True,
"flush": False,
"nvme_io": False
},
"claimed": False,
"block_size": 4096,
"product_name": "Logical Volume",
"name": "58b17014-d4a1-4f85-9761-093643ed18f1_4294967299"
}
self.bdevs.append(lvol_bdev)
return json.dumps({"result": [{}]})
def get_nvmf_subsystems(self, params=None):
return json.dumps({"result": self.nvmf_subsystems})
def resize_lvol_bdev(self, params=None):
if params:
if "name" in params:
tmp_bdev = json.loads(
self.get_bdevs(params={"name": params['name']}))['result']
if "size" in params:
for bdev in self.bdevs:
if bdev['name'] == tmp_bdev[0]['name']:
bdev['num_blocks'] = params['size'] \
/ bdev['block_size']
return json.dumps({"result": {}})
return json.dumps({"error": {}})
def inflate_lvol_bdev(self, params=None):
return json.dumps({'result': {}})
def construct_nvmf_subsystem(self, params=None):
nvmf_subsystem = {
"listen_addresses": [],
"subtype": "NVMe",
"hosts": [],
"namespaces": [{
"bdev_name": "Nvme1n1p0",
"nsid": 1,
"name": "Nvme1n1p0"
}],
"allow_any_host": True,
"serial_number": params['serial_number'],
"nqn": params['nqn']
}
self.nvmf_subsystems.append(nvmf_subsystem)
return json.dumps({"result": nvmf_subsystem})
def nvmf_subsystem_create(self, params=None):
nvmf_subsystem = {
"namespaces": [],
"nqn": params['nqn'],
"serial_number": "S0000000000000000001",
"allow_any_host": False,
"subtype": "NVMe",
"hosts": [],
"listen_addresses": []
}
self.nvmf_subsystems.append(nvmf_subsystem)
return json.dumps({"result": nvmf_subsystem})
def nvmf_subsystem_add_listener(self, params=None):
for nvmf_subsystem in self.nvmf_subsystems:
if nvmf_subsystem['nqn'] == params['nqn']:
nvmf_subsystem['listen_addresses'].append(
params['listen_address']
)
return json.dumps({"result": ""})
def nvmf_subsystem_add_ns(self, params=None):
for nvmf_subsystem in self.nvmf_subsystems:
if nvmf_subsystem['nqn'] == params['nqn']:
nvmf_subsystem['namespaces'].append(
params['namespace']
)
return json.dumps({"result": ""})
def call(self, method, params=None):
req = {}
req['jsonrpc'] = '2.0'
req['method'] = method
req['id'] = 1
if (params):
req['params'] = params
response = json.loads(self.methods[method](params))
if not response:
if method == "kill_instance":
return {}
msg = "Timeout while waiting for response:"
raise JSONRPCException(msg)
if 'error' in response:
msg = "\n".join(["Got JSON-RPC error response",
"request:",
json.dumps(req, indent=2),
"response:",
json.dumps(response['error'], indent=2)])
raise JSONRPCException(msg)
return response['result']
class SpdkDriverTestCase(test.TestCase):
def setUp(self):
super(SpdkDriverTestCase, self).setUp()
self.configuration = mock.Mock(conf.Configuration)
self.configuration.target_helper = ""
self.configuration.target_ip_address = "192.168.0.1"
self.configuration.target_port = 4420
self.configuration.target_prefix = "nqn.2014-08.io.spdk"
self.configuration.nvmet_port_id = "1"
self.configuration.nvmet_ns_id = "fake_id"
self.configuration.nvmet_subsystem_name = "2014-08.io.spdk"
self.configuration.target_protocol = "nvmet_rdma"
self.configuration.spdk_rpc_ip = "127.0.0.1"
self.configuration.spdk_rpc_port = 8000
mock_safe_get = mock.Mock()
mock_safe_get.return_value = 'spdk-nvmeof'
self.configuration.safe_get = mock_safe_get
self.jsonrpcclient = JSONRPCClient()
self.driver = spdk_driver.SPDKDriver(configuration=
self.configuration)
def test__update_volume_stats(self):
with mock.patch.object(self.driver, "_rpc_call",
self.jsonrpcclient.call):
self.driver._update_volume_stats()
self.assertEqual(1, len(self.driver._stats['pools']))
self.assertEqual("lvs_test",
self.driver._stats['pools'][0]['pool_name'])
def test__get_spdk_volume_name(self):
with mock.patch.object(self.driver, "_rpc_call",
self.jsonrpcclient.call):
bdev = self.driver._get_spdk_volume_name("lvs_test/lvol0")
self.assertEqual('58b17014-d4a1-4f85-9761'
'-093643ed18f1_4294967297',
bdev)
bdev = self.driver._get_spdk_volume_name("Nvme1n1")
self.assertIsNone(bdev)
def test__get_spdk_lvs_uuid(self):
with mock.patch.object(self.driver, "_rpc_call",
self.jsonrpcclient.call):
bdev = self.driver._rpc_call(
"get_bdevs", params={"name": "lvs_test/lvol0"})
self.assertEqual(
bdev[0]['driver_specific']['lvol']['lvol_store_uuid'],
self.driver._get_spdk_lvs_uuid(
"58b17014-d4a1-4f85-9761-093643ed18f1_4294967297"))
self.assertIsNone(
self.driver._get_spdk_lvs_uuid("lvs_test/fake"))
def test__get_spdk_lvs_free_space(self):
with mock.patch.object(self.driver, "_rpc_call",
self.jsonrpcclient.call):
lvs = self.driver._rpc_call("get_lvol_stores")
lvol_store = None
for lvol in lvs:
if lvol['name'] == "lvs_test":
lvol_store = lvol
self.assertIsNotNone(lvol_store)
free_size = (lvol_store['free_clusters']
* lvol_store['cluster_size']
/ units.Gi)
self.assertEqual(free_size,
self.driver._get_spdk_lvs_free_space(
"58b17014-d4a1-4f85-9761-093643ed18f1"))
self.assertEqual(0,
self.driver._get_spdk_lvs_free_space("fake"))
def test__delete_bdev(self):
with mock.patch.object(self.driver, "_rpc_call",
self.jsonrpcclient.call):
self.driver._delete_bdev("lvs_test/lvol1")
bdev = self.driver._get_spdk_volume_name("lvs_test/lvol1")
self.assertIsNone(bdev)
self.driver._delete_bdev("lvs_test/lvol1")
bdev = self.driver._get_spdk_volume_name("lvs_test/lvol1")
self.assertIsNone(bdev)
def test__create_volume(self):
with mock.patch.object(self.driver, "_rpc_call",
self.jsonrpcclient.call):
self.driver._create_volume(Volume())
bdev = self.driver._get_spdk_volume_name("lvs_test/lvol2")
self.assertEqual("58b17014-d4a1-4f85-9761"
"-093643ed18f1_4294967299",
bdev)
volume_clone = Volume()
volume_clone.name = "clone0"
self.driver._rpc_call("snapshot_lvol_bdev",
params={'snapshot_name': "snapshot0",
'lvol_name': "lvs_test/lvol2"})
bdev = self.driver._get_spdk_volume_name("lvs_test/snapshot0")
self.assertEqual("58b17014-d4a1-4f85-9761-093643ed18f2", bdev)
snapshot = Snapshot()
self.driver._create_volume(volume_clone, snapshot)
bdev = self.driver._get_spdk_volume_name("lvs_test/clone0")
self.assertEqual("3735a554-0dce-4d13-ba67-597d41186104", bdev)
def test_check_for_setup_error(self):
with mock.patch.object(self.driver, "_rpc_call",
self.jsonrpcclient.call):
self.driver.check_for_setup_error()
@mock.patch('cinder.db.sqlalchemy.api.volume_get')
def test_create_volume(self, volume_get):
with mock.patch.object(self.driver, "_rpc_call",
self.jsonrpcclient.call):
db_volume = fake_volume.fake_db_volume()
ctxt = context.get_admin_context()
db_volume = objects.Volume._from_db_object(ctxt, objects.Volume(),
db_volume)
volume_get.return_value = db_volume
self.driver.create_volume(db_volume)
bdev = self.driver._get_spdk_volume_name("lvs_test/%s"
% db_volume.name)
self.assertEqual("58b17014-d4a1-4f85-9761"
"-093643ed18f1_4294967299",
bdev)
@mock.patch('cinder.db.sqlalchemy.api.volume_get')
def test_delete_volume(self, volume_get):
with mock.patch.object(self.driver, "_rpc_call",
self.jsonrpcclient.call):
db_volume = fake_volume.fake_db_volume()
with mock.patch.object(self.driver.target_driver, "_rpc_call",
self.jsonrpcclient.call):
nqn = "nqn.2016-06.io.spdk:cnode%s" \
% self.driver.target_driver._get_first_free_node()
db_volume['provider_id'] = nqn
ctxt = context.get_admin_context()
db_volume = objects.Volume._from_db_object(ctxt, objects.Volume(),
db_volume)
volume_get.return_value = db_volume
start_bdevs_len = len(self.driver._rpc_call('get_bdevs'))
self.driver.create_volume(db_volume)
tmp_bdevs = self.driver._rpc_call('get_bdevs')
self.assertEqual(start_bdevs_len + 1, len(tmp_bdevs))
volume = Volume()
volume.name = "lvs_test/%s" % db_volume.name
volume_name = self.driver._get_spdk_volume_name(volume.name)
self.driver._rpc_call('destroy_lvol_bdev', {"name": volume_name})
self.driver.delete_volume(volume)
bdev = self.driver._get_spdk_volume_name("lvs_test/%s"
% db_volume.name)
self.assertIsNone(bdev)
tmp_bdevs = self.driver._rpc_call('get_bdevs')
self.assertEqual(start_bdevs_len, len(tmp_bdevs))
def get_volume_stats(self):
with mock.patch.object(self.driver, "_rpc_call",
self.jsonrpcclient.call):
self.driver.get_volume_stats(True)
self.driver.get_volume_stats(False)
def test_create_volume_from_snapshot(self):
with mock.patch.object(self.driver, "_rpc_call",
self.jsonrpcclient.call):
volume_clone = Volume()
volume_clone.name = "clone0"
self.driver._rpc_call("snapshot_lvol_bdev",
params={'snapshot_name': "snapshot0",
'lvol_name': "lvs_test/lvol2"})
snapshot = Snapshot()
self.driver.create_volume_from_snapshot(volume_clone, snapshot)
bdev = self.driver._get_spdk_volume_name("lvs_test/clone0")
self.assertEqual("3735a554-0dce-4d13-ba67-597d41186104", bdev)
@mock.patch('cinder.db.sqlalchemy.api.volume_get')
def test_create_snapshot(self, volume_get):
with mock.patch.object(self.driver, "_rpc_call",
self.jsonrpcclient.call):
db_volume = fake_volume.fake_db_volume()
db_volume['name'] = "lvs_test/lvol0"
ctxt = context.get_admin_context()
db_volume = objects.Volume._from_db_object(ctxt, objects.Volume(),
db_volume)
volume_get.return_value = db_volume
snapshot = {}
snapshot['volume_id'] = db_volume['id']
snapshot['name'] = "snapshot0"
snapshot['volume'] = db_volume
for bdev in self.jsonrpcclient.bdevs:
if bdev['aliases'][-1] == "lvs_test/lvol0":
bdev['aliases'].append(db_volume.name)
self.driver.create_snapshot(snapshot)
bdev = self.driver._get_spdk_volume_name("lvs_test/snapshot0")
self.assertEqual("58b17014-d4a1-4f85-9761-093643ed18f2", bdev)
def test_delete_snapshot(self):
with mock.patch.object(self.driver, "_rpc_call",
self.jsonrpcclient.call):
snapshot = Snapshot()
snapshot.name = "snapshot0"
self.driver._rpc_call("snapshot_lvol_bdev",
params = {'snapshot_name': snapshot.name})
self.driver.delete_snapshot(snapshot)
snapshot = self.driver._get_spdk_volume_name("lvs_test/" +
snapshot.name)
self.assertIsNone(snapshot)
@mock.patch('cinder.db.sqlalchemy.api.volume_get')
def test_create_cloned_volume(self, volume_get):
with mock.patch.object(self.driver, "_rpc_call",
self.jsonrpcclient.call):
db_volume = fake_volume.fake_db_volume()
db_volume['name'] = "lvs_test/lvol0"
db_volume['size'] = 1
ctxt = context.get_admin_context()
db_volume = objects.Volume._from_db_object(ctxt, objects.Volume(),
db_volume)
cloned_volume = Volume()
cloned_volume.name = 'lvs_test/cloned_volume'
for bdev in self.jsonrpcclient.bdevs:
if bdev['aliases'][-1] == "lvs_test/lvol0":
bdev['aliases'].append(db_volume.name)
self.driver.create_cloned_volume(cloned_volume, db_volume)
bdev = self.driver._get_spdk_volume_name("lvs_test/cloned_volume")
self.assertEqual("3735a554-0dce-4d13-ba67-597d41186104", bdev)
@mock.patch('cinder.db.sqlalchemy.api.volume_get')
def test_copy_image_to_volume(self, volume_get):
with mock.patch.object(self.driver, "_rpc_call",
self.jsonrpcclient.call):
db_volume = fake_volume.fake_db_volume()
db_volume['provider_location'] = "127.0.0.1:3262 RDMA " \
"2016-06.io.spdk:cnode2"
ctxt = context.get_admin_context()
db_volume = objects.Volume._from_db_object(ctxt, objects.Volume(),
db_volume)
volume_get.return_value = db_volume
with mock.patch.object(self.driver.target_driver, "_rpc_call",
self.jsonrpcclient.call):
self.driver.copy_image_to_volume(ctxt, db_volume, None, None)
@mock.patch('cinder.db.sqlalchemy.api.volume_get')
def test_copy_volume_to_image(self, volume_get):
with mock.patch.object(self.driver, "_rpc_call",
self.jsonrpcclient.call):
db_volume = fake_volume.fake_db_volume()
db_volume['provider_location'] = "127.0.0.1:3262 RDMA " \
"2016-06.io.spdk:cnode2"
ctxt = context.get_admin_context()
db_volume = objects.Volume._from_db_object(ctxt, objects.Volume(),
db_volume)
volume_get.return_value = db_volume
with mock.patch.object(self.driver.target_driver, "_rpc_call",
self.jsonrpcclient.call):
self.driver.copy_volume_to_image(ctxt, db_volume, None, None)
def test_extend_volume(self):
with mock.patch.object(self.driver, "_rpc_call",
self.jsonrpcclient.call):
volume = Volume()
volume.name = "lvs_test/lvol0"
self.driver.extend_volume(volume, 2)
bdev = self.driver._rpc_call("get_bdevs",
params={"name": "lvs_test/lvol0"})
self.assertEqual(2 * units.Gi,
bdev[0]['num_blocks'] * bdev[0]['block_size'])
def test_ensure_export(self):
pass
def test_create_export(self):
with mock.patch.object(self.driver, "_rpc_call",
self.jsonrpcclient.call):
db_volume = fake_volume.fake_db_volume()
db_volume['provider_location'] = "192.168.0.1:4420 rdma " \
"2014-08.io.spdk:cnode2"
ctxt = context.get_admin_context()
db_volume = objects.Volume._from_db_object(ctxt, objects.Volume(),
db_volume)
with mock.patch.object(self.driver.target_driver, "_rpc_call",
self.jsonrpcclient.call):
expected_return = {
'provider_location':
self.driver.target_driver.get_nvmeof_location(
"nqn.%s:cnode%s" % (
self.configuration.nvmet_subsystem_name,
self.driver.target_driver._get_first_free_node()
),
self.configuration.target_ip_address,
self.configuration.target_port, "rdma",
self.configuration.nvmet_ns_id
),
'provider_auth': ''
}
export = self.driver.create_export(ctxt, db_volume, None)
self.assertEqual(expected_return, export)
def test_remove_export(self):
with mock.patch.object(self.driver, "_rpc_call",
self.jsonrpcclient.call):
db_volume = fake_volume.fake_db_volume()
db_volume['provider_location'] = "127.0.0.1:4420 rdma " \
"2016-06.io.spdk:cnode2"
ctxt = context.get_admin_context()
db_volume = objects.Volume._from_db_object(ctxt, objects.Volume(),
db_volume)
with mock.patch.object(self.driver.target_driver, "_rpc_call",
self.jsonrpcclient.call):
self.driver.create_export(ctxt, db_volume, None)
self.assertIsNone(self.driver.remove_export(ctxt, db_volume))
def test_initialize_connection(self):
with mock.patch.object(self.driver, "_rpc_call",
self.jsonrpcclient.call):
db_volume = fake_volume.fake_db_volume()
db_volume['provider_location'] = "127.0.0.1:3262 RDMA " \
"2016-06.io.spdk:cnode2 1"
ctxt = context.get_admin_context()
db_volume = objects.Volume._from_db_object(ctxt, objects.Volume(),
db_volume)
target_connector = \
connector.InitiatorConnector.factory(initiator.NVME,
utils.get_root_helper())
self.driver.initialize_connection(db_volume, target_connector)
def test_validate_connector(self):
mock_connector = {'initiator': 'fake_init'}
self.assertTrue(self.driver.validate_connector(mock_connector))
def test_terminate_connection(self):
pass

View File

@ -0,0 +1,410 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import json
from os_brick import initiator
from os_brick.initiator import connector
from oslo_log import log as logging
from oslo_utils import importutils
from oslo_utils import units
import requests
from cinder import context
from cinder import exception
from cinder.i18n import _
from cinder.image import image_utils
from cinder import interface
from cinder import utils
from cinder.volume import driver
LOG = logging.getLogger(__name__)
@interface.volumedriver
class SPDKDriver(driver.VolumeDriver):
"""Executes commands relating to Volumes."""
VERSION = '1.0.0'
# ThirdPartySystems wiki page
CI_WIKI_NAME = "SPDK"
def __init__(self, *args, **kwargs):
# Parent sets db, host, _execute and base config
super(SPDKDriver, self).__init__(*args, **kwargs)
self.lvs = []
self.ctxt = context.get_admin_context()
target_driver = (
self.target_mapping[self.configuration.safe_get('target_helper')])
LOG.debug('SPDK attempting to initialize LVM driver with the '
'following target_driver: %s',
target_driver)
self.target_driver = importutils.import_object(
target_driver,
configuration=self.configuration,
db=self.db,
executor=self._execute)
def _rpc_call(self, method, params=None):
payload = {}
payload['jsonrpc'] = '2.0'
payload['id'] = 1
payload['method'] = method
if params is not None:
payload['params'] = params
req = requests.post(self.url,
data=json.dumps(payload),
auth=(self.configuration.spdk_rpc_username,
self.configuration.spdk_rpc_password),
verify=self.configuration.driver_ssl_cert_verify,
timeout=30)
if not req.ok:
raise exception.VolumeBackendAPIException(
data=_('SPDK target responded with error: %s') % req.text)
return req.json()['result']
def _update_volume_stats(self):
"""Retrieve stats info from volume group."""
LOG.debug('SPDK Updating volume stats')
status = {}
pools_status = []
self.lvs = []
output = self._rpc_call('get_lvol_stores')
if output:
for lvs in output:
pool = {}
lvs_entry = {}
free_size = (lvs['free_clusters']
* lvs['cluster_size']
/ units.Gi)
total_size = (lvs['total_data_clusters']
* lvs['cluster_size']
/ units.Gi)
pool["volume_backend_name"] = 'SPDK'
pool["vendor_name"] = 'Open Source'
pool["driver_version"] = self.VERSION
pool["storage_protocol"] = 'NVMe-oF'
pool["total_capacity_gb"] = total_size
pool["free_capacity_gb"] = free_size
pool["pool_name"] = lvs['name']
pools_status.append(pool)
lvs_entry['name'] = lvs['name']
lvs_entry['uuid'] = lvs['uuid']
lvs_entry['free_size'] = free_size
lvs_entry['total_size'] = total_size
self.lvs.append(lvs_entry)
status['pools'] = pools_status
self._stats = status
for lvs in self.lvs:
LOG.debug('SPDK lvs name: %s, total space: %s, free space: %s',
lvs['name'],
lvs['total_size'],
lvs['free_size'])
def _get_spdk_volume_name(self, name):
output = self._rpc_call('get_bdevs')
for bdev in output:
for alias in bdev['aliases']:
if name in alias:
return bdev['name']
def _get_spdk_lvs_uuid(self, spdk_name):
output = self._rpc_call('get_bdevs')
for bdev in output:
if spdk_name in bdev['name']:
return bdev['driver_specific']['lvol']['lvol_store_uuid']
def _get_spdk_lvs_free_space(self, lvs_uuid):
self._update_volume_stats()
for lvs in self.lvs:
if lvs_uuid in lvs['uuid']:
return lvs['free_size']
return 0
def _delete_bdev(self, name):
spdk_name = self._get_spdk_volume_name(name)
if spdk_name is not None:
params = {'name': spdk_name}
self._rpc_call('destroy_lvol_bdev', params)
LOG.debug('SPDK bdev %s deleted', spdk_name)
else:
LOG.debug('Could not find volume %s using SPDK driver', name)
def _create_volume(self, volume, snapshot=None):
output = self._rpc_call('get_lvol_stores')
for lvs in output:
free_size = (lvs['free_clusters'] * lvs['cluster_size'])
if free_size / units.Gi >= volume.size:
if snapshot is None:
params = {
'lvol_name': volume.name,
'size': volume.size * units.Gi,
'uuid': lvs['uuid']}
output2 = self._rpc_call('construct_lvol_bdev', params)
else:
snapshot_spdk_name = (
self._get_spdk_volume_name(snapshot.name))
params = {
'clone_name': volume.name,
'snapshot_name': snapshot_spdk_name}
output2 = self._rpc_call('clone_lvol_bdev', params)
spdk_name = self._get_spdk_volume_name(volume.name)
params = {'name': spdk_name}
self._rpc_call('inflate_lvol_bdev', params)
if volume.size > snapshot.volume_size:
params = {'name': spdk_name,
'size': volume.size * units.Gi}
self._rpc_call('resize_lvol_bdev', params)
LOG.debug('SPDK created lvol: %s', output2)
return
LOG.error('Unable to create volume using SPDK - no resources found')
raise exception.VolumeBackendAPIException(
data=_('Unable to create volume using SPDK'
' - no resources found'))
def do_setup(self, context):
try:
payload = {'method': 'get_bdevs', 'jsonrpc': '2.0', 'id': 1}
self.url = ('http://%(ip)s:%(port)s/' %
{'ip': self.configuration.spdk_rpc_ip,
'port': self.configuration.spdk_rpc_port})
requests.post(self.url,
data=json.dumps(payload),
auth=(self.configuration.spdk_rpc_username,
self.configuration.spdk_rpc_password),
verify=self.configuration.driver_ssl_cert_verify,
timeout=30)
except Exception as err:
err_msg = (
_('Could not connect to SPDK target: %(err)s')
% {'err': err})
LOG.error(err_msg)
raise exception.VolumeBackendAPIException(data=err_msg)
def check_for_setup_error(self):
"""Verify that requirements are in place to use LVM driver."""
# If configuration is incorrect we will get exception here
self._rpc_call('get_bdevs')
def create_volume(self, volume):
"""Creates a logical volume."""
LOG.debug('SPDK create volume')
return self._create_volume(volume)
def delete_volume(self, volume):
"""Deletes a logical volume."""
LOG.debug('SPDK deleting volume %s', volume.name)
self._delete_bdev(volume.name)
def get_volume_stats(self, refresh=False):
if refresh:
self._update_volume_stats()
return self._stats
def create_volume_from_snapshot(self, volume, snapshot):
"""Creates a volume from a snapshot."""
free_size = self._get_spdk_lvs_free_space(
self._get_spdk_lvs_uuid(
self._get_spdk_volume_name(snapshot.name)))
if free_size < volume.size:
raise exception.VolumeBackendAPIException(
data=_('Not enough space to create snapshot with SPDK'))
return self._create_volume(volume, snapshot)
def create_snapshot(self, snapshot):
"""Creates a snapshot."""
volume = snapshot['volume']
spdk_name = self._get_spdk_volume_name(volume.name)
if spdk_name is None:
raise exception.VolumeBackendAPIException(
data=_('Could not create snapshot with SPDK driver'))
free_size = self._get_spdk_lvs_free_space(
self._get_spdk_lvs_uuid(spdk_name))
if free_size < volume.size:
raise exception.VolumeBackendAPIException(
data=_('Not enough space to create snapshot with SPDK'))
params = {
'lvol_name': spdk_name,
'snapshot_name': snapshot['name']}
self._rpc_call('snapshot_lvol_bdev', params)
params = {'name': spdk_name}
self._rpc_call('inflate_lvol_bdev', params)
def delete_snapshot(self, snapshot):
"""Deletes a snapshot."""
spdk_name = self._get_spdk_volume_name(snapshot.name)
if spdk_name is None:
return
params = {'name': spdk_name}
bdev = self._rpc_call('get_bdevs', params)
if 'clones' in bdev[0]['driver_specific']['lvol']:
for clone in bdev[0]['driver_specific']['lvol']['clones']:
spdk_name = self._get_spdk_volume_name(clone)
params = {'name': spdk_name}
self._rpc_call('inflate_lvol_bdev', params)
self._delete_bdev(snapshot.name)
def create_cloned_volume(self, volume, src_volume):
spdk_name = self._get_spdk_volume_name(src_volume.name)
free_size = self._get_spdk_lvs_free_space(
self._get_spdk_lvs_uuid(spdk_name))
# We need additional space for snapshot that will be used here
if free_size < 2 * src_volume.size + volume.size:
raise exception.VolumeBackendAPIException(
data=_('Not enough space to clone volume with SPDK'))
snapshot_name = 'snp-' + src_volume.name
params = {
'lvol_name': spdk_name,
'snapshot_name': snapshot_name}
self._rpc_call('snapshot_lvol_bdev', params)
params = {'name': spdk_name}
self._rpc_call('inflate_lvol_bdev', params)
snapshot_spdk_name = self._get_spdk_volume_name(snapshot_name)
params = {
'clone_name': volume.name,
'snapshot_name': snapshot_spdk_name}
self._rpc_call('clone_lvol_bdev', params)
spdk_name = self._get_spdk_volume_name(volume.name)
params = {'name': spdk_name}
self._rpc_call('inflate_lvol_bdev', params)
self._delete_bdev(snapshot_name)
if volume.size > src_volume.size:
self.extend_volume(volume, volume.size)
def copy_image_to_volume(self, context, volume, image_service, image_id):
"""Fetch the image from image_service and write it to the volume."""
volume['provider_location'] = (
self.create_export(context, volume, None)['provider_location'])
connection_data = self.initialize_connection(volume, None)['data']
target_connector = (
connector.InitiatorConnector.factory(initiator.NVME,
utils.get_root_helper()))
try:
device_info = target_connector.connect_volume(connection_data)
except Exception:
LOG.info('Could not connect SPDK target device')
return
connection_data['device_path'] = device_info['path']
try:
image_utils.fetch_to_raw(context,
image_service,
image_id,
device_info['path'],
self.configuration.volume_dd_blocksize,
size=volume['size'])
finally:
target_connector.disconnect_volume(connection_data, volume)
def copy_volume_to_image(self, context, volume, image_service, image_meta):
"""Copy the volume to the specified image."""
volume['provider_location'] = (
self.create_export(context, volume, None)['provider_location'])
connection_data = self.initialize_connection(volume, None)['data']
target_connector = (
connector.InitiatorConnector.factory(initiator.NVME,
utils.get_root_helper()))
try:
device_info = target_connector.connect_volume(connection_data)
except Exception:
LOG.info('Could not connect SPDK target device')
return
connection_data['device_path'] = device_info['path']
try:
image_utils.upload_volume(context,
image_service,
image_meta,
device_info['path'])
finally:
target_connector.disconnect_volume(connection_data, volume)
def extend_volume(self, volume, new_size):
"""Extend an existing volume's size."""
spdk_name = self._get_spdk_volume_name(volume.name)
params = {'name': spdk_name, 'size': new_size * units.Gi}
self._rpc_call('resize_lvol_bdev', params)
# ####### Interface methods for DataPath (Target Driver) ########
def ensure_export(self, context, volume):
pass
def create_export(self, context, volume, connector, vg=None):
export_info = self.target_driver.create_export(
context,
volume,
None)
return {'provider_location': export_info['location'],
'provider_auth': export_info['auth'], }
def remove_export(self, context, volume):
self.target_driver.remove_export(context, volume)
def initialize_connection(self, volume, connector):
return self.target_driver.initialize_connection(volume, connector)
def validate_connector(self, connector):
return self.target_driver.validate_connector(connector)
def terminate_connection(self, volume, connector, **kwargs):
pass

View File

@ -0,0 +1,6 @@
---
features:
- |
A new volume driver, SPDK, is added for Storage Performance
Development Kit NVMe-oF target handling, that allows Cinder
to manage volumes in SPDK NVMe-oF driver.