Support replication in K2

Add replication feature for Kaminario K2 iSCSI
and FC cinder drivers.

Change-Id: I01426a7db234655e4af6ba53985ac8e40a33ef00
Implements: blueprint k2-replication
Co-Authored-By: Nikesh Mahalka<Nikesh.Mahalka.ctr@kaminario.com>
Co-Authored-By: Sreedhar Varma<Sreedhar.Varma.ctr@kaminario.com>
Co-Authored-By: Lakshman<Lakshmi.Narayana.ctr@kaminario.com>
This commit is contained in:
VenkataKrishna Reddy 2016-07-29 11:30:01 -04:00
parent 832ec71e28
commit f7e715f9ef
5 changed files with 328 additions and 13 deletions

View File

@ -18,6 +18,7 @@ from oslo_utils import units
from cinder import context
from cinder import exception
from cinder.objects import fields
from cinder import test
from cinder.tests.unit import fake_snapshot
from cinder.tests.unit import fake_volume
@ -50,6 +51,8 @@ class FakeSaveObject(FakeK2Obj):
self.volume_group = self
self.is_dedup = True
self.size = units.Mi
self.replication_status = None
self.state = 'in_sync'
def save(self):
return FakeSaveObject()
@ -96,6 +99,13 @@ class FakeKrestException(object):
return FakeSaveObjectExp()
class Replication(object):
backend_id = '10.0.0.1'
login = 'login'
password = 'password'
rpo = 500
class TestKaminarioISCSI(test.TestCase):
driver = None
conf = None
@ -261,7 +271,7 @@ class TestKaminarioISCSI(test.TestCase):
def test_get_target_info(self):
"""Test get_target_info."""
iscsi_portal, target_iqn = self.driver.get_target_info()
iscsi_portal, target_iqn = self.driver.get_target_info(self.vol)
self.assertEqual('10.0.0.1:3260', iscsi_portal)
self.assertEqual('xyztlnxyz', target_iqn)
@ -306,6 +316,93 @@ class TestKaminarioISCSI(test.TestCase):
result = self.driver._get_replica_status(self.vol)
self.assertTrue(result)
def test_create_volume_replica(self):
"""Test _create_volume_replica."""
vg = FakeSaveObject()
rep = Replication()
self.driver.replica = rep
session_name = self.driver.get_session_name('1234567890987654321')
self.assertEqual('ssn-1234567890987654321', session_name)
rsession_name = self.driver.get_rep_name(session_name)
self.assertEqual('rssn-1234567890987654321', rsession_name)
src_ssn = self.driver.client.new("replication/sessions").save()
self.assertEqual('in_sync', src_ssn.state)
result = self.driver._create_volume_replica(self.vol, vg, vg, rep.rpo)
self.assertIsNone(result)
def test_create_volume_replica_exp(self):
"""Test _create_volume_replica_exp."""
vg = FakeSaveObject()
rep = Replication()
self.driver.replica = rep
self.driver.client = FakeKrestException()
self.assertRaises(exception.KaminarioCinderDriverException,
self.driver._create_volume_replica, self.vol,
vg, vg, rep.rpo)
def test_delete_by_ref(self):
"""Test _delete_by_ref."""
result = self.driver._delete_by_ref(self.driver.client, 'volume',
'name', 'message')
self.assertIsNone(result)
def test_failover_volume(self):
"""Test _failover_volume."""
self.driver.target = FakeKrest()
session_name = self.driver.get_session_name('1234567890987654321')
self.assertEqual('ssn-1234567890987654321', session_name)
rsession_name = self.driver.get_rep_name(session_name)
self.assertEqual('rssn-1234567890987654321', rsession_name)
result = self.driver._failover_volume(self.vol)
self.assertIsNone(result)
def test_failover_host(self):
"""Test failover_host."""
volumes = [self.vol, self.vol]
self.driver.replica = Replication()
self.driver.target = FakeKrest()
backend_ip, res_volumes = self.driver.failover_host(None, volumes)
self.assertEqual('10.0.0.1', backend_ip)
status = res_volumes[0]['updates']['replication_status']
self.assertEqual(fields.ReplicationStatus.FAILED_OVER, status)
def test_delete_volume_replica(self):
"""Test _delete_volume_replica."""
self.driver.replica = Replication()
self.driver.target = FakeKrest()
session_name = self.driver.get_session_name('1234567890987654321')
self.assertEqual('ssn-1234567890987654321', session_name)
rsession_name = self.driver.get_rep_name(session_name)
self.assertEqual('rssn-1234567890987654321', rsession_name)
res = self.driver._delete_by_ref(self.driver.client, 'volumes',
'test', 'test')
self.assertIsNone(res)
result = self.driver._delete_volume_replica(self.vol, 'test', 'test')
self.assertIsNone(result)
src_ssn = self.driver.client.search("replication/sessions").hits[0]
self.assertEqual('idle', src_ssn.state)
def test_delete_volume_replica_exp(self):
"""Test _delete_volume_replica_exp."""
self.driver.replica = Replication()
self.driver.target = FakeKrestException()
self.driver._check_for_status = mock.Mock()
self.assertRaises(exception.KaminarioCinderDriverException,
self.driver._delete_volume_replica, self.vol,
'test', 'test')
def test_get_is_replica(self):
"""Test get_is_replica."""
result = self.driver._get_is_replica(self.vol.volume_type)
self.assertFalse(result)
def test_get_is_replica_true(self):
"""Test get_is_replica_true."""
self.driver.replica = Replication()
self.vol.volume_type.extra_specs = {'kaminario:replication': 'enabled'}
result = self.driver._get_is_replica(self.vol.volume_type)
self.assertTrue(result)
class TestKaminarioFC(TestKaminarioISCSI):
@ -325,7 +422,7 @@ class TestKaminarioFC(TestKaminarioISCSI):
def test_get_target_info(self):
"""Test get_target_info."""
target_wwpn = self.driver.get_target_info()
target_wwpn = self.driver.get_target_info(self.vol)
self.assertEqual(['50024f4053300300'], target_wwpn)
def test_terminate_connection(self):

