Merge "Add replication failback in Kaminario K2 drivers"
This commit is contained in:
commit
112a25c82d
@ -15,15 +15,18 @@
|
||||
"""Unit tests for kaminario driver."""
|
||||
import mock
|
||||
from oslo_utils import units
|
||||
import time
|
||||
|
||||
from cinder import context
|
||||
from cinder import exception
|
||||
from cinder import objects
|
||||
from cinder.objects import fields
|
||||
from cinder import test
|
||||
from cinder.tests.unit import fake_snapshot
|
||||
from cinder.tests.unit import fake_volume
|
||||
from cinder import utils
|
||||
from cinder.volume import configuration
|
||||
from cinder.volume.drivers.kaminario import kaminario_common
|
||||
from cinder.volume.drivers.kaminario import kaminario_fc
|
||||
from cinder.volume.drivers.kaminario import kaminario_iscsi
|
||||
from cinder.volume import utils as vol_utils
|
||||
@ -53,6 +56,13 @@ class FakeSaveObject(FakeK2Obj):
|
||||
self.size = units.Mi
|
||||
self.replication_status = None
|
||||
self.state = 'in_sync'
|
||||
self.generation_number = 548
|
||||
self.current_role = 'target'
|
||||
self.current_snapshot_progress = 100
|
||||
self.current_snapshot_id = None
|
||||
|
||||
def refresh(self):
|
||||
return
|
||||
|
||||
def save(self):
|
||||
return FakeSaveObject()
|
||||
@ -364,15 +374,33 @@ class TestKaminarioISCSI(test.TestCase):
|
||||
result = self.driver._failover_volume(self.vol)
|
||||
self.assertIsNone(result)
|
||||
|
||||
def test_failover_host(self):
|
||||
@mock.patch.object(kaminario_common.KaminarioCinderDriver,
|
||||
'_check_for_status')
|
||||
@mock.patch.object(objects.service.Service, 'get_by_args')
|
||||
def test_failover_host(self, get_by_args, check_stauts):
|
||||
"""Test failover_host."""
|
||||
mock_args = mock.Mock()
|
||||
mock_args.active_backend_id = '10.0.0.1'
|
||||
self.vol.replication_status = 'failed-over'
|
||||
self.driver.configuration.san_ip = '10.0.0.1'
|
||||
get_by_args.side_effect = [mock_args, mock_args]
|
||||
self.driver.host = 'host'
|
||||
volumes = [self.vol, self.vol]
|
||||
self.driver.replica = Replication()
|
||||
self.driver.target = FakeKrest()
|
||||
self.driver.target.search().total = 1
|
||||
self.driver.client.search().total = 1
|
||||
backend_ip, res_volumes = self.driver.failover_host(None, volumes)
|
||||
self.assertEqual('10.0.0.1', backend_ip)
|
||||
status = res_volumes[0]['updates']['replication_status']
|
||||
self.assertEqual(fields.ReplicationStatus.FAILED_OVER, status)
|
||||
# different backend ip
|
||||
self.driver.configuration.san_ip = '10.0.0.2'
|
||||
self.driver.client.search().hits[0].state = 'in_sync'
|
||||
backend_ip, res_volumes = self.driver.failover_host(None, volumes)
|
||||
self.assertEqual('10.0.0.2', backend_ip)
|
||||
status = res_volumes[0]['updates']['replication_status']
|
||||
self.assertEqual(fields.ReplicationStatus.ENABLED, status)
|
||||
|
||||
def test_delete_volume_replica(self):
|
||||
"""Test _delete_volume_replica."""
|
||||
@ -454,6 +482,41 @@ class TestKaminarioISCSI(test.TestCase):
|
||||
result = self.driver._delete_replication(self.vol)
|
||||
self.assertIsNone(result)
|
||||
|
||||
def test_create_failover_volume_replica(self):
|
||||
"""Test _create_failover_volume_replica."""
|
||||
self.driver.replica = Replication()
|
||||
self.driver.target = FakeKrest()
|
||||
self.driver.configuration.san_ip = '10.0.0.1'
|
||||
result = self.driver._create_failover_volume_replica(self.vol,
|
||||
'test', 'test')
|
||||
self.assertIsNone(result)
|
||||
|
||||
def test_create_volume_replica_user_snap(self):
|
||||
"""Test create_volume_replica_user_snap."""
|
||||
result = self.driver._create_volume_replica_user_snap(FakeKrest(),
|
||||
'sess')
|
||||
self.assertEqual(548, result)
|
||||
|
||||
def test_is_user_snap_sync_finished(self):
|
||||
"""Test _is_user_snap_sync_finished."""
|
||||
sess_mock = mock.Mock()
|
||||
sess_mock.refresh = mock.Mock()
|
||||
sess_mock.generation_number = 548
|
||||
sess_mock.current_snapshot_id = None
|
||||
sess_mock.current_snapshot_progress = 100
|
||||
sess_mock.current_snapshot_id = None
|
||||
self.driver.snap_updates = [{'tgt_ssn': sess_mock, 'gno': 548,
|
||||
'stime': time.time()}]
|
||||
result = self.driver._is_user_snap_sync_finished()
|
||||
self.assertIsNone(result)
|
||||
|
||||
def test_delete_failover_volume_replica(self):
|
||||
"""Test _delete_failover_volume_replica."""
|
||||
self.driver.target = FakeKrest()
|
||||
result = self.driver._delete_failover_volume_replica(self.vol, 'test',
|
||||
'test')
|
||||
self.assertIsNone(result)
|
||||
|
||||
|
||||
class TestKaminarioFC(TestKaminarioISCSI):
|
||||
|
||||
|
@ -17,6 +17,7 @@
|
||||
import math
|
||||
import re
|
||||
import threading
|
||||
import time
|
||||
|
||||
import eventlet
|
||||
from oslo_config import cfg
|
||||
@ -30,6 +31,7 @@ import six
|
||||
import cinder
|
||||
from cinder import exception
|
||||
from cinder.i18n import _, _LE, _LW, _LI
|
||||
from cinder import objects
|
||||
from cinder.objects import fields
|
||||
from cinder import utils
|
||||
from cinder.volume.drivers.san import san
|
||||
@ -40,6 +42,7 @@ krest = importutils.try_import("krest")
|
||||
K2_MIN_VERSION = '2.2.0'
|
||||
K2_LOCK_PREFIX = 'Kaminario'
|
||||
MAX_K2_RETRY = 5
|
||||
K2_REP_FAILED_OVER = fields.ReplicationStatus.FAILED_OVER
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
kaminario1_opts = [
|
||||
@ -290,6 +293,62 @@ class KaminarioCinderDriver(cinder.volume.driver.ISCSIDriver):
|
||||
raise exception.KaminarioCinderDriverException(
|
||||
reason=six.text_type(ex.message))
|
||||
|
||||
@kaminario_logger
|
||||
def _create_failover_volume_replica(self, volume, vg_name, vol_name):
|
||||
"""Volume replica creation in K2 needs session and remote volume.
|
||||
|
||||
- create a session
|
||||
- create a volume in the volume group
|
||||
|
||||
"""
|
||||
session_name = self.get_session_name(volume.id)
|
||||
rsession_name = self.get_rep_name(session_name)
|
||||
|
||||
rvg_name = self.get_rep_name(vg_name)
|
||||
rvol_name = self.get_rep_name(vol_name)
|
||||
rvg = self.target.search("volume_groups", name=rvg_name).hits[0]
|
||||
rvol = self.target.search("volumes", name=rvol_name).hits[0]
|
||||
k2peer_rs = self.target.search("replication/peer_k2arrays",
|
||||
mgmt_host=self.configuration.san_ip)
|
||||
if hasattr(k2peer_rs, 'hits') and k2peer_rs.total != 0:
|
||||
k2peer = k2peer_rs.hits[0]
|
||||
else:
|
||||
msg = _("Unable to find K2peer in source K2:")
|
||||
LOG.error(msg)
|
||||
raise exception.KaminarioCinderDriverException(reason=msg)
|
||||
try:
|
||||
LOG.debug("Creating source session with name: %(sname)s and "
|
||||
" target session name: %(tname)s",
|
||||
{'sname': rsession_name, 'tname': session_name})
|
||||
tgt_ssn = self.target.new("replication/sessions")
|
||||
tgt_ssn.replication_peer_k2array = k2peer
|
||||
tgt_ssn.auto_configure_peer_volumes = "False"
|
||||
tgt_ssn.local_volume_group = rvg
|
||||
tgt_ssn.replication_peer_volume_group_name = vg_name
|
||||
tgt_ssn.remote_replication_session_name = session_name
|
||||
tgt_ssn.name = rsession_name
|
||||
tgt_ssn.rpo = self.replica.rpo
|
||||
tgt_ssn.save()
|
||||
LOG.debug("Creating remote volume with name: %s",
|
||||
rvol_name)
|
||||
self.target.new("replication/peer_volumes",
|
||||
local_volume=rvol,
|
||||
name=vol_name,
|
||||
replication_session=tgt_ssn).save()
|
||||
tgt_ssn.state = "in_sync"
|
||||
tgt_ssn.save()
|
||||
except Exception as ex:
|
||||
LOG.exception(_LE("Replication for the volume %s has "
|
||||
"failed."), rvol_name)
|
||||
self._delete_by_ref(self.target, "replication/sessions",
|
||||
rsession_name, 'session')
|
||||
self._delete_by_ref(self.client, "replication/sessions",
|
||||
session_name, 'remote session')
|
||||
self._delete_by_ref(self.client, "volumes", vol_name, "volume")
|
||||
self._delete_by_ref(self.client, "volume_groups", vg_name, "vg")
|
||||
raise exception.KaminarioCinderDriverException(
|
||||
reason=six.text_type(ex.message))
|
||||
|
||||
def _delete_by_ref(self, device, url, name, msg):
|
||||
rs = device.search(url, name=name)
|
||||
for result in rs.hits:
|
||||
@ -313,27 +372,183 @@ class KaminarioCinderDriver(cinder.volume.driver.ISCSIDriver):
|
||||
def failover_host(self, context, volumes, secondary_id=None):
|
||||
"""Failover to replication target."""
|
||||
volume_updates = []
|
||||
back_end_ip = None
|
||||
svc_host = vol_utils.extract_host(self.host, 'backend')
|
||||
service = objects.Service.get_by_args(context, svc_host,
|
||||
'cinder-volume')
|
||||
|
||||
if secondary_id and secondary_id != self.replica.backend_id:
|
||||
LOG.error(_LE("Kaminario driver received failover_host "
|
||||
"request, But backend is non replicated device"))
|
||||
raise exception.UnableToFailOver(reason=_("Failover requested "
|
||||
"on non replicated "
|
||||
"backend."))
|
||||
for v in volumes:
|
||||
vol_name = self.get_volume_name(v['id'])
|
||||
rv = self.get_rep_name(vol_name)
|
||||
if self.target.search("volumes", name=rv).total:
|
||||
self._failover_volume(v)
|
||||
volume_updates.append(
|
||||
{'volume_id': v['id'],
|
||||
'updates':
|
||||
{'replication_status':
|
||||
fields.ReplicationStatus.FAILED_OVER}})
|
||||
else:
|
||||
volume_updates.append({'volume_id': v['id'],
|
||||
'updates': {'status': 'error', }})
|
||||
|
||||
return self.replica.backend_id, volume_updates
|
||||
if (service.active_backend_id and
|
||||
service.active_backend_id != self.configuration.san_ip):
|
||||
self.snap_updates = []
|
||||
rep_volumes = []
|
||||
# update status for non-replicated primary volumes
|
||||
for v in volumes:
|
||||
vol_name = self.get_volume_name(v['id'])
|
||||
vol = self.client.search("volumes", name=vol_name)
|
||||
if v.replication_status != K2_REP_FAILED_OVER and vol.total:
|
||||
status = 'available'
|
||||
if v.volume_attachment:
|
||||
map_rs = self.client.search("mappings",
|
||||
volume=vol.hits[0])
|
||||
status = 'in-use'
|
||||
if map_rs.total:
|
||||
map_rs.hits[0].delete()
|
||||
volume_updates.append({'volume_id': v['id'],
|
||||
'updates':
|
||||
{'status': status}})
|
||||
else:
|
||||
rep_volumes.append(v)
|
||||
|
||||
# In-sync from secondaray array to primary array
|
||||
for v in rep_volumes:
|
||||
vol_name = self.get_volume_name(v['id'])
|
||||
vol = self.client.search("volumes", name=vol_name)
|
||||
rvol_name = self.get_rep_name(vol_name)
|
||||
rvol = self.target.search("volumes", name=rvol_name)
|
||||
session_name = self.get_session_name(v['id'])
|
||||
rsession_name = self.get_rep_name(session_name)
|
||||
ssn = self.target.search("replication/sessions",
|
||||
name=rsession_name)
|
||||
if ssn.total:
|
||||
tgt_ssn = ssn.hits[0]
|
||||
ssn = self.client.search("replication/sessions",
|
||||
name=session_name)
|
||||
if ssn.total:
|
||||
src_ssn = ssn.hits[0]
|
||||
|
||||
if (tgt_ssn.state == 'failed_over' and
|
||||
tgt_ssn.current_role == 'target' and vol.total and src_ssn):
|
||||
map_rs = self.client.search("mappings", volume=vol.hits[0])
|
||||
if map_rs.total:
|
||||
map_rs.hits[0].delete()
|
||||
tgt_ssn.state = 'in_sync'
|
||||
tgt_ssn.save()
|
||||
self._check_for_status(src_ssn, 'in_sync')
|
||||
if (rvol.total and src_ssn.state == 'in_sync' and
|
||||
src_ssn.current_role == 'target'):
|
||||
gen_no = self._create_volume_replica_user_snap(self.target,
|
||||
tgt_ssn)
|
||||
self.snap_updates.append({'tgt_ssn': tgt_ssn,
|
||||
'gno': gen_no,
|
||||
'stime': time.time()})
|
||||
LOG.debug("The target session: %s state is "
|
||||
"changed to in sync", rsession_name)
|
||||
|
||||
self._is_user_snap_sync_finished()
|
||||
|
||||
# Delete secondary volume mappings and create snapshot
|
||||
for v in rep_volumes:
|
||||
vol_name = self.get_volume_name(v['id'])
|
||||
vol = self.client.search("volumes", name=vol_name)
|
||||
rvol_name = self.get_rep_name(vol_name)
|
||||
rvol = self.target.search("volumes", name=rvol_name)
|
||||
session_name = self.get_session_name(v['id'])
|
||||
rsession_name = self.get_rep_name(session_name)
|
||||
ssn = self.target.search("replication/sessions",
|
||||
name=rsession_name)
|
||||
if ssn.total:
|
||||
tgt_ssn = ssn.hits[0]
|
||||
ssn = self.client.search("replication/sessions",
|
||||
name=session_name)
|
||||
if ssn.total:
|
||||
src_ssn = ssn.hits[0]
|
||||
if (rvol.total and src_ssn.state == 'in_sync' and
|
||||
src_ssn.current_role == 'target'):
|
||||
map_rs = self.target.search("mappings",
|
||||
volume=rvol.hits[0])
|
||||
if map_rs.total:
|
||||
map_rs.hits[0].delete()
|
||||
gen_no = self._create_volume_replica_user_snap(self.target,
|
||||
tgt_ssn)
|
||||
self.snap_updates.append({'tgt_ssn': tgt_ssn,
|
||||
'gno': gen_no,
|
||||
'stime': time.time()})
|
||||
self._is_user_snap_sync_finished()
|
||||
# changing source sessions to failed-over
|
||||
for v in rep_volumes:
|
||||
vol_name = self.get_volume_name(v['id'])
|
||||
vol = self.client.search("volumes", name=vol_name)
|
||||
rvol_name = self.get_rep_name(vol_name)
|
||||
rvol = self.target.search("volumes", name=rvol_name)
|
||||
session_name = self.get_session_name(v['id'])
|
||||
rsession_name = self.get_rep_name(session_name)
|
||||
ssn = self.target.search("replication/sessions",
|
||||
name=rsession_name)
|
||||
if ssn.total:
|
||||
tgt_ssn = ssn.hits[0]
|
||||
ssn = self.client.search("replication/sessions",
|
||||
name=session_name)
|
||||
if ssn.total:
|
||||
src_ssn = ssn.hits[0]
|
||||
if (rvol.total and src_ssn.state == 'in_sync' and
|
||||
src_ssn.current_role == 'target'):
|
||||
src_ssn.state = 'failed_over'
|
||||
src_ssn.save()
|
||||
self._check_for_status(tgt_ssn, 'suspended')
|
||||
LOG.debug("The target session: %s state is "
|
||||
"changed to failed over", session_name)
|
||||
|
||||
src_ssn.state = 'in_sync'
|
||||
src_ssn.save()
|
||||
LOG.debug("The target session: %s state is "
|
||||
"changed to in sync", session_name)
|
||||
rep_status = fields.ReplicationStatus.ENABLED
|
||||
volume_updates.append({'volume_id': v['id'],
|
||||
'updates':
|
||||
{'replication_status': rep_status}})
|
||||
|
||||
back_end_ip = self.configuration.san_ip
|
||||
else:
|
||||
"""Failover to replication target."""
|
||||
for v in volumes:
|
||||
vol_name = self.get_volume_name(v['id'])
|
||||
rv = self.get_rep_name(vol_name)
|
||||
if self.target.search("volumes", name=rv).total:
|
||||
self._failover_volume(v)
|
||||
volume_updates.append(
|
||||
{'volume_id': v['id'],
|
||||
'updates':
|
||||
{'replication_status': K2_REP_FAILED_OVER}})
|
||||
else:
|
||||
volume_updates.append({'volume_id': v['id'],
|
||||
'updates': {'status': 'error', }})
|
||||
back_end_ip = self.replica.backend_id
|
||||
return back_end_ip, volume_updates
|
||||
|
||||
def _create_volume_replica_user_snap(self, k2, sess):
|
||||
snap = k2.new("snapshots")
|
||||
snap.is_application_consistent = "False"
|
||||
snap.replication_session = sess
|
||||
snap.save()
|
||||
return snap.generation_number
|
||||
|
||||
def _is_user_snap_sync_finished(self):
|
||||
# waiting for user snapshot to be synced
|
||||
while len(self.snap_updates) > 0:
|
||||
for l in self.snap_updates:
|
||||
sess = l.get('tgt_ssn')
|
||||
gno = l.get('gno')
|
||||
stime = l.get('stime')
|
||||
sess.refresh()
|
||||
if (sess.generation_number == gno and
|
||||
sess.current_snapshot_progress == 100
|
||||
and sess.current_snapshot_id is None):
|
||||
if time.time() - stime > 300:
|
||||
gen_no = self._create_volume_replica_user_snap(
|
||||
self.target,
|
||||
sess)
|
||||
self.snap_updates.append({'tgt_ssn': sess,
|
||||
'gno': gen_no,
|
||||
'stime': time.time()})
|
||||
self.snap_updates.remove(l)
|
||||
eventlet.sleep(1)
|
||||
|
||||
@kaminario_logger
|
||||
def create_volume_from_snapshot(self, volume, snapshot):
|
||||
@ -499,6 +714,26 @@ class KaminarioCinderDriver(cinder.volume.driver.ISCSIDriver):
|
||||
self._delete_by_ref(self.target, "volume_groups",
|
||||
rvg_name, "remote vg")
|
||||
|
||||
@kaminario_logger
|
||||
def _delete_failover_volume_replica(self, volume, vg_name, vol_name):
|
||||
rvg_name = self.get_rep_name(vg_name)
|
||||
rvol_name = self.get_rep_name(vol_name)
|
||||
session_name = self.get_session_name(volume.id)
|
||||
rsession_name = self.get_rep_name(session_name)
|
||||
tgt_ssn = self.target.search('replication/sessions',
|
||||
name=rsession_name).hits[0]
|
||||
tgt_ssn.state = 'idle'
|
||||
tgt_ssn.save()
|
||||
tgt_ssn.delete()
|
||||
|
||||
LOG.debug("Searching and deleting snapshots for target volume group "
|
||||
"and target volume: %(vol)s, %(vg)s in K2.",
|
||||
{'vol': rvol_name, 'vg': rvg_name})
|
||||
rvg = self.target.search('volume_groups', name=rvg_name).hits
|
||||
rsnaps = self.target.search('snapshots', volume_group=rvg).hits
|
||||
for s in rsnaps:
|
||||
s.delete()
|
||||
|
||||
@kaminario_logger
|
||||
def _check_for_status(self, obj, status):
|
||||
while obj.state != status:
|
||||
@ -664,9 +899,8 @@ class KaminarioCinderDriver(cinder.volume.driver.ISCSIDriver):
|
||||
@kaminario_logger
|
||||
def _get_volume_object(self, volume):
|
||||
vol_name = self.get_volume_name(volume.id)
|
||||
if volume.replication_status == 'failed-over':
|
||||
if volume.replication_status == K2_REP_FAILED_OVER:
|
||||
vol_name = self.get_rep_name(vol_name)
|
||||
self.client = self.target
|
||||
LOG.debug("Searching volume : %s in K2.", vol_name)
|
||||
vol_rs = self.client.search("volumes", name=vol_name)
|
||||
if not hasattr(vol_rs, 'hits') or vol_rs.total == 0:
|
||||
@ -696,9 +930,8 @@ class KaminarioCinderDriver(cinder.volume.driver.ISCSIDriver):
|
||||
# Get volume object
|
||||
if type(volume).__name__ != 'RestObject':
|
||||
vol_name = self.get_volume_name(volume.id)
|
||||
if volume.replication_status == 'failed-over':
|
||||
if volume.replication_status == K2_REP_FAILED_OVER:
|
||||
vol_name = self.get_rep_name(vol_name)
|
||||
self.client = self.target
|
||||
LOG.debug("Searching volume: %s in K2.", vol_name)
|
||||
volume_rs = self.client.search("volumes", name=vol_name)
|
||||
if hasattr(volume_rs, "hits") and volume_rs.total != 0:
|
||||
@ -779,12 +1012,13 @@ class KaminarioCinderDriver(cinder.volume.driver.ISCSIDriver):
|
||||
return replica
|
||||
|
||||
def _get_replica_status(self, vg_name):
|
||||
vg = self.client.search("volume_groups", name=vg_name).hits[0]
|
||||
if self.client.search("replication/sessions",
|
||||
local_volume_group=vg).total != 0:
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
vg_rs = self.client.search("volume_groups", name=vg_name)
|
||||
if vg_rs.total:
|
||||
vg = vg_rs.hits[0]
|
||||
if self.client.search("replication/sessions",
|
||||
local_volume_group=vg).total:
|
||||
return True
|
||||
return False
|
||||
|
||||
def manage_existing(self, volume, existing_ref):
|
||||
vol_name = existing_ref['source-name']
|
||||
@ -853,6 +1087,11 @@ class KaminarioCinderDriver(cinder.volume.driver.ISCSIDriver):
|
||||
def retype(self, ctxt, volume, new_type, diff, host):
|
||||
old_type = volume.get('volume_type')
|
||||
vg_name = self.get_volume_group_name(volume.id)
|
||||
vol_name = self.get_volume_name(volume.id)
|
||||
vol_rs = self.client.search("volumes", name=vol_name)
|
||||
if vol_rs.total:
|
||||
vol = vol_rs.hits[0]
|
||||
vmap = self.client.search("mappings", volume=vol).total
|
||||
old_rep_type = self._get_replica_status(vg_name)
|
||||
new_rep_type = self._get_is_replica(new_type)
|
||||
new_prov_type = self._get_is_dedup(new_type)
|
||||
@ -867,8 +1106,11 @@ class KaminarioCinderDriver(cinder.volume.driver.ISCSIDriver):
|
||||
self._delete_replication(volume)
|
||||
return True
|
||||
elif not new_rep_type and not old_rep_type:
|
||||
LOG.debug("Use '--migration-policy on-demand' to change 'dedup "
|
||||
"without replication'<->'nodedup without replication'.")
|
||||
msg = ("Use '--migration-policy on-demand' to change 'dedup "
|
||||
"without replication'<->'nodedup without replication'.")
|
||||
if vol_rs.total and vmap:
|
||||
msg = "Unattach volume and {0}".format(msg)
|
||||
LOG.debug(msg)
|
||||
return False
|
||||
else:
|
||||
LOG.error(_LE('Change from type1: %(type1)s to type2: %(type2)s '
|
||||
@ -879,15 +1121,21 @@ class KaminarioCinderDriver(cinder.volume.driver.ISCSIDriver):
|
||||
def _add_replication(self, volume):
|
||||
vg_name = self.get_volume_group_name(volume.id)
|
||||
vol_name = self.get_volume_name(volume.id)
|
||||
LOG.debug("Searching volume group with name: %(name)s",
|
||||
{'name': vg_name})
|
||||
vg = self.client.search("volume_groups", name=vg_name).hits[0]
|
||||
LOG.debug("Searching volume with name: %(name)s",
|
||||
{'name': vol_name})
|
||||
vol = self.client.search("volumes", name=vol_name).hits[0]
|
||||
self._create_volume_replica(volume, vg, vol, self.replica.rpo)
|
||||
if volume.replication_status == K2_REP_FAILED_OVER:
|
||||
self._create_failover_volume_replica(volume, vg_name, vol_name)
|
||||
else:
|
||||
LOG.debug("Searching volume group with name: %(name)s",
|
||||
{'name': vg_name})
|
||||
vg = self.client.search("volume_groups", name=vg_name).hits[0]
|
||||
LOG.debug("Searching volume with name: %(name)s",
|
||||
{'name': vol_name})
|
||||
vol = self.client.search("volumes", name=vol_name).hits[0]
|
||||
self._create_volume_replica(volume, vg, vol, self.replica.rpo)
|
||||
|
||||
def _delete_replication(self, volume):
|
||||
vg_name = self.get_volume_group_name(volume.id)
|
||||
vol_name = self.get_volume_name(volume.id)
|
||||
self._delete_volume_replica(volume, vg_name, vol_name)
|
||||
if volume.replication_status == K2_REP_FAILED_OVER:
|
||||
self._delete_failover_volume_replica(volume, vg_name, vol_name)
|
||||
else:
|
||||
self._delete_volume_replica(volume, vg_name, vol_name)
|
||||
|
@ -24,6 +24,7 @@ from cinder.objects import fields
|
||||
from cinder.volume.drivers.kaminario import kaminario_common as common
|
||||
from cinder.zonemanager import utils as fczm_utils
|
||||
|
||||
K2_REP_FAILED_OVER = fields.ReplicationStatus.FAILED_OVER
|
||||
LOG = logging.getLogger(__name__)
|
||||
kaminario_logger = common.kaminario_logger
|
||||
|
||||
@ -36,9 +37,10 @@ class KaminarioFCDriver(common.KaminarioCinderDriver):
|
||||
1.1 - Added manage/unmanage and extra-specs support for nodedup
|
||||
1.2 - Added replication support
|
||||
1.3 - Added retype support
|
||||
1.4 - Added replication failback support
|
||||
"""
|
||||
|
||||
VERSION = '1.3'
|
||||
VERSION = '1.4'
|
||||
|
||||
# ThirdPartySystems wiki page name
|
||||
CI_WIKI_NAME = "Kaminario_K2_CI"
|
||||
@ -59,6 +61,12 @@ class KaminarioFCDriver(common.KaminarioCinderDriver):
|
||||
msg = _("No wwpns found in host connector.")
|
||||
LOG.error(msg)
|
||||
raise exception.KaminarioCinderDriverException(reason=msg)
|
||||
# To support replication failback
|
||||
temp_client = None
|
||||
if (hasattr(volume, 'replication_status') and
|
||||
volume.replication_status == K2_REP_FAILED_OVER):
|
||||
temp_client = self.client
|
||||
self.client = self.target
|
||||
# Get target wwpns.
|
||||
target_wwpns = self.get_target_info(volume)
|
||||
# Map volume.
|
||||
@ -66,6 +74,9 @@ class KaminarioFCDriver(common.KaminarioCinderDriver):
|
||||
# Create initiator-target mapping.
|
||||
target_wwpns, init_target_map = self._build_initiator_target_map(
|
||||
connector, target_wwpns)
|
||||
# To support replication failback
|
||||
if temp_client:
|
||||
self.client = temp_client
|
||||
# Return target volume information.
|
||||
return {'driver_volume_type': 'fibre_channel',
|
||||
'data': {"target_discovered": True,
|
||||
@ -77,6 +88,12 @@ class KaminarioFCDriver(common.KaminarioCinderDriver):
|
||||
@kaminario_logger
|
||||
@coordination.synchronized('{self.k2_lock_name}')
|
||||
def terminate_connection(self, volume, connector, **kwargs):
|
||||
# To support replication failback
|
||||
temp_client = None
|
||||
if (hasattr(volume, 'replication_status') and
|
||||
volume.replication_status == K2_REP_FAILED_OVER):
|
||||
temp_client = self.client
|
||||
self.client = self.target
|
||||
super(KaminarioFCDriver, self).terminate_connection(volume, connector)
|
||||
properties = {"driver_volume_type": "fibre_channel", "data": {}}
|
||||
host_name = self.get_initiator_host_name(connector)
|
||||
@ -90,14 +107,13 @@ class KaminarioFCDriver(common.KaminarioCinderDriver):
|
||||
connector, target_wwpns)
|
||||
properties["data"] = {"target_wwn": target_wwpns,
|
||||
"initiator_target_map": init_target_map}
|
||||
# To support replication failback
|
||||
if temp_client:
|
||||
self.client = temp_client
|
||||
return properties
|
||||
|
||||
@kaminario_logger
|
||||
def get_target_info(self, volume):
|
||||
rep_status = fields.ReplicationStatus.FAILED_OVER
|
||||
if (hasattr(volume, 'replication_status') and
|
||||
volume.replication_status == rep_status):
|
||||
self.client = self.target
|
||||
LOG.debug("Searching target wwpns in K2.")
|
||||
fc_ports_rs = self.client.search("system/fc_ports")
|
||||
target_wwpns = []
|
||||
|
@ -25,6 +25,7 @@ from cinder.objects import fields
|
||||
from cinder.volume.drivers.kaminario import kaminario_common as common
|
||||
|
||||
ISCSI_TCP_PORT = "3260"
|
||||
K2_REP_FAILED_OVER = fields.ReplicationStatus.FAILED_OVER
|
||||
LOG = logging.getLogger(__name__)
|
||||
kaminario_logger = common.kaminario_logger
|
||||
|
||||
@ -38,9 +39,10 @@ class KaminarioISCSIDriver(common.KaminarioCinderDriver):
|
||||
1.1 - Added manage/unmanage and extra-specs support for nodedup
|
||||
1.2 - Added replication support
|
||||
1.3 - Added retype support
|
||||
1.4 - Added replication failback support
|
||||
"""
|
||||
|
||||
VERSION = '1.3'
|
||||
VERSION = '1.4'
|
||||
|
||||
# ThirdPartySystems wiki page name
|
||||
CI_WIKI_NAME = "Kaminario_K2_CI"
|
||||
@ -54,10 +56,19 @@ class KaminarioISCSIDriver(common.KaminarioCinderDriver):
|
||||
@coordination.synchronized('{self.k2_lock_name}')
|
||||
def initialize_connection(self, volume, connector):
|
||||
"""Attach K2 volume to host."""
|
||||
# To support replication failback
|
||||
temp_client = None
|
||||
if (hasattr(volume, 'replication_status') and
|
||||
volume.replication_status == K2_REP_FAILED_OVER):
|
||||
temp_client = self.client
|
||||
self.client = self.target
|
||||
# Get target_portal and target iqn.
|
||||
iscsi_portal, target_iqn = self.get_target_info(volume)
|
||||
# Map volume.
|
||||
lun = self.k2_initialize_connection(volume, connector)
|
||||
# To support replication failback
|
||||
if temp_client:
|
||||
self.client = temp_client
|
||||
# Return target volume information.
|
||||
return {"driver_volume_type": "iscsi",
|
||||
"data": {"target_iqn": target_iqn,
|
||||
@ -68,15 +79,20 @@ class KaminarioISCSIDriver(common.KaminarioCinderDriver):
|
||||
@kaminario_logger
|
||||
@coordination.synchronized('{self.k2_lock_name}')
|
||||
def terminate_connection(self, volume, connector, **kwargs):
|
||||
# To support replication failback
|
||||
temp_client = None
|
||||
if (hasattr(volume, 'replication_status') and
|
||||
volume.replication_status == K2_REP_FAILED_OVER):
|
||||
temp_client = self.client
|
||||
self.client = self.target
|
||||
super(KaminarioISCSIDriver, self).terminate_connection(volume,
|
||||
connector)
|
||||
# To support replication failback
|
||||
if temp_client:
|
||||
self.client = temp_client
|
||||
|
||||
@kaminario_logger
|
||||
def get_target_info(self, volume):
|
||||
rep_status = fields.ReplicationStatus.FAILED_OVER
|
||||
if (hasattr(volume, 'replication_status') and
|
||||
volume.replication_status == rep_status):
|
||||
self.client = self.target
|
||||
LOG.debug("Searching first iscsi port ip without wan in K2.")
|
||||
iscsi_ip_rs = self.client.search("system/net_ips", wan_port="")
|
||||
iscsi_ip = target_iqn = None
|
||||
|
@ -4,6 +4,3 @@ fixes:
|
||||
due to possible race conditions between attach and detach
|
||||
volumes and due to limitation from Kaminario K2 iSCSI and
|
||||
FC arrays on concurrent operations.
|
||||
To overcome array limitation, use locks and retry mechanism
|
||||
on each K2 requests. To overcome race conditions, use locks
|
||||
on initialize_connection and terminate_connection.
|
||||
|
@ -0,0 +1,4 @@
|
||||
---
|
||||
fixes:
|
||||
- Added missing replication failback support in Kaminario iSCSI and
|
||||
FC Cinder drivers.
|
Loading…
Reference in New Issue
Block a user