Add backup & restore for Cassandra

Implement backup and restore functionality for Cassandra datastore.

We implement full backup strategy using the Nodetool
(http://goo.gl/QtXVsM) utility.

Snapshots:

Nodetool can take a snapshot of one or more keyspace(s).
Snapshot(s) will be stored in the data directory tree:
'<data dir>/<keyspace>/<table>/snapshots/<snapshot name>'

A snapshot can be restored by moving all *.db files from a snapshot
directory to the respective keyspace overwriting any existing files.

NOTE: It is recommended to include the system keyspace in the backup.
      Keeping the system keyspace will reduce the restore time
      by avoiding need to rebuilding indexes.

The Backup Procedure:

1. Clear existing snapshots.

2. Take a snapshot of all keyspaces.

3. Collect all *.db files from the snapshot directories package them
into a single TAR archive.

Transform the paths such that the backup can be restored simply by
extracting the archive right to an existing data directory
(i.e. place the root into the <data dir> and
remove the 'snapshots/<snapshot name>' portion of the path).
The data directory itself is not included in the backup archive
(i.e. the archive is rooted inside the data directory).
This is to make sure we can always restore an old backup
even if the standard guest agent data directory changes.

Attempt to preserve access modifiers on the archived files.

Assert the backup is not empty as there should always be
at least the system keyspace. Fail if there is nothing to backup.

4. Compress and/or encrypt the archive as required.

5. This archive is streamed to the storage location.

The Restore Procedure:

1. Create a new data directory as it does not exist.

2. Unpack the backup to that directory.

3. Update ownership of the restored files to the Cassandra user.

Notes on 'cluster_name' property:

Cassandra has a concept of clusters. Clusters are composed of
nodes - instances. All nodes belonging to one cluster must all have the
same 'cluster_name' property. This prevents nodes from different logical
clusters from accidentally talking to each other.

The cluster name can be changed in the configuration file.
It is also stored in the system keyspace.
When the Cassandra service boots up it verifies that the cluster name
stored in the database matches the name in the configuration file and
fails if not. This is to prevent the operator from accidentally
launching a node with data from another cluster.
The operator has to update the configuration file.

Similarly, when a backup is restored it carries the original cluster
name with it. We have to update the configuration file to use the old
name.
When a node gets restored it will still belong to the original cluster.

Notes on superuser password reset:

Database is no longer wide open and requires password authentication.
The 'root' password stored in the system keyspace
needs to be reset before we can start up with restored data.

A general password reset procedure is:
- disable user authentication and remote access
- restart the service
- update the password in the 'system_auth.credentials' table
- re-enable authentication and make the host reachable
- restart the service

Note: The superuser-password-reset and related methods that
      potentially expose the database contents are intentionally
      decorated with '_' and '__' to discourage a caller from
      using them unless absolutely necessary.

Additional changes:

- Adds backup/restore namespaces to the sample config
  file 'trove-guestagent.conf.sample'.
  We include the other datastores too
  for the sake of consistency.
  (Auston McReynolds, Jul 6, 2014)

Implements: blueprint cassandra-backup-restore
Co-Authored-By: Denis Makogon <dmakogon@mirantis.com>
Change-Id: I3671a737d3e71305982d8f4965215a73e785ea2d
This commit is contained in:
Petr Malik 2015-03-10 20:49:30 -04:00
parent 21e0a5ca4f
commit e722342ce7
11 changed files with 879 additions and 43 deletions

View File

@ -120,7 +120,6 @@ rabbit_password=f7999d1955c5014aa32c
# backup_strategy = InnoBackupEx
# backup_namespace = trove.guestagent.strategies.backup.mysql_impl
# restore_namespace = trove.guestagent.strategies.restore.mysql_impl
# Default configuration for mysql replication
# replication_strategy = MysqlBinlogReplication
# replication_namespace = trove.guestagent.strategies.replication.mysql_binlog
@ -144,3 +143,15 @@ rabbit_password=f7999d1955c5014aa32c
# backup_strategy = RedisBackup
# backup_namespace = trove.guestagent.strategies.backup.experimental.redis_impl
# restore_namespace = trove.guestagent.strategies.restore.experimental.redis_impl
[percona]
backup_namespace = trove.guestagent.strategies.backup.mysql_impl
restore_namespace = trove.guestagent.strategies.restore.mysql_impl
[couchbase]
backup_namespace = trove.guestagent.strategies.backup.experimental.couchbase_impl
restore_namespace = trove.guestagent.strategies.restore.experimental.couchbase_impl
[cassandra]
backup_namespace = trove.guestagent.strategies.backup.experimental.cassandra_impl
restore_namespace = trove.guestagent.strategies.restore.experimental.cassandra_impl

View File

@ -784,16 +784,16 @@ cassandra_opts = [
help='List of UDP ports and/or port ranges to open '
'in the security group (only applicable '
'if trove_security_groups_support is True).'),
cfg.StrOpt('backup_strategy', default=None,
cfg.DictOpt('backup_incremental_strategy', default={},
help='Incremental strategy based on the default backup '
'strategy. For strategies that do not implement incremental '
'backups, the runner performs full backup instead.',
deprecated_name='backup_incremental_strategy',
deprecated_group='DEFAULT'),
cfg.StrOpt('backup_strategy', default="NodetoolSnapshot",
help='Default strategy to perform backups.',
deprecated_name='backup_strategy',
deprecated_group='DEFAULT'),
cfg.DictOpt('backup_incremental_strategy', default={},
help='Incremental Backup Runner based on the default '
'strategy. For strategies that do not implement an '
'incremental, the runner will use the default full backup.',
deprecated_name='backup_incremental_strategy',
deprecated_group='DEFAULT'),
cfg.StrOpt('replication_strategy', default=None,
help='Default strategy for replication.'),
cfg.StrOpt('mount_point', default='/var/lib/cassandra',
@ -803,11 +803,15 @@ cassandra_opts = [
help='Whether to provision a Cinder volume for datadir.'),
cfg.StrOpt('device_path', default='/dev/vdb',
help='Device path for volume if volume support is enabled.'),
cfg.StrOpt('backup_namespace', default=None,
cfg.StrOpt('backup_namespace',
default="trove.guestagent.strategies.backup.experimental."
"cassandra_impl",
help='Namespace to load backup strategies from.',
deprecated_name='backup_namespace',
deprecated_group='DEFAULT'),
cfg.StrOpt('restore_namespace', default=None,
cfg.StrOpt('restore_namespace',
default="trove.guestagent.strategies.restore.experimental."
"cassandra_impl",
help='Namespace to load restore strategies from.',
deprecated_name='restore_namespace',
deprecated_group='DEFAULT'),

View File

@ -18,6 +18,10 @@ import os
from oslo_log import log as logging
from trove.common import cfg
from trove.common.i18n import _
from trove.common import instance as trove_instance
from trove.guestagent import backup
from trove.guestagent.datastore.experimental.cassandra import service
from trove.guestagent.datastore.experimental.cassandra.service import (
CassandraAdmin
@ -27,6 +31,7 @@ from trove.guestagent import volume
LOG = logging.getLogger(__name__)
CONF = cfg.CONF
class Manager(manager.Manager):
@ -72,14 +77,32 @@ class Manager(manager.Manager):
self.app.install_if_needed(packages)
self.app.init_storage_structure(mount_point)
if config_contents or device_path:
# Stop the db while we configure
# FIXME(amrith) Once the cassandra bug
if config_contents or device_path or backup_info:
# FIXME(pmalik) Once the cassandra bug
# https://issues.apache.org/jira/browse/CASSANDRA-2356
# is fixed, this code may have to be revisited.
LOG.debug("Stopping database prior to initial configuration.")
self.app.stop_db()
#
# Cassandra generates system keyspaces on the first start.
# The stored properties include the 'cluster_name', which once
# saved cannot be easily changed without removing the system
# tables. It is crucial that the service does not boot up in
# the middle of the configuration procedure.
# We wait here for the service to come up, stop it properly and
# remove the generated keyspaces before proceeding with
# configuration. If it does not start up within the time limit
# we assume it is not going to and proceed with configuration
# right away.
LOG.debug("Waiting for database first boot.")
if (self.app.status.wait_for_real_status_to_change_to(
trove_instance.ServiceStatuses.RUNNING,
CONF.state_change_wait_time,
False)):
LOG.debug("Stopping database prior to initial configuration.")
self.app.stop_db()
self.app._remove_system_tables()
LOG.debug("Starting initial configuration.")
if config_contents:
LOG.debug("Applying configuration.")
self.app.configuration_manager.save_configuration(
@ -100,6 +123,9 @@ class Manager(manager.Manager):
LOG.debug("Mounting new volume.")
device.mount(mount_point)
if backup_info:
self._perform_restore(backup_info, context, mount_point)
LOG.debug("Starting database with configuration changes.")
self.app.start_db(update_db=False)
@ -149,6 +175,30 @@ class Manager(manager.Manager):
include_marker=False):
return self.admin.list_users(context, limit, marker, include_marker)
def _perform_restore(self, backup_info, context, restore_location):
LOG.info(_("Restoring database from backup %s.") % backup_info['id'])
try:
backup.restore(context, backup_info, restore_location)
self.app._apply_post_restore_updates(backup_info)
except Exception as e:
LOG.error(e)
LOG.error(_("Error performing restore from backup %s.") %
backup_info['id'])
self.app.status.set_status(trove_instance.ServiceStatuses.FAILED)
raise
LOG.info(_("Restored database successfully."))
def create_backup(self, context, backup_info):
"""
Entry point for initiating a backup for this instance.
The call currently blocks guestagent until the backup is finished.
:param backup_info: a dictionary containing the db instance id of the
backup task, location, type, and other data.
"""
backup.backup(context, backup_info)
def update_overrides(self, context, overrides, remove=False):
LOG.debug("Updating overrides.")
if remove:

View File

@ -62,7 +62,6 @@ class CassandraApp(object):
CASSANDRA_KILL_CMD = "sudo killall java || true"
def __init__(self):
"""By default login with root no password for initial setup."""
self.state_change_wait_time = CONF.state_change_wait_time
self.status = CassandraAppStatus(self.get_current_superuser())
@ -133,10 +132,10 @@ class CassandraApp(object):
except exception.ProcessExecutionError:
LOG.exception(_("Error while initiating storage structure."))
def start_db(self, update_db=False):
def start_db(self, update_db=False, enable_on_boot=True):
self.status.start_db_service(
self.service_candidates, self.state_change_wait_time,
enable_on_boot=True, update_db=update_db)
enable_on_boot=enable_on_boot, update_db=update_db)
def stop_db(self, update_db=False, do_not_start_on_reboot=False):
self.status.stop_db_service(
@ -153,6 +152,102 @@ class CassandraApp(object):
packager.pkg_install(packages, None, 10000)
LOG.debug("Finished installing Cassandra server")
def _remove_system_tables(self):
"""
Clean up the system keyspace.
System tables are initialized on the first boot.
They store certain properties, such as 'cluster_name',
that cannot be easily changed once afterwards.
The system keyspace needs to be cleaned up first. The
tables will be regenerated on the next startup.
Make sure to also cleanup the commitlog and caches to avoid
startup errors due to inconsistencies.
The service should not be running at this point.
"""
if self.status.is_running:
raise RuntimeError(_("Cannot remove system tables. "
"The service is still running."))
LOG.info(_('Removing existing system tables.'))
system_keyspace_dir = guestagent_utils.build_file_path(
self.cassandra_data_dir, 'system')
commitlog_file = guestagent_utils.build_file_path(
self.cassandra_working_dir, 'commitlog')
chaches_dir = guestagent_utils.build_file_path(
self.cassandra_working_dir, 'saved_caches')
operating_system.remove(system_keyspace_dir,
force=True, recursive=True, as_root=True)
operating_system.remove(commitlog_file,
force=True, recursive=True, as_root=True)
operating_system.remove(chaches_dir,
force=True, recursive=True, as_root=True)
operating_system.create_directory(
system_keyspace_dir,
user=self.cassandra_owner, group=self.cassandra_owner,
force=True, as_root=True)
operating_system.create_directory(
commitlog_file,
user=self.cassandra_owner, group=self.cassandra_owner,
force=True, as_root=True)
operating_system.create_directory(
chaches_dir,
user=self.cassandra_owner, group=self.cassandra_owner,
force=True, as_root=True)
def _apply_post_restore_updates(self, backup_info):
"""The service should not be running at this point.
The restored database files carry some properties over from the
original instance that need to be updated with appropriate
values for the new instance.
These include:
- Reset the 'cluster_name' property to match the new unique
ID of this instance.
This is to ensure that the restored instance is a part of a new
single-node cluster rather than forming a one with the
original node.
- Reset the administrator's password.
The original password from the parent instance may be
compromised or long lost.
A general procedure is:
- update the configuration property with the current value
so that the service can start up
- reset the superuser password
- restart the service
- change the cluster name
- restart the service
:seealso: _reset_admin_password
:seealso: change_cluster_name
"""
if self.status.is_running:
raise RuntimeError(_("Cannot reset the cluster name. "
"The service is still running."))
LOG.debug("Applying post-restore updates to the database.")
try:
# Change the 'cluster_name' property to the current in-database
# value so that the database can start up.
self._update_cluster_name_property(backup_info['instance_id'])
# Reset the superuser password so that we can log-in.
self._reset_admin_password()
# Start the database and update the 'cluster_name' to the
# new value.
self.start_db(update_db=False)
self.change_cluster_name(CONF.guest_id)
finally:
self.stop_db() # Always restore the initial state of the service.
def secure(self, update_user=None):
"""Configure the Trove administrative user.
Update an existing user if given.
@ -184,6 +279,98 @@ class CassandraApp(object):
return os_admin
def _reset_admin_password(self):
"""
Reset the password of the Trove's administrative superuser.
The service should not be running at this point.
A general password reset procedure is:
- disable user authentication and remote access
- restart the service
- update the password in the 'system_auth.credentials' table
- re-enable authentication and make the host reachable
- restart the service
"""
if self.status.is_running:
raise RuntimeError(_("Cannot reset the administrative password. "
"The service is still running."))
try:
# Disable automatic startup in case the node goes down before
# we have the superuser secured.
operating_system.disable_service_on_boot(self.service_candidates)
self.__disable_remote_access()
self.__disable_authentication()
# We now start up the service and immediately re-enable
# authentication in the configuration file (takes effect after
# restart).
# Then we reset the superuser password to its default value
# and restart the service to get user functions back.
self.start_db(update_db=False, enable_on_boot=False)
self.__enable_authentication()
os_admin = self.__reset_user_password_to_default(self._ADMIN_USER)
self.status = CassandraAppStatus(os_admin)
self.restart()
# Now change the administrative password to a new secret value.
self.secure(update_user=os_admin)
finally:
self.stop_db() # Always restore the initial state of the service.
# At this point, we should have a secured database with new Trove-only
# superuser password.
# Proceed to re-enable remote access and automatic startup.
self.__enable_remote_access()
operating_system.enable_service_on_boot(self.service_candidates)
def __reset_user_password_to_default(self, username):
LOG.debug("Resetting the password of user '%s' to '%s'."
% (username, self.default_superuser_password))
user = models.CassandraUser(username, self.default_superuser_password)
with CassandraLocalhostConnection(user) as client:
client.execute(
"UPDATE system_auth.credentials SET salted_hash=%s "
"WHERE username='{}';", (user.name,),
(self.default_superuser_pwd_hash,))
return user
def change_cluster_name(self, cluster_name):
"""Change the 'cluster_name' property of an exesting running instance.
Cluster name is stored in the database and is required to match the
configuration value. Cassandra fails to start otherwise.
"""
if not self.status.is_running:
raise RuntimeError(_("Cannot change the cluster name. "
"The service is not running."))
LOG.debug("Changing the cluster name to '%s'." % cluster_name)
# Update the in-database value.
self.__reset_cluster_name(cluster_name)
# Update the configuration property.
self._update_cluster_name_property(cluster_name)
self.restart()
def __reset_cluster_name(self, cluster_name):
# Reset the in-database value stored locally on this node.
current_superuser = self.get_current_superuser()
with CassandraLocalhostConnection(current_superuser) as client:
client.execute(
"UPDATE system.local SET cluster_name = '{}' "
"WHERE key='local';", (cluster_name,))
# Newer version of Cassandra require a flush to ensure the changes
# to the local system keyspace persist.
self.flush_tables('system', 'local')
def __create_cqlsh_config(self, sections):
config_path = self._get_cqlsh_conf_path()
config_dir = os.path.dirname(config_path)
@ -224,15 +411,25 @@ class CassandraApp(object):
config[self._CONF_AUTH_SEC][self._CONF_PWD_KEY]
)
def apply_initial_guestagent_configuration(self):
def apply_initial_guestagent_configuration(self, cluster_name=None):
"""Update guestagent-controlled configuration properties.
These changes to the default template are necessary in order to make
the database service bootable and accessible in the guestagent context.
:param cluster_name: The 'cluster_name' configuration property.
Use the unique guest id by default.
:type cluster_name: string
"""
self.configuration_manager.apply_system_override(
{'data_file_directories': [self.cassandra_data_dir]})
self._make_host_reachable()
self._update_cluster_name_property(cluster_name or CONF.guest_id)
def _make_host_reachable(self):
"""
Some of these settings may be overriden by user defined
configuration groups.
cluster_name
- Use the unique guest id by default.
- Prevents nodes from one logical cluster from talking
to another. All nodes in a cluster must have the same value.
authenticator and authorizer
- Necessary to enable users and permissions.
rpc_address - Enable remote connections on all interfaces.
@ -244,10 +441,11 @@ class CassandraApp(object):
other nodes. Can never be 0.0.0.0.
seed_provider - A list of discovery contact points.
"""
self.__enable_authentication()
self.__enable_remote_access()
def __enable_remote_access(self):
updates = {
'cluster_name': CONF.guest_id,
'authenticator': 'org.apache.cassandra.auth.PasswordAuthenticator',
'authorizer': 'org.apache.cassandra.auth.CassandraAuthorizer',
'rpc_address': "0.0.0.0",
'broadcast_rpc_address': netutils.get_my_ipv4(),
'listen_address': netutils.get_my_ipv4(),
@ -258,6 +456,41 @@ class CassandraApp(object):
self.configuration_manager.apply_system_override(updates)
def __disable_remote_access(self):
updates = {
'rpc_address': "127.0.0.1",
'listen_address': '127.0.0.1',
'seed_provider': {'parameters':
[{'seeds': '127.0.0.1'}]
}
}
self.configuration_manager.apply_system_override(updates)
def __enable_authentication(self):
updates = {
'authenticator': 'org.apache.cassandra.auth.PasswordAuthenticator',
'authorizer': 'org.apache.cassandra.auth.CassandraAuthorizer'
}
self.configuration_manager.apply_system_override(updates)
def __disable_authentication(self):
updates = {
'authenticator': 'org.apache.cassandra.auth.AllowAllAuthenticator',
'authorizer': 'org.apache.cassandra.auth.AllowAllAuthorizer'
}
self.configuration_manager.apply_system_override(updates)
def _update_cluster_name_property(self, name):
"""This 'cluster_name' property prevents nodes from one
logical cluster from talking to another.
All nodes in a cluster must have the same value.
"""
self.configuration_manager.apply_system_override({'cluster_name':
name})
def update_overrides(self, context, overrides, remove=False):
if overrides:
self.configuration_manager.apply_user_override(overrides)
@ -284,6 +517,23 @@ class CassandraApp(object):
def _get_cqlsh_conf_path(self):
return os.path.expanduser(self.cqlsh_conf_path)
def flush_tables(self, keyspace, *tables):
"""Flushes one or more tables from the memtable.
"""
LOG.debug("Flushing tables.")
# nodetool -h <HOST> -p <PORT> -u <USER> -pw <PASSWORD> flush --
# <keyspace> ( <table> ... )
self._run_nodetool_command('flush', keyspace, *tables)
def _run_nodetool_command(self, cmd, *args, **kwargs):
"""Execute a nodetool command on this node.
"""
cassandra = self.get_current_superuser()
return utils.execute('nodetool',
'-h', 'localhost',
'-u', cassandra.name,
'-pw', cassandra.password, cmd, *args, **kwargs)
class CassandraAppStatus(service.BaseDbStatus):
@ -296,9 +546,6 @@ class CassandraAppStatus(service.BaseDbStatus):
super(CassandraAppStatus, self).__init__()
self.__user = superuser
def set_superuser(self, user):
self.__user = user
def _get_actual_db_status(self):
try:
with CassandraLocalhostConnection(self.__user):

View File

@ -0,0 +1,117 @@
# Copyright 2014 Mirantis Inc.
# All Rights Reserved.
# Copyright 2015 Tesora Inc.
# All Rights Reserved.s
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_log import log as logging
from trove.common import exception
from trove.common.i18n import _
from trove.common import utils
from trove.guestagent.common import operating_system
from trove.guestagent.datastore.experimental.cassandra import service
from trove.guestagent.strategies.backup import base
LOG = logging.getLogger(__name__)
class NodetoolSnapshot(base.BackupRunner):
"""Implementation of backup using the Nodetool (http://goo.gl/QtXVsM)
utility.
"""
# It is recommended to include the system keyspace in the backup.
# Keeping the system keyspace will reduce the restore time
# by avoiding need to rebuilding indexes.
__strategy_name__ = 'nodetoolsnapshot'
_SNAPSHOT_EXTENSION = 'db'
def __init__(self, filename, **kwargs):
self._app = service.CassandraApp()
super(NodetoolSnapshot, self).__init__(filename, **kwargs)
def _run_pre_backup(self):
"""Take snapshot(s) for all keyspaces.
Remove existing ones first if any.
Snapshot(s) will be stored in the data directory tree:
<data dir>/<keyspace>/<table>/snapshots/<snapshot name>
"""
self._remove_snapshot(self.filename)
self._snapshot_all_keyspaces(self.filename)
# Commonly 'self.command' gets resolved in the base constructor,
# but we can build the full command only after having taken the
# keyspace snapshot(s).
self.command = self._backup_cmd + self.command
def _run_post_backup(self):
"""Remove the created snapshot(s).
"""
self._remove_snapshot(self.filename)
def _remove_snapshot(self, snapshot_name):
LOG.debug('Clearing snapshot(s) for all keyspaces with snapshot name '
'"%s".' % snapshot_name)
utils.execute('nodetool', 'clearsnapshot', '-t %s' % snapshot_name)
def _snapshot_all_keyspaces(self, snapshot_name):
LOG.debug('Creating snapshot(s) for all keyspaces with snapshot name '
'"%s".' % snapshot_name)
utils.execute('nodetool', 'snapshot', '-t %s' % snapshot_name)
@property
def cmd(self):
return self.zip_cmd + self.encrypt_cmd
@property
def _backup_cmd(self):
"""Command to collect and package keyspace snapshot(s).
"""
return self._build_snapshot_package_cmd(self._app.cassandra_data_dir,
self.filename)
def _build_snapshot_package_cmd(self, data_dir, snapshot_name):
"""Collect all files for a given snapshot and build a package
command for them.
Transform the paths such that the backup can be restored simply by
extracting the archive right to an existing data directory
(i.e. place the root into the <data dir> and
remove the 'snapshots/<snapshot name>' portion of the path).
Attempt to preserve access modifiers on the archived files.
Assert the backup is not empty as there should always be
at least the system keyspace. Fail if there is nothing to backup.
"""
LOG.debug('Searching for all snapshot(s) with name "%s".'
% snapshot_name)
snapshot_files = operating_system.list_files_in_directory(
data_dir, recursive=True, include_dirs=False,
pattern='.*/snapshots/%s/.*\.%s' % (snapshot_name,
self._SNAPSHOT_EXTENSION),
as_root=True)
num_snapshot_files = len(snapshot_files)
LOG.debug('Found %d snapshot (*.%s) files.'
% (num_snapshot_files, self._SNAPSHOT_EXTENSION))
if num_snapshot_files > 0:
return ('sudo tar '
'--transform="s#snapshots/%s/##" -cpPf - -C "%s" "%s"'
% (snapshot_name, data_dir, '" "'.join(snapshot_files)))
# There should always be at least the system keyspace snapshot.
raise exception.BackupCreationError(_("No data found."))

View File

@ -0,0 +1,69 @@
# Copyright 2014 Mirantis Inc.
# All Rights Reserved.
# Copyright 2015 Tesora Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_log import log as logging
from trove.guestagent.common import operating_system
from trove.guestagent.datastore.experimental.cassandra import service
from trove.guestagent.strategies.restore import base
LOG = logging.getLogger(__name__)
class NodetoolSnapshot(base.RestoreRunner):
"""Implementation of restore using the Nodetool (http://goo.gl/QtXVsM)
utility.
"""
__strategy_name__ = 'nodetoolsnapshot'
def __init__(self, storage, **kwargs):
self._app = service.CassandraApp()
kwargs.update({'restore_location': self._app.cassandra_data_dir})
super(NodetoolSnapshot, self).__init__(storage, **kwargs)
def pre_restore(self):
"""Prepare the data directory for restored files.
The directory itself is not included in the backup archive
(i.e. the archive is rooted inside the data directory).
This is to make sure we can always restore an old backup
even if the standard guest agent data directory changes.
"""
LOG.debug('Initializing a data directory.')
operating_system.create_directory(
self.restore_location,
user=self._app.cassandra_owner, group=self._app.cassandra_owner,
force=True, as_root=True)
def post_restore(self):
"""Updated ownership on the restored files.
"""
LOG.debug('Updating ownership of the restored files.')
operating_system.chown(
self.restore_location,
self._app.cassandra_owner, self._app.cassandra_owner,
recursive=True, force=True, as_root=True)
@property
def base_restore_cmd(self):
"""Command to extract a backup archive into a given location.
Attempt to preserve access modifiers on the archived files.
"""
return 'sudo tar -xpPf - -C "%(restore_location)s"'

View File

@ -399,6 +399,7 @@ class FreshInstanceTasks(FreshInstance, NotifyMixin, ConfigurationMixin):
if backup_id is not None:
backup = bkup_models.Backup.get_by_id(self.context, backup_id)
backup_info = {'id': backup_id,
'instance_id': backup.instance_id,
'location': backup.location,
'type': backup.backup_type,
'checksum': backup.checksum,

View File

@ -13,13 +13,123 @@
# License for the specific language governing permissions and limitations
# under the License.
from cassandra.auth import PlainTextAuthProvider
from cassandra.cluster import Cluster
from trove.tests.scenario.helpers.test_helper import TestHelper
from trove.tests.scenario.runners.test_runners import TestRunner
class CassandrabHelper(TestHelper):
class CassandraClient(object):
# Cassandra 2.1 only supports protocol versions 3 and lower.
NATIVE_PROTOCOL_VERSION = 3
def __init__(self, contact_points, user, password, keyspace):
super(CassandraClient, self).__init__()
self._cluster = None
self._session = None
self._cluster = Cluster(
contact_points=contact_points,
auth_provider=PlainTextAuthProvider(user, password),
protocol_version=self.NATIVE_PROTOCOL_VERSION)
self._session = self._connect(keyspace)
def _connect(self, keyspace):
if not self._cluster.is_shutdown:
return self._cluster.connect(keyspace)
else:
raise Exception("Cannot perform this operation on a terminated "
"cluster.")
@property
def session(self):
return self._session
def __del__(self):
if self._cluster is not None:
self._cluster.shutdown()
if self._session is not None:
self._session.shutdown()
class CassandraHelper(TestHelper):
DATA_COLUMN_NAME = 'value'
def __init__(self, expected_override_name):
super(CassandrabHelper, self).__init__(expected_override_name)
super(CassandraHelper, self).__init__(expected_override_name)
self._data_cache = dict()
def create_client(self, host, *args, **kwargs):
user = self.get_helper_credentials()
return CassandraClient(
[host], user['name'], user['password'], user['database'])
def add_actual_data(self, data_label, data_start, data_size, host,
*args, **kwargs):
client = self.get_client(host, *args, **kwargs)
self._create_data_table(client, data_label)
stmt = client.session.prepare("INSERT INTO %s (%s) VALUES (?)"
% (data_label, self.DATA_COLUMN_NAME))
count = self._count_data_rows(client, data_label)
if count == 0:
for value in self._get_dataset(data_size):
client.session.execute(stmt, [value])
def _create_data_table(self, client, table_name):
client.session.execute('CREATE TABLE IF NOT EXISTS %s '
'(%s INT PRIMARY KEY)'
% (table_name, self.DATA_COLUMN_NAME))
def _count_data_rows(self, client, table_name):
rows = client.session.execute('SELECT COUNT(*) FROM %s' % table_name)
if rows:
return rows[0][0]
return 0
def _get_dataset(self, data_size):
cache_key = str(data_size)
if cache_key in self._data_cache:
return self._data_cache.get(cache_key)
data = self._generate_dataset(data_size)
self._data_cache[cache_key] = data
return data
def _generate_dataset(self, data_size):
return range(1, data_size + 1)
def remove_actual_data(self, data_label, data_start, data_size, host,
*args, **kwargs):
client = self.get_client(host, *args, **kwargs)
self._drop_table(client, data_label)
def _drop_table(self, client, table_name):
client.session.execute('DROP TABLE %s' % table_name)
def verify_actual_data(self, data_label, data_start, data_size, host,
*args, **kwargs):
expected_data = self._get_dataset(data_size)
client = self.get_client(host, *args, **kwargs)
actual_data = self._select_data_rows(client, data_label)
TestRunner.assert_equal(len(expected_data), len(actual_data),
"Unexpected number of result rows.")
for expected_row in expected_data:
TestRunner.assert_true(expected_row in actual_data,
"Row not found in the result set: %s"
% expected_row)
def _select_data_rows(self, client, table_name):
rows = client.session.execute('SELECT %s FROM %s'
% (self.DATA_COLUMN_NAME, table_name))
return [value[0] for value in rows]
def get_helper_credentials(self):
return {'name': 'lite', 'password': 'litepass', 'database': 'firstdb'}
def get_valid_database_definitions(self):
return [{"name": 'db1'}, {"name": 'db2'}, {"name": 'db3'}]

View File

@ -200,7 +200,7 @@ class BackupAgentTest(trove_testtools.TestCase):
def test_backup_impl_MySQLDump(self):
"""This test is for
guestagent/strategies/backup/impl
guestagent/strategies/backup/mysql_impl
"""
mysql_dump = mysql_impl.MySQLDump(
'abc', extra_opts='')
@ -223,7 +223,7 @@ class BackupAgentTest(trove_testtools.TestCase):
MySqlApp, 'get_data_dir', return_value='/var/lib/mysql/data')
def test_backup_impl_InnoBackupEx(self, mock_datadir):
"""This test is for
guestagent/strategies/backup/impl
guestagent/strategies/backup/mysql_impl
"""
inno_backup_ex = mysql_impl.InnoBackupEx('innobackupex', extra_opts='')
self.assertIsNotNone(inno_backup_ex.cmd)

View File

@ -12,13 +12,16 @@
# License for the specific language governing permissions and limitations
# under the License.
import mock
from mock import ANY, DEFAULT, Mock, patch
from mock import ANY, DEFAULT, Mock, patch, PropertyMock
from testtools.testcase import ExpectedException
from trove.common import exception
from trove.common import utils
from trove.guestagent.common import configuration
from trove.guestagent.common.configuration import ImportOverrideStrategy
from trove.guestagent.common import operating_system
from trove.guestagent.datastore.experimental.cassandra import (
service as cass_service
)
from trove.guestagent.strategies.backup import base as backupBase
from trove.guestagent.strategies.backup.mysql_impl import MySqlApp
from trove.guestagent.strategies.restore import base as restoreBase
@ -49,6 +52,10 @@ BACKUP_REDIS_CLS = ("trove.guestagent.strategies.backup."
"experimental.redis_impl.RedisBackup")
RESTORE_REDIS_CLS = ("trove.guestagent.strategies.restore."
"experimental.redis_impl.RedisBackup")
BACKUP_NODETOOLSNAPSHOT_CLS = ("trove.guestagent.strategies.backup."
"experimental.cassandra_impl.NodetoolSnapshot")
RESTORE_NODETOOLSNAPSHOT_CLS = ("trove.guestagent.strategies.restore."
"experimental.cassandra_impl.NodetoolSnapshot")
PIPE = " | "
ZIP = "gzip"
@ -412,6 +419,113 @@ class GuestAgentBackupTest(trove_testtools.TestCase):
DECRYPT + PIPE + UNZIP + PIPE + REDISBACKUP_RESTORE)
class CassandraBackupTest(trove_testtools.TestCase):
_BASE_BACKUP_CMD = ('sudo tar --transform="s#snapshots/%s/##" -cpPf - '
'-C "%s" "%s"')
_BASE_RESTORE_CMD = 'sudo tar -xpPf - -C "%(restore_location)s"'
_DATA_DIR = 'data_dir'
_SNAPSHOT_NAME = 'snapshot_name'
_SNAPSHOT_FILES = {'foo.db', 'bar.db'}
_RESTORE_LOCATION = {'restore_location': '/var/lib/cassandra'}
def setUp(self):
super(CassandraBackupTest, self).setUp()
self.app_status_patcher = patch(
'trove.guestagent.datastore.experimental.cassandra.service.'
'CassandraAppStatus')
self.addCleanup(self.app_status_patcher.stop)
self.app_status_patcher.start()
self.get_data_dirs_patcher = patch.object(
cass_service.CassandraApp, 'cassandra_data_dir',
new_callable=PropertyMock)
self.addCleanup(self.get_data_dirs_patcher.stop)
data_dir_mock = self.get_data_dirs_patcher.start()
data_dir_mock.return_value = self._DATA_DIR
self.os_list_patcher = patch.object(
operating_system, 'list_files_in_directory',
return_value=self._SNAPSHOT_FILES)
self.addCleanup(self.os_list_patcher.stop)
self.os_list_patcher.start()
def tearDown(self):
super(CassandraBackupTest, self).tearDown()
def test_backup_encrypted_zipped_nodetoolsnapshot_command(self):
bkp = self._build_backup_runner(True, True)
bkp._run_pre_backup()
self.assertIsNotNone(bkp)
self.assertEqual(self._BASE_BACKUP_CMD % (
self._SNAPSHOT_NAME,
self._DATA_DIR,
'" "'.join(self._SNAPSHOT_FILES)
) + PIPE + ZIP + PIPE + ENCRYPT, bkp.command)
self.assertIn(".gz.enc", bkp.manifest)
def test_backup_not_encrypted_not_zipped_nodetoolsnapshot_command(self):
bkp = self._build_backup_runner(False, False)
bkp._run_pre_backup()
self.assertIsNotNone(bkp)
self.assertEqual(self._BASE_BACKUP_CMD % (
self._SNAPSHOT_NAME,
self._DATA_DIR,
'" "'.join(self._SNAPSHOT_FILES)
), bkp.command)
self.assertNotIn(".gz.enc", bkp.manifest)
def test_backup_not_encrypted_but_zipped_nodetoolsnapshot_command(self):
bkp = self._build_backup_runner(False, True)
bkp._run_pre_backup()
self.assertIsNotNone(bkp)
self.assertEqual(self._BASE_BACKUP_CMD % (
self._SNAPSHOT_NAME,
self._DATA_DIR,
'" "'.join(self._SNAPSHOT_FILES)
) + PIPE + ZIP, bkp.command)
self.assertIn(".gz", bkp.manifest)
self.assertNotIn(".enc", bkp.manifest)
def test_backup_encrypted_but_not_zipped_nodetoolsnapshot_command(self):
bkp = self._build_backup_runner(True, False)
bkp._run_pre_backup()
self.assertIsNotNone(bkp)
self.assertEqual(self._BASE_BACKUP_CMD % (
self._SNAPSHOT_NAME,
self._DATA_DIR,
'" "'.join(self._SNAPSHOT_FILES)
) + PIPE + ENCRYPT, bkp.command)
self.assertIn(".enc", bkp.manifest)
self.assertNotIn(".gz", bkp.manifest)
@mock.patch.object(ImportOverrideStrategy, '_initialize_import_directory')
def test_restore_encrypted_but_not_zipped_nodetoolsnapshot_command(
self, _):
restoreBase.RestoreRunner.is_zipped = False
restoreBase.RestoreRunner.is_encrypted = True
restoreBase.RestoreRunner.decrypt_key = CRYPTO_KEY
RunnerClass = utils.import_class(RESTORE_NODETOOLSNAPSHOT_CLS)
rstr = RunnerClass(None, restore_location=self._RESTORE_LOCATION,
location="filename", checksum="md5")
self.assertIsNotNone(rstr)
self.assertEqual(self._BASE_RESTORE_CMD % self._RESTORE_LOCATION,
rstr.base_restore_cmd % self._RESTORE_LOCATION)
@mock.patch.object(ImportOverrideStrategy, '_initialize_import_directory')
def _build_backup_runner(self, is_encrypted, is_zipped, _):
backupBase.BackupRunner.is_zipped = is_zipped
backupBase.BackupRunner.is_encrypted = is_encrypted
backupBase.BackupRunner.encrypt_key = CRYPTO_KEY
RunnerClass = utils.import_class(BACKUP_NODETOOLSNAPSHOT_CLS)
runner = RunnerClass(self._SNAPSHOT_NAME)
runner._remove_snapshot = mock.MagicMock()
runner._snapshot_all_keyspaces = mock.MagicMock()
runner._find_in_subdirectories = mock.MagicMock(
return_value=self._SNAPSHOT_FILES
)
return runner
class CouchbaseBackupTests(trove_testtools.TestCase):
def setUp(self):

View File

@ -18,6 +18,7 @@ import string
from mock import ANY
from mock import call
from mock import DEFAULT
from mock import MagicMock
from mock import Mock
from mock import NonCallableMagicMock
@ -28,7 +29,9 @@ from testtools import ExpectedException
from trove.common.context import TroveContext
from trove.common import exception
from trove.common.instance import ServiceStatuses
from trove.guestagent import backup
from trove.guestagent.common.configuration import ImportOverrideStrategy
from trove.guestagent.common import operating_system
from trove.guestagent.datastore.experimental.cassandra import (
manager as cass_manager)
from trove.guestagent.datastore.experimental.cassandra import (
@ -41,6 +44,8 @@ from trove.tests.unittests import trove_testtools
class GuestAgentCassandraDBManagerTest(trove_testtools.TestCase):
__MOUNT_POINT = '/var/lib/cassandra'
__N_GAK = '_get_available_keyspaces'
__N_GLU = '_get_listed_users'
__N_BU = '_build_user'
@ -133,18 +138,121 @@ class GuestAgentCassandraDBManagerTest(trove_testtools.TestCase):
self._prepare_dynamic([],
is_db_installed=True)
@patch.object(backup, 'restore')
def test_prepare_db_restore(self, restore):
backup_info = {'id': 'backup_id',
'instance_id': 'fake-instance-id',
'location': 'fake-location',
'type': 'InnoBackupEx',
'checksum': 'fake-checksum'}
self._prepare_dynamic(['cassandra'], is_db_installed=False,
backup_info=backup_info)
restore.assert_called_once_with(
self.context, backup_info, self.__MOUNT_POINT)
@patch.multiple(operating_system, enable_service_on_boot=DEFAULT,
disable_service_on_boot=DEFAULT)
def test_superuser_password_reset(
self, enable_service_on_boot, disable_service_on_boot):
fake_status = MagicMock()
fake_status.is_running = False
test_app = cass_service.CassandraApp()
test_app.status = fake_status
with patch.multiple(
test_app,
start_db=DEFAULT,
stop_db=DEFAULT,
restart=DEFAULT,
_CassandraApp__disable_remote_access=DEFAULT,
_CassandraApp__enable_remote_access=DEFAULT,
_CassandraApp__disable_authentication=DEFAULT,
_CassandraApp__enable_authentication=DEFAULT,
_CassandraApp__reset_user_password_to_default=DEFAULT,
secure=DEFAULT) as calls:
test_app._reset_admin_password()
disable_service_on_boot.assert_called_once_with(
test_app.service_candidates)
calls[
'_CassandraApp__disable_remote_access'
].assert_called_once_with()
calls[
'_CassandraApp__disable_authentication'
].assert_called_once_with()
calls['start_db'].assert_called_once_with(update_db=False,
enable_on_boot=False),
calls[
'_CassandraApp__enable_authentication'
].assert_called_once_with()
pw_reset_mock = calls[
'_CassandraApp__reset_user_password_to_default'
]
pw_reset_mock.assert_called_once_with(test_app._ADMIN_USER)
calls['secure'].assert_called_once_with(
update_user=pw_reset_mock.return_value)
calls['restart'].assert_called_once_with()
calls['stop_db'].assert_called_once_with()
calls[
'_CassandraApp__enable_remote_access'
].assert_called_once_with()
enable_service_on_boot.assert_called_once_with(
test_app.service_candidates)
def test_change_cluster_name(self):
fake_status = MagicMock()
fake_status.is_running = True
test_app = cass_service.CassandraApp()
test_app.status = fake_status
with patch.multiple(
test_app,
start_db=DEFAULT,
stop_db=DEFAULT,
restart=DEFAULT,
_update_cluster_name_property=DEFAULT,
_CassandraApp__reset_cluster_name=DEFAULT) as calls:
sample_name = NonCallableMagicMock()
test_app.change_cluster_name(sample_name)
calls['_CassandraApp__reset_cluster_name'].assert_called_once_with(
sample_name)
calls['_update_cluster_name_property'].assert_called_once_with(
sample_name)
calls['restart'].assert_called_once_with()
@patch.object(cass_service, 'CONF', DEFAULT)
def test_apply_post_restore_updates(self, conf_mock):
fake_status = MagicMock()
fake_status.is_running = False
test_app = cass_service.CassandraApp()
test_app.status = fake_status
with patch.multiple(
test_app,
start_db=DEFAULT,
stop_db=DEFAULT,
_update_cluster_name_property=DEFAULT,
_reset_admin_password=DEFAULT,
change_cluster_name=DEFAULT) as calls:
backup_info = {'instance_id': 'old_id'}
conf_mock.guest_id = 'new_id'
test_app._apply_post_restore_updates(backup_info)
calls['_update_cluster_name_property'].assert_called_once_with(
'old_id')
calls['_reset_admin_password'].assert_called_once_with()
calls['start_db'].assert_called_once_with(update_db=False)
calls['change_cluster_name'].assert_called_once_with('new_id')
calls['stop_db'].assert_called_once_with()
def _prepare_dynamic(self, packages,
config_content='MockContent', device_path='/dev/vdb',
is_db_installed=True, backup_id=None,
is_db_installed=True, backup_info=None,
is_root_enabled=False,
overrides=None):
# covering all outcomes is starting to cause trouble here
if not backup_id:
backup_info = {'id': backup_id,
'location': 'fake-location',
'type': 'InnoBackupEx',
'checksum': 'fake-checksum',
}
mock_status = MagicMock()
mock_app = MagicMock()
@ -160,6 +268,7 @@ class GuestAgentCassandraDBManagerTest(trove_testtools.TestCase):
mock_app.restart = MagicMock(return_value=None)
mock_app.start_db = MagicMock(return_value=None)
mock_app.stop_db = MagicMock(return_value=None)
mock_app._remove_system_tables = MagicMock(return_value=None)
os.path.exists = MagicMock(return_value=True)
volume.VolumeDevice.format = MagicMock(return_value=None)
volume.VolumeDevice.migrate_data = MagicMock(return_value=None)
@ -174,7 +283,7 @@ class GuestAgentCassandraDBManagerTest(trove_testtools.TestCase):
databases=None,
memory_mb='2048', users=None,
device_path=device_path,
mount_point="/var/lib/cassandra",
mount_point=self.__MOUNT_POINT,
backup_info=backup_info,
overrides=None,
cluster_config=None)
@ -182,10 +291,14 @@ class GuestAgentCassandraDBManagerTest(trove_testtools.TestCase):
# verification/assertion
mock_status.begin_install.assert_any_call()
mock_app.install_if_needed.assert_any_call(packages)
mock_app._remove_system_tables.assert_any_call()
mock_app.init_storage_structure.assert_any_call('/var/lib/cassandra')
mock_app.apply_initial_guestagent_configuration.assert_any_call()
mock_app.start_db.assert_any_call(update_db=False)
mock_app.stop_db.assert_any_call()
if backup_info:
mock_app._apply_post_restore_updates.assert_called_once_with(
backup_info)
def test_keyspace_validation(self):
valid_name = self._get_random_name(32)