View File

@ -18,6 +18,7 @@ import math
import re
import six
import eventlet
from oslo_config import cfg
from oslo_log import log as logging
from oslo_utils import importutils
@ -27,6 +28,7 @@ from oslo_utils import versionutils
import cinder
from cinder import exception
from cinder.i18n import _, _LE, _LW, _LI
from cinder.objects import fields
from cinder import utils
from cinder.volume.drivers.san import san
from cinder.volume import utils as vol_utils
@ -74,6 +76,14 @@ def kaminario_logger(func):
return func_wrapper
class Replication(object):
def __init__(self, config, *args, **kwargs):
self.backend_id = config.get('backend_id')
self.login = config.get('login')
self.password = config.get('password')
self.rpo = config.get('rpo')
class KaminarioCinderDriver(cinder.volume.driver.ISCSIDriver):
VENDOR = "Kaminario"
stats = {}
@ -82,6 +92,7 @@ class KaminarioCinderDriver(cinder.volume.driver.ISCSIDriver):
super(KaminarioCinderDriver, self).__init__(*args, **kwargs)
self.configuration.append_config_values(san.san_opts)
self.configuration.append_config_values(kaminario2_opts)
self.replica = None
self._protocol = None
def check_for_setup_error(self):
@ -95,6 +106,11 @@ class KaminarioCinderDriver(cinder.volume.driver.ISCSIDriver):
conf.san_login,
conf.san_password,
ssl_validate=False)
if self.replica:
self.target = self.krest.EndPoint(self.replica.backend_id,
self.replica.login,
self.replica.password,
ssl_validate=False)
v_rs = self.client.search("system/state")
if hasattr(v_rs, 'hits') and v_rs.total != 0:
ver = v_rs.hits[0].rest_api_version
@ -119,6 +135,15 @@ class KaminarioCinderDriver(cinder.volume.driver.ISCSIDriver):
if not getattr(self.configuration, attr, None):
raise exception.InvalidInput(reason=_('%s is not set.') % attr)
replica = self.configuration.safe_get('replication_device')
if replica and isinstance(replica, list):
replica_ops = ['backend_id', 'login', 'password', 'rpo']
for attr in replica_ops:
if attr not in replica[0]:
msg = _('replication_device %s is not set.') % attr
raise exception.InvalidInput(reason=msg)
self.replica = Replication(replica[0])
@kaminario_logger
def do_setup(self, context):
super(KaminarioCinderDriver, self).do_setup(context)
@ -145,9 +170,9 @@ class KaminarioCinderDriver(cinder.volume.driver.ISCSIDriver):
LOG.debug("Creating volume with name: %(name)s, size: %(size)s "
"GB, volume_group: %(vg)s",
{'name': vol_name, 'size': volume.size, 'vg': vg_name})
self.client.new("volumes", name=vol_name,
size=volume.size * units.Mi,
volume_group=vg).save()
vol = self.client.new("volumes", name=vol_name,
size=volume.size * units.Mi,
volume_group=vg).save()
except Exception as ex:
vg_rs = self.client.search("volume_groups", name=vg_name)
if vg_rs.total != 0:
@ -157,6 +182,113 @@ class KaminarioCinderDriver(cinder.volume.driver.ISCSIDriver):
raise exception.KaminarioCinderDriverException(
reason=six.text_type(ex.message))
if self._get_is_replica(volume.volume_type) and self.replica:
self._create_volume_replica(volume, vg, vol, self.replica.rpo)
@kaminario_logger
def _create_volume_replica(self, volume, vg, vol, rpo):
"""Volume replica creation in K2 needs session and remote volume.
- create a session
- create a volume in the volume group
"""
session_name = self.get_session_name(volume.id)
rsession_name = self.get_rep_name(session_name)
rvg_name = self.get_rep_name(vg.name)
rvol_name = self.get_rep_name(vol.name)
k2peer_rs = self.client.search("replication/peer_k2arrays",
mgmt_host=self.replica.backend_id)
if hasattr(k2peer_rs, 'hits') and k2peer_rs.total != 0:
k2peer = k2peer_rs.hits[0]
else:
msg = _("Unable to find K2peer in source K2:")
LOG.error(msg)
raise exception.KaminarioCinderDriverException(reason=msg)
try:
LOG.debug("Creating source session with name: %(sname)s and "
" target session name: %(tname)s",
{'sname': session_name, 'tname': rsession_name})
src_ssn = self.client.new("replication/sessions")
src_ssn.replication_peer_k2array = k2peer
src_ssn.auto_configure_peer_volumes = "False"
src_ssn.local_volume_group = vg
src_ssn.replication_peer_volume_group_name = rvg_name
src_ssn.remote_replication_session_name = rsession_name
src_ssn.name = session_name
src_ssn.rpo = rpo
src_ssn.save()
LOG.debug("Creating remote volume with name: %s",
rvol_name)
self.client.new("replication/peer_volumes",
local_volume=vol,
name=rvol_name,
replication_session=src_ssn).save()
src_ssn.state = "in_sync"
src_ssn.save()
except Exception as ex:
LOG.exception(_LE("Replication for the volume %s has "
"failed."), vol.name)
self._delete_by_ref(self.client, "replication/sessions",
session_name, 'session')
self._delete_by_ref(self.target, "replication/sessions",
rsession_name, 'remote session')
self._delete_by_ref(self.target, "volumes",
rvol_name, 'remote volume')
self._delete_by_ref(self.client, "volumes", vol.name, "volume")
self._delete_by_ref(self.target, "volume_groups",
rvg_name, "remote vg")
self._delete_by_ref(self.client, "volume_groups", vg.name, "vg")
raise exception.KaminarioCinderDriverException(
reason=six.text_type(ex.message))
def _delete_by_ref(self, device, url, name, msg):
rs = device.search(url, name=name)
for result in rs.hits:
result.delete()
LOG.debug("Deleting %(msg)s: %(name)s", {'msg': msg, 'name': name})
@kaminario_logger
def _failover_volume(self, volume):
"""Promoting a secondary volume to primary volume."""
session_name = self.get_session_name(volume.id)
rsession_name = self.get_rep_name(session_name)
tgt_ssn = self.target.search("replication/sessions",
name=rsession_name).hits[0]
if tgt_ssn.state == 'in_sync':
tgt_ssn.state = 'failed_over'
tgt_ssn.save()
LOG.debug("The target session: %s state is "
"changed to failed_over ", rsession_name)
@kaminario_logger
def failover_host(self, context, volumes, secondary_id=None):
"""Failover to replication target."""
volume_updates = []
if secondary_id and secondary_id != self.replica.backend_id:
LOG.error(_LE("Kaminario driver received failover_host "
"request, But backend is non replicated device"))
raise exception.UnableToFailOver(reason=_("Failover requested "
"on non replicated "
"backend."))
for v in volumes:
vol_name = self.get_volume_name(v['id'])
rv = self.get_rep_name(vol_name)
if self.target.search("volumes", name=rv).total:
self._failover_volume(v)
volume_updates.append(
{'volume_id': v['id'],
'updates':
{'replication_status':
fields.ReplicationStatus.FAILED_OVER}})
else:
volume_updates.append({'volume_id': v['id'],
'updates': {'status': 'error', }})
return self.replica.backend_id, volume_updates
@kaminario_logger
def create_volume_from_snapshot(self, volume, snapshot):
"""Create volume from snapshot.
@ -271,6 +403,9 @@ class KaminarioCinderDriver(cinder.volume.driver.ISCSIDriver):
vg_name = self.get_volume_group_name(volume.id)
vol_name = self.get_volume_name(volume.id)
try:
if self._get_is_replica(volume.volume_type) and self.replica:
self._delete_volume_replica(volume, vg_name, vol_name)
LOG.debug("Searching and deleting volume: %s in K2.", vol_name)
vol_rs = self.client.search("volumes", name=vol_name)
if vol_rs.total != 0:
@ -284,6 +419,46 @@ class KaminarioCinderDriver(cinder.volume.driver.ISCSIDriver):
raise exception.KaminarioCinderDriverException(
reason=six.text_type(ex.message))
@kaminario_logger
def _delete_volume_replica(self, volume, vg_name, vol_name):
rvg_name = self.get_rep_name(vg_name)
rvol_name = self.get_rep_name(vol_name)
session_name = self.get_session_name(volume.id)
rsession_name = self.get_rep_name(session_name)
src_ssn = self.client.search('replication/sessions',
name=session_name).hits[0]
tgt_ssn = self.target.search('replication/sessions',
name=rsession_name).hits[0]
src_ssn.state = 'suspended'
src_ssn.save()
self._check_for_status(tgt_ssn, 'suspended')
src_ssn.state = 'idle'
src_ssn.save()
self._check_for_status(tgt_ssn, 'idle')
tgt_ssn.delete()
src_ssn.delete()
LOG.debug("Searching and deleting snapshots for volume groups:"
"%(vg1)s, %(vg2)s in K2.", {'vg1': vg_name, 'vg2': rvg_name})
vg = self.target.search('volume_groups', name=vg_name).hits
rvg = self.target.search('volume_groups', name=rvg_name).hits
snaps = self.client.search('snapshots', volume_group=vg).hits
for s in snaps:
s.delete()
rsnaps = self.target.search('snapshots', volume_group=rvg).hits
for s in rsnaps:
s.delete()
self._delete_by_ref(self.target, "volumes", rvol_name, 'remote volume')
self._delete_by_ref(self.target, "volume_groups",
rvg_name, "remote vg")
@kaminario_logger
def _check_for_status(self, obj, status):
while obj.state != status:
obj.refresh()
eventlet.sleep(1)
@kaminario_logger
def get_volume_stats(self, refresh=False):
if refresh:
@ -375,7 +550,9 @@ class KaminarioCinderDriver(cinder.volume.driver.ISCSIDriver):
'thick_provisioning_support': False,
'provisioned_capacity_gb': provisioned_vol / units.Mi,
'max_oversubscription_ratio': ratio,
'kaminario:thin_prov_type': 'dedup/nodedup'}
'kaminario:thin_prov_type': 'dedup/nodedup',
'replication_enabled': True,
'kaminario:replication': True}
@kaminario_logger
def get_initiator_host_name(self, connector):
@ -397,6 +574,11 @@ class KaminarioCinderDriver(cinder.volume.driver.ISCSIDriver):
"""Return the volume name."""
return "cv-{0}".format(vid)
@kaminario_logger
def get_session_name(self, vid):
"""Return the volume name."""
return "ssn-{0}".format(vid)
@kaminario_logger
def get_snap_name(self, sid):
"""Return the snapshot name."""
@ -407,6 +589,11 @@ class KaminarioCinderDriver(cinder.volume.driver.ISCSIDriver):
"""Return the view name."""
return "cview-{0}".format(vid)
@kaminario_logger
def get_rep_name(self, sname):
"""Return the replication session name."""
return "r{0}".format(sname)
@kaminario_logger
def _delete_host_by_name(self, name):
"""Deleting host by name."""
@ -430,6 +617,9 @@ class KaminarioCinderDriver(cinder.volume.driver.ISCSIDriver):
@kaminario_logger
def _get_volume_object(self, volume):
vol_name = self.get_volume_name(volume.id)
if volume.replication_status == 'failed-over':
vol_name = self.get_rep_name(vol_name)
self.client = self.target
LOG.debug("Searching volume : %s in K2.", vol_name)
vol_rs = self.client.search("volumes", name=vol_name)
if not hasattr(vol_rs, 'hits') or vol_rs.total == 0:
@ -459,6 +649,9 @@ class KaminarioCinderDriver(cinder.volume.driver.ISCSIDriver):
# Get volume object
if type(volume).__name__ != 'RestObject':
vol_name = self.get_volume_name(volume.id)
if volume.replication_status == 'failed-over':
vol_name = self.get_rep_name(vol_name)
self.client = self.target
LOG.debug("Searching volume: %s in K2.", vol_name)
volume_rs = self.client.search("volumes", name=vol_name)
if hasattr(volume_rs, "hits") and volume_rs.total != 0:
@ -529,6 +722,15 @@ class KaminarioCinderDriver(cinder.volume.driver.ISCSIDriver):
else:
return True
def _get_is_replica(self, vol_type):
replica = False
if vol_type and vol_type.get('extra_specs'):
specs = vol_type.get('extra_specs')
if (specs.get('kaminario:replication') == 'enabled' and
self.replica):
replica = True
return replica
def _get_replica_status(self, vg_name):
status = False
rvg = self.client.search("replication/peer_volume_groups",

View File

@ -19,6 +19,7 @@ from oslo_log import log as logging
from cinder import exception
from cinder.i18n import _, _LE
from cinder.objects import fields
from cinder.volume.drivers.kaminario import kaminario_common as common
from cinder.zonemanager import utils as fczm_utils
@ -32,9 +33,10 @@ class KaminarioFCDriver(common.KaminarioCinderDriver):
Version history:
1.0 - Initial driver
1.1 - Added manage/unmanage and extra-specs support for nodedup
1.2 - Added replication support
"""
VERSION = '1.1'
VERSION = '1.2'
@kaminario_logger
def __init__(self, *args, **kwargs):
@ -52,7 +54,7 @@ class KaminarioFCDriver(common.KaminarioCinderDriver):
LOG.error(msg)
raise exception.KaminarioCinderDriverException(reason=msg)
# Get target wwpns.
target_wwpns = self.get_target_info()
target_wwpns = self.get_target_info(volume)
# Map volume.
lun = self.k2_initialize_connection(volume, connector)
# Create initiator-target mapping.
@ -75,7 +77,7 @@ class KaminarioFCDriver(common.KaminarioCinderDriver):
# is not attached to any volume
if host_rs.total == 0:
# Get target wwpns.
target_wwpns = self.get_target_info()
target_wwpns = self.get_target_info(volume)
target_wwpns, init_target_map = self._build_initiator_target_map(
connector, target_wwpns)
properties["data"] = {"target_wwn": target_wwpns,
@ -83,7 +85,11 @@ class KaminarioFCDriver(common.KaminarioCinderDriver):
return properties
@kaminario_logger
def get_target_info(self):
def get_target_info(self, volume):
rep_status = fields.ReplicationStatus.FAILED_OVER
if (hasattr(volume, 'replication_status') and
volume.replication_status == rep_status):
self.client = self.target
LOG.debug("Searching target wwpns in K2.")
fc_ports_rs = self.client.search("system/fc_ports")
target_wwpns = []

View File

@ -20,6 +20,7 @@ from oslo_log import log as logging
from cinder import exception
from cinder.i18n import _, _LE
from cinder import interface
from cinder.objects import fields
from cinder.volume.drivers.kaminario import kaminario_common as common
ISCSI_TCP_PORT = "3260"
@ -34,9 +35,10 @@ class KaminarioISCSIDriver(common.KaminarioCinderDriver):
Version history:
1.0 - Initial driver
1.1 - Added manage/unmanage and extra-specs support for nodedup
1.2 - Added replication support
"""
VERSION = '1.1'
VERSION = '1.2'
@kaminario_logger
def __init__(self, *args, **kwargs):
@ -47,7 +49,7 @@ class KaminarioISCSIDriver(common.KaminarioCinderDriver):
def initialize_connection(self, volume, connector):
"""Attach K2 volume to host."""
# Get target_portal and target iqn.
iscsi_portal, target_iqn = self.get_target_info()
iscsi_portal, target_iqn = self.get_target_info(volume)
# Map volume.
lun = self.k2_initialize_connection(volume, connector)
# Return target volume information.
@ -58,7 +60,11 @@ class KaminarioISCSIDriver(common.KaminarioCinderDriver):
"target_discovered": True}}
@kaminario_logger
def get_target_info(self):
def get_target_info(self, volume):
rep_status = fields.ReplicationStatus.FAILED_OVER
if (hasattr(volume, 'replication_status') and
volume.replication_status == rep_status):
self.client = self.target
LOG.debug("Searching first iscsi port ip without wan in K2.")
iscsi_ip_rs = self.client.search("system/net_ips", wan_port="")
iscsi_ip = target_iqn = None

View File

@ -0,0 +1,4 @@
---
features:
- Add replication feature in Kaminario iSCSI and FC Cinder drivers.