Add support for cluster restart

Implement cluster rolling restart strategy.
Add support for Cassandra and PXC.

Add some missing cluster upgrade infrastructure.

Implements: blueprint cluster-restart
Co-Authored-By: Petr Malik <pmalik@tesora.com>
Co-Authored-By: Peter Stachowski <peter@tesora.com>
Change-Id: I21e654a8dd2dc6a74aa095604f78db4e96c70d64
This commit is contained in:
Petr Malik 2016-11-01 17:44:55 -04:00
parent 6a917bab58
commit 6e7fa196dc
17 changed files with 298 additions and 8 deletions

View File

@ -0,0 +1,4 @@
---
features:
- |
Add support for cluster restart.

View File

@ -21,8 +21,9 @@ from trove.cluster.tasks import ClusterTasks
from trove.common import cfg from trove.common import cfg
from trove.common import exception from trove.common import exception
from trove.common.i18n import _ from trove.common.i18n import _
from trove.common.notification import DBaaSClusterGrow, DBaaSClusterShrink from trove.common.notification import (DBaaSClusterGrow, DBaaSClusterShrink,
from trove.common.notification import DBaaSClusterResetStatus DBaaSClusterResetStatus,
DBaaSClusterRestart)
from trove.common.notification import DBaaSClusterUpgrade from trove.common.notification import DBaaSClusterUpgrade
from trove.common.notification import StartNotification from trove.common.notification import StartNotification
from trove.common import remote from trove.common import remote
@ -316,6 +317,11 @@ class Cluster(object):
with StartNotification(context, cluster_id=self.id): with StartNotification(context, cluster_id=self.id):
return self.reset_status() return self.reset_status()
elif action == 'restart':
context.notification = DBaaSClusterRestart(context, request=req)
with StartNotification(context, cluster_id=self.id):
return self.restart()
elif action == 'upgrade': elif action == 'upgrade':
context.notification = DBaaSClusterUpgrade(context, request=req) context.notification = DBaaSClusterUpgrade(context, request=req)
dv_id = param['datastore_version'] dv_id = param['datastore_version']
@ -332,8 +338,43 @@ class Cluster(object):
def shrink(self, instance_ids): def shrink(self, instance_ids):
raise exception.BadRequest(_("Action 'shrink' not supported")) raise exception.BadRequest(_("Action 'shrink' not supported"))
def rolling_restart(self):
self.validate_cluster_available()
self.db_info.update(task_status=ClusterTasks.RESTARTING_CLUSTER)
try:
cluster_id = self.db_info.id
task_api.load(self.context, self.ds_version.manager
).restart_cluster(cluster_id)
except Exception:
self.db_info.update(task_status=ClusterTasks.NONE)
raise
return self.__class__(self.context, self.db_info,
self.ds, self.ds_version)
def rolling_upgrade(self, datastore_version):
"""Upgrades a cluster to a new datastore version."""
LOG.debug("Upgrading cluster %s." % self.id)
self.validate_cluster_available()
self.db_info.update(task_status=ClusterTasks.UPGRADING_CLUSTER)
try:
cluster_id = self.db_info.id
ds_ver_id = datastore_version.id
task_api.load(self.context, self.ds_version.manager
).upgrade_cluster(cluster_id, ds_ver_id)
except Exception:
self.db_info.update(task_status=ClusterTasks.NONE)
raise
return self.__class__(self.context, self.db_info,
self.ds, self.ds_version)
def restart(self):
raise exception.BadRequest(_("Action 'restart' not supported"))
def upgrade(self, datastore_version): def upgrade(self, datastore_version):
raise exception.BadRequest(_("Action 'upgrade' not supported")) raise exception.BadRequest(_("Action 'upgrade' not supported"))
@staticmethod @staticmethod
def load_instance(context, cluster_id, instance_id): def load_instance(context, cluster_id, instance_id):

View File

