From a3261e3fbd7f0230eba2a205ec99906505df1c7c Mon Sep 17 00:00:00 2001 From: Thelo Gaultier Date: Fri, 4 Dec 2015 15:14:31 +0800 Subject: [PATCH] Add connector for ITRI DISCO cinder driver This commit adds the os-brick connector for the ITRI DISCO cinder driver This commit also includes the ITRI DISCO connector unit test. The cinder driver itself has been commited in cinder but not merged yet. ( ref : https://review.openstack.org/#/c/253356/ ) I first commit this connector, then the part in nova ( ref : https://review.openstack.org/#/c/253353/ ) finally I committed the connector in cinder Note that the patch for nova requires this commit to be merged to pass the unit test. Change-Id: I81036a58ab334a7e047f7fa5486c11fd19d24b8f Implements: blueprint disco-driver-cinder --- os_brick/initiator/connector.py | 178 +++++++++++++++++++++ os_brick/tests/initiator/test_connector.py | 132 +++++++++++++++ 2 files changed, 310 insertions(+) diff --git a/os_brick/initiator/connector.py b/os_brick/initiator/connector.py index ae58d3e9d..eaa2e2667 100644 --- a/os_brick/initiator/connector.py +++ b/os_brick/initiator/connector.py @@ -29,6 +29,7 @@ import platform import re import requests import socket +import struct import sys import tempfile import time @@ -77,6 +78,7 @@ RBD = "RBD" SCALEIO = "SCALEIO" SCALITY = "SCALITY" QUOBYTE = "QUOBYTE" +DISCO = "DISCO" def _check_multipathd_running(root_helper, enforce_multipath): @@ -248,6 +250,13 @@ class InitiatorConnector(executor.Executor): *args, **kwargs) elif protocol == SCALEIO: return ScaleIOConnector( + root_helper=root_helper, + driver=driver, + execute=execute, + device_scan_attempts=device_scan_attempts, + *args, **kwargs) + elif protocol == DISCO: + return DISCOConnector( root_helper=root_helper, driver=driver, execute=execute, @@ -2797,3 +2806,172 @@ class ScaleIOConnector(InitiatorConnector): def extend_volume(self, connection_properties): # TODO(walter-boring): is this possible? raise NotImplementedError + + +class DISCOConnector(InitiatorConnector): + """Class implements the connector driver for DISCO.""" + + DISCO_PREFIX = 'dms' + + def __init__(self, root_helper, driver=None, execute=putils.execute, + device_scan_attempts=DEVICE_SCAN_ATTEMPTS_DEFAULT, + *args, **kwargs): + """Init DISCO connector.""" + super(DISCOConnector, self).__init__( + root_helper, + driver=driver, + execute=execute, + device_scan_attempts=device_scan_attempts, + *args, **kwargs + ) + LOG.info(_LI("Init DISCO connector")) + + self.server_port = None + self.server_ip = None + + def get_search_path(self): + """Get directory path where to get DISCO volumes.""" + return "/dev" + + def get_volume_paths(self, connection_properties): + """Get config for DISCO volume driver.""" + self.get_config(connection_properties) + volume_paths = [] + disco_id = connection_properties['disco_id'] + disco_dev = '/dev/dms%s' % (disco_id) + device_paths = [disco_dev] + for path in device_paths: + if os.path.exists(path): + volume_paths.append(path) + return volume_paths + + def get_all_available_volumes(self, connection_properties=None): + """Return all DISCO volumes that exist in the search directory.""" + path = self.get_search_path() + + if os.path.isdir(path): + path_items = [path, '/', self.DISCO_PREFIX, '*'] + file_filter = ''.join(path_items) + return glob.glob(file_filter) + else: + return [] + + def get_config(self, connection_properties): + """Get config for DISCO volume driver.""" + self.server_port = ( + six.text_type(connection_properties['conf']['server_port'])) + self.server_ip = ( + six.text_type(connection_properties['conf']['server_ip'])) + + disco_id = connection_properties['disco_id'] + disco_dev = '/dev/dms%s' % (disco_id) + device_info = {'type': 'block', + 'path': disco_dev} + return device_info + + @synchronized('connect_volume') + def connect_volume(self, connection_properties): + """Connect the volume. Returns xml for libvirt.""" + LOG.debug("Enter in DISCO connect_volume") + device_info = self.get_config(connection_properties) + LOG.debug("Device info : %s.", device_info) + disco_id = connection_properties['disco_id'] + disco_dev = '/dev/dms%s' % (disco_id) + LOG.debug("Attaching %s", disco_dev) + + self._mount_disco_volume(disco_dev, disco_id) + return device_info + + @synchronized('connect_volume') + def disconnect_volume(self, connection_properties, device_info): + """Detach the volume from instance.""" + disco_id = connection_properties['disco_id'] + disco_dev = '/dev/dms%s' % (disco_id) + LOG.debug("detaching %s", disco_dev) + + if os.path.exists(disco_dev): + ret = self._send_disco_vol_cmd(self.server_ip, + self.server_port, + 2, + disco_id) + if ret is not None: + msg = _("Detach volume failed") + raise exception.BrickException(message=msg) + else: + LOG.info(_LI("Volume already detached from host")) + + def _mount_disco_volume(self, path, volume_id): + """Send request to mount volume on physical host.""" + LOG.debug("Enter in mount disco volume %(port)s " + "and %(ip)s." % + {'port': self.server_port, + 'ip': self.server_ip}) + + if not os.path.exists(path): + ret = self._send_disco_vol_cmd(self.server_ip, + self.server_port, + 1, + volume_id) + if ret is not None: + msg = _("Attach volume failed") + raise exception.BrickException(message=msg) + else: + LOG.info(_LI("Volume already attached to host")) + + def _connect_tcp_socket(self, client_ip, client_port): + """Connect to TCP socket.""" + sock = None + + for res in socket.getaddrinfo(client_ip, + client_port, + socket.AF_UNSPEC, + socket.SOCK_STREAM): + aff, socktype, proto, canonname, saa = res + try: + sock = socket.socket(aff, socktype, proto) + except socket.error: + sock = None + continue + try: + sock.connect(saa) + except socket.error: + sock.close() + sock = None + continue + break + + if sock is None: + LOG.error(_LE("Cannot connect TCP socket")) + return sock + + def _send_disco_vol_cmd(self, client_ip, client_port, op_code, vol_id): + """Send DISCO client socket command.""" + s = self._connect_tcp_socket(client_ip, int(client_port)) + + if s is not None: + inst_id = 'DEFAULT-INSTID' + pktlen = 2 + 8 + len(inst_id) + LOG.debug("pktlen=%(plen)s op=%(op)s " + "vol_id=%(vol_id)s, inst_id=%(inst_id)s", + {'plen': pktlen, 'op': op_code, + 'vol_id': vol_id, 'inst_id': inst_id}) + data = struct.pack("!HHQ14s", + pktlen, + op_code, + int(vol_id), + inst_id) + s.sendall(data) + ret = s.recv(4) + s.close() + + LOG.debug("Received ret len=%(lenR)d, ret=%(ret)s", + {'lenR': len(repr(ret)), 'ret': repr(ret)}) + + ret_val = "".join("%02x" % ord(c) for c in ret) + + if ret_val != '00000000': + return 'ERROR' + return None + + def extend_volume(self, connection_properties): + raise NotImplementedError diff --git a/os_brick/tests/initiator/test_connector.py b/os_brick/tests/initiator/test_connector.py index 0418f6f78..56c28642f 100644 --- a/os_brick/tests/initiator/test_connector.py +++ b/os_brick/tests/initiator/test_connector.py @@ -175,6 +175,9 @@ class ConnectorTestCase(base.TestCase): 'quobyte', None, quobyte_mount_point_base='/mnt/test') self.assertEqual(obj.__class__.__name__, "RemoteFsConnector") + obj = connector.InitiatorConnector.factory("disco", None) + self.assertEqual("DISCOConnector", obj.__class__.__name__) + self.assertRaises(ValueError, connector.InitiatorConnector.factory, "bogus", None) @@ -2430,3 +2433,132 @@ class ScaleIOConnectorTestCase(ConnectorTestCase): self.assertRaises(NotImplementedError, self.connector.extend_volume, self.fake_connection_properties) + + +class DISCOConnectorTestCase(ConnectorTestCase): + """Test cases for DISCO connector.""" + + # Fake volume information + volume = { + 'name': 'a-disco-volume', + 'disco_id': '1234567' + } + + # Conf for test + conf = { + 'ip': MY_IP, + 'port': 9898 + } + + def setUp(self): + super(DISCOConnectorTestCase, self).setUp() + + self.fake_connection_properties = { + 'name': self.volume['name'], + 'disco_id': self.volume['disco_id'], + 'conf': { + 'server_ip': self.conf['ip'], + 'server_port': self.conf['port']} + } + + self.fake_volume_status = {'attached': True, + 'detached': False} + self.fake_request_status = {'success': None, + 'fail': 'ERROR'} + self.volume_status = 'detached' + self.request_status = 'success' + + # Patch the request and os calls to fake versions + mock.patch.object(connector.DISCOConnector, + '_send_disco_vol_cmd', + self.perform_disco_request).start() + mock.patch.object(os.path, + 'exists', self.is_volume_attached).start() + mock.patch.object(glob, + 'glob', self.list_disco_volume).start() + self.addCleanup(mock.patch.stopall) + + # The actual DISCO connector + self.connector = connector.DISCOConnector( + 'sudo', execute=self.fake_execute) + + def perform_disco_request(self, *cmd, **kwargs): + """Fake the socket call.""" + return self.fake_request_status[self.request_status] + + def is_volume_attached(self, *cmd, **kwargs): + """Fake volume detection check.""" + return self.fake_volume_status[self.volume_status] + + def list_disco_volume(self, *cmd, **kwargs): + """Fake the glob call.""" + path_dir = self.connector.get_search_path() + volume_id = self.volume['disco_id'] + volume_items = [path_dir, '/', self.connector.DISCO_PREFIX, volume_id] + volume_path = ''.join(volume_items) + return [volume_path] + + def test_get_search_path(self): + """DISCO volumes should be under /dev.""" + expected = "/dev" + actual = self.connector.get_search_path() + self.assertEqual(expected, actual) + + def test_get_volume_paths(self): + """Test to get all the path for a specific volume.""" + expected = ['/dev/dms1234567'] + self.volume_status = 'attached' + actual = self.connector.get_volume_paths( + self.fake_connection_properties) + self.assertEqual(expected, actual) + + def test_connect_volume(self): + """Attach a volume.""" + self.connector.connect_volume(self.fake_connection_properties) + + def test_connect_volume_already_attached(self): + """Make sure that we don't issue the request.""" + self.request_status = 'fail' + self.volume_status = 'attached' + self.test_connect_volume() + + def test_connect_volume_request_fail(self): + """Fail the attach request.""" + self.volume_status = 'detached' + self.request_status = 'fail' + self.assertRaises(exception.BrickException, + self.test_connect_volume) + + def test_disconnect_volume(self): + """Detach a volume.""" + self.connector.disconnect_volume(self.fake_connection_properties, None) + + def test_disconnect_volume_attached(self): + """Detach a volume attached.""" + self.request_status = 'success' + self.volume_status = 'attached' + self.test_disconnect_volume() + + def test_disconnect_volume_already_detached(self): + """Ensure that we don't issue the request.""" + self.request_status = 'fail' + self.volume_status = 'detached' + self.test_disconnect_volume() + + def test_disconnect_volume_request_fail(self): + """Fail the detach request.""" + self.volume_status = 'attached' + self.request_status = 'fail' + self.assertRaises(exception.BrickException, + self.test_disconnect_volume) + + def test_get_all_available_volumes(self): + """Test to get all the available DISCO volumes.""" + expected = ['/dev/dms1234567'] + actual = self.connector.get_all_available_volumes(None) + self.assertItemsEqual(expected, actual) + + def test_extend_volume(self): + self.assertRaises(NotImplementedError, + self.connector.extend_volume, + self.fake_connection_properties)