diff --git a/trove/common/strategies/cluster/experimental/cassandra/taskmanager.py b/trove/common/strategies/cluster/experimental/cassandra/taskmanager.py index 29bb9e0fd8..f74c74a935 100644 --- a/trove/common/strategies/cluster/experimental/cassandra/taskmanager.py +++ b/trove/common/strategies/cluster/experimental/cassandra/taskmanager.py @@ -281,11 +281,7 @@ class CassandraClusterTasks(task_models.ClusterTasks): # remaining ones. try: - # All nodes should have the same seeds. - # We retrieve current seeds from the first node. - test_node = self.load_cluster_nodes( - context, cluster_node_ids[:1])[0] - current_seeds = test_node['guest'].get_seeds() + current_seeds = self._get_current_seeds(context, cluster_id) # The seeds will have to be updated on all remaining instances # if any of the seed nodes is going to be removed. update_seeds = any(node['ip'] in current_seeds @@ -351,7 +347,24 @@ class CassandraClusterTasks(task_models.ClusterTasks): context, cluster_id, delay_sec=CONF.cassandra.node_sync_time) def upgrade_cluster(self, context, cluster_id, datastore_version): - self.rolling_upgrade_cluster(context, cluster_id, datastore_version) + current_seeds = self._get_current_seeds(context, cluster_id) + + def ordering_function(instance): + + if self.get_ip(instance) in current_seeds: + return -1 + return 0 + + self.rolling_upgrade_cluster(context, cluster_id, + datastore_version, ordering_function) + + def _get_current_seeds(self, context, cluster_id): + # All nodes should have the same seeds. + # We retrieve current seeds from the first node. + cluster_node_ids = self.find_cluster_node_ids(cluster_id) + test_node = self.load_cluster_nodes(context, + cluster_node_ids[:1])[0] + return test_node['guest'].get_seeds() class CassandraTaskManagerAPI(task_api.API): diff --git a/trove/guestagent/datastore/experimental/cassandra/manager.py b/trove/guestagent/datastore/experimental/cassandra/manager.py index eb9a9bf767..40a6498f5a 100644 --- a/trove/guestagent/datastore/experimental/cassandra/manager.py +++ b/trove/guestagent/datastore/experimental/cassandra/manager.py @@ -22,6 +22,7 @@ from trove.common import cfg from trove.common import instance as trove_instance from trove.common.notification import EndNotification from trove.guestagent import backup +from trove.guestagent.common import operating_system from trove.guestagent.datastore.experimental.cassandra import service from trove.guestagent.datastore import manager from trove.guestagent import guest_log @@ -179,6 +180,52 @@ class Manager(manager.Manager): if not cluster_config and self.is_root_enabled(context): self.status.report_root(context) + def pre_upgrade(self, context): + data_dir = self.app.cassandra_data_dir + mount_point, _data = os.path.split(data_dir) + save_etc_dir = "%s/etc" % mount_point + home_save = "%s/trove_user" % mount_point + + self.app.status.begin_restart() + self.app.drain() + self.app.stop_db() + + operating_system.copy("%s/." % self.app.cassandra_conf_dir, + save_etc_dir, + preserve=True, as_root=True) + operating_system.copy("%s/." % os.path.expanduser('~'), home_save, + preserve=True, as_root=True) + + self.unmount_volume(context, mount_point=mount_point) + + return { + 'mount_point': mount_point, + 'save_etc_dir': save_etc_dir, + 'home_save': home_save + } + + def post_upgrade(self, context, upgrade_info): + self.app.stop_db() + + if 'device' in upgrade_info: + self.mount_volume(context, mount_point=upgrade_info['mount_point'], + device_path=upgrade_info['device'], + write_to_fstab=True) + operating_system.chown(path=upgrade_info['mount_point'], + user=self.app.cassandra_owner, + group=self.app.cassandra_owner, + recursive=True, + as_root=True) + + self._restore_home_directory(upgrade_info['home_save']) + self._restore_directory(upgrade_info['save_etc_dir'], + self.app.cassandra_conf_dir) + + self._reset_app() + self.app.start_db() + self.app.upgrade_sstables() + self.app.status.end_restart() + def change_passwords(self, context, users): with EndNotification(context): self.admin.change_passwords(context, users) @@ -310,3 +357,13 @@ class Manager(manager.Manager): def store_admin_credentials(self, context, admin_credentials): self.app.store_admin_credentials(admin_credentials) self._admin = self.app.build_admin() + + def _reset_app(self): + """ + A function for reseting app and admin properties. + It is useful when we want to force reload application. + Possible usages: loading new configuration files, loading new + datastore password + """ + self._app = None + self._admin = None diff --git a/trove/guestagent/datastore/experimental/cassandra/service.py b/trove/guestagent/datastore/experimental/cassandra/service.py index 721b46b690..3ec2a38361 100644 --- a/trove/guestagent/datastore/experimental/cassandra/service.py +++ b/trove/guestagent/datastore/experimental/cassandra/service.py @@ -717,6 +717,18 @@ class CassandraApp(object): self.logback_conf_manager.apply_system_override( {'configuration': {'root': {'@level': log_level}}}) + def drain(self): + """Drains Cassandra node so that it can upgraded safely. + """ + LOG.debug("Draining node.") + self._run_nodetool_command('drain') + + def upgrade_sstables(self): + """Upgrades sstables to match new datastore version. + """ + LOG.debug("Upgrading sstables.") + self._run_nodetool_command('upgradesstables') + def _run_nodetool_command(self, cmd, *args, **kwargs): """Execute a nodetool command on this node. """ diff --git a/trove/taskmanager/models.py b/trove/taskmanager/models.py index 9f86ecb7ad..91988d5d06 100755 --- a/trove/taskmanager/models.py +++ b/trove/taskmanager/models.py @@ -363,7 +363,8 @@ class ClusterTasks(Cluster): LOG.debug("End rolling restart for id: %s.", cluster_id) - def rolling_upgrade_cluster(self, context, cluster_id, datastore_version): + def rolling_upgrade_cluster(self, context, cluster_id, + datastore_version, ordering_function=None): LOG.debug("Begin rolling cluster upgrade for id: %s.", cluster_id) def _upgrade_cluster_instance(instance): @@ -383,10 +384,17 @@ class ClusterTasks(Cluster): cluster_notification = context.notification request_info = cluster_notification.serialize(context) try: + instances = [] for db_inst in DBInstance.find_all(cluster_id=cluster_id, deleted=False).all(): instance = BuiltInstanceTasks.load( context, db_inst.id) + instances.append(instance) + + if ordering_function is not None: + instances.sort(key=ordering_function) + + for instance in instances: _upgrade_cluster_instance(instance) self.reset_task() diff --git a/trove/tests/unittests/guestagent/test_cassandra_manager.py b/trove/tests/unittests/guestagent/test_cassandra_manager.py index 1e9c423452..b2d7abc80e 100644 --- a/trove/tests/unittests/guestagent/test_cassandra_manager.py +++ b/trove/tests/unittests/guestagent/test_cassandra_manager.py @@ -736,6 +736,18 @@ class GuestAgentCassandraDBManagerTest(DatastoreManagerTest): self.assertIsNone( self.manager.apply_overrides(Mock(), NonCallableMagicMock())) + @patch('trove.guestagent.datastore.experimental.cassandra.service.LOG') + @patch.object(cass_service.CassandraApp, '_run_nodetool_command') + def test_drain(self, command_runner_mock, _): + self.manager._app.drain() + command_runner_mock.assert_called_once_with('drain') + + @patch('trove.guestagent.datastore.experimental.cassandra.service.LOG') + @patch.object(cass_service.CassandraApp, '_run_nodetool_command') + def test_upgrade_sstables(self, command_runner_mock, _): + self.manager._app.upgrade_sstables() + command_runner_mock.assert_called_once_with('upgradesstables') + @patch('trove.guestagent.datastore.experimental.cassandra.service.LOG') def test_enable_root(self, _): with patch.object(self.manager._app, 'is_root_enabled', diff --git a/trove/tests/unittests/taskmanager/test_clusters.py b/trove/tests/unittests/taskmanager/test_clusters.py index e18d828580..1b6b7b52a9 100644 --- a/trove/tests/unittests/taskmanager/test_clusters.py +++ b/trove/tests/unittests/taskmanager/test_clusters.py @@ -15,6 +15,7 @@ import datetime +from mock import MagicMock from mock import Mock from mock import patch @@ -29,6 +30,7 @@ from trove.instance.models import DBInstance from trove.instance.models import Instance from trove.instance.models import InstanceServiceStatus from trove.instance.models import InstanceTasks +# from trove.taskmanager.models import BuiltInstanceTasks from trove.taskmanager.models import ServiceStatuses from trove.tests.unittests import trove_testtools @@ -292,6 +294,78 @@ class MongoDbClusterTasksTest(trove_testtools.TestCase): self.assertEqual(ClusterTaskStatus.NONE, self.db_cluster.task_status) mock_save.assert_called_with() + def test_rolling_upgrade_cluster_without_order_specified(self): + self._assert_rolling_upgrade_cluster(None, None) + + def test_rolling_upgrade_cluster_with_order_specified(self): + ordering = { + 1: 1, + 2: 2, + 3: 3, + 4: 4, + 5: 5 + } + + def ordering_function(instance): + return ordering[instance.id] + + self._assert_rolling_upgrade_cluster(ordering_function, ordering) + + @patch('trove.taskmanager.models.DBaaSInstanceUpgrade') + @patch('trove.taskmanager.models.BuiltInstanceTasks') + @patch('trove.taskmanager.models.EndNotification') + @patch('trove.taskmanager.models.StartNotification') + @patch('trove.taskmanager.models.Timeout') + @patch.object(ClusterTasks, 'reset_task') + @patch.object(DBInstance, 'find_all') + def _assert_rolling_upgrade_cluster(self, + ordering_function, + ordering, + mock_find_all, + mock_reset_task, + mock_timeout, + mock_start, + mock_end, + mock_instance_task, + mock_upgrade): + class MockInstance(Mock): + upgrade_counter = 0 + + def upgrade(self, _): + MockInstance.upgrade_counter += 1 + self.upgrade_number = MockInstance.upgrade_counter + + db_instances = [Mock() for _ in range(5)] + for i in range(5): + db_instances[i].id = i + 1 + + mock_find_all.return_value.all.return_value = db_instances + instances = [] + + def load_side_effect(_, instance_id): + return_value = MockInstance() + return_value.id = instance_id + instances.append(return_value) + return return_value + + mock_instance_task.load.side_effect = load_side_effect + if ordering is None: + ordering = { + 1: 1, + 2: 2, + 3: 3, + 4: 4, + 5: 5 + } + self.clustertasks.rolling_upgrade_cluster(MagicMock(), + Mock(), + Mock(), + ordering_function) + order_result = {inst.id: inst.upgrade_number for inst in instances} + + self.assertEqual(ClusterTaskStatus.NONE, self.db_cluster.task_status) + self.assertDictEqual(ordering, order_result) + @patch.object(ClusterTasks, 'reset_task') @patch.object(ClusterTasks, '_create_shard') @patch.object(ClusterTasks, 'get_guest')