@ -69,6 +69,10 @@ class ClusterTasks(object):
0x05, 'GROWING_CLUSTER', 'Increasing the size of the cluster.') 0x05, 'GROWING_CLUSTER', 'Increasing the size of the cluster.')
SHRINKING_CLUSTER = ClusterTask( SHRINKING_CLUSTER = ClusterTask(
0x06, 'SHRINKING_CLUSTER', 'Decreasing the size of the cluster.') 0x06, 'SHRINKING_CLUSTER', 'Decreasing the size of the cluster.')
UPGRADING_CLUSTER = ClusterTask(
0x07, 'UPGRADING_CLUSTER', 'Upgrading the cluster to new version.')
RESTARTING_CLUSTER = ClusterTask(
0x08, 'RESTARTING_CLUSTER', 'Restarting the cluster.')
# Dissuade further additions at run-time. # Dissuade further additions at run-time.

View File

@ -938,6 +938,26 @@ cassandra_opts = [
help='Character length of generated passwords.', help='Character length of generated passwords.',
deprecated_name='default_password_length', deprecated_name='default_password_length',
deprecated_group='DEFAULT'), deprecated_group='DEFAULT'),
cfg.BoolOpt('enable_cluster_instance_backup',
default=False,
help='Allows backup of single instance in the cluster.'),
cfg.BoolOpt('enable_saslauthd', default=False,
help='Enable the saslauth daemon.'),
cfg.StrOpt('user_controller',
default='trove.extensions.cassandra.service.'
'CassandraUserController',
help='User controller implementation.'),
cfg.StrOpt('database_controller',
default='trove.extensions.cassandra.service.'
'CassandraDatabaseController',
help='Database controller implementation.'),
cfg.StrOpt('user_access_controller',
default='trove.extensions.cassandra.service.'
'CassandraUserAccessController',
help='User access controller implementation.'),
cfg.IntOpt('node_sync_time', default=60,
help='Time (in seconds) given to a node after a state change '
'to finish rejoining the cluster.'),
] ]
# Couchbase # Couchbase

View File

