From f7e9c240dcc25bdf17e3ad0e4591a7368fe8032a Mon Sep 17 00:00:00 2001 From: Bardia Keyoumarsi Date: Tue, 17 Nov 2015 16:07:41 -0800 Subject: [PATCH] Volume driver for Coho Data storage solutions This patch introduces Coho Data volume driver along with unit tests. Implements: blueprint coho-cinder-driver DocImpact Documentation for setting up the Coho driver and enabling it in Cinder will be provided in a separate patch. Change-Id: I06a66d10add9132d0f3afca054d68094ddfb4da0 Signed-off-by: Bardia Keyoumarsi --- cinder/exception.py | 6 + cinder/opts.py | 2 + cinder/tests/unit/test_coho.py | 376 +++++++++++++++++ cinder/volume/drivers/coho.py | 397 ++++++++++++++++++ .../add-coho-driver-b4472bff3f64aa41.yaml | 3 + tests-py3.txt | 1 + 6 files changed, 785 insertions(+) create mode 100644 cinder/tests/unit/test_coho.py create mode 100644 cinder/volume/drivers/coho.py create mode 100644 releasenotes/notes/add-coho-driver-b4472bff3f64aa41.yaml diff --git a/cinder/exception.py b/cinder/exception.py index ef50bd67bc3..b5344537591 100644 --- a/cinder/exception.py +++ b/cinder/exception.py @@ -48,6 +48,7 @@ CONF.register_opts(exc_log_opts) class ConvertedException(webob.exc.WSGIHTTPException): + def __init__(self, code=500, title="", explanation=""): self.code = code # There is a strict rule about constructing status line for HTTP: @@ -1017,3 +1018,8 @@ class NotSupportedOperation(Invalid): # Hitachi HNAS drivers class HNASConnError(CinderException): message = _("%(message)s") + + +# Coho drivers +class CohoException(VolumeDriverException): + message = _("Coho Data Cinder driver failure: %(message)s") diff --git a/cinder/opts.py b/cinder/opts.py index 8d65f80314b..a4864e8f733 100644 --- a/cinder/opts.py +++ b/cinder/opts.py @@ -64,6 +64,7 @@ from cinder.volume.drivers import blockbridge as \ cinder_volume_drivers_blockbridge from cinder.volume.drivers.cloudbyte import options as \ cinder_volume_drivers_cloudbyte_options +from cinder.volume.drivers import coho as cinder_volume_drivers_coho from cinder.volume.drivers import datera as cinder_volume_drivers_datera from cinder.volume.drivers.dell import dell_storagecenter_common as \ cinder_volume_drivers_dell_dellstoragecentercommon @@ -238,6 +239,7 @@ def list_opts(): cinder_db_api.db_opts, cinder_scheduler_weights_volumenumber. volume_number_weight_opts, + cinder_volume_drivers_coho.coho_opts, cinder_volume_drivers_xio.XIO_OPTS, cinder_volume_drivers_zfssa_zfssaiscsi.ZFSSA_OPTS, cinder_volume_driver.volume_opts, diff --git a/cinder/tests/unit/test_coho.py b/cinder/tests/unit/test_coho.py new file mode 100644 index 00000000000..a0c724dd324 --- /dev/null +++ b/cinder/tests/unit/test_coho.py @@ -0,0 +1,376 @@ +# Copyright (c) 2015 Coho Data, Inc. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +import binascii +import errno +import mock +import os +import six +import socket +import xdrlib + +from cinder import exception +from cinder import test +from cinder.volume import configuration as conf +from cinder.volume.drivers import coho +from cinder.volume.drivers import nfs + +ADDR = 'coho-datastream-addr' +PATH = '/test/path' +RPC_PORT = 2049 +LOCAL_PATH = '/opt/cinder/mnt/test/path' + +VOLUME = { + 'name': 'volume-bcc48c61-9691-4e5f-897c-793686093190', + 'volume_id': 'bcc48c61-9691-4e5f-897c-793686093190', + 'size': 128, + 'volume_type': 'silver', + 'volume_type_id': 'test', + 'metadata': [{'key': 'type', + 'service_label': 'silver'}], + 'provider_location': None, + 'id': 'bcc48c61-9691-4e5f-897c-793686093190', + 'status': 'available', +} + +CLONE_VOL = VOLUME.copy() +CLONE_VOL['size'] = 256 + +SNAPSHOT = { + 'name': 'snapshot-51dd4-8d8a-4aa9-9176-086c9d89e7fc', + 'id': '51dd4-8d8a-4aa9-9176-086c9d89e7fc', + 'size': 128, + 'volume_type': None, + 'provider_location': None, + 'volume_size': 128, + 'volume_name': 'volume-bcc48c61-9691-4e5f-897c-793686093190', + 'volume_id': 'bcc48c61-9691-4e5f-897c-793686093191', +} + +INVALID_SNAPSHOT = SNAPSHOT.copy() +INVALID_SNAPSHOT['name'] = '' + +INVALID_HEADER_BIN = binascii.unhexlify('800000') +NO_REPLY_BIN = binascii.unhexlify( + 'aaaaa01000000010000000000000000000000003') +MSG_DENIED_BIN = binascii.unhexlify( + '00000a010000000110000000000000000000000000000003') +PROC_UNAVAIL_BIN = binascii.unhexlify( + '00000a010000000100000000000000000000000000000003') +PROG_UNAVAIL_BIN = binascii.unhexlify( + '000003c70000000100000000000000000000000000000001') +PROG_MISMATCH_BIN = binascii.unhexlify( + '00000f7700000001000000000000000000000000000000020000000100000001') +GARBAGE_ARGS_BIN = binascii.unhexlify( + '00000d6e0000000100000000000000000000000000000004') + + +class CohoDriverTest(test.TestCase): + """Test Coho Data's NFS volume driver.""" + + def __init__(self, *args, **kwargs): + super(CohoDriverTest, self).__init__(*args, **kwargs) + + def setUp(self): + super(CohoDriverTest, self).setUp() + + self.context = mock.Mock() + self.configuration = mock.Mock(spec=conf.Configuration) + self.configuration.max_over_subscription_ratio = 20.0 + self.configuration.reserved_percentage = 0 + self.configuration.volume_backend_name = 'coho-1' + self.configuration.coho_rpc_port = 2049 + self.configuration.nfs_shares_config = '/etc/cinder/coho_shares' + self.configuration.nfs_sparsed_volumes = True + self.configuration.nfs_mount_point_base = '/opt/stack/cinder/mnt' + self.configuration.nfs_mount_options = None + self.configuration.nas_ip = None + self.configuration.nas_share_path = None + self.configuration.nas_mount_options = None + self.configuration.nfs_used_ratio = .95 + self.configuration.nfs_oversub_ratio = 1.0 + + def test_setup_failure_when_rpc_port_unconfigured(self): + self.configuration.coho_rpc_port = None + drv = coho.CohoDriver(configuration=self.configuration) + + self.mock_object(coho, 'LOG') + self.mock_object(nfs.NfsDriver, 'do_setup') + + with self.assertRaisesRegex(exception.CohoException, + ".*Coho rpc port is not configured.*"): + drv.do_setup(self.context) + + self.assertTrue(coho.LOG.warning.called) + self.assertTrue(nfs.NfsDriver.do_setup.called) + + def test_setup_failure_when_coho_rpc_port_is_invalid(self): + self.configuration.coho_rpc_port = 99999 + drv = coho.CohoDriver(configuration=self.configuration) + + self.mock_object(coho, 'LOG') + self.mock_object(nfs.NfsDriver, 'do_setup') + + with self.assertRaisesRegex(exception.CohoException, + "Invalid port number.*"): + drv.do_setup(self.context) + + self.assertTrue(coho.LOG.warning.called) + self.assertTrue(nfs.NfsDriver.do_setup.called) + + def test_create_snapshot(self): + drv = coho.CohoDriver(configuration=self.configuration) + + mock_rpc_client = self.mock_object(coho, 'CohoRPCClient') + mock_get_volume_location = self.mock_object(coho.CohoDriver, + '_get_volume_location') + mock_get_volume_location.return_value = ADDR, PATH + + drv.create_snapshot(SNAPSHOT) + + mock_get_volume_location.assert_has_calls( + [mock.call(SNAPSHOT['volume_id'])]) + mock_rpc_client.assert_has_calls( + [mock.call(ADDR, self.configuration.coho_rpc_port), + mock.call().create_snapshot( + os.path.join(PATH, SNAPSHOT['volume_name']), + SNAPSHOT['name'], 0)]) + + def test_delete_snapshot(self): + drv = coho.CohoDriver(configuration=self.configuration) + + mock_rpc_client = self.mock_object(coho, 'CohoRPCClient') + mock_get_volume_location = self.mock_object(coho.CohoDriver, + '_get_volume_location') + mock_get_volume_location.return_value = ADDR, PATH + + drv.delete_snapshot(SNAPSHOT) + + mock_get_volume_location.assert_has_calls( + [mock.call(SNAPSHOT['volume_id'])]) + mock_rpc_client.assert_has_calls( + [mock.call(ADDR, self.configuration.coho_rpc_port), + mock.call().delete_snapshot(SNAPSHOT['name'])]) + + def test_create_volume_from_snapshot(self): + drv = coho.CohoDriver(configuration=self.configuration) + + mock_rpc_client = self.mock_object(coho, 'CohoRPCClient') + mock_find_share = self.mock_object(drv, '_find_share') + mock_find_share.return_value = ADDR + ':' + PATH + + drv.create_volume_from_snapshot(VOLUME, SNAPSHOT) + + mock_find_share.assert_has_calls( + [mock.call(VOLUME['size'])]) + mock_rpc_client.assert_has_calls( + [mock.call(ADDR, self.configuration.coho_rpc_port), + mock.call().create_volume_from_snapshot( + SNAPSHOT['name'], os.path.join(PATH, VOLUME['name']))]) + + def test_create_cloned_volume(self): + drv = coho.CohoDriver(configuration=self.configuration) + + mock_find_share = self.mock_object(drv, '_find_share') + mock_find_share.return_value = ADDR + ':' + PATH + mock_execute = self.mock_object(drv, '_execute') + mock_local_path = self.mock_object(drv, 'local_path') + mock_local_path.return_value = LOCAL_PATH + + drv.create_cloned_volume(VOLUME, CLONE_VOL) + + mock_find_share.assert_has_calls( + [mock.call(VOLUME['size'])]) + mock_local_path.assert_has_calls( + [mock.call(VOLUME), mock.call(CLONE_VOL)]) + mock_execute.assert_has_calls( + [mock.call('cp', LOCAL_PATH, LOCAL_PATH, run_as_root=True)]) + + def test_extend_volume(self): + drv = coho.CohoDriver(configuration=self.configuration) + + mock_execute = self.mock_object(drv, '_execute') + mock_local_path = self.mock_object(drv, 'local_path') + mock_local_path.return_value = LOCAL_PATH + + drv.extend_volume(VOLUME, 512) + + mock_local_path.assert_has_calls( + [mock.call(VOLUME)]) + mock_execute.assert_has_calls( + [mock.call('truncate', '-s', '512G', + LOCAL_PATH, run_as_root=True)]) + + def test_snapshot_failure_when_source_does_not_exist(self): + drv = coho.CohoDriver(configuration=self.configuration) + + self.mock_object(coho.Client, '_make_call') + mock_init_socket = self.mock_object(coho.Client, 'init_socket') + mock_unpack_uint = self.mock_object(xdrlib.Unpacker, 'unpack_uint') + mock_unpack_uint.return_value = errno.ENOENT + mock_get_volume_location = self.mock_object(coho.CohoDriver, + '_get_volume_location') + mock_get_volume_location.return_value = ADDR, PATH + + with self.assertRaisesRegex(exception.CohoException, + "No such file or directory.*"): + drv.create_snapshot(SNAPSHOT) + + self.assertTrue(mock_init_socket.called) + self.assertTrue(mock_unpack_uint.called) + mock_get_volume_location.assert_has_calls( + [mock.call(SNAPSHOT['volume_id'])]) + + def test_snapshot_failure_with_invalid_input(self): + drv = coho.CohoDriver(configuration=self.configuration) + + self.mock_object(coho.Client, '_make_call') + mock_init_socket = self.mock_object(coho.Client, 'init_socket') + mock_unpack_uint = self.mock_object(xdrlib.Unpacker, 'unpack_uint') + mock_unpack_uint.return_value = errno.EINVAL + mock_get_volume_location = self.mock_object(coho.CohoDriver, + '_get_volume_location') + mock_get_volume_location.return_value = ADDR, PATH + + with self.assertRaisesRegex(exception.CohoException, + "Invalid argument"): + drv.delete_snapshot(INVALID_SNAPSHOT) + + self.assertTrue(mock_init_socket.called) + self.assertTrue(mock_unpack_uint.called) + mock_get_volume_location.assert_has_calls( + [mock.call(INVALID_SNAPSHOT['volume_id'])]) + + def test_snapshot_failure_when_remote_is_unreachable(self): + drv = coho.CohoDriver(configuration=self.configuration) + + mock_get_volume_location = self.mock_object(coho.CohoDriver, + '_get_volume_location') + mock_get_volume_location.return_value = 'uknown-address', PATH + + with self.assertRaisesRegex(exception.CohoException, + "Failed to establish connection.*"): + drv.create_snapshot(SNAPSHOT) + + mock_get_volume_location.assert_has_calls( + [mock.call(INVALID_SNAPSHOT['volume_id'])]) + + def test_rpc_client_make_call_proper_order(self): + """This test ensures that the RPC client logic is correct. + + When the RPC client's make_call function is called it creates + a packet and sends it to the Coho cluster RPC server. This test + ensures that the functions needed to complete the process are + called in the proper order with valid arguments. + """ + + mock_packer = self.mock_object(xdrlib, 'Packer') + mock_unpacker = self.mock_object(xdrlib, 'Unpacker') + mock_unpacker.return_value.unpack_uint.return_value = 0 + mock_socket = self.mock_object(socket, 'socket') + mock_init_call = self.mock_object(coho.Client, 'init_call') + mock_init_call.return_value = (1, 2) + mock_sendrecord = self.mock_object(coho.Client, '_sendrecord') + mock_recvrecord = self.mock_object(coho.Client, '_recvrecord') + mock_recvrecord.return_value = 'test_reply' + mock_unpack_replyheader = self.mock_object(coho.Client, + 'unpack_replyheader') + mock_unpack_replyheader.return_value = (123, 1) + + rpc_client = coho.CohoRPCClient(ADDR, RPC_PORT) + rpc_client.create_volume_from_snapshot('src', 'dest') + + self.assertTrue(mock_sendrecord.called) + self.assertTrue(mock_unpack_replyheader.called) + mock_packer.assert_has_calls([mock.call().reset()]) + mock_unpacker.assert_has_calls( + [mock.call().reset('test_reply'), + mock.call().unpack_uint()]) + mock_socket.assert_has_calls( + [mock.call(socket.AF_INET, socket.SOCK_STREAM), + mock.call().bind(('', 0)), + mock.call().connect((ADDR, RPC_PORT))]) + mock_init_call.assert_has_calls( + [mock.call(coho.COHO1_CREATE_VOLUME_FROM_SNAPSHOT, + [(six.b('src'), mock_packer().pack_string), + (six.b('dest'), mock_packer().pack_string)])]) + + def test_rpc_client_error_in_reply_header(self): + """Ensure excpetions in reply header are raised by the RPC client. + + Coho cluster's RPC server packs errors into the reply header. + This test ensures that the RPC client parses the reply header + correctly and raises exceptions on various errors that can be + included in the reply header. + """ + mock_socket = self.mock_object(socket, 'socket') + mock_recvrecord = self.mock_object(coho.Client, '_recvrecord') + rpc_client = coho.CohoRPCClient(ADDR, RPC_PORT) + + mock_recvrecord.return_value = NO_REPLY_BIN + with self.assertRaisesRegex(exception.CohoException, + "no REPLY.*"): + rpc_client.create_snapshot('src', 'dest', 0) + + mock_recvrecord.return_value = MSG_DENIED_BIN + with self.assertRaisesRegex(exception.CohoException, + ".*MSG_DENIED.*"): + rpc_client.delete_snapshot('snapshot') + + mock_recvrecord.return_value = PROG_UNAVAIL_BIN + with self.assertRaisesRegex(exception.CohoException, + ".*PROG_UNAVAIL"): + rpc_client.delete_snapshot('snapshot') + + mock_recvrecord.return_value = PROG_MISMATCH_BIN + with self.assertRaisesRegex(exception.CohoException, + ".*PROG_MISMATCH.*"): + rpc_client.delete_snapshot('snapshot') + + mock_recvrecord.return_value = GARBAGE_ARGS_BIN + with self.assertRaisesRegex(exception.CohoException, + ".*GARBAGE_ARGS"): + rpc_client.delete_snapshot('snapshot') + + mock_recvrecord.return_value = PROC_UNAVAIL_BIN + with self.assertRaisesRegex(exception.CohoException, + ".*PROC_UNAVAIL"): + rpc_client.delete_snapshot('snapshot') + + self.assertTrue(mock_recvrecord.called) + mock_socket.assert_has_calls( + [mock.call(socket.AF_INET, socket.SOCK_STREAM), + mock.call().bind(('', 0)), + mock.call().connect((ADDR, RPC_PORT))]) + + def test_rpc_client_error_in_receive_fragment(self): + """Ensure exception is raised when malformed packet is recieved.""" + + mock_sendrcd = self.mock_object(coho.Client, '_sendrecord') + mock_socket = self.mock_object(socket, 'socket') + mock_socket.return_value.recv.return_value = INVALID_HEADER_BIN + rpc_client = coho.CohoRPCClient(ADDR, RPC_PORT) + + with self.assertRaisesRegex(exception.CohoException, + "Invalid response header.*"): + rpc_client.create_snapshot('src', 'dest', 0) + + self.assertTrue(mock_sendrcd.called) + mock_socket.assert_has_calls( + [mock.call(socket.AF_INET, socket.SOCK_STREAM), + mock.call().bind(('', 0)), + mock.call().connect((ADDR, RPC_PORT)), + mock.call().recv(4)]) diff --git a/cinder/volume/drivers/coho.py b/cinder/volume/drivers/coho.py new file mode 100644 index 00000000000..0b2c3f8fbd4 --- /dev/null +++ b/cinder/volume/drivers/coho.py @@ -0,0 +1,397 @@ +# Copyright (c) 2015 Coho Data, Inc. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import os +import six +import socket +import xdrlib + +from oslo_config import cfg +from oslo_log import log as logging +from random import randint + +from cinder import exception +from cinder.i18n import _ +from cinder.volume.drivers import nfs + +# +# RPC Definition +# + +RPCVERSION = 2 + +CALL = 0 +REPLY = 1 + +AUTH_NULL = 0 + +MSG_ACCEPTED = 0 +MSG_DENIED = 1 + +SUCCESS = 0 +PROG_UNAVAIL = 1 +PROG_MISMATCH = 2 +PROC_UNAVAIL = 3 +GARBAGE_ARGS = 4 + +RPC_MISMATCH = 0 +AUTH_ERROR = 1 + +COHO_PROGRAM = 400115 +COHO_V1 = 1 +COHO1_CREATE_SNAPSHOT = 1 +COHO1_DELETE_SNAPSHOT = 2 +COHO1_CREATE_VOLUME_FROM_SNAPSHOT = 3 + +# +# Simple RPC Client +# + + +def make_auth_null(): + + return six.b('') + + +class Client(object): + + def __init__(self, address, prog, vers, port): + self.packer = xdrlib.Packer() + self.unpacker = xdrlib.Unpacker('') + self.address = address + self.prog = prog + self.vers = vers + self.port = port + self.cred = None + self.verf = None + + self.init_socket() + self.init_xid() + + def init_socket(self): + try: + self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.sock.bind(('', 0)) + self.sock.connect((self.address, self.port)) + except socket.error: + msg = _('Failed to establish connection with Coho cluster') + raise exception.CohoException(msg) + + def init_xid(self): + self.xid = randint(0, 4096) + + def make_xid(self): + self.xid += 1 + + def make_cred(self): + if self.cred is None: + self.cred = (AUTH_NULL, make_auth_null()) + return self.cred + + def make_verf(self): + if self.verf is None: + self.verf = (AUTH_NULL, make_auth_null()) + return self.verf + + def pack_auth(self, auth): + flavor, stuff = auth + self.packer.pack_enum(flavor) + self.packer.pack_opaque(stuff) + + def pack_callheader(self, xid, prog, vers, proc, cred, verf): + self.packer.pack_uint(xid) + self.packer.pack_enum(CALL) + self.packer.pack_uint(RPCVERSION) + self.packer.pack_uint(prog) + self.packer.pack_uint(vers) + self.packer.pack_uint(proc) + self.pack_auth(cred) + self.pack_auth(verf) + + def unpack_auth(self): + flavor = self.unpacker.unpack_enum() + stuff = self.unpacker.unpack_opaque() + return (flavor, stuff) + + def unpack_replyheader(self): + xid = self.unpacker.unpack_uint() + mtype = self.unpacker.unpack_enum() + if mtype != REPLY: + raise exception.CohoException( + _('no REPLY but %r') % (mtype,)) + stat = self.unpacker.unpack_enum() + if stat == MSG_DENIED: + stat = self.unpacker.unpack_enum() + if stat == RPC_MISMATCH: + low = self.unpacker.unpack_uint() + high = self.unpacker.unpack_uint() + raise exception.CohoException( + _('MSG_DENIED: RPC_MISMATCH: %r') % ((low, high),)) + if stat == AUTH_ERROR: + stat = self.unpacker.unpack_uint() + raise exception.CohoException( + _('MSG_DENIED: AUTH_ERROR: %r') % (stat,)) + raise exception.CohoException(_('MSG_DENIED: %r') % (stat,)) + if stat != MSG_ACCEPTED: + raise exception.CohoException( + _('Neither MSG_DENIED nor MSG_ACCEPTED: %r') % (stat,)) + verf = self.unpack_auth() + stat = self.unpacker.unpack_enum() + if stat == PROG_UNAVAIL: + raise exception.CohoException(_('call failed: PROG_UNAVAIL')) + if stat == PROG_MISMATCH: + low = self.unpacker.unpack_uint() + high = self.unpacker.unpack_uint() + raise exception.CohoException( + _('call failed: PROG_MISMATCH: %r') % ((low, high),)) + if stat == PROC_UNAVAIL: + raise exception.CohoException(_('call failed: PROC_UNAVAIL')) + if stat == GARBAGE_ARGS: + raise exception.CohoException(_('call failed: GARBAGE_ARGS')) + if stat != SUCCESS: + raise exception.CohoException(_('call failed: %r') % (stat,)) + return xid, verf + + def init_call(self, proc, args): + self.make_xid() + self.packer.reset() + cred = self.make_cred() + verf = self.make_verf() + self.pack_callheader(self.xid, self.prog, self.vers, proc, cred, verf) + + for arg, func in args: + func(arg) + + return self.xid, self.packer.get_buf() + + def _sendfrag(self, last, frag): + x = len(frag) + if last: + x = x | 0x80000000 + header = (six.int2byte(int(x >> 24 & 0xff)) + + six.int2byte(int(x >> 16 & 0xff)) + + six.int2byte(int(x >> 8 & 0xff)) + + six.int2byte(int(x & 0xff))) + self.sock.send(header + frag) + + def _sendrecord(self, record): + self._sendfrag(1, record) + + def _recvfrag(self): + header = self.sock.recv(4) + if len(header) < 4: + raise exception.CohoException( + _('Invalid response header from RPC server')) + x = (six.indexbytes(header, 0) << 24 | + six.indexbytes(header, 1) << 16 | + six.indexbytes(header, 2) << 8 | + six.indexbytes(header, 3)) + last = ((x & 0x80000000) != 0) + n = int(x & 0x7fffffff) + frag = six.b('') + while n > 0: + buf = self.sock.recv(n) + if not buf: + raise exception.CohoException( + _('RPC server response is incomplete')) + n = n - len(buf) + frag = frag + buf + return last, frag + + def _recvrecord(self): + record = six.b('') + last = 0 + while not last: + last, frag = self._recvfrag() + record = record + frag + return record + + def _make_call(self, proc, args): + self.packer.reset() + xid, call = self.init_call(proc, args) + self._sendrecord(call) + reply = self._recvrecord() + self.unpacker.reset(reply) + xid, verf = self.unpack_replyheader() + + def _call(self, proc, args): + self._make_call(proc, args) + res = self.unpacker.unpack_uint() + if res != SUCCESS: + raise exception.CohoException(os.strerror(res)) + + +class CohoRPCClient(Client): + + def __init__(self, address, port): + Client.__init__(self, address, COHO_PROGRAM, 1, port) + + def create_snapshot(self, src, dst, flags): + self._call(COHO1_CREATE_SNAPSHOT, + [(six.b(src), self.packer.pack_string), + (six.b(dst), self.packer.pack_string), + (flags, self.packer.pack_uint)]) + + def delete_snapshot(self, name): + self._call(COHO1_DELETE_SNAPSHOT, + [(six.b(name), self.packer.pack_string)]) + + def create_volume_from_snapshot(self, src, dst): + self._call(COHO1_CREATE_VOLUME_FROM_SNAPSHOT, + [(six.b(src), self.packer.pack_string), + (six.b(dst), self.packer.pack_string)]) + + +# +# Coho Data Volume Driver +# + +VERSION = '1.0.0' + +LOG = logging.getLogger(__name__) + +coho_opts = [ + cfg.IntOpt('coho_rpc_port', + default=2049, + help='RPC port to connect to Coha Data MicroArray') +] + +CONF = cfg.CONF +CONF.register_opts(coho_opts) + + +class CohoDriver(nfs.NfsDriver): + """Coho Data NFS based cinder driver. + + Creates file on NFS share for using it as block device on hypervisor. + Version history: + 1.0.0 - Initial driver + """ + + # We have to overload this attribute of RemoteFSDriver because + # unfortunately the base method doesn't accept exports of the form: + #
:/ + # It expects a non blank export name following the /. + # We are more permissive. + SHARE_FORMAT_REGEX = r'.+:/.*' + + def __init__(self, *args, **kwargs): + super(CohoDriver, self).__init__(*args, **kwargs) + self.configuration.append_config_values(coho_opts) + self._execute_as_root = True + self._rpcclients = dict() + self._backend_name = (self.configuration.volume_backend_name or + self.__class__.__name__) + + def _init_rpcclient(self, addr, port): + client = CohoRPCClient(addr, port) + self._rpcclients[(addr, port)] = client + return client + + def _get_rpcclient(self, addr, port): + if (addr, port) in self._rpcclients: + return self._rpcclients[(addr, port)] + return self._init_rpcclient(addr, port) + + def do_setup(self, context): + """Any initialization the volume driver does while starting.""" + super(CohoDriver, self).do_setup(context) + self._context = context + + config = self.configuration.coho_rpc_port + if not config: + msg = _("Coho rpc port is not configured") + LOG.warning(msg) + raise exception.CohoException(msg) + if config < 1 or config > 65535: + msg = (_("Invalid port number %(config)s for Coho rpc port") % + {'config': config}) + LOG.warning(msg) + raise exception.CohoException(msg) + + def _do_clone_volume(self, volume, src): + """Clone volume to source. + + Create a volume on given remote share with the same contents + as the specified source. + """ + volume_path = self.local_path(volume) + source_path = self.local_path(src) + + self._execute('cp', source_path, volume_path, + run_as_root=self._execute_as_root) + + def _get_volume_location(self, volume_id): + """Returns provider location for given volume.""" + + # The driver should not directly access db, but since volume is not + # passed in create_snapshot and delete_snapshot we are forced to read + # the volume info from the database + volume = self.db.volume_get(self._context, volume_id) + addr, path = volume.provider_location.split(":") + return addr, path + + def create_snapshot(self, snapshot): + """Create a volume snapshot.""" + addr, path = self._get_volume_location(snapshot['volume_id']) + volume_path = os.path.join(path, snapshot['volume_name']) + snapshot_name = snapshot['name'] + flags = 0 # unused at this time + client = self._get_rpcclient(addr, self.configuration.coho_rpc_port) + client.create_snapshot(volume_path, snapshot_name, flags) + + def delete_snapshot(self, snapshot): + """Delete a volume snapshot.""" + addr, path = self._get_volume_location(snapshot['volume_id']) + snapshot_name = snapshot['name'] + client = self._get_rpcclient(addr, self.configuration.coho_rpc_port) + client.delete_snapshot(snapshot_name) + + def create_volume_from_snapshot(self, volume, snapshot): + """Create a volume from a snapshot.""" + volume['provider_location'] = self._find_share(volume['size']) + addr, path = volume['provider_location'].split(":") + volume_path = os.path.join(path, volume['name']) + snapshot_name = snapshot['name'] + client = self._get_rpcclient(addr, self.configuration.coho_rpc_port) + client.create_volume_from_snapshot(snapshot_name, volume_path) + + return {'provider_location': volume['provider_location']} + + def _extend_file_sparse(self, path, size): + """Extend the size of a file (with no additional disk usage).""" + self._execute('truncate', '-s', '%sG' % size, + path, run_as_root=self._execute_as_root) + + def create_cloned_volume(self, volume, src_vref): + volume['provider_location'] = self._find_share(volume['size']) + + self._do_clone_volume(volume, src_vref) + + def extend_volume(self, volume, new_size): + """Extend the specified file to the new_size (sparsely).""" + volume_path = self.local_path(volume) + + self._extend_file_sparse(volume_path, new_size) + + def get_volume_stats(self, refresh): + """Pass in Coho Data information in volume stats.""" + _stats = super(CohoDriver, self).get_volume_stats(refresh) + _stats["vendor_name"] = 'Coho Data' + _stats["driver_version"] = VERSION + _stats["storage_protocol"] = 'NFS' + _stats["volume_backend_name"] = self._backend_name + + return _stats diff --git a/releasenotes/notes/add-coho-driver-b4472bff3f64aa41.yaml b/releasenotes/notes/add-coho-driver-b4472bff3f64aa41.yaml new file mode 100644 index 00000000000..4a3586158e4 --- /dev/null +++ b/releasenotes/notes/add-coho-driver-b4472bff3f64aa41.yaml @@ -0,0 +1,3 @@ +--- +features: + - Added backend driver for Coho Data storage. diff --git a/tests-py3.txt b/tests-py3.txt index 3305ce77f45..1ebf728626d 100644 --- a/tests-py3.txt +++ b/tests-py3.txt @@ -48,6 +48,7 @@ cinder.tests.unit.test_block_device cinder.tests.unit.test_blockbridge cinder.tests.unit.test_cloudbyte cinder.tests.unit.test_cmd +cinder.tests.unit.test_coho cinder.tests.unit.test_conf cinder.tests.unit.test_context cinder.tests.unit.test_db_api