Volume driver for Coho Data storage solutions
This patch introduces Coho Data volume driver along with unit tests. Implements: blueprint coho-cinder-driver DocImpact Documentation for setting up the Coho driver and enabling it in Cinder will be provided in a separate patch. Change-Id: I06a66d10add9132d0f3afca054d68094ddfb4da0 Signed-off-by: Bardia Keyoumarsi <bardia.keyoumarsi@cohodata.com>
This commit is contained in:
parent
2c3b6e9912
commit
f7e9c240dc
@ -48,6 +48,7 @@ CONF.register_opts(exc_log_opts)
|
||||
|
||||
|
||||
class ConvertedException(webob.exc.WSGIHTTPException):
|
||||
|
||||
def __init__(self, code=500, title="", explanation=""):
|
||||
self.code = code
|
||||
# There is a strict rule about constructing status line for HTTP:
|
||||
@ -1017,3 +1018,8 @@ class NotSupportedOperation(Invalid):
|
||||
# Hitachi HNAS drivers
|
||||
class HNASConnError(CinderException):
|
||||
message = _("%(message)s")
|
||||
|
||||
|
||||
# Coho drivers
|
||||
class CohoException(VolumeDriverException):
|
||||
message = _("Coho Data Cinder driver failure: %(message)s")
|
||||
|
@ -64,6 +64,7 @@ from cinder.volume.drivers import blockbridge as \
|
||||
cinder_volume_drivers_blockbridge
|
||||
from cinder.volume.drivers.cloudbyte import options as \
|
||||
cinder_volume_drivers_cloudbyte_options
|
||||
from cinder.volume.drivers import coho as cinder_volume_drivers_coho
|
||||
from cinder.volume.drivers import datera as cinder_volume_drivers_datera
|
||||
from cinder.volume.drivers.dell import dell_storagecenter_common as \
|
||||
cinder_volume_drivers_dell_dellstoragecentercommon
|
||||
@ -238,6 +239,7 @@ def list_opts():
|
||||
cinder_db_api.db_opts,
|
||||
cinder_scheduler_weights_volumenumber.
|
||||
volume_number_weight_opts,
|
||||
cinder_volume_drivers_coho.coho_opts,
|
||||
cinder_volume_drivers_xio.XIO_OPTS,
|
||||
cinder_volume_drivers_zfssa_zfssaiscsi.ZFSSA_OPTS,
|
||||
cinder_volume_driver.volume_opts,
|
||||
|
376
cinder/tests/unit/test_coho.py
Normal file
376
cinder/tests/unit/test_coho.py
Normal file
@ -0,0 +1,376 @@
|
||||
# Copyright (c) 2015 Coho Data, Inc.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
#
|
||||
|
||||
import binascii
|
||||
import errno
|
||||
import mock
|
||||
import os
|
||||
import six
|
||||
import socket
|
||||
import xdrlib
|
||||
|
||||
from cinder import exception
|
||||
from cinder import test
|
||||
from cinder.volume import configuration as conf
|
||||
from cinder.volume.drivers import coho
|
||||
from cinder.volume.drivers import nfs
|
||||
|
||||
ADDR = 'coho-datastream-addr'
|
||||
PATH = '/test/path'
|
||||
RPC_PORT = 2049
|
||||
LOCAL_PATH = '/opt/cinder/mnt/test/path'
|
||||
|
||||
VOLUME = {
|
||||
'name': 'volume-bcc48c61-9691-4e5f-897c-793686093190',
|
||||
'volume_id': 'bcc48c61-9691-4e5f-897c-793686093190',
|
||||
'size': 128,
|
||||
'volume_type': 'silver',
|
||||
'volume_type_id': 'test',
|
||||
'metadata': [{'key': 'type',
|
||||
'service_label': 'silver'}],
|
||||
'provider_location': None,
|
||||
'id': 'bcc48c61-9691-4e5f-897c-793686093190',
|
||||
'status': 'available',
|
||||
}
|
||||
|
||||
CLONE_VOL = VOLUME.copy()
|
||||
CLONE_VOL['size'] = 256
|
||||
|
||||
SNAPSHOT = {
|
||||
'name': 'snapshot-51dd4-8d8a-4aa9-9176-086c9d89e7fc',
|
||||
'id': '51dd4-8d8a-4aa9-9176-086c9d89e7fc',
|
||||
'size': 128,
|
||||
'volume_type': None,
|
||||
'provider_location': None,
|
||||
'volume_size': 128,
|
||||
'volume_name': 'volume-bcc48c61-9691-4e5f-897c-793686093190',
|
||||
'volume_id': 'bcc48c61-9691-4e5f-897c-793686093191',
|
||||
}
|
||||
|
||||
INVALID_SNAPSHOT = SNAPSHOT.copy()
|
||||
INVALID_SNAPSHOT['name'] = ''
|
||||
|
||||
INVALID_HEADER_BIN = binascii.unhexlify('800000')
|
||||
NO_REPLY_BIN = binascii.unhexlify(
|
||||
'aaaaa01000000010000000000000000000000003')
|
||||
MSG_DENIED_BIN = binascii.unhexlify(
|
||||
'00000a010000000110000000000000000000000000000003')
|
||||
PROC_UNAVAIL_BIN = binascii.unhexlify(
|
||||
'00000a010000000100000000000000000000000000000003')
|
||||
PROG_UNAVAIL_BIN = binascii.unhexlify(
|
||||
'000003c70000000100000000000000000000000000000001')
|
||||
PROG_MISMATCH_BIN = binascii.unhexlify(
|
||||
'00000f7700000001000000000000000000000000000000020000000100000001')
|
||||
GARBAGE_ARGS_BIN = binascii.unhexlify(
|
||||
'00000d6e0000000100000000000000000000000000000004')
|
||||
|
||||
|
||||
class CohoDriverTest(test.TestCase):
|
||||
"""Test Coho Data's NFS volume driver."""
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
super(CohoDriverTest, self).__init__(*args, **kwargs)
|
||||
|
||||
def setUp(self):
|
||||
super(CohoDriverTest, self).setUp()
|
||||
|
||||
self.context = mock.Mock()
|
||||
self.configuration = mock.Mock(spec=conf.Configuration)
|
||||
self.configuration.max_over_subscription_ratio = 20.0
|
||||
self.configuration.reserved_percentage = 0
|
||||
self.configuration.volume_backend_name = 'coho-1'
|
||||
self.configuration.coho_rpc_port = 2049
|
||||
self.configuration.nfs_shares_config = '/etc/cinder/coho_shares'
|
||||
self.configuration.nfs_sparsed_volumes = True
|
||||
self.configuration.nfs_mount_point_base = '/opt/stack/cinder/mnt'
|
||||
self.configuration.nfs_mount_options = None
|
||||
self.configuration.nas_ip = None
|
||||
self.configuration.nas_share_path = None
|
||||
self.configuration.nas_mount_options = None
|
||||
self.configuration.nfs_used_ratio = .95
|
||||
self.configuration.nfs_oversub_ratio = 1.0
|
||||
|
||||
def test_setup_failure_when_rpc_port_unconfigured(self):
|
||||
self.configuration.coho_rpc_port = None
|
||||
drv = coho.CohoDriver(configuration=self.configuration)
|
||||
|
||||
self.mock_object(coho, 'LOG')
|
||||
self.mock_object(nfs.NfsDriver, 'do_setup')
|
||||
|
||||
with self.assertRaisesRegex(exception.CohoException,
|
||||
".*Coho rpc port is not configured.*"):
|
||||
drv.do_setup(self.context)
|
||||
|
||||
self.assertTrue(coho.LOG.warning.called)
|
||||
self.assertTrue(nfs.NfsDriver.do_setup.called)
|
||||
|
||||
def test_setup_failure_when_coho_rpc_port_is_invalid(self):
|
||||
self.configuration.coho_rpc_port = 99999
|
||||
drv = coho.CohoDriver(configuration=self.configuration)
|
||||
|
||||
self.mock_object(coho, 'LOG')
|
||||
self.mock_object(nfs.NfsDriver, 'do_setup')
|
||||
|
||||
with self.assertRaisesRegex(exception.CohoException,
|
||||
"Invalid port number.*"):
|
||||
drv.do_setup(self.context)
|
||||
|
||||
self.assertTrue(coho.LOG.warning.called)
|
||||
self.assertTrue(nfs.NfsDriver.do_setup.called)
|
||||
|
||||
def test_create_snapshot(self):
|
||||
drv = coho.CohoDriver(configuration=self.configuration)
|
||||
|
||||
mock_rpc_client = self.mock_object(coho, 'CohoRPCClient')
|
||||
mock_get_volume_location = self.mock_object(coho.CohoDriver,
|
||||
'_get_volume_location')
|
||||
mock_get_volume_location.return_value = ADDR, PATH
|
||||
|
||||
drv.create_snapshot(SNAPSHOT)
|
||||
|
||||
mock_get_volume_location.assert_has_calls(
|
||||
[mock.call(SNAPSHOT['volume_id'])])
|
||||
mock_rpc_client.assert_has_calls(
|
||||
[mock.call(ADDR, self.configuration.coho_rpc_port),
|
||||
mock.call().create_snapshot(
|
||||
os.path.join(PATH, SNAPSHOT['volume_name']),
|
||||
SNAPSHOT['name'], 0)])
|
||||
|
||||
def test_delete_snapshot(self):
|
||||
drv = coho.CohoDriver(configuration=self.configuration)
|
||||
|
||||
mock_rpc_client = self.mock_object(coho, 'CohoRPCClient')
|
||||
mock_get_volume_location = self.mock_object(coho.CohoDriver,
|
||||
'_get_volume_location')
|
||||
mock_get_volume_location.return_value = ADDR, PATH
|
||||
|
||||
drv.delete_snapshot(SNAPSHOT)
|
||||
|
||||
mock_get_volume_location.assert_has_calls(
|
||||
[mock.call(SNAPSHOT['volume_id'])])
|
||||
mock_rpc_client.assert_has_calls(
|
||||
[mock.call(ADDR, self.configuration.coho_rpc_port),
|
||||
mock.call().delete_snapshot(SNAPSHOT['name'])])
|
||||
|
||||
def test_create_volume_from_snapshot(self):
|
||||
drv = coho.CohoDriver(configuration=self.configuration)
|
||||
|
||||
mock_rpc_client = self.mock_object(coho, 'CohoRPCClient')
|
||||
mock_find_share = self.mock_object(drv, '_find_share')
|
||||
mock_find_share.return_value = ADDR + ':' + PATH
|
||||
|
||||
drv.create_volume_from_snapshot(VOLUME, SNAPSHOT)
|
||||
|
||||
mock_find_share.assert_has_calls(
|
||||
[mock.call(VOLUME['size'])])
|
||||
mock_rpc_client.assert_has_calls(
|
||||
[mock.call(ADDR, self.configuration.coho_rpc_port),
|
||||
mock.call().create_volume_from_snapshot(
|
||||
SNAPSHOT['name'], os.path.join(PATH, VOLUME['name']))])
|
||||
|
||||
def test_create_cloned_volume(self):
|
||||
drv = coho.CohoDriver(configuration=self.configuration)
|
||||
|
||||
mock_find_share = self.mock_object(drv, '_find_share')
|
||||
mock_find_share.return_value = ADDR + ':' + PATH
|
||||
mock_execute = self.mock_object(drv, '_execute')
|
||||
mock_local_path = self.mock_object(drv, 'local_path')
|
||||
mock_local_path.return_value = LOCAL_PATH
|
||||
|
||||
drv.create_cloned_volume(VOLUME, CLONE_VOL)
|
||||
|
||||
mock_find_share.assert_has_calls(
|
||||
[mock.call(VOLUME['size'])])
|
||||
mock_local_path.assert_has_calls(
|
||||
[mock.call(VOLUME), mock.call(CLONE_VOL)])
|
||||
mock_execute.assert_has_calls(
|
||||
[mock.call('cp', LOCAL_PATH, LOCAL_PATH, run_as_root=True)])
|
||||
|
||||
def test_extend_volume(self):
|
||||
drv = coho.CohoDriver(configuration=self.configuration)
|
||||
|
||||
mock_execute = self.mock_object(drv, '_execute')
|
||||
mock_local_path = self.mock_object(drv, 'local_path')
|
||||
mock_local_path.return_value = LOCAL_PATH
|
||||
|
||||
drv.extend_volume(VOLUME, 512)
|
||||
|
||||
mock_local_path.assert_has_calls(
|
||||
[mock.call(VOLUME)])
|
||||
mock_execute.assert_has_calls(
|
||||
[mock.call('truncate', '-s', '512G',
|
||||
LOCAL_PATH, run_as_root=True)])
|
||||
|
||||
def test_snapshot_failure_when_source_does_not_exist(self):
|
||||
drv = coho.CohoDriver(configuration=self.configuration)
|
||||
|
||||
self.mock_object(coho.Client, '_make_call')
|
||||
mock_init_socket = self.mock_object(coho.Client, 'init_socket')
|
||||
mock_unpack_uint = self.mock_object(xdrlib.Unpacker, 'unpack_uint')
|
||||
mock_unpack_uint.return_value = errno.ENOENT
|
||||
mock_get_volume_location = self.mock_object(coho.CohoDriver,
|
||||
'_get_volume_location')
|
||||
mock_get_volume_location.return_value = ADDR, PATH
|
||||
|
||||
with self.assertRaisesRegex(exception.CohoException,
|
||||
"No such file or directory.*"):
|
||||
drv.create_snapshot(SNAPSHOT)
|
||||
|
||||
self.assertTrue(mock_init_socket.called)
|
||||
self.assertTrue(mock_unpack_uint.called)
|
||||
mock_get_volume_location.assert_has_calls(
|
||||
[mock.call(SNAPSHOT['volume_id'])])
|
||||
|
||||
def test_snapshot_failure_with_invalid_input(self):
|
||||
drv = coho.CohoDriver(configuration=self.configuration)
|
||||
|
||||
self.mock_object(coho.Client, '_make_call')
|
||||
mock_init_socket = self.mock_object(coho.Client, 'init_socket')
|
||||
mock_unpack_uint = self.mock_object(xdrlib.Unpacker, 'unpack_uint')
|
||||
mock_unpack_uint.return_value = errno.EINVAL
|
||||
mock_get_volume_location = self.mock_object(coho.CohoDriver,
|
||||
'_get_volume_location')
|
||||
mock_get_volume_location.return_value = ADDR, PATH
|
||||
|
||||
with self.assertRaisesRegex(exception.CohoException,
|
||||
"Invalid argument"):
|
||||
drv.delete_snapshot(INVALID_SNAPSHOT)
|
||||
|
||||
self.assertTrue(mock_init_socket.called)
|
||||
self.assertTrue(mock_unpack_uint.called)
|
||||
mock_get_volume_location.assert_has_calls(
|
||||
[mock.call(INVALID_SNAPSHOT['volume_id'])])
|
||||
|
||||
def test_snapshot_failure_when_remote_is_unreachable(self):
|
||||
drv = coho.CohoDriver(configuration=self.configuration)
|
||||
|
||||
mock_get_volume_location = self.mock_object(coho.CohoDriver,
|
||||
'_get_volume_location')
|
||||
mock_get_volume_location.return_value = 'uknown-address', PATH
|
||||
|
||||
with self.assertRaisesRegex(exception.CohoException,
|
||||
"Failed to establish connection.*"):
|
||||
drv.create_snapshot(SNAPSHOT)
|
||||
|
||||
mock_get_volume_location.assert_has_calls(
|
||||
[mock.call(INVALID_SNAPSHOT['volume_id'])])
|
||||
|
||||
def test_rpc_client_make_call_proper_order(self):
|
||||
"""This test ensures that the RPC client logic is correct.
|
||||
|
||||
When the RPC client's make_call function is called it creates
|
||||
a packet and sends it to the Coho cluster RPC server. This test
|
||||
ensures that the functions needed to complete the process are
|
||||
called in the proper order with valid arguments.
|
||||
"""
|
||||
|
||||
mock_packer = self.mock_object(xdrlib, 'Packer')
|
||||
mock_unpacker = self.mock_object(xdrlib, 'Unpacker')
|
||||
mock_unpacker.return_value.unpack_uint.return_value = 0
|
||||
mock_socket = self.mock_object(socket, 'socket')
|
||||
mock_init_call = self.mock_object(coho.Client, 'init_call')
|
||||
mock_init_call.return_value = (1, 2)
|
||||
mock_sendrecord = self.mock_object(coho.Client, '_sendrecord')
|
||||
mock_recvrecord = self.mock_object(coho.Client, '_recvrecord')
|
||||
mock_recvrecord.return_value = 'test_reply'
|
||||
mock_unpack_replyheader = self.mock_object(coho.Client,
|
||||
'unpack_replyheader')
|
||||
mock_unpack_replyheader.return_value = (123, 1)
|
||||
|
||||
rpc_client = coho.CohoRPCClient(ADDR, RPC_PORT)
|
||||
rpc_client.create_volume_from_snapshot('src', 'dest')
|
||||
|
||||
self.assertTrue(mock_sendrecord.called)
|
||||
self.assertTrue(mock_unpack_replyheader.called)
|
||||
mock_packer.assert_has_calls([mock.call().reset()])
|
||||
mock_unpacker.assert_has_calls(
|
||||
[mock.call().reset('test_reply'),
|
||||
mock.call().unpack_uint()])
|
||||
mock_socket.assert_has_calls(
|
||||
[mock.call(socket.AF_INET, socket.SOCK_STREAM),
|
||||
mock.call().bind(('', 0)),
|
||||
mock.call().connect((ADDR, RPC_PORT))])
|
||||
mock_init_call.assert_has_calls(
|
||||
[mock.call(coho.COHO1_CREATE_VOLUME_FROM_SNAPSHOT,
|
||||
[(six.b('src'), mock_packer().pack_string),
|
||||
(six.b('dest'), mock_packer().pack_string)])])
|
||||
|
||||
def test_rpc_client_error_in_reply_header(self):
|
||||
"""Ensure excpetions in reply header are raised by the RPC client.
|
||||
|
||||
Coho cluster's RPC server packs errors into the reply header.
|
||||
This test ensures that the RPC client parses the reply header
|
||||
correctly and raises exceptions on various errors that can be
|
||||
included in the reply header.
|
||||
"""
|
||||
mock_socket = self.mock_object(socket, 'socket')
|
||||
mock_recvrecord = self.mock_object(coho.Client, '_recvrecord')
|
||||
rpc_client = coho.CohoRPCClient(ADDR, RPC_PORT)
|
||||
|
||||
mock_recvrecord.return_value = NO_REPLY_BIN
|
||||
with self.assertRaisesRegex(exception.CohoException,
|
||||
"no REPLY.*"):
|
||||
rpc_client.create_snapshot('src', 'dest', 0)
|
||||
|
||||
mock_recvrecord.return_value = MSG_DENIED_BIN
|
||||
with self.assertRaisesRegex(exception.CohoException,
|
||||
".*MSG_DENIED.*"):
|
||||
rpc_client.delete_snapshot('snapshot')
|
||||
|
||||
mock_recvrecord.return_value = PROG_UNAVAIL_BIN
|
||||
with self.assertRaisesRegex(exception.CohoException,
|
||||
".*PROG_UNAVAIL"):
|
||||
rpc_client.delete_snapshot('snapshot')
|
||||
|
||||
mock_recvrecord.return_value = PROG_MISMATCH_BIN
|
||||
with self.assertRaisesRegex(exception.CohoException,
|
||||
".*PROG_MISMATCH.*"):
|
||||
rpc_client.delete_snapshot('snapshot')
|
||||
|
||||
mock_recvrecord.return_value = GARBAGE_ARGS_BIN
|
||||
with self.assertRaisesRegex(exception.CohoException,
|
||||
".*GARBAGE_ARGS"):
|
||||
rpc_client.delete_snapshot('snapshot')
|
||||
|
||||
mock_recvrecord.return_value = PROC_UNAVAIL_BIN
|
||||
with self.assertRaisesRegex(exception.CohoException,
|
||||
".*PROC_UNAVAIL"):
|
||||
rpc_client.delete_snapshot('snapshot')
|
||||
|
||||
self.assertTrue(mock_recvrecord.called)
|
||||
mock_socket.assert_has_calls(
|
||||
[mock.call(socket.AF_INET, socket.SOCK_STREAM),
|
||||
mock.call().bind(('', 0)),
|
||||
mock.call().connect((ADDR, RPC_PORT))])
|
||||
|
||||
def test_rpc_client_error_in_receive_fragment(self):
|
||||
"""Ensure exception is raised when malformed packet is recieved."""
|
||||
|
||||
mock_sendrcd = self.mock_object(coho.Client, '_sendrecord')
|
||||
mock_socket = self.mock_object(socket, 'socket')
|
||||
mock_socket.return_value.recv.return_value = INVALID_HEADER_BIN
|
||||
rpc_client = coho.CohoRPCClient(ADDR, RPC_PORT)
|
||||
|
||||
with self.assertRaisesRegex(exception.CohoException,
|
||||
"Invalid response header.*"):
|
||||
rpc_client.create_snapshot('src', 'dest', 0)
|
||||
|
||||
self.assertTrue(mock_sendrcd.called)
|
||||
mock_socket.assert_has_calls(
|
||||
[mock.call(socket.AF_INET, socket.SOCK_STREAM),
|
||||
mock.call().bind(('', 0)),
|
||||
mock.call().connect((ADDR, RPC_PORT)),
|
||||
mock.call().recv(4)])
|
397
cinder/volume/drivers/coho.py
Normal file
397
cinder/volume/drivers/coho.py
Normal file
@ -0,0 +1,397 @@
|
||||
# Copyright (c) 2015 Coho Data, Inc.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import os
|
||||
import six
|
||||
import socket
|
||||
import xdrlib
|
||||
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log as logging
|
||||
from random import randint
|
||||
|
||||
from cinder import exception
|
||||
from cinder.i18n import _
|
||||
from cinder.volume.drivers import nfs
|
||||
|
||||
#
|
||||
# RPC Definition
|
||||
#
|
||||
|
||||
RPCVERSION = 2
|
||||
|
||||
CALL = 0
|
||||
REPLY = 1
|
||||
|
||||
AUTH_NULL = 0
|
||||
|
||||
MSG_ACCEPTED = 0
|
||||
MSG_DENIED = 1
|
||||
|
||||
SUCCESS = 0
|
||||
PROG_UNAVAIL = 1
|
||||
PROG_MISMATCH = 2
|
||||
PROC_UNAVAIL = 3
|
||||
GARBAGE_ARGS = 4
|
||||
|
||||
RPC_MISMATCH = 0
|
||||
AUTH_ERROR = 1
|
||||
|
||||
COHO_PROGRAM = 400115
|
||||
COHO_V1 = 1
|
||||
COHO1_CREATE_SNAPSHOT = 1
|
||||
COHO1_DELETE_SNAPSHOT = 2
|
||||
COHO1_CREATE_VOLUME_FROM_SNAPSHOT = 3
|
||||
|
||||
#
|
||||
# Simple RPC Client
|
||||
#
|
||||
|
||||
|
||||
def make_auth_null():
|
||||
|
||||
return six.b('')
|
||||
|
||||
|
||||
class Client(object):
|
||||
|
||||
def __init__(self, address, prog, vers, port):
|
||||
self.packer = xdrlib.Packer()
|
||||
self.unpacker = xdrlib.Unpacker('')
|
||||
self.address = address
|
||||
self.prog = prog
|
||||
self.vers = vers
|
||||
self.port = port
|
||||
self.cred = None
|
||||
self.verf = None
|
||||
|
||||
self.init_socket()
|
||||
self.init_xid()
|
||||
|
||||
def init_socket(self):
|
||||
try:
|
||||
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
self.sock.bind(('', 0))
|
||||
self.sock.connect((self.address, self.port))
|
||||
except socket.error:
|
||||
msg = _('Failed to establish connection with Coho cluster')
|
||||
raise exception.CohoException(msg)
|
||||
|
||||
def init_xid(self):
|
||||
self.xid = randint(0, 4096)
|
||||
|
||||
def make_xid(self):
|
||||
self.xid += 1
|
||||
|
||||
def make_cred(self):
|
||||
if self.cred is None:
|
||||
self.cred = (AUTH_NULL, make_auth_null())
|
||||
return self.cred
|
||||
|
||||
def make_verf(self):
|
||||
if self.verf is None:
|
||||
self.verf = (AUTH_NULL, make_auth_null())
|
||||
return self.verf
|
||||
|
||||
def pack_auth(self, auth):
|
||||
flavor, stuff = auth
|
||||
self.packer.pack_enum(flavor)
|
||||
self.packer.pack_opaque(stuff)
|
||||
|
||||
def pack_callheader(self, xid, prog, vers, proc, cred, verf):
|
||||
self.packer.pack_uint(xid)
|
||||
self.packer.pack_enum(CALL)
|
||||
self.packer.pack_uint(RPCVERSION)
|
||||
self.packer.pack_uint(prog)
|
||||
self.packer.pack_uint(vers)
|
||||
self.packer.pack_uint(proc)
|
||||
self.pack_auth(cred)
|
||||
self.pack_auth(verf)
|
||||
|
||||
def unpack_auth(self):
|
||||
flavor = self.unpacker.unpack_enum()
|
||||
stuff = self.unpacker.unpack_opaque()
|
||||
return (flavor, stuff)
|
||||
|
||||
def unpack_replyheader(self):
|
||||
xid = self.unpacker.unpack_uint()
|
||||
mtype = self.unpacker.unpack_enum()
|
||||
if mtype != REPLY:
|
||||
raise exception.CohoException(
|
||||
_('no REPLY but %r') % (mtype,))
|
||||
stat = self.unpacker.unpack_enum()
|
||||
if stat == MSG_DENIED:
|
||||
stat = self.unpacker.unpack_enum()
|
||||
if stat == RPC_MISMATCH:
|
||||
low = self.unpacker.unpack_uint()
|
||||
high = self.unpacker.unpack_uint()
|
||||
raise exception.CohoException(
|
||||
_('MSG_DENIED: RPC_MISMATCH: %r') % ((low, high),))
|
||||
if stat == AUTH_ERROR:
|
||||
stat = self.unpacker.unpack_uint()
|
||||
raise exception.CohoException(
|
||||
_('MSG_DENIED: AUTH_ERROR: %r') % (stat,))
|
||||
raise exception.CohoException(_('MSG_DENIED: %r') % (stat,))
|
||||
if stat != MSG_ACCEPTED:
|
||||
raise exception.CohoException(
|
||||
_('Neither MSG_DENIED nor MSG_ACCEPTED: %r') % (stat,))
|
||||
verf = self.unpack_auth()
|
||||
stat = self.unpacker.unpack_enum()
|
||||
if stat == PROG_UNAVAIL:
|
||||
raise exception.CohoException(_('call failed: PROG_UNAVAIL'))
|
||||
if stat == PROG_MISMATCH:
|
||||
low = self.unpacker.unpack_uint()
|
||||
high = self.unpacker.unpack_uint()
|
||||
raise exception.CohoException(
|
||||
_('call failed: PROG_MISMATCH: %r') % ((low, high),))
|
||||
if stat == PROC_UNAVAIL:
|
||||
raise exception.CohoException(_('call failed: PROC_UNAVAIL'))
|
||||
if stat == GARBAGE_ARGS:
|
||||
raise exception.CohoException(_('call failed: GARBAGE_ARGS'))
|
||||
if stat != SUCCESS:
|
||||
raise exception.CohoException(_('call failed: %r') % (stat,))
|
||||
return xid, verf
|
||||
|
||||
def init_call(self, proc, args):
|
||||
self.make_xid()
|
||||
self.packer.reset()
|
||||
cred = self.make_cred()
|
||||
verf = self.make_verf()
|
||||
self.pack_callheader(self.xid, self.prog, self.vers, proc, cred, verf)
|
||||
|
||||
for arg, func in args:
|
||||
func(arg)
|
||||
|
||||
return self.xid, self.packer.get_buf()
|
||||
|
||||
def _sendfrag(self, last, frag):
|
||||
x = len(frag)
|
||||
if last:
|
||||
x = x | 0x80000000
|
||||
header = (six.int2byte(int(x >> 24 & 0xff)) +
|
||||
six.int2byte(int(x >> 16 & 0xff)) +
|
||||
six.int2byte(int(x >> 8 & 0xff)) +
|
||||
six.int2byte(int(x & 0xff)))
|
||||
self.sock.send(header + frag)
|
||||
|
||||
def _sendrecord(self, record):
|
||||
self._sendfrag(1, record)
|
||||
|
||||
def _recvfrag(self):
|
||||
header = self.sock.recv(4)
|
||||
if len(header) < 4:
|
||||
raise exception.CohoException(
|
||||
_('Invalid response header from RPC server'))
|
||||
x = (six.indexbytes(header, 0) << 24 |
|
||||
six.indexbytes(header, 1) << 16 |
|
||||
six.indexbytes(header, 2) << 8 |
|
||||
six.indexbytes(header, 3))
|
||||
last = ((x & 0x80000000) != 0)
|
||||
n = int(x & 0x7fffffff)
|
||||
frag = six.b('')
|
||||
while n > 0:
|
||||
buf = self.sock.recv(n)
|
||||
if not buf:
|
||||
raise exception.CohoException(
|
||||
_('RPC server response is incomplete'))
|
||||
n = n - len(buf)
|
||||
frag = frag + buf
|
||||
return last, frag
|
||||
|
||||
def _recvrecord(self):
|
||||
record = six.b('')
|
||||
last = 0
|
||||
while not last:
|
||||
last, frag = self._recvfrag()
|
||||
record = record + frag
|
||||
return record
|
||||
|
||||
def _make_call(self, proc, args):
|
||||
self.packer.reset()
|
||||
xid, call = self.init_call(proc, args)
|
||||
self._sendrecord(call)
|
||||
reply = self._recvrecord()
|
||||
self.unpacker.reset(reply)
|
||||
xid, verf = self.unpack_replyheader()
|
||||
|
||||
def _call(self, proc, args):
|
||||
self._make_call(proc, args)
|
||||
res = self.unpacker.unpack_uint()
|
||||
if res != SUCCESS:
|
||||
raise exception.CohoException(os.strerror(res))
|
||||
|
||||
|
||||
class CohoRPCClient(Client):
|
||||
|
||||
def __init__(self, address, port):
|
||||
Client.__init__(self, address, COHO_PROGRAM, 1, port)
|
||||
|
||||
def create_snapshot(self, src, dst, flags):
|
||||
self._call(COHO1_CREATE_SNAPSHOT,
|
||||
[(six.b(src), self.packer.pack_string),
|
||||
(six.b(dst), self.packer.pack_string),
|
||||
(flags, self.packer.pack_uint)])
|
||||
|
||||
def delete_snapshot(self, name):
|
||||
self._call(COHO1_DELETE_SNAPSHOT,
|
||||
[(six.b(name), self.packer.pack_string)])
|
||||
|
||||
def create_volume_from_snapshot(self, src, dst):
|
||||
self._call(COHO1_CREATE_VOLUME_FROM_SNAPSHOT,
|
||||
[(six.b(src), self.packer.pack_string),
|
||||
(six.b(dst), self.packer.pack_string)])
|
||||
|
||||
|
||||
#
|
||||
# Coho Data Volume Driver
|
||||
#
|
||||
|
||||
VERSION = '1.0.0'
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
coho_opts = [
|
||||
cfg.IntOpt('coho_rpc_port',
|
||||
default=2049,
|
||||
help='RPC port to connect to Coha Data MicroArray')
|
||||
]
|
||||
|
||||
CONF = cfg.CONF
|
||||
CONF.register_opts(coho_opts)
|
||||
|
||||
|
||||
class CohoDriver(nfs.NfsDriver):
|
||||
"""Coho Data NFS based cinder driver.
|
||||
|
||||
Creates file on NFS share for using it as block device on hypervisor.
|
||||
Version history:
|
||||
1.0.0 - Initial driver
|
||||
"""
|
||||
|
||||
# We have to overload this attribute of RemoteFSDriver because
|
||||
# unfortunately the base method doesn't accept exports of the form:
|
||||
# <address>:/
|
||||
# It expects a non blank export name following the /.
|
||||
# We are more permissive.
|
||||
SHARE_FORMAT_REGEX = r'.+:/.*'
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
super(CohoDriver, self).__init__(*args, **kwargs)
|
||||
self.configuration.append_config_values(coho_opts)
|
||||
self._execute_as_root = True
|
||||
self._rpcclients = dict()
|
||||
self._backend_name = (self.configuration.volume_backend_name or
|
||||
self.__class__.__name__)
|
||||
|
||||
def _init_rpcclient(self, addr, port):
|
||||
client = CohoRPCClient(addr, port)
|
||||
self._rpcclients[(addr, port)] = client
|
||||
return client
|
||||
|
||||
def _get_rpcclient(self, addr, port):
|
||||
if (addr, port) in self._rpcclients:
|
||||
return self._rpcclients[(addr, port)]
|
||||
return self._init_rpcclient(addr, port)
|
||||
|
||||
def do_setup(self, context):
|
||||
"""Any initialization the volume driver does while starting."""
|
||||
super(CohoDriver, self).do_setup(context)
|
||||
self._context = context
|
||||
|
||||
config = self.configuration.coho_rpc_port
|
||||
if not config:
|
||||
msg = _("Coho rpc port is not configured")
|
||||
LOG.warning(msg)
|
||||
raise exception.CohoException(msg)
|
||||
if config < 1 or config > 65535:
|
||||
msg = (_("Invalid port number %(config)s for Coho rpc port") %
|
||||
{'config': config})
|
||||
LOG.warning(msg)
|
||||
raise exception.CohoException(msg)
|
||||
|
||||
def _do_clone_volume(self, volume, src):
|
||||
"""Clone volume to source.
|
||||
|
||||
Create a volume on given remote share with the same contents
|
||||
as the specified source.
|
||||
"""
|
||||
volume_path = self.local_path(volume)
|
||||
source_path = self.local_path(src)
|
||||
|
||||
self._execute('cp', source_path, volume_path,
|
||||
run_as_root=self._execute_as_root)
|
||||
|
||||
def _get_volume_location(self, volume_id):
|
||||
"""Returns provider location for given volume."""
|
||||
|
||||
# The driver should not directly access db, but since volume is not
|
||||
# passed in create_snapshot and delete_snapshot we are forced to read
|
||||
# the volume info from the database
|
||||
volume = self.db.volume_get(self._context, volume_id)
|
||||
addr, path = volume.provider_location.split(":")
|
||||
return addr, path
|
||||
|
||||
def create_snapshot(self, snapshot):
|
||||
"""Create a volume snapshot."""
|
||||
addr, path = self._get_volume_location(snapshot['volume_id'])
|
||||
volume_path = os.path.join(path, snapshot['volume_name'])
|
||||
snapshot_name = snapshot['name']
|
||||
flags = 0 # unused at this time
|
||||
client = self._get_rpcclient(addr, self.configuration.coho_rpc_port)
|
||||
client.create_snapshot(volume_path, snapshot_name, flags)
|
||||
|
||||
def delete_snapshot(self, snapshot):
|
||||
"""Delete a volume snapshot."""
|
||||
addr, path = self._get_volume_location(snapshot['volume_id'])
|
||||
snapshot_name = snapshot['name']
|
||||
client = self._get_rpcclient(addr, self.configuration.coho_rpc_port)
|
||||
client.delete_snapshot(snapshot_name)
|
||||
|
||||
def create_volume_from_snapshot(self, volume, snapshot):
|
||||
"""Create a volume from a snapshot."""
|
||||
volume['provider_location'] = self._find_share(volume['size'])
|
||||
addr, path = volume['provider_location'].split(":")
|
||||
volume_path = os.path.join(path, volume['name'])
|
||||
snapshot_name = snapshot['name']
|
||||
client = self._get_rpcclient(addr, self.configuration.coho_rpc_port)
|
||||
client.create_volume_from_snapshot(snapshot_name, volume_path)
|
||||
|
||||
return {'provider_location': volume['provider_location']}
|
||||
|
||||
def _extend_file_sparse(self, path, size):
|
||||
"""Extend the size of a file (with no additional disk usage)."""
|
||||
self._execute('truncate', '-s', '%sG' % size,
|
||||
path, run_as_root=self._execute_as_root)
|
||||
|
||||
def create_cloned_volume(self, volume, src_vref):
|
||||
volume['provider_location'] = self._find_share(volume['size'])
|
||||
|
||||
self._do_clone_volume(volume, src_vref)
|
||||
|
||||
def extend_volume(self, volume, new_size):
|
||||
"""Extend the specified file to the new_size (sparsely)."""
|
||||
volume_path = self.local_path(volume)
|
||||
|
||||
self._extend_file_sparse(volume_path, new_size)
|
||||
|
||||
def get_volume_stats(self, refresh):
|
||||
"""Pass in Coho Data information in volume stats."""
|
||||
_stats = super(CohoDriver, self).get_volume_stats(refresh)
|
||||
_stats["vendor_name"] = 'Coho Data'
|
||||
_stats["driver_version"] = VERSION
|
||||
_stats["storage_protocol"] = 'NFS'
|
||||
_stats["volume_backend_name"] = self._backend_name
|
||||
|
||||
return _stats
|
3
releasenotes/notes/add-coho-driver-b4472bff3f64aa41.yaml
Normal file
3
releasenotes/notes/add-coho-driver-b4472bff3f64aa41.yaml
Normal file
@ -0,0 +1,3 @@
|
||||
---
|
||||
features:
|
||||
- Added backend driver for Coho Data storage.
|
@ -48,6 +48,7 @@ cinder.tests.unit.test_block_device
|
||||
cinder.tests.unit.test_blockbridge
|
||||
cinder.tests.unit.test_cloudbyte
|
||||
cinder.tests.unit.test_cmd
|
||||
cinder.tests.unit.test_coho
|
||||
cinder.tests.unit.test_conf
|
||||
cinder.tests.unit.test_context
|
||||
cinder.tests.unit.test_db_api
|
||||
|
Loading…
Reference in New Issue
Block a user