@ -368,7 +368,7 @@ class DBaaSAPINotification(object):
}) })
elif 'request_id' not in kwargs: elif 'request_id' not in kwargs:
raise TroveError(_("Notification %s must include 'request'" raise TroveError(_("Notification %s must include 'request'"
" property") % self.__class__.__name__) " property") % self.__class__.__name__)
self.payload.update(kwargs) self.payload.update(kwargs)
@ -385,7 +385,7 @@ class DBaaSAPINotification(object):
'keys': list(required_keys - provided_keys)}) 'keys': list(required_keys - provided_keys)})
if 'server_type' not in self.payload: if 'server_type' not in self.payload:
raise TroveError(_("Notification %s must include a" raise TroveError(_("Notification %s must include a"
" 'server_type' for correct routing") " 'server_type' for correct routing")
% self.__class__.__name__) % self.__class__.__name__)
def _notify(self, event_qualifier, required_traits, optional_traits, def _notify(self, event_qualifier, required_traits, optional_traits,
@ -564,6 +564,15 @@ class DBaaSClusterCreate(DBaaSAPINotification):
return ['cluster_id'] return ['cluster_id']
class DBaaSClusterRestart(DBaaSAPINotification):
def event_type(self):
return 'cluster_restart'
def required_start_traits(self):
return ['cluster_id']
class DBaaSClusterUpgrade(DBaaSAPINotification): class DBaaSClusterUpgrade(DBaaSAPINotification):
@abc.abstractmethod @abc.abstractmethod

View File

@ -206,6 +206,12 @@ class CassandraCluster(models.Cluster):
return CassandraCluster(context, db_info, datastore, datastore_version) return CassandraCluster(context, db_info, datastore, datastore_version)
def restart(self):
self.rolling_restart()
def upgrade(self, datastore_version):
self.rolling_upgrade(datastore_version)
class CassandraClusterView(ClusterView): class CassandraClusterView(ClusterView):

View File

@ -341,6 +341,13 @@ class CassandraClusterTasks(task_models.ClusterTasks):
LOG.debug("End shrink_cluster for id: %s." % cluster_id) LOG.debug("End shrink_cluster for id: %s." % cluster_id)
def restart_cluster(self, context, cluster_id):
self.rolling_restart_cluster(
context, cluster_id, delay_sec=CONF.cassandra.node_sync_time)
def upgrade_cluster(self, context, cluster_id, datastore_version):
self.rolling_upgrade_cluster(context, cluster_id, datastore_version)
class CassandraTaskManagerAPI(task_api.API): class CassandraTaskManagerAPI(task_api.API):
pass pass

View File

@ -197,6 +197,12 @@ class GaleraCommonCluster(cluster_models.Cluster):
return self.__class__(self.context, self.db_info, return self.__class__(self.context, self.db_info,
self.ds, self.ds_version) self.ds, self.ds_version)
def restart(self):
self.rolling_restart()
def upgrade(self, datastore_version):
self.rolling_upgrade(datastore_version)
class GaleraCommonClusterView(ClusterView): class GaleraCommonClusterView(ClusterView):

View File

@ -325,3 +325,9 @@ class GaleraCommonClusterTasks(task_models.ClusterTasks):
timeout.cancel() timeout.cancel()
LOG.debug("End shrink_cluster for id: %s." % cluster_id) LOG.debug("End shrink_cluster for id: %s." % cluster_id)
def restart_cluster(self, context, cluster_id):
self.rolling_restart_cluster(context, cluster_id)
def upgrade_cluster(self, context, cluster_id, datastore_version):
self.rolling_upgrade_cluster(context, cluster_id, datastore_version)

View File

@ -114,6 +114,9 @@ class InstanceTasks(object):
SHRINKING_ERROR = InstanceTask(0x58, 'SHRINKING', SHRINKING_ERROR = InstanceTask(0x58, 'SHRINKING',
'Shrinking Cluster Error.', 'Shrinking Cluster Error.',
is_error=True) is_error=True)
UPGRADING_ERROR = InstanceTask(0x59, 'UPGRADING',
'Upgrading Cluster Error.',
is_error=True)
UPGRADING = InstanceTask(0x59, 'UPGRADING', 'Upgrading the instance.') UPGRADING = InstanceTask(0x59, 'UPGRADING', 'Upgrading the instance.')
# Dissuade further additions at run-time. # Dissuade further additions at run-time.

View File

@ -251,6 +251,22 @@ class API(object):
cctxt.cast(self.context, "upgrade", instance_id=instance_id, cctxt.cast(self.context, "upgrade", instance_id=instance_id,
datastore_version_id=datastore_version_id) datastore_version_id=datastore_version_id)
def restart_cluster(self, cluster_id):
LOG.debug("Making async call to restart cluster %s " % cluster_id)
version = self.API_BASE_VERSION
cctxt = self.client.prepare(version=version)
cctxt.cast(self.context, "restart_cluster", cluster_id=cluster_id)
def upgrade_cluster(self, cluster_id, datastore_version_id):
LOG.debug("Making async call to upgrade guest to datastore "
"version %s " % datastore_version_id)
version = self.API_BASE_VERSION
cctxt = self.client.prepare(version=version)
cctxt.cast(self.context, "upgrade_cluster", cluster_id=cluster_id,
datastore_version_id=datastore_version_id)
def load(context, manager=None): def load(context, manager=None):
if manager: if manager:

View File

@ -371,7 +371,7 @@ class Manager(periodic_task.PeriodicTasks):
cluster_config, volume_type, modules, locality): cluster_config, volume_type, modules, locality):
with EndNotification(context, with EndNotification(context,
instance_id=(instance_id[0] instance_id=(instance_id[0]
if type(instance_id) is list if isinstance(instance_id, list)
else instance_id)): else instance_id)):
self._create_instance(context, instance_id, name, flavor, self._create_instance(context, instance_id, name, flavor,
image_id, databases, users, image_id, databases, users,
@ -409,6 +409,15 @@ class Manager(periodic_task.PeriodicTasks):
cluster_tasks = models.load_cluster_tasks(context, cluster_id) cluster_tasks = models.load_cluster_tasks(context, cluster_id)
cluster_tasks.shrink_cluster(context, cluster_id, instance_ids) cluster_tasks.shrink_cluster(context, cluster_id, instance_ids)
def restart_cluster(self, context, cluster_id):
cluster_tasks = models.load_cluster_tasks(context, cluster_id)
cluster_tasks.restart_cluster(context, cluster_id)
def upgrade_cluster(self, context, cluster_id, datastore_version_id):
datastore_version = DatastoreVersion.load_by_uuid(datastore_version_id)
cluster_tasks = models.load_cluster_tasks(context, cluster_id)
cluster_tasks.upgrade_cluster(context, cluster_id, datastore_version)
def delete_cluster(self, context, cluster_id): def delete_cluster(self, context, cluster_id):
with EndNotification(context): with EndNotification(context):
cluster_tasks = models.load_cluster_tasks(context, cluster_id) cluster_tasks = models.load_cluster_tasks(context, cluster_id)

View File

@ -13,10 +13,12 @@
# under the License. # under the License.
import os.path import os.path
import time
import traceback import traceback
from cinderclient import exceptions as cinder_exceptions from cinderclient import exceptions as cinder_exceptions
from eventlet import greenthread from eventlet import greenthread
from eventlet.timeout import Timeout
from heatclient import exc as heat_exceptions from heatclient import exc as heat_exceptions
from novaclient import exceptions as nova_exceptions from novaclient import exceptions as nova_exceptions
from oslo_log import log as logging from oslo_log import log as logging
@ -45,6 +47,10 @@ from trove.common.i18n import _
from trove.common import instance as rd_instance from trove.common import instance as rd_instance
from trove.common.instance import ServiceStatuses from trove.common.instance import ServiceStatuses
from trove.common.notification import ( from trove.common.notification import (
DBaaSInstanceRestart,
DBaaSInstanceUpgrade,
EndNotification,
StartNotification,
TroveInstanceCreate, TroveInstanceCreate,
TroveInstanceModifyVolume, TroveInstanceModifyVolume,
TroveInstanceModifyFlavor, TroveInstanceModifyFlavor,
@ -316,6 +322,88 @@ class ClusterTasks(Cluster):
cluster.save() cluster.save()
LOG.debug("end delete_cluster for id: %s" % cluster_id) LOG.debug("end delete_cluster for id: %s" % cluster_id)
def rolling_restart_cluster(self, context, cluster_id, delay_sec=0):
LOG.debug("Begin rolling cluster restart for id: %s" % cluster_id)
def _restart_cluster_instance(instance):
LOG.debug("Restarting instance with id: %s" % instance.id)
context.notification = (
DBaaSInstanceRestart(context, **request_info))
with StartNotification(context, instance_id=instance.id):
with EndNotification(context):
instance.update_db(task_status=InstanceTasks.REBOOTING)
instance.restart()
timeout = Timeout(CONF.cluster_usage_timeout)
cluster_notification = context.notification
request_info = cluster_notification.serialize(context)
try:
node_db_inst = DBInstance.find_all(cluster_id=cluster_id).all()
for index, db_inst in enumerate(node_db_inst):
if index > 0:
LOG.debug(
"Waiting (%ds) for restarted nodes to rejoin the "
"cluster before proceeding." % delay_sec)
time.sleep(delay_sec)
instance = BuiltInstanceTasks.load(context, db_inst.id)
_restart_cluster_instance(instance)
except Timeout as t:
if t is not timeout:
raise # not my timeout
LOG.exception(_("Timeout for restarting cluster."))
raise
except Exception:
LOG.exception(_("Error restarting cluster.") % cluster_id)
raise
finally:
context.notification = cluster_notification
timeout.cancel()
self.reset_task()
LOG.debug("End rolling restart for id: %s." % cluster_id)
def rolling_upgrade_cluster(self, context, cluster_id, datastore_version):
LOG.debug("Begin rolling cluster upgrade for id: %s." % cluster_id)
def _upgrade_cluster_instance(instance):
LOG.debug("Upgrading instance with id: %s." % instance.id)
context.notification = (
DBaaSInstanceUpgrade(context, **request_info))
with StartNotification(
context, instance_id=instance.id,
datastore_version_id=datastore_version.id):
with EndNotification(context):
instance.update_db(
datastore_version_id=datastore_version.id,
task_status=InstanceTasks.UPGRADING)
instance.upgrade(datastore_version)
timeout = Timeout(CONF.cluster_usage_timeout)
cluster_notification = context.notification
request_info = cluster_notification.serialize(context)
try:
for db_inst in DBInstance.find_all(cluster_id=cluster_id).all():
instance = BuiltInstanceTasks.load(
context, db_inst.id)
_upgrade_cluster_instance(instance)
self.reset_task()
except Timeout as t:
if t is not timeout:
raise # not my timeout
LOG.exception(_("Timeout for upgrading cluster."))
self.update_statuses_on_failure(
cluster_id, status=InstanceTasks.UPGRADING_ERROR)
except Exception:
LOG.exception(_("Error upgrading cluster %s.") % cluster_id)
self.update_statuses_on_failure(
cluster_id, status=InstanceTasks.UPGRADING_ERROR)
finally:
context.notification = cluster_notification
timeout.cancel()
LOG.debug("End upgrade_cluster for id: %s." % cluster_id)
class FreshInstanceTasks(FreshInstance, NotifyMixin, ConfigurationMixin): class FreshInstanceTasks(FreshInstance, NotifyMixin, ConfigurationMixin):

View File

@ -162,6 +162,9 @@ cluster_root_groups.extend([groups.CLUSTER_ACTIONS_ROOT_ENABLE])
cluster_root_actions_groups = list(cluster_actions_groups) cluster_root_actions_groups = list(cluster_actions_groups)
cluster_root_actions_groups.extend([groups.CLUSTER_ACTIONS_ROOT_ACTIONS]) cluster_root_actions_groups.extend([groups.CLUSTER_ACTIONS_ROOT_ACTIONS])
cluster_restart_groups = list(cluster_create_groups)
cluster_restart_groups.extend([groups.CLUSTER_ACTIONS_RESTART_WAIT])
cluster_upgrade_groups = list(cluster_create_groups) cluster_upgrade_groups = list(cluster_create_groups)
cluster_upgrade_groups.extend([groups.CLUSTER_UPGRADE_WAIT]) cluster_upgrade_groups.extend([groups.CLUSTER_UPGRADE_WAIT])
@ -247,6 +250,7 @@ register(["cluster"], cluster_actions_groups)
register(["cluster_actions"], cluster_actions_groups) register(["cluster_actions"], cluster_actions_groups)
register(["cluster_create"], cluster_create_groups) register(["cluster_create"], cluster_create_groups)
register(["cluster_negative_actions"], cluster_negative_actions_groups) register(["cluster_negative_actions"], cluster_negative_actions_groups)
register(["cluster_restart"], cluster_restart_groups)
register(["cluster_root"], cluster_root_groups) register(["cluster_root"], cluster_root_groups)
register(["cluster_root_actions"], cluster_root_actions_groups) register(["cluster_root_actions"], cluster_root_actions_groups)
register(["cluster_upgrade"], cluster_upgrade_groups) register(["cluster_upgrade"], cluster_upgrade_groups)

View File

@ -61,6 +61,8 @@ CLUSTER_ACTIONS_GROW = "scenario.cluster_actions_grow_grp"
CLUSTER_ACTIONS_GROW_WAIT = "scenario.cluster_actions_grow_wait_grp" CLUSTER_ACTIONS_GROW_WAIT = "scenario.cluster_actions_grow_wait_grp"
CLUSTER_ACTIONS_SHRINK = "scenario.cluster_actions_shrink_grp" CLUSTER_ACTIONS_SHRINK = "scenario.cluster_actions_shrink_grp"
CLUSTER_ACTIONS_SHRINK_WAIT = "scenario.cluster_actions_shrink_wait_grp" CLUSTER_ACTIONS_SHRINK_WAIT = "scenario.cluster_actions_shrink_wait_grp"
CLUSTER_ACTIONS_RESTART = "scenario.cluster_actions_restart_grp"
CLUSTER_ACTIONS_RESTART_WAIT = "scenario.cluster_actions_restart_wait_grp"
# Cluster Create Group (in cluster_actions file) # Cluster Create Group (in cluster_actions file)

View File

@ -92,8 +92,44 @@ class ClusterCreateWaitGroup(TestGroup):
@test(groups=[GROUP, groups.CLUSTER_ACTIONS, @test(groups=[GROUP, groups.CLUSTER_ACTIONS,
groups.CLUSTER_ACTIONS_ROOT_ENABLE], groups.CLUSTER_ACTIONS_RESTART],
depends_on_groups=[groups.CLUSTER_CREATE_WAIT]) depends_on_groups=[groups.CLUSTER_CREATE_WAIT])
class ClusterRestartGroup(TestGroup):
def __init__(self):
super(ClusterRestartGroup, self).__init__(
ClusterRunnerFactory.instance())
@test
def cluster_restart(self):
"""Restart the cluster."""
self.test_runner.run_cluster_restart()
@test(groups=[GROUP, groups.CLUSTER_ACTIONS,
groups.CLUSTER_ACTIONS_RESTART_WAIT],
depends_on_groups=[groups.CLUSTER_ACTIONS_RESTART])
class ClusterRestartWaitGroup(TestGroup):
def __init__(self):
super(ClusterRestartWaitGroup, self).__init__(
ClusterRunnerFactory.instance())
@test
def cluster_restart_wait(self):
"""Wait for cluster restart to complete."""
self.test_runner.run_cluster_restart_wait()
@test(depends_on=[cluster_restart_wait])
def verify_initial_cluster_data(self):
"""Verify the initial data still exists after cluster restart."""
self.test_runner.run_verify_initial_cluster_data()
@test(groups=[GROUP, groups.CLUSTER_ACTIONS,
groups.CLUSTER_ACTIONS_ROOT_ENABLE],
depends_on_groups=[groups.CLUSTER_CREATE_WAIT],
runs_after_groups=[groups.CLUSTER_ACTIONS_RESTART_WAIT])
class ClusterRootEnableGroup(TestGroup): class ClusterRootEnableGroup(TestGroup):
def __init__(self): def __init__(self):
@ -308,7 +344,8 @@ class ClusterRootEnableShrinkGroup(TestGroup):
groups.CLUSTER_ACTIONS_ROOT_SHRINK, groups.CLUSTER_ACTIONS_ROOT_SHRINK,
groups.CLUSTER_ACTIONS_GROW_WAIT, groups.CLUSTER_ACTIONS_GROW_WAIT,
groups.CLUSTER_ACTIONS_SHRINK_WAIT, groups.CLUSTER_ACTIONS_SHRINK_WAIT,
groups.CLUSTER_UPGRADE_WAIT]) groups.CLUSTER_UPGRADE_WAIT,
groups.CLUSTER_ACTIONS_RESTART_WAIT])
class ClusterDeleteGroup(TestGroup): class ClusterDeleteGroup(TestGroup):
def __init__(self): def __init__(self):

