Add sheepdog support

This patch is to add sheepdog connection in os-brick.

Most of codes are referred from sheepdog in volume drivers.

Change-Id: I358e66e11ba1c44741fdaed41eb7eb66706f92fb
This commit is contained in:
lisali 2016-02-16 10:08:55 +00:00
parent 71e5745fef
commit a168bd08c3
5 changed files with 420 additions and 0 deletions

View File

@ -132,3 +132,12 @@ class VolumeGroupCreationFailed(BrickException):
class CommandExecutionFailed(BrickException):
message = _("Failed to execute command %(cmd)s")
class VolumeDriverException(BrickException):
message = _('An error occurred while IO to volume %(name)s.')
class InvalidIOHandleObject(BrickException):
message = _('IO handle of %(protocol)s has wrong object '
'type %(actual_type)s.')

View File

@ -53,6 +53,7 @@ from os_brick.initiator import host_driver
from os_brick.initiator import linuxfc
from os_brick.initiator import linuxrbd
from os_brick.initiator import linuxscsi
from os_brick.initiator import linuxsheepdog
from os_brick.remotefs import remotefs
from os_brick.i18n import _, _LE, _LI, _LW
@ -80,6 +81,7 @@ SCALITY = "SCALITY"
QUOBYTE = "QUOBYTE"
DISCO = "DISCO"
VZSTORAGE = "VZSTORAGE"
SHEEPDOG = "SHEEPDOG"
def _check_multipathd_running(root_helper, enforce_multipath):
@ -265,6 +267,12 @@ class InitiatorConnector(executor.Executor):
device_scan_attempts=device_scan_attempts,
*args, **kwargs
)
elif protocol == SHEEPDOG:
return SheepdogConnector(root_helper=root_helper,
driver=driver,
execute=execute,
device_scan_attempts=device_scan_attempts,
*args, **kwargs)
else:
msg = (_("Invalid InitiatorConnector protocol "
"specified %(protocol)s") %
@ -466,6 +474,13 @@ class InitiatorConnector(executor.Executor):
else:
return []
def check_IO_handle_valid(self, handle, data_type, protocol):
"""Check IO handle has correct data type."""
if (handle and not isinstance(handle, data_type)):
raise exception.InvalidIOHandleObject(
protocol=protocol,
actual_type=type(handle))
class FakeConnector(InitiatorConnector):
@ -2989,3 +3004,99 @@ class DISCOConnector(InitiatorConnector):
def extend_volume(self, connection_properties):
raise NotImplementedError
class SheepdogConnector(InitiatorConnector):
""""Connector class to attach/detach sheepdog volumes."""
def __init__(self, root_helper, driver=None,
execute=putils.execute, use_multipath=False,
device_scan_attempts=DEVICE_SCAN_ATTEMPTS_DEFAULT,
*args, **kwargs):
super(SheepdogConnector, self).__init__(root_helper, driver=driver,
execute=execute,
device_scan_attempts=
device_scan_attempts,
*args, **kwargs)
def get_volume_paths(self, connection_properties):
# TODO(lixiaoy1): don't know where the connector
# looks for sheepdog volumes.
return []
def get_search_path(self):
# TODO(lixiaoy1): don't know where the connector
# looks for sheepdog volumes.
return None
def get_all_available_volumes(self, connection_properties=None):
# TODO(lixiaoy1): not sure what to return here for sheepdog
return []
def _get_sheepdog_handle(self, connection_properties):
try:
host = connection_properties['hosts'][0]
name = connection_properties['name']
port = connection_properties['ports'][0]
except IndexError:
msg = _("Connect volume failed, malformed connection properties")
raise exception.BrickException(msg=msg)
sheepdog_handle = linuxsheepdog.SheepdogVolumeIOWrapper(
host, port, name)
return sheepdog_handle
def connect_volume(self, connection_properties):
"""Connect to a volume.
:param connection_properties: The dictionary that describes all
of the target volume attributes.
:type connection_properties: dict
:returns: dict
"""
sheepdog_handle = self._get_sheepdog_handle(connection_properties)
return {'path': sheepdog_handle}
def disconnect_volume(self, connection_properties, device_info):
"""Disconnect a volume.
:param connection_properties: The dictionary that describes all
of the target volume attributes.
:type connection_properties: dict
:param device_info: historical difference, but same as connection_props
:type device_info: dict
"""
if device_info:
sheepdog_handle = device_info.get('path', None)
self.check_IO_handle_valid(sheepdog_handle,
linuxsheepdog.SheepdogVolumeIOWrapper,
'Sheepdog')
if sheepdog_handle is not None:
sheepdog_handle.close()
def check_valid_device(self, path, run_as_root=True):
"""Verify an existing sheepdog handle is connected and valid."""
sheepdog_handle = path
if sheepdog_handle is None:
return False
original_offset = sheepdog_handle.tell()
try:
sheepdog_handle.read(4096)
except Exception as e:
LOG.error(_LE("Failed to access sheepdog device "
"handle: %(error)s"),
{"error": e})
return False
finally:
sheepdog_handle.seek(original_offset, 0)
return True
def extend_volume(self, connection_properties):
# TODO(lixiaoy1): is this possible?
raise NotImplementedError

View File

@ -0,0 +1,118 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Generic SheepDog Connection Utilities.
"""
import eventlet
import io
from oslo_concurrency import processutils
from oslo_log import log as logging
from os_brick import exception
from os_brick.i18n import _
LOG = logging.getLogger(__name__)
class SheepdogVolumeIOWrapper(io.RawIOBase):
"""File-like object with Sheepdog backend."""
def __init__(self, addr, port, volume, snapshot_name=None):
self._addr = addr
self._port = port
self._vdiname = volume
self._snapshot_name = snapshot_name
self._offset = 0
# SheepdogVolumeIOWrapper instance becomes invalid
# if a write error occurs.
self._valid = True
def _execute(self, cmd, data=None):
try:
# NOTE(yamada-h): processutils.execute causes busy waiting
# under eventlet.
# To avoid wasting CPU resources, it should not be used for
# the command which takes long time to execute.
# For workaround, we replace a subprocess module with
# the original one while only executing a read/write command.
_processutils_subprocess = processutils.subprocess
processutils.subprocess = eventlet.patcher.original('subprocess')
return processutils.execute(*cmd, process_input=data)[0]
except (processutils.ProcessExecutionError, OSError):
self._valid = False
raise exception.VolumeDriverException(name=self._vdiname)
finally:
processutils.subprocess = _processutils_subprocess
def read(self, length=None):
if not self._valid:
raise exception.VolumeDriverException(name=self._vdiname)
cmd = ['dog', 'vdi', 'read', '-a', self._addr, '-p', self._port]
if self._snapshot_name:
cmd.extend(('-s', self._snapshot_name))
cmd.extend((self._vdiname, self._offset))
if length:
cmd.append(length)
data = self._execute(cmd)
self._offset += len(data)
return data
def write(self, data):
if not self._valid:
raise exception.VolumeDriverException(name=self._vdiname)
length = len(data)
cmd = ('dog', 'vdi', 'write', '-a', self._addr, '-p', self._port,
self._vdiname, self._offset, length)
self._execute(cmd, data)
self._offset += length
return length
def seek(self, offset, whence=0):
if not self._valid:
raise exception.VolumeDriverException(name=self._vdiname)
if whence == 0:
# SEEK_SET or 0 - start of the stream (the default);
# offset should be zero or positive
new_offset = offset
elif whence == 1:
# SEEK_CUR or 1 - current stream position; offset may be negative
new_offset = self._offset + offset
else:
# SEEK_END or 2 - end of the stream; offset is usually negative
# TODO(yamada-h): Support SEEK_END
raise IOError(_("Invalid argument - whence=%s not supported.") %
whence)
if new_offset < 0:
raise IOError(_("Invalid argument - negative seek offset."))
self._offset = new_offset
def tell(self):
return self._offset
def flush(self):
pass
def fileno(self):
"""Sheepdog does not have support for fileno so we raise IOError.
Raising IOError is recommended way to notify caller that interface is
not supported - see http://docs.python.org/2/library/io.html#io.IOBase
"""
raise IOError(_("fileno is not supported by SheepdogVolumeIOWrapper"))

View File

@ -35,6 +35,7 @@ from os_brick.initiator import host_driver
from os_brick.initiator import linuxfc
from os_brick.initiator import linuxrbd
from os_brick.initiator import linuxscsi
from os_brick.initiator import linuxsheepdog
from os_brick.remotefs import remotefs
from os_brick.tests import base
@ -2600,3 +2601,63 @@ class DISCOConnectorTestCase(ConnectorTestCase):
self.assertRaises(NotImplementedError,
self.connector.extend_volume,
self.fake_connection_properties)
class SheepdogConnectorTestCase(ConnectorTestCase):
def setUp(self):
super(SheepdogConnectorTestCase, self).setUp()
self.hosts = ['fake_hosts']
self.ports = ['fake_ports']
self.volume = 'fake_volume'
self.connection_properties = {
'hosts': self.hosts,
'name': self.volume,
'ports': self.ports,
}
def test_get_search_path(self):
sheepdog = connector.SheepdogConnector(None)
path = sheepdog.get_search_path()
self.assertIsNone(path)
def test_get_volume_paths(self):
sheepdog = connector.SheepdogConnector(None)
expected = []
actual = sheepdog.get_volume_paths(self.connection_properties)
self.assertEqual(expected, actual)
def test_connect_volume(self):
"""Test the connect volume case."""
sheepdog = connector.SheepdogConnector(None)
device_info = sheepdog.connect_volume(self.connection_properties)
# Ensure expected object is returned correctly
self.assertTrue(isinstance(device_info['path'],
linuxsheepdog.SheepdogVolumeIOWrapper))
@mock.patch.object(linuxsheepdog.SheepdogVolumeIOWrapper, 'close')
def test_disconnect_volume(self, volume_close):
"""Test the disconnect volume case."""
sheepdog = connector.SheepdogConnector(None)
device_info = sheepdog.connect_volume(self.connection_properties)
sheepdog.disconnect_volume(self.connection_properties, device_info)
self.assertEqual(1, volume_close.call_count)
def test_disconnect_volume_with_invalid_handle(self):
"""Test the disconnect volume case with invalid handle."""
sheepdog = connector.SheepdogConnector(None)
device_info = {'path': 'fake_handle'}
self.assertRaises(exception.InvalidIOHandleObject,
sheepdog.disconnect_volume,
self.connection_properties,
device_info)
def test_extend_volume(self):
sheepdog = connector.SheepdogConnector(None)
self.assertRaises(NotImplementedError,
sheepdog.extend_volume,
self.connection_properties)

View File

@ -0,0 +1,121 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from os_brick import exception
from os_brick.initiator import linuxsheepdog
from os_brick.tests import base
from oslo_concurrency import processutils
SHEEP_ADDR = '127.0.0.1'
SHEEP_PORT = 7000
class SheepdogVolumeIOWrapperTestCase(base.TestCase):
def setUp(self):
super(SheepdogVolumeIOWrapperTestCase, self).setUp()
self.volume = 'volume-2f9b2ff5-987b-4412-a91c-23caaf0d5aff'
self.snapshot_name = 'snapshot-bf452d80-068a-43d7-ba9f-196cf47bd0be'
self.vdi_wrapper = linuxsheepdog.SheepdogVolumeIOWrapper(
SHEEP_ADDR, SHEEP_PORT, self.volume)
self.snapshot_wrapper = linuxsheepdog.SheepdogVolumeIOWrapper(
SHEEP_ADDR, SHEEP_PORT, self.volume, self.snapshot_name)
self.execute = mock.MagicMock()
self.mock_object(processutils, 'execute', self.execute)
def test_init(self):
self.assertEqual(self.volume, self.vdi_wrapper._vdiname)
self.assertIsNone(self.vdi_wrapper._snapshot_name)
self.assertEqual(0, self.vdi_wrapper._offset)
self.assertEqual(self.snapshot_name,
self.snapshot_wrapper._snapshot_name)
def test_execute(self):
cmd = ('cmd1', 'arg1')
data = 'data1'
self.vdi_wrapper._execute(cmd, data)
self.execute.assert_called_once_with(*cmd, process_input=data)
def test_execute_error(self):
cmd = ('cmd1', 'arg1')
data = 'data1'
self.mock_object(processutils, 'execute',
mock.MagicMock(side_effect=OSError))
args = (cmd, data)
self.assertRaises(exception.VolumeDriverException,
self.vdi_wrapper._execute,
*args)
def test_read_vdi(self):
self.vdi_wrapper.read()
self.execute.assert_called_once_with(
'dog', 'vdi', 'read', '-a', SHEEP_ADDR, '-p', SHEEP_PORT,
self.volume, 0, process_input=None)
def test_read_vdi_invalid(self):
self.vdi_wrapper._valid = False
self.assertRaises(exception.VolumeDriverException,
self.vdi_wrapper.read)
def test_write_vdi(self):
data = 'data1'
self.vdi_wrapper.write(data)
self.execute.assert_called_once_with(
'dog', 'vdi', 'write', '-a', SHEEP_ADDR, '-p', SHEEP_PORT,
self.volume, 0, len(data),
process_input=data)
self.assertEqual(len(data), self.vdi_wrapper.tell())
def test_write_vdi_invalid(self):
self.vdi_wrapper._valid = False
self.assertRaises(exception.VolumeDriverException,
self.vdi_wrapper.write, 'dummy_data')
def test_read_snapshot(self):
self.snapshot_wrapper.read()
self.execute.assert_called_once_with(
'dog', 'vdi', 'read', '-a', SHEEP_ADDR, '-p', SHEEP_PORT,
'-s', self.snapshot_name, self.volume, 0,
process_input=None)
def test_seek(self):
self.vdi_wrapper.seek(12345)
self.assertEqual(12345, self.vdi_wrapper.tell())
self.vdi_wrapper.seek(-2345, whence=1)
self.assertEqual(10000, self.vdi_wrapper.tell())
# This results in negative offset.
self.assertRaises(IOError, self.vdi_wrapper.seek, -20000, whence=1)
def test_seek_invalid(self):
seek_num = 12345
self.vdi_wrapper._valid = False
self.assertRaises(exception.VolumeDriverException,
self.vdi_wrapper.seek, seek_num)
def test_flush(self):
# flush does nothing.
self.vdi_wrapper.flush()
self.assertFalse(self.execute.called)
def test_fileno(self):
self.assertRaises(IOError, self.vdi_wrapper.fileno)