Add Cassandra datastore upgrade

This patch adds support for upgrading Cassandra datastore - both single
instance cluster and multiple-node cluster.

It is achievied in a similar way to MySQL/Mariadb. The key difference
is draining nodes before upgrade and upgrading sstables after succesful
volume mounting operation.

Cassandra upgrade steps have been taken from
https://docs.datastax.com/en/upgrade/doc/upgrade/ddac/upgdDDACMinor51x.html

In case of trove it looks as follows:
For each node do:
    + enter restart mode
    + drain and stop node
    + preserve configuration files and home directory
    + unmount volume
    + create instance with new datastore version
    + mount volume
    + restore configuration and home directory
    + start Cassandra
    + upgrade sstables

Because we need to upgrade seeds first there is an additional parameter
in rolling_upgrade_cluster function - a function which is used to so
sort nodes in proper order.

Story: #2005410
Task: #30426
Co-Authored-By: Przemysław Godek <p.godek@partner.samsung.com>
Change-Id: I9f25606d51b35e264a8c36f9524ae57b0a5780d0
Signed-off-by: Kasper Hasior <k.hasior@samsung.com>
This commit is contained in:
Kasper Hasior 2019-04-03 13:09:51 +02:00
parent d5d84653cf
commit 0969d53c6c
6 changed files with 183 additions and 7 deletions

View File

@ -281,11 +281,7 @@ class CassandraClusterTasks(task_models.ClusterTasks):
# remaining ones.
try:
# All nodes should have the same seeds.
# We retrieve current seeds from the first node.
test_node = self.load_cluster_nodes(
context, cluster_node_ids[:1])[0]
current_seeds = test_node['guest'].get_seeds()
current_seeds = self._get_current_seeds(context, cluster_id)
# The seeds will have to be updated on all remaining instances
# if any of the seed nodes is going to be removed.
update_seeds = any(node['ip'] in current_seeds
@ -351,7 +347,24 @@ class CassandraClusterTasks(task_models.ClusterTasks):
context, cluster_id, delay_sec=CONF.cassandra.node_sync_time)
def upgrade_cluster(self, context, cluster_id, datastore_version):
self.rolling_upgrade_cluster(context, cluster_id, datastore_version)
current_seeds = self._get_current_seeds(context, cluster_id)
def ordering_function(instance):
if self.get_ip(instance) in current_seeds:
return -1
return 0
self.rolling_upgrade_cluster(context, cluster_id,
datastore_version, ordering_function)
def _get_current_seeds(self, context, cluster_id):
# All nodes should have the same seeds.
# We retrieve current seeds from the first node.
cluster_node_ids = self.find_cluster_node_ids(cluster_id)
test_node = self.load_cluster_nodes(context,
cluster_node_ids[:1])[0]
return test_node['guest'].get_seeds()
class CassandraTaskManagerAPI(task_api.API):

View File

@ -22,6 +22,7 @@ from trove.common import cfg
from trove.common import instance as trove_instance
from trove.common.notification import EndNotification
from trove.guestagent import backup
from trove.guestagent.common import operating_system
from trove.guestagent.datastore.experimental.cassandra import service
from trove.guestagent.datastore import manager
from trove.guestagent import guest_log
@ -179,6 +180,52 @@ class Manager(manager.Manager):
if not cluster_config and self.is_root_enabled(context):
self.status.report_root(context)
def pre_upgrade(self, context):
data_dir = self.app.cassandra_data_dir
mount_point, _data = os.path.split(data_dir)
save_etc_dir = "%s/etc" % mount_point
home_save = "%s/trove_user" % mount_point
self.app.status.begin_restart()
self.app.drain()
self.app.stop_db()
operating_system.copy("%s/." % self.app.cassandra_conf_dir,
save_etc_dir,
preserve=True, as_root=True)
operating_system.copy("%s/." % os.path.expanduser('~'), home_save,
preserve=True, as_root=True)
self.unmount_volume(context, mount_point=mount_point)
return {
'mount_point': mount_point,
'save_etc_dir': save_etc_dir,
'home_save': home_save
}
def post_upgrade(self, context, upgrade_info):
self.app.stop_db()
if 'device' in upgrade_info:
self.mount_volume(context, mount_point=upgrade_info['mount_point'],
device_path=upgrade_info['device'],
write_to_fstab=True)
operating_system.chown(path=upgrade_info['mount_point'],
user=self.app.cassandra_owner,
group=self.app.cassandra_owner,
recursive=True,
as_root=True)
self._restore_home_directory(upgrade_info['home_save'])
self._restore_directory(upgrade_info['save_etc_dir'],
self.app.cassandra_conf_dir)
self._reset_app()
self.app.start_db()
self.app.upgrade_sstables()
self.app.status.end_restart()
def change_passwords(self, context, users):
with EndNotification(context):
self.admin.change_passwords(context, users)
@ -310,3 +357,13 @@ class Manager(manager.Manager):
def store_admin_credentials(self, context, admin_credentials):
self.app.store_admin_credentials(admin_credentials)
self._admin = self.app.build_admin()
def _reset_app(self):
"""
A function for reseting app and admin properties.
It is useful when we want to force reload application.
Possible usages: loading new configuration files, loading new
datastore password
"""
self._app = None
self._admin = None

View File