View File

@ -160,6 +160,34 @@ class ClusterRunner(TestRunner):
self.assert_cluster_show( self.assert_cluster_show(
self.cluster_id, expected_task_name, expected_http_code) self.cluster_id, expected_task_name, expected_http_code)
def run_cluster_restart(self, expected_http_code=202,
expected_task_name='RESTARTING_CLUSTER'):
self.assert_cluster_restart(
self.cluster_id, expected_task_name, expected_http_code)
def assert_cluster_restart(
self, cluster_id, expected_task_name, expected_http_code):
client = self.auth_client
client.clusters.restart(cluster_id)
self.assert_client_code(client, expected_http_code)
self._assert_cluster_response(
client, cluster_id, expected_task_name)
def run_cluster_restart_wait(self):
self.assert_cluster_restart_wait(self.cluster_id)
def assert_cluster_restart_wait(self, cluster_id):
client = self.auth_client
cluster_instances = self._get_cluster_instances(
client, cluster_id)
self.assert_all_instance_states(
cluster_instances, ['REBOOT', 'ACTIVE'])
self._assert_cluster_states(
client, cluster_id, ['NONE'])
self._assert_cluster_response(
client, cluster_id, 'NONE')
def assert_cluster_show(self, cluster_id, expected_task_name, def assert_cluster_show(self, cluster_id, expected_task_name,
expected_http_code): expected_http_code):
self._assert_cluster_response(self.auth_client, self._assert_cluster_response(self.auth_client,