Merge "Add Cassandra datastore upgrade"
This commit is contained in:
commit
2c6feef958
@ -281,11 +281,7 @@ class CassandraClusterTasks(task_models.ClusterTasks):
|
|||||||
# remaining ones.
|
# remaining ones.
|
||||||
try:
|
try:
|
||||||
|
|
||||||
# All nodes should have the same seeds.
|
current_seeds = self._get_current_seeds(context, cluster_id)
|
||||||
# We retrieve current seeds from the first node.
|
|
||||||
test_node = self.load_cluster_nodes(
|
|
||||||
context, cluster_node_ids[:1])[0]
|
|
||||||
current_seeds = test_node['guest'].get_seeds()
|
|
||||||
# The seeds will have to be updated on all remaining instances
|
# The seeds will have to be updated on all remaining instances
|
||||||
# if any of the seed nodes is going to be removed.
|
# if any of the seed nodes is going to be removed.
|
||||||
update_seeds = any(node['ip'] in current_seeds
|
update_seeds = any(node['ip'] in current_seeds
|
||||||
@ -351,7 +347,24 @@ class CassandraClusterTasks(task_models.ClusterTasks):
|
|||||||
context, cluster_id, delay_sec=CONF.cassandra.node_sync_time)
|
context, cluster_id, delay_sec=CONF.cassandra.node_sync_time)
|
||||||
|
|
||||||
def upgrade_cluster(self, context, cluster_id, datastore_version):
|
def upgrade_cluster(self, context, cluster_id, datastore_version):
|
||||||
self.rolling_upgrade_cluster(context, cluster_id, datastore_version)
|
current_seeds = self._get_current_seeds(context, cluster_id)
|
||||||
|
|
||||||
|
def ordering_function(instance):
|
||||||
|
|
||||||
|
if self.get_ip(instance) in current_seeds:
|
||||||
|
return -1
|
||||||
|
return 0
|
||||||
|
|
||||||
|
self.rolling_upgrade_cluster(context, cluster_id,
|
||||||
|
datastore_version, ordering_function)
|
||||||
|
|
||||||
|
def _get_current_seeds(self, context, cluster_id):
|
||||||
|
# All nodes should have the same seeds.
|
||||||
|
# We retrieve current seeds from the first node.
|
||||||
|
cluster_node_ids = self.find_cluster_node_ids(cluster_id)
|
||||||
|
test_node = self.load_cluster_nodes(context,
|
||||||
|
cluster_node_ids[:1])[0]
|
||||||
|
return test_node['guest'].get_seeds()
|
||||||
|
|
||||||
|
|
||||||
class CassandraTaskManagerAPI(task_api.API):
|
class CassandraTaskManagerAPI(task_api.API):
|
||||||
|
@ -22,6 +22,7 @@ from trove.common import cfg
|
|||||||
from trove.common import instance as trove_instance
|
from trove.common import instance as trove_instance
|
||||||
from trove.common.notification import EndNotification
|
from trove.common.notification import EndNotification
|
||||||
from trove.guestagent import backup
|
from trove.guestagent import backup
|
||||||
|
from trove.guestagent.common import operating_system
|
||||||
from trove.guestagent.datastore.experimental.cassandra import service
|
from trove.guestagent.datastore.experimental.cassandra import service
|
||||||
from trove.guestagent.datastore import manager
|
from trove.guestagent.datastore import manager
|
||||||
from trove.guestagent import guest_log
|
from trove.guestagent import guest_log
|
||||||
@ -179,6 +180,52 @@ class Manager(manager.Manager):
|
|||||||
if not cluster_config and self.is_root_enabled(context):
|
if not cluster_config and self.is_root_enabled(context):
|
||||||
self.status.report_root(context)
|
self.status.report_root(context)
|
||||||
|
|
||||||
|
def pre_upgrade(self, context):
|
||||||
|
data_dir = self.app.cassandra_data_dir
|
||||||
|
mount_point, _data = os.path.split(data_dir)
|
||||||
|
save_etc_dir = "%s/etc" % mount_point
|
||||||
|
home_save = "%s/trove_user" % mount_point
|
||||||
|
|
||||||
|
self.app.status.begin_restart()
|
||||||
|
self.app.drain()
|
||||||
|
self.app.stop_db()
|
||||||
|
|
||||||
|
operating_system.copy("%s/." % self.app.cassandra_conf_dir,
|
||||||
|
save_etc_dir,
|
||||||
|
preserve=True, as_root=True)
|
||||||
|
operating_system.copy("%s/." % os.path.expanduser('~'), home_save,
|
||||||
|
preserve=True, as_root=True)
|
||||||
|
|
||||||
|
self.unmount_volume(context, mount_point=mount_point)
|
||||||
|
|
||||||
|
return {
|
||||||
|
'mount_point': mount_point,
|
||||||
|
'save_etc_dir': save_etc_dir,
|
||||||
|
'home_save': home_save
|
||||||
|
}
|
||||||
|
|
||||||
|
def post_upgrade(self, context, upgrade_info):
|
||||||
|
self.app.stop_db()
|
||||||
|
|
||||||
|
if 'device' in upgrade_info:
|
||||||
|
self.mount_volume(context, mount_point=upgrade_info['mount_point'],
|
||||||
|
device_path=upgrade_info['device'],
|
||||||
|
write_to_fstab=True)
|
||||||
|
operating_system.chown(path=upgrade_info['mount_point'],
|
||||||
|
user=self.app.cassandra_owner,
|
||||||
|
group=self.app.cassandra_owner,
|
||||||
|
recursive=True,
|
||||||
|
as_root=True)
|
||||||
|
|
||||||
|
self._restore_home_directory(upgrade_info['home_save'])
|
||||||
|
self._restore_directory(upgrade_info['save_etc_dir'],
|
||||||
|
self.app.cassandra_conf_dir)
|
||||||
|
|
||||||
|
self._reset_app()
|
||||||
|
self.app.start_db()
|
||||||
|
self.app.upgrade_sstables()
|
||||||
|
self.app.status.end_restart()
|
||||||
|
|
||||||
def change_passwords(self, context, users):
|
def change_passwords(self, context, users):
|
||||||
with EndNotification(context):
|
with EndNotification(context):
|
||||||
self.admin.change_passwords(context, users)
|
self.admin.change_passwords(context, users)
|
||||||
@ -310,3 +357,13 @@ class Manager(manager.Manager):
|
|||||||
def store_admin_credentials(self, context, admin_credentials):
|
def store_admin_credentials(self, context, admin_credentials):
|
||||||
self.app.store_admin_credentials(admin_credentials)
|
self.app.store_admin_credentials(admin_credentials)
|
||||||
self._admin = self.app.build_admin()
|
self._admin = self.app.build_admin()
|
||||||
|
|
||||||
|
def _reset_app(self):
|
||||||
|
"""
|
||||||
|
A function for reseting app and admin properties.
|
||||||
|
It is useful when we want to force reload application.
|
||||||
|
Possible usages: loading new configuration files, loading new
|
||||||
|
datastore password
|
||||||
|
"""
|
||||||
|
self._app = None
|
||||||
|
self._admin = None
|
||||||
|
@ -717,6 +717,18 @@ class CassandraApp(object):
|
|||||||
self.logback_conf_manager.apply_system_override(
|
self.logback_conf_manager.apply_system_override(
|
||||||
{'configuration': {'root': {'@level': log_level}}})
|
{'configuration': {'root': {'@level': log_level}}})
|
||||||
|
|
||||||
|
def drain(self):
|
||||||
|
"""Drains Cassandra node so that it can upgraded safely.
|
||||||
|
"""
|
||||||
|
LOG.debug("Draining node.")
|
||||||
|
self._run_nodetool_command('drain')
|
||||||
|
|
||||||
|
def upgrade_sstables(self):
|
||||||
|
"""Upgrades sstables to match new datastore version.
|
||||||
|
"""
|
||||||
|
LOG.debug("Upgrading sstables.")
|
||||||
|
self._run_nodetool_command('upgradesstables')
|
||||||
|
|
||||||
def _run_nodetool_command(self, cmd, *args, **kwargs):
|
def _run_nodetool_command(self, cmd, *args, **kwargs):
|
||||||
"""Execute a nodetool command on this node.
|
"""Execute a nodetool command on this node.
|
||||||
"""
|
"""
|
||||||
|
@ -363,7 +363,8 @@ class ClusterTasks(Cluster):
|
|||||||
|
|
||||||
LOG.debug("End rolling restart for id: %s.", cluster_id)
|
LOG.debug("End rolling restart for id: %s.", cluster_id)
|
||||||
|
|
||||||
def rolling_upgrade_cluster(self, context, cluster_id, datastore_version):
|
def rolling_upgrade_cluster(self, context, cluster_id,
|
||||||
|
datastore_version, ordering_function=None):
|
||||||
LOG.debug("Begin rolling cluster upgrade for id: %s.", cluster_id)
|
LOG.debug("Begin rolling cluster upgrade for id: %s.", cluster_id)
|
||||||
|
|
||||||
def _upgrade_cluster_instance(instance):
|
def _upgrade_cluster_instance(instance):
|
||||||
@ -383,10 +384,17 @@ class ClusterTasks(Cluster):
|
|||||||
cluster_notification = context.notification
|
cluster_notification = context.notification
|
||||||
request_info = cluster_notification.serialize(context)
|
request_info = cluster_notification.serialize(context)
|
||||||
try:
|
try:
|
||||||
|
instances = []
|
||||||
for db_inst in DBInstance.find_all(cluster_id=cluster_id,
|
for db_inst in DBInstance.find_all(cluster_id=cluster_id,
|
||||||
deleted=False).all():
|
deleted=False).all():
|
||||||
instance = BuiltInstanceTasks.load(
|
instance = BuiltInstanceTasks.load(
|
||||||
context, db_inst.id)
|
context, db_inst.id)
|
||||||
|
instances.append(instance)
|
||||||
|
|
||||||
|
if ordering_function is not None:
|
||||||
|
instances.sort(key=ordering_function)
|
||||||
|
|
||||||
|
for instance in instances:
|
||||||
_upgrade_cluster_instance(instance)
|
_upgrade_cluster_instance(instance)
|
||||||
|
|
||||||
self.reset_task()
|
self.reset_task()
|
||||||
|
@ -736,6 +736,18 @@ class GuestAgentCassandraDBManagerTest(DatastoreManagerTest):
|
|||||||
self.assertIsNone(
|
self.assertIsNone(
|
||||||
self.manager.apply_overrides(Mock(), NonCallableMagicMock()))
|
self.manager.apply_overrides(Mock(), NonCallableMagicMock()))
|
||||||
|
|
||||||
|
@patch('trove.guestagent.datastore.experimental.cassandra.service.LOG')
|
||||||
|
@patch.object(cass_service.CassandraApp, '_run_nodetool_command')
|
||||||
|
def test_drain(self, command_runner_mock, _):
|
||||||
|
self.manager._app.drain()
|
||||||
|
command_runner_mock.assert_called_once_with('drain')
|
||||||
|
|
||||||
|
@patch('trove.guestagent.datastore.experimental.cassandra.service.LOG')
|
||||||
|
@patch.object(cass_service.CassandraApp, '_run_nodetool_command')
|
||||||
|
def test_upgrade_sstables(self, command_runner_mock, _):
|
||||||
|
self.manager._app.upgrade_sstables()
|
||||||
|
command_runner_mock.assert_called_once_with('upgradesstables')
|
||||||
|
|
||||||
@patch('trove.guestagent.datastore.experimental.cassandra.service.LOG')
|
@patch('trove.guestagent.datastore.experimental.cassandra.service.LOG')
|
||||||
def test_enable_root(self, _):
|
def test_enable_root(self, _):
|
||||||
with patch.object(self.manager._app, 'is_root_enabled',
|
with patch.object(self.manager._app, 'is_root_enabled',
|
||||||
|
@ -15,6 +15,7 @@
|
|||||||
|
|
||||||
import datetime
|
import datetime
|
||||||
|
|
||||||
|
from mock import MagicMock
|
||||||
from mock import Mock
|
from mock import Mock
|
||||||
from mock import patch
|
from mock import patch
|
||||||
|
|
||||||
@ -29,6 +30,7 @@ from trove.instance.models import DBInstance
|
|||||||
from trove.instance.models import Instance
|
from trove.instance.models import Instance
|
||||||
from trove.instance.models import InstanceServiceStatus
|
from trove.instance.models import InstanceServiceStatus
|
||||||
from trove.instance.models import InstanceTasks
|
from trove.instance.models import InstanceTasks
|
||||||
|
# from trove.taskmanager.models import BuiltInstanceTasks
|
||||||
from trove.taskmanager.models import ServiceStatuses
|
from trove.taskmanager.models import ServiceStatuses
|
||||||
from trove.tests.unittests import trove_testtools
|
from trove.tests.unittests import trove_testtools
|
||||||
|
|
||||||
@ -292,6 +294,78 @@ class MongoDbClusterTasksTest(trove_testtools.TestCase):
|
|||||||
self.assertEqual(ClusterTaskStatus.NONE, self.db_cluster.task_status)
|
self.assertEqual(ClusterTaskStatus.NONE, self.db_cluster.task_status)
|
||||||
mock_save.assert_called_with()
|
mock_save.assert_called_with()
|
||||||
|
|
||||||
|
def test_rolling_upgrade_cluster_without_order_specified(self):
|
||||||
|
self._assert_rolling_upgrade_cluster(None, None)
|
||||||
|
|
||||||
|
def test_rolling_upgrade_cluster_with_order_specified(self):
|
||||||
|
ordering = {
|
||||||
|
1: 1,
|
||||||
|
2: 2,
|
||||||
|
3: 3,
|
||||||
|
4: 4,
|
||||||
|
5: 5
|
||||||
|
}
|
||||||
|
|
||||||
|
def ordering_function(instance):
|
||||||
|
return ordering[instance.id]
|
||||||
|
|
||||||
|
self._assert_rolling_upgrade_cluster(ordering_function, ordering)
|
||||||
|
|
||||||
|
@patch('trove.taskmanager.models.DBaaSInstanceUpgrade')
|
||||||
|
@patch('trove.taskmanager.models.BuiltInstanceTasks')
|
||||||
|
@patch('trove.taskmanager.models.EndNotification')
|
||||||
|
@patch('trove.taskmanager.models.StartNotification')
|
||||||
|
@patch('trove.taskmanager.models.Timeout')
|
||||||
|
@patch.object(ClusterTasks, 'reset_task')
|
||||||
|
@patch.object(DBInstance, 'find_all')
|
||||||
|
def _assert_rolling_upgrade_cluster(self,
|
||||||
|
ordering_function,
|
||||||
|
ordering,
|
||||||
|
mock_find_all,
|
||||||
|
mock_reset_task,
|
||||||
|
mock_timeout,
|
||||||
|
mock_start,
|
||||||
|
mock_end,
|
||||||
|
mock_instance_task,
|
||||||
|
mock_upgrade):
|
||||||
|
class MockInstance(Mock):
|
||||||
|
upgrade_counter = 0
|
||||||
|
|
||||||
|
def upgrade(self, _):
|
||||||
|
MockInstance.upgrade_counter += 1
|
||||||
|
self.upgrade_number = MockInstance.upgrade_counter
|
||||||
|
|
||||||
|
db_instances = [Mock() for _ in range(5)]
|
||||||
|
for i in range(5):
|
||||||
|
db_instances[i].id = i + 1
|
||||||
|
|
||||||
|
mock_find_all.return_value.all.return_value = db_instances
|
||||||
|
instances = []
|
||||||
|
|
||||||
|
def load_side_effect(_, instance_id):
|
||||||
|
return_value = MockInstance()
|
||||||
|
return_value.id = instance_id
|
||||||
|
instances.append(return_value)
|
||||||
|
return return_value
|
||||||
|
|
||||||
|
mock_instance_task.load.side_effect = load_side_effect
|
||||||
|
if ordering is None:
|
||||||
|
ordering = {
|
||||||
|
1: 1,
|
||||||
|
2: 2,
|
||||||
|
3: 3,
|
||||||
|
4: 4,
|
||||||
|
5: 5
|
||||||
|
}
|
||||||
|
self.clustertasks.rolling_upgrade_cluster(MagicMock(),
|
||||||
|
Mock(),
|
||||||
|
Mock(),
|
||||||
|
ordering_function)
|
||||||
|
order_result = {inst.id: inst.upgrade_number for inst in instances}
|
||||||
|
|
||||||
|
self.assertEqual(ClusterTaskStatus.NONE, self.db_cluster.task_status)
|
||||||
|
self.assertDictEqual(ordering, order_result)
|
||||||
|
|
||||||
@patch.object(ClusterTasks, 'reset_task')
|
@patch.object(ClusterTasks, 'reset_task')
|
||||||
@patch.object(ClusterTasks, '_create_shard')
|
@patch.object(ClusterTasks, '_create_shard')
|
||||||
@patch.object(ClusterTasks, 'get_guest')
|
@patch.object(ClusterTasks, 'get_guest')
|
||||||
|
Loading…
Reference in New Issue
Block a user