From f7e715f9ef9e298a3cc24513517ee277b9fe49bb Mon Sep 17 00:00:00 2001 From: VenkataKrishna Reddy Date: Fri, 29 Jul 2016 11:30:01 -0400 Subject: [PATCH] Support replication in K2 Add replication feature for Kaminario K2 iSCSI and FC cinder drivers. Change-Id: I01426a7db234655e4af6ba53985ac8e40a33ef00 Implements: blueprint k2-replication Co-Authored-By: Nikesh Mahalka Co-Authored-By: Sreedhar Varma Co-Authored-By: Lakshman --- .../unit/volume/drivers/test_kaminario.py | 101 ++++++++- .../drivers/kaminario/kaminario_common.py | 210 +++++++++++++++++- .../volume/drivers/kaminario/kaminario_fc.py | 14 +- .../drivers/kaminario/kaminario_iscsi.py | 12 +- ...inario-cinder-driver-435faa025b2f6df4.yaml | 4 + 5 files changed, 328 insertions(+), 13 deletions(-) create mode 100644 releasenotes/notes/kaminario-cinder-driver-435faa025b2f6df4.yaml diff --git a/cinder/tests/unit/volume/drivers/test_kaminario.py b/cinder/tests/unit/volume/drivers/test_kaminario.py index 2acc3e789c3..581684e5633 100644 --- a/cinder/tests/unit/volume/drivers/test_kaminario.py +++ b/cinder/tests/unit/volume/drivers/test_kaminario.py @@ -18,6 +18,7 @@ from oslo_utils import units from cinder import context from cinder import exception +from cinder.objects import fields from cinder import test from cinder.tests.unit import fake_snapshot from cinder.tests.unit import fake_volume @@ -50,6 +51,8 @@ class FakeSaveObject(FakeK2Obj): self.volume_group = self self.is_dedup = True self.size = units.Mi + self.replication_status = None + self.state = 'in_sync' def save(self): return FakeSaveObject() @@ -96,6 +99,13 @@ class FakeKrestException(object): return FakeSaveObjectExp() +class Replication(object): + backend_id = '10.0.0.1' + login = 'login' + password = 'password' + rpo = 500 + + class TestKaminarioISCSI(test.TestCase): driver = None conf = None @@ -261,7 +271,7 @@ class TestKaminarioISCSI(test.TestCase): def test_get_target_info(self): """Test get_target_info.""" - iscsi_portal, target_iqn = self.driver.get_target_info() + iscsi_portal, target_iqn = self.driver.get_target_info(self.vol) self.assertEqual('10.0.0.1:3260', iscsi_portal) self.assertEqual('xyztlnxyz', target_iqn) @@ -306,6 +316,93 @@ class TestKaminarioISCSI(test.TestCase): result = self.driver._get_replica_status(self.vol) self.assertTrue(result) + def test_create_volume_replica(self): + """Test _create_volume_replica.""" + vg = FakeSaveObject() + rep = Replication() + self.driver.replica = rep + session_name = self.driver.get_session_name('1234567890987654321') + self.assertEqual('ssn-1234567890987654321', session_name) + rsession_name = self.driver.get_rep_name(session_name) + self.assertEqual('rssn-1234567890987654321', rsession_name) + src_ssn = self.driver.client.new("replication/sessions").save() + self.assertEqual('in_sync', src_ssn.state) + result = self.driver._create_volume_replica(self.vol, vg, vg, rep.rpo) + self.assertIsNone(result) + + def test_create_volume_replica_exp(self): + """Test _create_volume_replica_exp.""" + vg = FakeSaveObject() + rep = Replication() + self.driver.replica = rep + self.driver.client = FakeKrestException() + self.assertRaises(exception.KaminarioCinderDriverException, + self.driver._create_volume_replica, self.vol, + vg, vg, rep.rpo) + + def test_delete_by_ref(self): + """Test _delete_by_ref.""" + result = self.driver._delete_by_ref(self.driver.client, 'volume', + 'name', 'message') + self.assertIsNone(result) + + def test_failover_volume(self): + """Test _failover_volume.""" + self.driver.target = FakeKrest() + session_name = self.driver.get_session_name('1234567890987654321') + self.assertEqual('ssn-1234567890987654321', session_name) + rsession_name = self.driver.get_rep_name(session_name) + self.assertEqual('rssn-1234567890987654321', rsession_name) + result = self.driver._failover_volume(self.vol) + self.assertIsNone(result) + + def test_failover_host(self): + """Test failover_host.""" + volumes = [self.vol, self.vol] + self.driver.replica = Replication() + self.driver.target = FakeKrest() + backend_ip, res_volumes = self.driver.failover_host(None, volumes) + self.assertEqual('10.0.0.1', backend_ip) + status = res_volumes[0]['updates']['replication_status'] + self.assertEqual(fields.ReplicationStatus.FAILED_OVER, status) + + def test_delete_volume_replica(self): + """Test _delete_volume_replica.""" + self.driver.replica = Replication() + self.driver.target = FakeKrest() + session_name = self.driver.get_session_name('1234567890987654321') + self.assertEqual('ssn-1234567890987654321', session_name) + rsession_name = self.driver.get_rep_name(session_name) + self.assertEqual('rssn-1234567890987654321', rsession_name) + res = self.driver._delete_by_ref(self.driver.client, 'volumes', + 'test', 'test') + self.assertIsNone(res) + result = self.driver._delete_volume_replica(self.vol, 'test', 'test') + self.assertIsNone(result) + src_ssn = self.driver.client.search("replication/sessions").hits[0] + self.assertEqual('idle', src_ssn.state) + + def test_delete_volume_replica_exp(self): + """Test _delete_volume_replica_exp.""" + self.driver.replica = Replication() + self.driver.target = FakeKrestException() + self.driver._check_for_status = mock.Mock() + self.assertRaises(exception.KaminarioCinderDriverException, + self.driver._delete_volume_replica, self.vol, + 'test', 'test') + + def test_get_is_replica(self): + """Test get_is_replica.""" + result = self.driver._get_is_replica(self.vol.volume_type) + self.assertFalse(result) + + def test_get_is_replica_true(self): + """Test get_is_replica_true.""" + self.driver.replica = Replication() + self.vol.volume_type.extra_specs = {'kaminario:replication': 'enabled'} + result = self.driver._get_is_replica(self.vol.volume_type) + self.assertTrue(result) + class TestKaminarioFC(TestKaminarioISCSI): @@ -325,7 +422,7 @@ class TestKaminarioFC(TestKaminarioISCSI): def test_get_target_info(self): """Test get_target_info.""" - target_wwpn = self.driver.get_target_info() + target_wwpn = self.driver.get_target_info(self.vol) self.assertEqual(['50024f4053300300'], target_wwpn) def test_terminate_connection(self): diff --git a/cinder/volume/drivers/kaminario/kaminario_common.py b/cinder/volume/drivers/kaminario/kaminario_common.py index bc34ec3a2ae..87290188e11 100644 --- a/cinder/volume/drivers/kaminario/kaminario_common.py +++ b/cinder/volume/drivers/kaminario/kaminario_common.py @@ -18,6 +18,7 @@ import math import re import six +import eventlet from oslo_config import cfg from oslo_log import log as logging from oslo_utils import importutils @@ -27,6 +28,7 @@ from oslo_utils import versionutils import cinder from cinder import exception from cinder.i18n import _, _LE, _LW, _LI +from cinder.objects import fields from cinder import utils from cinder.volume.drivers.san import san from cinder.volume import utils as vol_utils @@ -74,6 +76,14 @@ def kaminario_logger(func): return func_wrapper +class Replication(object): + def __init__(self, config, *args, **kwargs): + self.backend_id = config.get('backend_id') + self.login = config.get('login') + self.password = config.get('password') + self.rpo = config.get('rpo') + + class KaminarioCinderDriver(cinder.volume.driver.ISCSIDriver): VENDOR = "Kaminario" stats = {} @@ -82,6 +92,7 @@ class KaminarioCinderDriver(cinder.volume.driver.ISCSIDriver): super(KaminarioCinderDriver, self).__init__(*args, **kwargs) self.configuration.append_config_values(san.san_opts) self.configuration.append_config_values(kaminario2_opts) + self.replica = None self._protocol = None def check_for_setup_error(self): @@ -95,6 +106,11 @@ class KaminarioCinderDriver(cinder.volume.driver.ISCSIDriver): conf.san_login, conf.san_password, ssl_validate=False) + if self.replica: + self.target = self.krest.EndPoint(self.replica.backend_id, + self.replica.login, + self.replica.password, + ssl_validate=False) v_rs = self.client.search("system/state") if hasattr(v_rs, 'hits') and v_rs.total != 0: ver = v_rs.hits[0].rest_api_version @@ -119,6 +135,15 @@ class KaminarioCinderDriver(cinder.volume.driver.ISCSIDriver): if not getattr(self.configuration, attr, None): raise exception.InvalidInput(reason=_('%s is not set.') % attr) + replica = self.configuration.safe_get('replication_device') + if replica and isinstance(replica, list): + replica_ops = ['backend_id', 'login', 'password', 'rpo'] + for attr in replica_ops: + if attr not in replica[0]: + msg = _('replication_device %s is not set.') % attr + raise exception.InvalidInput(reason=msg) + self.replica = Replication(replica[0]) + @kaminario_logger def do_setup(self, context): super(KaminarioCinderDriver, self).do_setup(context) @@ -145,9 +170,9 @@ class KaminarioCinderDriver(cinder.volume.driver.ISCSIDriver): LOG.debug("Creating volume with name: %(name)s, size: %(size)s " "GB, volume_group: %(vg)s", {'name': vol_name, 'size': volume.size, 'vg': vg_name}) - self.client.new("volumes", name=vol_name, - size=volume.size * units.Mi, - volume_group=vg).save() + vol = self.client.new("volumes", name=vol_name, + size=volume.size * units.Mi, + volume_group=vg).save() except Exception as ex: vg_rs = self.client.search("volume_groups", name=vg_name) if vg_rs.total != 0: @@ -157,6 +182,113 @@ class KaminarioCinderDriver(cinder.volume.driver.ISCSIDriver): raise exception.KaminarioCinderDriverException( reason=six.text_type(ex.message)) + if self._get_is_replica(volume.volume_type) and self.replica: + self._create_volume_replica(volume, vg, vol, self.replica.rpo) + + @kaminario_logger + def _create_volume_replica(self, volume, vg, vol, rpo): + """Volume replica creation in K2 needs session and remote volume. + + - create a session + - create a volume in the volume group + + """ + session_name = self.get_session_name(volume.id) + rsession_name = self.get_rep_name(session_name) + + rvg_name = self.get_rep_name(vg.name) + rvol_name = self.get_rep_name(vol.name) + + k2peer_rs = self.client.search("replication/peer_k2arrays", + mgmt_host=self.replica.backend_id) + if hasattr(k2peer_rs, 'hits') and k2peer_rs.total != 0: + k2peer = k2peer_rs.hits[0] + else: + msg = _("Unable to find K2peer in source K2:") + LOG.error(msg) + raise exception.KaminarioCinderDriverException(reason=msg) + try: + LOG.debug("Creating source session with name: %(sname)s and " + " target session name: %(tname)s", + {'sname': session_name, 'tname': rsession_name}) + src_ssn = self.client.new("replication/sessions") + src_ssn.replication_peer_k2array = k2peer + src_ssn.auto_configure_peer_volumes = "False" + src_ssn.local_volume_group = vg + src_ssn.replication_peer_volume_group_name = rvg_name + src_ssn.remote_replication_session_name = rsession_name + src_ssn.name = session_name + src_ssn.rpo = rpo + src_ssn.save() + LOG.debug("Creating remote volume with name: %s", + rvol_name) + self.client.new("replication/peer_volumes", + local_volume=vol, + name=rvol_name, + replication_session=src_ssn).save() + src_ssn.state = "in_sync" + src_ssn.save() + except Exception as ex: + LOG.exception(_LE("Replication for the volume %s has " + "failed."), vol.name) + self._delete_by_ref(self.client, "replication/sessions", + session_name, 'session') + self._delete_by_ref(self.target, "replication/sessions", + rsession_name, 'remote session') + self._delete_by_ref(self.target, "volumes", + rvol_name, 'remote volume') + self._delete_by_ref(self.client, "volumes", vol.name, "volume") + self._delete_by_ref(self.target, "volume_groups", + rvg_name, "remote vg") + self._delete_by_ref(self.client, "volume_groups", vg.name, "vg") + raise exception.KaminarioCinderDriverException( + reason=six.text_type(ex.message)) + + def _delete_by_ref(self, device, url, name, msg): + rs = device.search(url, name=name) + for result in rs.hits: + result.delete() + LOG.debug("Deleting %(msg)s: %(name)s", {'msg': msg, 'name': name}) + + @kaminario_logger + def _failover_volume(self, volume): + """Promoting a secondary volume to primary volume.""" + session_name = self.get_session_name(volume.id) + rsession_name = self.get_rep_name(session_name) + tgt_ssn = self.target.search("replication/sessions", + name=rsession_name).hits[0] + if tgt_ssn.state == 'in_sync': + tgt_ssn.state = 'failed_over' + tgt_ssn.save() + LOG.debug("The target session: %s state is " + "changed to failed_over ", rsession_name) + + @kaminario_logger + def failover_host(self, context, volumes, secondary_id=None): + """Failover to replication target.""" + volume_updates = [] + if secondary_id and secondary_id != self.replica.backend_id: + LOG.error(_LE("Kaminario driver received failover_host " + "request, But backend is non replicated device")) + raise exception.UnableToFailOver(reason=_("Failover requested " + "on non replicated " + "backend.")) + for v in volumes: + vol_name = self.get_volume_name(v['id']) + rv = self.get_rep_name(vol_name) + if self.target.search("volumes", name=rv).total: + self._failover_volume(v) + volume_updates.append( + {'volume_id': v['id'], + 'updates': + {'replication_status': + fields.ReplicationStatus.FAILED_OVER}}) + else: + volume_updates.append({'volume_id': v['id'], + 'updates': {'status': 'error', }}) + + return self.replica.backend_id, volume_updates + @kaminario_logger def create_volume_from_snapshot(self, volume, snapshot): """Create volume from snapshot. @@ -271,6 +403,9 @@ class KaminarioCinderDriver(cinder.volume.driver.ISCSIDriver): vg_name = self.get_volume_group_name(volume.id) vol_name = self.get_volume_name(volume.id) try: + if self._get_is_replica(volume.volume_type) and self.replica: + self._delete_volume_replica(volume, vg_name, vol_name) + LOG.debug("Searching and deleting volume: %s in K2.", vol_name) vol_rs = self.client.search("volumes", name=vol_name) if vol_rs.total != 0: @@ -284,6 +419,46 @@ class KaminarioCinderDriver(cinder.volume.driver.ISCSIDriver): raise exception.KaminarioCinderDriverException( reason=six.text_type(ex.message)) + @kaminario_logger + def _delete_volume_replica(self, volume, vg_name, vol_name): + rvg_name = self.get_rep_name(vg_name) + rvol_name = self.get_rep_name(vol_name) + session_name = self.get_session_name(volume.id) + rsession_name = self.get_rep_name(session_name) + src_ssn = self.client.search('replication/sessions', + name=session_name).hits[0] + tgt_ssn = self.target.search('replication/sessions', + name=rsession_name).hits[0] + src_ssn.state = 'suspended' + src_ssn.save() + self._check_for_status(tgt_ssn, 'suspended') + src_ssn.state = 'idle' + src_ssn.save() + self._check_for_status(tgt_ssn, 'idle') + tgt_ssn.delete() + src_ssn.delete() + + LOG.debug("Searching and deleting snapshots for volume groups:" + "%(vg1)s, %(vg2)s in K2.", {'vg1': vg_name, 'vg2': rvg_name}) + vg = self.target.search('volume_groups', name=vg_name).hits + rvg = self.target.search('volume_groups', name=rvg_name).hits + snaps = self.client.search('snapshots', volume_group=vg).hits + for s in snaps: + s.delete() + rsnaps = self.target.search('snapshots', volume_group=rvg).hits + for s in rsnaps: + s.delete() + + self._delete_by_ref(self.target, "volumes", rvol_name, 'remote volume') + self._delete_by_ref(self.target, "volume_groups", + rvg_name, "remote vg") + + @kaminario_logger + def _check_for_status(self, obj, status): + while obj.state != status: + obj.refresh() + eventlet.sleep(1) + @kaminario_logger def get_volume_stats(self, refresh=False): if refresh: @@ -375,7 +550,9 @@ class KaminarioCinderDriver(cinder.volume.driver.ISCSIDriver): 'thick_provisioning_support': False, 'provisioned_capacity_gb': provisioned_vol / units.Mi, 'max_oversubscription_ratio': ratio, - 'kaminario:thin_prov_type': 'dedup/nodedup'} + 'kaminario:thin_prov_type': 'dedup/nodedup', + 'replication_enabled': True, + 'kaminario:replication': True} @kaminario_logger def get_initiator_host_name(self, connector): @@ -397,6 +574,11 @@ class KaminarioCinderDriver(cinder.volume.driver.ISCSIDriver): """Return the volume name.""" return "cv-{0}".format(vid) + @kaminario_logger + def get_session_name(self, vid): + """Return the volume name.""" + return "ssn-{0}".format(vid) + @kaminario_logger def get_snap_name(self, sid): """Return the snapshot name.""" @@ -407,6 +589,11 @@ class KaminarioCinderDriver(cinder.volume.driver.ISCSIDriver): """Return the view name.""" return "cview-{0}".format(vid) + @kaminario_logger + def get_rep_name(self, sname): + """Return the replication session name.""" + return "r{0}".format(sname) + @kaminario_logger def _delete_host_by_name(self, name): """Deleting host by name.""" @@ -430,6 +617,9 @@ class KaminarioCinderDriver(cinder.volume.driver.ISCSIDriver): @kaminario_logger def _get_volume_object(self, volume): vol_name = self.get_volume_name(volume.id) + if volume.replication_status == 'failed-over': + vol_name = self.get_rep_name(vol_name) + self.client = self.target LOG.debug("Searching volume : %s in K2.", vol_name) vol_rs = self.client.search("volumes", name=vol_name) if not hasattr(vol_rs, 'hits') or vol_rs.total == 0: @@ -459,6 +649,9 @@ class KaminarioCinderDriver(cinder.volume.driver.ISCSIDriver): # Get volume object if type(volume).__name__ != 'RestObject': vol_name = self.get_volume_name(volume.id) + if volume.replication_status == 'failed-over': + vol_name = self.get_rep_name(vol_name) + self.client = self.target LOG.debug("Searching volume: %s in K2.", vol_name) volume_rs = self.client.search("volumes", name=vol_name) if hasattr(volume_rs, "hits") and volume_rs.total != 0: @@ -529,6 +722,15 @@ class KaminarioCinderDriver(cinder.volume.driver.ISCSIDriver): else: return True + def _get_is_replica(self, vol_type): + replica = False + if vol_type and vol_type.get('extra_specs'): + specs = vol_type.get('extra_specs') + if (specs.get('kaminario:replication') == 'enabled' and + self.replica): + replica = True + return replica + def _get_replica_status(self, vg_name): status = False rvg = self.client.search("replication/peer_volume_groups", diff --git a/cinder/volume/drivers/kaminario/kaminario_fc.py b/cinder/volume/drivers/kaminario/kaminario_fc.py index f569a32fee3..15a45bc7901 100644 --- a/cinder/volume/drivers/kaminario/kaminario_fc.py +++ b/cinder/volume/drivers/kaminario/kaminario_fc.py @@ -19,6 +19,7 @@ from oslo_log import log as logging from cinder import exception from cinder.i18n import _, _LE +from cinder.objects import fields from cinder.volume.drivers.kaminario import kaminario_common as common from cinder.zonemanager import utils as fczm_utils @@ -32,9 +33,10 @@ class KaminarioFCDriver(common.KaminarioCinderDriver): Version history: 1.0 - Initial driver 1.1 - Added manage/unmanage and extra-specs support for nodedup + 1.2 - Added replication support """ - VERSION = '1.1' + VERSION = '1.2' @kaminario_logger def __init__(self, *args, **kwargs): @@ -52,7 +54,7 @@ class KaminarioFCDriver(common.KaminarioCinderDriver): LOG.error(msg) raise exception.KaminarioCinderDriverException(reason=msg) # Get target wwpns. - target_wwpns = self.get_target_info() + target_wwpns = self.get_target_info(volume) # Map volume. lun = self.k2_initialize_connection(volume, connector) # Create initiator-target mapping. @@ -75,7 +77,7 @@ class KaminarioFCDriver(common.KaminarioCinderDriver): # is not attached to any volume if host_rs.total == 0: # Get target wwpns. - target_wwpns = self.get_target_info() + target_wwpns = self.get_target_info(volume) target_wwpns, init_target_map = self._build_initiator_target_map( connector, target_wwpns) properties["data"] = {"target_wwn": target_wwpns, @@ -83,7 +85,11 @@ class KaminarioFCDriver(common.KaminarioCinderDriver): return properties @kaminario_logger - def get_target_info(self): + def get_target_info(self, volume): + rep_status = fields.ReplicationStatus.FAILED_OVER + if (hasattr(volume, 'replication_status') and + volume.replication_status == rep_status): + self.client = self.target LOG.debug("Searching target wwpns in K2.") fc_ports_rs = self.client.search("system/fc_ports") target_wwpns = [] diff --git a/cinder/volume/drivers/kaminario/kaminario_iscsi.py b/cinder/volume/drivers/kaminario/kaminario_iscsi.py index 6b21fb8e12d..c0527771c8c 100644 --- a/cinder/volume/drivers/kaminario/kaminario_iscsi.py +++ b/cinder/volume/drivers/kaminario/kaminario_iscsi.py @@ -20,6 +20,7 @@ from oslo_log import log as logging from cinder import exception from cinder.i18n import _, _LE from cinder import interface +from cinder.objects import fields from cinder.volume.drivers.kaminario import kaminario_common as common ISCSI_TCP_PORT = "3260" @@ -34,9 +35,10 @@ class KaminarioISCSIDriver(common.KaminarioCinderDriver): Version history: 1.0 - Initial driver 1.1 - Added manage/unmanage and extra-specs support for nodedup + 1.2 - Added replication support """ - VERSION = '1.1' + VERSION = '1.2' @kaminario_logger def __init__(self, *args, **kwargs): @@ -47,7 +49,7 @@ class KaminarioISCSIDriver(common.KaminarioCinderDriver): def initialize_connection(self, volume, connector): """Attach K2 volume to host.""" # Get target_portal and target iqn. - iscsi_portal, target_iqn = self.get_target_info() + iscsi_portal, target_iqn = self.get_target_info(volume) # Map volume. lun = self.k2_initialize_connection(volume, connector) # Return target volume information. @@ -58,7 +60,11 @@ class KaminarioISCSIDriver(common.KaminarioCinderDriver): "target_discovered": True}} @kaminario_logger - def get_target_info(self): + def get_target_info(self, volume): + rep_status = fields.ReplicationStatus.FAILED_OVER + if (hasattr(volume, 'replication_status') and + volume.replication_status == rep_status): + self.client = self.target LOG.debug("Searching first iscsi port ip without wan in K2.") iscsi_ip_rs = self.client.search("system/net_ips", wan_port="") iscsi_ip = target_iqn = None diff --git a/releasenotes/notes/kaminario-cinder-driver-435faa025b2f6df4.yaml b/releasenotes/notes/kaminario-cinder-driver-435faa025b2f6df4.yaml new file mode 100644 index 00000000000..ca97ae8c694 --- /dev/null +++ b/releasenotes/notes/kaminario-cinder-driver-435faa025b2f6df4.yaml @@ -0,0 +1,4 @@ +--- +features: + - Add replication feature in Kaminario iSCSI and FC Cinder drivers. +