@ -717,6 +717,18 @@ class CassandraApp(object):
self.logback_conf_manager.apply_system_override(
{'configuration': {'root': {'@level': log_level}}})
def drain(self):
"""Drains Cassandra node so that it can upgraded safely.
"""
LOG.debug("Draining node.")
self._run_nodetool_command('drain')
def upgrade_sstables(self):
"""Upgrades sstables to match new datastore version.
"""
LOG.debug("Upgrading sstables.")
self._run_nodetool_command('upgradesstables')
def _run_nodetool_command(self, cmd, *args, **kwargs):
"""Execute a nodetool command on this node.
"""

View File

@ -363,7 +363,8 @@ class ClusterTasks(Cluster):
LOG.debug("End rolling restart for id: %s.", cluster_id)
def rolling_upgrade_cluster(self, context, cluster_id, datastore_version):
def rolling_upgrade_cluster(self, context, cluster_id,
datastore_version, ordering_function=None):
LOG.debug("Begin rolling cluster upgrade for id: %s.", cluster_id)
def _upgrade_cluster_instance(instance):
@ -383,10 +384,17 @@ class ClusterTasks(Cluster):
cluster_notification = context.notification
request_info = cluster_notification.serialize(context)
try:
instances = []
for db_inst in DBInstance.find_all(cluster_id=cluster_id,
deleted=False).all():
instance = BuiltInstanceTasks.load(
context, db_inst.id)
instances.append(instance)
if ordering_function is not None:
instances.sort(key=ordering_function)
for instance in instances:
_upgrade_cluster_instance(instance)
self.reset_task()

View File

@ -736,6 +736,18 @@ class GuestAgentCassandraDBManagerTest(DatastoreManagerTest):
self.assertIsNone(
self.manager.apply_overrides(Mock(), NonCallableMagicMock()))
@patch('trove.guestagent.datastore.experimental.cassandra.service.LOG')
@patch.object(cass_service.CassandraApp, '_run_nodetool_command')
def test_drain(self, command_runner_mock, _):
self.manager._app.drain()
command_runner_mock.assert_called_once_with('drain')
@patch('trove.guestagent.datastore.experimental.cassandra.service.LOG')
@patch.object(cass_service.CassandraApp, '_run_nodetool_command')
def test_upgrade_sstables(self, command_runner_mock, _):
self.manager._app.upgrade_sstables()
command_runner_mock.assert_called_once_with('upgradesstables')
@patch('trove.guestagent.datastore.experimental.cassandra.service.LOG')
def test_enable_root(self, _):
with patch.object(self.manager._app, 'is_root_enabled',

View File

@ -15,6 +15,7 @@
import datetime
from mock import MagicMock
from mock import Mock
from mock import patch
@ -29,6 +30,7 @@ from trove.instance.models import DBInstance
from trove.instance.models import Instance
from trove.instance.models import InstanceServiceStatus
from trove.instance.models import InstanceTasks
# from trove.taskmanager.models import BuiltInstanceTasks
from trove.taskmanager.models import ServiceStatuses
from trove.tests.unittests import trove_testtools
@ -292,6 +294,78 @@ class MongoDbClusterTasksTest(trove_testtools.TestCase):
self.assertEqual(ClusterTaskStatus.NONE, self.db_cluster.task_status)
mock_save.assert_called_with()
def test_rolling_upgrade_cluster_without_order_specified(self):
self._assert_rolling_upgrade_cluster(None, None)
def test_rolling_upgrade_cluster_with_order_specified(self):
ordering = {
1: 1,
2: 2,
3: 3,
4: 4,
5: 5
}
def ordering_function(instance):
return ordering[instance.id]
self._assert_rolling_upgrade_cluster(ordering_function, ordering)
@patch('trove.taskmanager.models.DBaaSInstanceUpgrade')
@patch('trove.taskmanager.models.BuiltInstanceTasks')
@patch('trove.taskmanager.models.EndNotification')
@patch('trove.taskmanager.models.StartNotification')
@patch('trove.taskmanager.models.Timeout')
@patch.object(ClusterTasks, 'reset_task')
@patch.object(DBInstance, 'find_all')
def _assert_rolling_upgrade_cluster(self,
ordering_function,
ordering,
mock_find_all,
mock_reset_task,
mock_timeout,
mock_start,
mock_end,
mock_instance_task,
mock_upgrade):
class MockInstance(Mock):
upgrade_counter = 0
def upgrade(self, _):
MockInstance.upgrade_counter += 1
self.upgrade_number = MockInstance.upgrade_counter
db_instances = [Mock() for _ in range(5)]
for i in range(5):
db_instances[i].id = i + 1
mock_find_all.return_value.all.return_value = db_instances
instances = []
def load_side_effect(_, instance_id):
return_value = MockInstance()
return_value.id = instance_id
instances.append(return_value)
return return_value
mock_instance_task.load.side_effect = load_side_effect
if ordering is None:
ordering = {
1: 1,
2: 2,
3: 3,
4: 4,
5: 5
}
self.clustertasks.rolling_upgrade_cluster(MagicMock(),
Mock(),
Mock(),
ordering_function)
order_result = {inst.id: inst.upgrade_number for inst in instances}
self.assertEqual(ClusterTaskStatus.NONE, self.db_cluster.task_status)
self.assertDictEqual(ordering, order_result)
@patch.object(ClusterTasks, 'reset_task')
@patch.object(ClusterTasks, '_create_shard')
@patch.object(ClusterTasks, 'get_guest')