Mysql guest agent functionality for replication

Implement new strategy for replication
Implement replication strategy for mysql binlog replication.
Supporting methods in mysql service
Implement API methods in mysql manager
Define configuration settings for replication

Co-authored by: Nikhil Manchanda <SlickNik@gmail.com>
Co-authored by: Greg Lucas <glucas@tesora.com>

Partially implements: blueprint replication-v1
Change-Id: I70f0b5c37fe3c2d42426029bb627c141965eb524
This commit is contained in:
Morgan Jones 2014-05-27 14:29:41 -07:00 committed by Nikhil Manchanda
parent c9173f22e9
commit 86eacbeabc
19 changed files with 611 additions and 38 deletions

View File

@ -94,3 +94,11 @@ backup_aes_cbc_key = "default_aes_cbc_key"
backup_use_snet = False
backup_chunk_size = 65536
backup_segment_max_size = 2147483648
[mysql]
# Configuration for Replication
replication_strategy = MysqlBinlogReplication
replication_namespace = trove.guestagent.strategies.replication.mysql_binlog
replication_user = slave_user
replication_password = slave_password

View File

@ -174,6 +174,7 @@ if __name__ == "__main__":
from trove.tests.api import instances_resize # noqa
from trove.tests.api import databases # noqa
from trove.tests.api import datastores # noqa
from trove.tests.api import replication # noqa
from trove.tests.api import root # noqa
from trove.tests.api import root_on_create # noqa
from trove.tests.api import users # noqa

View File

@ -299,6 +299,15 @@ mysql_opts = [
'if trove_security_groups_support is True).'),
cfg.StrOpt('backup_strategy', default='InnoBackupEx',
help='Default strategy to perform backups.'),
cfg.StrOpt('replication_strategy', default='MysqlBinlogReplication',
help='Default strategy for replication.'),
cfg.StrOpt('replication_namespace',
default='trove.guestagent.strategies.replication.mysql_binlog',
help='Namespace to load replication strategies from.'),
cfg.StrOpt('replication_user', default='slave_user',
help='Userid for replication slave.', secret=True),
cfg.StrOpt('replication_password', default='NETOU7897NNLOU',
help='Password for replication slave user.', secret=True),
cfg.StrOpt('mount_point', default='/var/lib/mysql',
help="Filesystem path for mounting "
"volumes if volume support is enabled."),
@ -334,6 +343,15 @@ percona_opts = [
'if trove_security_groups_support is True).'),
cfg.StrOpt('backup_strategy', default='InnoBackupEx',
help='Default strategy to perform backups.'),
cfg.StrOpt('replication_strategy', default='MysqlBinlogReplication',
help='Default strategy for replication.'),
cfg.StrOpt('replication_namespace',
default='trove.guestagent.strategies.replication.mysql_binlog',
help='Namespace to load replication strategies from.'),
cfg.StrOpt('replication_user', default='slave_user',
help='Userid for replication slave.'),
cfg.StrOpt('replication_password', default='NETOU7897NNLOU',
help='Password for replication slave user.'),
cfg.StrOpt('mount_point', default='/var/lib/mysql',
help="Filesystem path for mounting "
"volumes if volume support is enabled."),
@ -369,6 +387,8 @@ redis_opts = [
'if trove_security_groups_support is True).'),
cfg.StrOpt('backup_strategy', default=None,
help='Default strategy to perform backups.'),
cfg.StrOpt('replication_strategy', default=None,
help='Default strategy for replication.'),
cfg.StrOpt('mount_point', default='/var/lib/redis',
help="Filesystem path for mounting "
"volumes if volume support is enabled."),
@ -395,6 +415,8 @@ cassandra_opts = [
'if trove_security_groups_support is True).'),
cfg.StrOpt('backup_strategy', default=None,
help='Default strategy to perform backups.'),
cfg.StrOpt('replication_strategy', default=None,
help='Default strategy for replication.'),
cfg.StrOpt('mount_point', default='/var/lib/cassandra',
help="Filesystem path for mounting "
"volumes if volume support is enabled."),
@ -423,6 +445,8 @@ couchbase_opts = [
'if trove_security_groups_support is True).'),
cfg.StrOpt('backup_strategy', default='CbBackup',
help='Default strategy to perform backups.'),
cfg.StrOpt('replication_strategy', default=None,
help='Default strategy for replication.'),
cfg.StrOpt('mount_point', default='/var/lib/couchbase',
help="Filesystem path for mounting "
"volumes if volume support is enabled."),
@ -458,6 +482,8 @@ mongodb_opts = [
'if trove_security_groups_support is True).'),
cfg.StrOpt('backup_strategy', default=None,
help='Default strategy to perform backups.'),
cfg.StrOpt('replication_strategy', default=None,
help='Default strategy for replication.'),
cfg.StrOpt('mount_point', default='/var/lib/mongodb',
help="Filesystem path for mounting "
"volumes if volume support is enabled."),

View File

@ -252,6 +252,12 @@ class VolumeNotSupported(TroveError):
message = _("Volume support is not enabled.")
class ReplicationNotSupported(TroveError):
message = _("Replication is not supported for "
"the '%(datastore)s' datastore.")
class TaskManagerError(TroveError):
message = _("An error occurred communicating with the task manager: "
@ -445,3 +451,15 @@ class NoServiceEndpoint(TroveError):
class EmptyCatalog(NoServiceEndpoint):
"""The service catalog is empty."""
message = _("Empty catalog.")
class IncompatibleReplicationStrategy(TroveError):
message = _("Instance with replication strategy %(guest_strategy)s "
"cannot replicate from instance with replication strategy "
"%(replication_strategy)s.")
class InsufficientSpaceForSlave(TroveError):
message = _("The target instance has only %(slave_volume_size)sG free, "
"but the replication snapshot contains %(dataset_size)sG "
"of data.")

View File

@ -326,8 +326,8 @@ class API(proxy.RpcProxy):
def get_replication_snapshot(self, master_config=None):
LOG.debug("Retrieving replication snapshot from instance %s.", self.id)
self._call("get_replication_snapshot", AGENT_HIGH_TIMEOUT,
master_config=master_config)
return self._call("get_replication_snapshot", AGENT_HIGH_TIMEOUT,
master_config=master_config)
def attach_replication_slave(self, snapshot, slave_config=None):
LOG.debug("Configuring instance %s to replicate from %s.",
@ -337,7 +337,7 @@ class API(proxy.RpcProxy):
def detach_replication_slave(self):
LOG.debug("Detaching slave %s from its master.", self.id)
self._call("detach_replication_slave", AGENT_LOW_TIMEOUT)
self._call("detach_replication_slave", AGENT_HIGH_TIMEOUT)
def demote_replication_master(self):
LOG.debug("Demoting instance %s to non-master.", self.id)

View File

@ -26,6 +26,7 @@ from trove.guestagent import volume
from trove.guestagent.datastore.mysql.service import MySqlAppStatus
from trove.guestagent.datastore.mysql.service import MySqlAdmin
from trove.guestagent.datastore.mysql.service import MySqlApp
from trove.guestagent.strategies.replication import get_replication_strategy
from trove.openstack.common import log as logging
from trove.openstack.common.gettextutils import _
from trove.openstack.common import periodic_task
@ -33,7 +34,11 @@ from trove.openstack.common import periodic_task
LOG = logging.getLogger(__name__)
CONF = cfg.CONF
MANAGER = CONF.datastore_manager
MANAGER = CONF.datastore_manager if CONF.datastore_manager else 'mysql'
REPLICATION_STRATEGY = CONF.get(MANAGER).replication_strategy
REPLICATION_NAMESPACE = CONF.get(MANAGER).replication_namespace
REPLICATION_STRATEGY_CLASS = get_replication_strategy(REPLICATION_STRATEGY,
REPLICATION_NAMESPACE)
class Manager(periodic_task.PeriodicTasks):
@ -166,8 +171,7 @@ class Manager(periodic_task.PeriodicTasks):
def get_filesystem_stats(self, context, fs_path):
"""Gets the filesystem stats for the path given."""
mount_point = CONF.get(
'mysql' if not MANAGER else MANAGER).mount_point
mount_point = CONF.get(MANAGER).mount_point
return dbaas.get_filesystem_volume_stats(mount_point)
def create_backup(self, context, backup_info):
@ -209,22 +213,69 @@ class Manager(periodic_task.PeriodicTasks):
app = MySqlApp(MySqlAppStatus.get())
app.apply_overrides(overrides)
def get_replication_snapshot(self, master_config):
def get_replication_snapshot(self, context, master_config):
LOG.debug("Getting replication snapshot.")
raise exception.DatastoreOperationNotSupported(
operation='get_replication_snapshot', datastore=MANAGER)
app = MySqlApp(MySqlAppStatus.get())
def attach_replication_slave(self, snapshot, slave_config):
replication = REPLICATION_STRATEGY_CLASS(context)
replication.enable_as_master(app, master_config)
snapshot_id, log_position = (
replication.snapshot_for_replication(app, None, master_config))
mount_point = CONF.get(MANAGER).mount_point
volume_stats = dbaas.get_filesystem_volume_stats(mount_point)
replication_snapshot = {
'dataset': {
'datastore_manager': MANAGER,
'dataset_size': volume_stats.get('used', 0.0),
'volume_size': volume_stats.get('total', 0.0),
'snapshot_id': snapshot_id
},
'replication_strategy': REPLICATION_STRATEGY,
'master': replication.get_master_ref(app, master_config),
'log_position': log_position
}
return replication_snapshot
def _validate_slave_for_replication(self, context, snapshot):
if (snapshot['replication_strategy'] != REPLICATION_STRATEGY):
raise exception.IncompatibleReplicationStrategy(
snapshot.update({
'guest_strategy': REPLICATION_STRATEGY
}))
mount_point = CONF.get(MANAGER).mount_point
volume_stats = dbaas.get_filesystem_volume_stats(mount_point)
if (volume_stats.get('total', 0.0) <
snapshot['dataset']['dataset_size']):
raise exception.InsufficientSpaceForSlave(
snapshot.update({
'slave_volume_size': volume_stats.get('total', 0.0)
}))
def attach_replication_slave(self, context, snapshot, slave_config):
LOG.debug("Attaching replication snapshot.")
raise exception.DatastoreOperationNotSupported(
operation='attach_replication_slave', datastore=MANAGER)
app = MySqlApp(MySqlAppStatus.get())
try:
self._validate_slave_for_replication(context, snapshot)
replication = REPLICATION_STRATEGY_CLASS(context)
replication.enable_as_slave(app, snapshot)
except Exception:
LOG.exception("Error enabling replication.")
app.status.set_status(rd_instance.ServiceStatuses.FAILED)
raise
def detach_replication_slave(self):
LOG.debug("Detaching replication slave.")
raise exception.DatastoreOperationNotSupported(
operation='detach_replication_slave', datastore=MANAGER)
def detach_replication_slave(self, context):
LOG.debug("Detaching replication snapshot.")
app = MySqlApp(MySqlAppStatus.get())
replication = REPLICATION_STRATEGY_CLASS(context)
replication.detach_slave(app)
def demote_replication_master(self):
def demote_replication_master(self, context):
LOG.debug("Demoting replication master.")
raise exception.DatastoreOperationNotSupported(
operation='demote_replication_master', datastore=MANAGER)
app = MySqlApp(MySqlAppStatus.get())
replication = REPLICATION_STRATEGY_CLASS(context)
replication.demote_master(app)

View File

@ -29,6 +29,7 @@ from trove.common import cfg
from trove.common import utils as utils
from trove.common import exception
from trove.common import instance as rd_instance
from trove.common.exception import PollTimeOut
from trove.guestagent.common import operating_system
from trove.guestagent.common import sql_query
from trove.guestagent.db import models
@ -48,6 +49,9 @@ TMP_MYCNF = "/tmp/my.cnf.tmp"
MYSQL_BASE_DIR = "/var/lib/mysql"
CONF = cfg.CONF
MANAGER = CONF.datastore_manager if CONF.datastore_manager else 'mysql'
REPLICATION_USER = CONF.get(MANAGER).replication_user
REPLICATION_PASSWORD = CONF.get(MANAGER).replication_password
INCLUDE_MARKER_OPERATORS = {
True: ">=",
@ -59,6 +63,8 @@ MYSQL_SERVICE_CANDIDATES = ["mysql", "mysqld", "mysql-server"]
MYSQL_BIN_CANDIDATES = ["/usr/sbin/mysqld", "/usr/libexec/mysqld"]
MYCNF_OVERRIDES = "/etc/mysql/conf.d/overrides.cnf"
MYCNF_OVERRIDES_TMP = "/tmp/overrides.cnf.tmp"
MYCNF_REPLMASTER = "/etc/mysql/conf.d/replication.cnf"
MYCNF_REPLMASTER_TMP = "/tmp/replication.cnf.tmp"
# Create a package impl
@ -813,7 +819,7 @@ class MySqlApp(object):
MYCNF_OVERRIDES)
LOG.info(_("Setting permissions on overrides.cnf."))
utils.execute_with_timeout("sudo", "chmod", "0711",
utils.execute_with_timeout("sudo", "chmod", "0644",
MYCNF_OVERRIDES)
def _remove_overrides(self):
@ -821,6 +827,111 @@ class MySqlApp(object):
if os.path.exists(MYCNF_OVERRIDES):
utils.execute_with_timeout("sudo", "rm", MYCNF_OVERRIDES)
def write_replication_overrides(self, overrideValues):
LOG.info(_("Writing replication.cnf file."))
with open(MYCNF_REPLMASTER_TMP, 'w') as overrides:
overrides.write(overrideValues)
LOG.debug("Moving temp replication.cnf into correct location.")
utils.execute_with_timeout("sudo", "mv", MYCNF_REPLMASTER_TMP,
MYCNF_REPLMASTER)
LOG.debug("Setting permissions on replication.cnf.")
utils.execute_with_timeout("sudo", "chmod", "0644",
MYCNF_REPLMASTER)
def remove_replication_overrides(self):
LOG.info(_("Removing replication configuration file."))
if os.path.exists(MYCNF_REPLMASTER):
utils.execute_with_timeout("sudo", "rm", MYCNF_REPLMASTER)
def grant_replication_privilege(self):
LOG.info(_("Granting Replication Slave privilege."))
with LocalSqlClient(get_engine()) as client:
g = sql_query.Grant(permissions=['REPLICATION SLAVE'],
user=REPLICATION_USER,
clear=REPLICATION_PASSWORD)
t = text(str(g))
client.execute(t)
def revoke_replication_privilege(self):
LOG.info(_("Revoking Replication Slave privilege."))
with LocalSqlClient(get_engine()) as client:
g = sql_query.Revoke(permissions=['REPLICATION SLAVE'],
user=REPLICATION_USER,
clear=REPLICATION_PASSWORD)
t = text(str(g))
client.execute(t)
def get_port(self):
with LocalSqlClient(get_engine()) as client:
result = client.execute('SELECT @@port').first()
return result[0]
def get_binlog_position(self):
with LocalSqlClient(get_engine()) as client:
result = client.execute('SHOW MASTER STATUS').first()
binlog_position = {
'log_file': result['File'],
'position': result['Position']
}
return binlog_position
def change_master_for_binlog(self, host, port, log_position):
LOG.info(_("Configuring replication from %s.") % host)
change_master_cmd = ("CHANGE MASTER TO MASTER_HOST='%(host)s', "
"MASTER_PORT=%(port)s, "
"MASTER_USER='%(user)s', "
"MASTER_PASSWORD='%(password)s', "
"MASTER_LOG_FILE='%(log_file)s', "
"MASTER_LOG_POS=%(log_pos)s" %
{
'host': host,
'port': port,
'user': REPLICATION_USER,
'password': REPLICATION_PASSWORD,
'log_file': log_position['log_file'],
'log_pos': log_position['position']
})
with LocalSqlClient(get_engine()) as client:
client.execute(change_master_cmd)
def start_slave(self):
LOG.info(_("Starting slave replication."))
with LocalSqlClient(get_engine()) as client:
client.execute('START SLAVE')
self._wait_for_slave_status("ON", client, 60)
def stop_slave(self):
LOG.info(_("Stopping slave replication."))
with LocalSqlClient(get_engine()) as client:
client.execute('STOP SLAVE')
client.execute('RESET SLAVE ALL')
self._wait_for_slave_status("OFF", client, 30)
def _wait_for_slave_status(self, status, client, max_time):
def verify_slave_status():
actual_status = client.execute(
"SHOW GLOBAL STATUS like 'slave_running'").first()[1]
return actual_status.upper() == status.upper()
LOG.debug("Waiting for SLAVE_RUNNING to change to %s.", status)
try:
utils.poll_until(verify_slave_status, sleep_time=3,
time_out=max_time)
LOG.info(_("Replication is now %s.") % status.lower())
except PollTimeOut:
raise RuntimeError(
_("Replication is not %(status)s after %(max)d seconds.") % {
'status': status.lower(), 'max': max_time})
def start_mysql(self, update_db=False):
LOG.info(_("Starting MySQL."))
# This is the site of all the trouble in the restart tests.

View File

@ -0,0 +1,25 @@
# Copyright 2014 Tesora, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
from trove.guestagent.strategy import Strategy
from trove.openstack.common import log as logging
LOG = logging.getLogger(__name__)
def get_replication_strategy(replication_driver, ns=__name__):
LOG.debug("Getting replication strategy: %s.", replication_driver)
return Strategy.get_strategy(replication_driver, ns)

View File

@ -0,0 +1,56 @@
# Copyright 2014 Tesora, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
import abc
import six
from trove.guestagent.strategy import Strategy
@six.add_metaclass(abc.ABCMeta)
class Replication(Strategy):
"""Base class for Replication Strategy implementation."""
__strategy_type__ = 'replication'
__strategy_ns__ = 'trove.guestagent.strategies.replication'
def __init__(self, context):
self.context = context
super(Replication, self).__init__()
@abc.abstractmethod
def get_master_ref(self, mysql_service, master_config):
"""Get reference to master site for replication strategy."""
@abc.abstractmethod
def snapshot_for_replication(self, mysql_service, location, master_config):
"""Capture snapshot of master db."""
@abc.abstractmethod
def enable_as_master(self, mysql_service, master_config):
"""Configure underlying database to act as master for replication."""
@abc.abstractmethod
def enable_as_slave(self, mysql_service, snapshot):
"""Configure underlying database as a slave of the given master."""
@abc.abstractmethod
def detach_slave(self, mysql_service):
"""Turn off replication on a slave site."""
@abc.abstractmethod
def demote_master(self, mysql_service):
"""Turn off replication on a master site."""

View File

@ -0,0 +1,75 @@
# Copyright 2014 Tesora, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
from trove.guestagent.strategies.replication import base
from trove.guestagent.common import operating_system
from trove.openstack.common import log as logging
MASTER_CONFIG = """
[mysqld]
log_bin = /var/lib/mysql/mysql-bin.log
"""
SLAVE_CONFIG = """
[mysqld]
log_bin = /var/lib/mysql/mysql-bin.log
relay_log = /var/lib/mysql/mysql-relay-bin.log
"""
LOG = logging.getLogger(__name__)
class MysqlBinlogReplication(base.Replication):
"""MySql Replication coordinated by binlog position."""
def get_master_ref(self, mysql_service, master_config):
master_ref = {
'host': operating_system.get_ip_address(),
'port': mysql_service.get_port()
}
return master_ref
def snapshot_for_replication(self, mysql_service, location, master_config):
# TODO(mwj): snapshot_id = master_config['snapshot_id']
# Check to see if the snapshot_id exists as a backup. If so, and
# it is suitable for restoring the slave, just use it
# Otherwise, create a new backup of the master site.
snapshot_id = None
log_position = mysql_service.get_binlog_position()
return snapshot_id, log_position
def enable_as_master(self, mysql_service, master_config):
mysql_service.write_replication_overrides(MASTER_CONFIG)
mysql_service.restart()
mysql_service.grant_replication_privilege()
def enable_as_slave(self, mysql_service, snapshot):
mysql_service.write_replication_overrides(SLAVE_CONFIG)
mysql_service.restart()
mysql_service.change_master_for_binlog(
snapshot['master']['host'],
snapshot['master']['port'],
snapshot['log_position'])
mysql_service.start_slave()
def detach_slave(self, mysql_service):
mysql_service.stop_slave()
mysql_service.remove_replication_overrides()
mysql_service.restart()
def demote_master(self, mysql_service):
mysql_service.revoke_replication_privilege()
mysql_service.remove_replication_overrides()
mysql_service.restart()

View File

@ -617,6 +617,7 @@ class Instance(BuiltInstance):
availability_zone=None, nics=None, configuration_id=None,
slave_of_id=None):
datastore_cfg = CONF.get(datastore_version.manager)
client = create_nova_client(context)
try:
flavor = client.flavors.get(flavor_id)
@ -624,14 +625,14 @@ class Instance(BuiltInstance):
raise exception.FlavorNotFound(uuid=flavor_id)
deltas = {'instances': 1}
volume_support = CONF.get(datastore_version.manager).volume_support
volume_support = datastore_cfg.volume_support
if volume_support:
validate_volume_size(volume_size)
deltas['volumes'] = volume_size
else:
if volume_size is not None:
raise exception.VolumeNotSupported()
ephemeral_support = CONF.get(datastore_version.manager).device_path
ephemeral_support = datastore_cfg.device_path
if ephemeral_support:
if flavor.ephemeral == 0:
raise exception.LocalStorageNotSpecified(flavor=flavor_id)
@ -653,6 +654,12 @@ class Instance(BuiltInstance):
datastore1=backup_info.datastore.name,
datastore2=datastore.name)
if slave_of_id:
replication_support = datastore_cfg.replication_strategy
if not replication_support:
raise exception.ReplicationNotSupported(
datastore=datastore.name)
if not nics:
nics = []
if CONF.default_neutron_networks:
@ -702,7 +709,8 @@ class Instance(BuiltInstance):
availability_zone,
root_password,
nics,
overrides)
overrides,
slave_of_id)
return SimpleInstance(context, db_info, datastore_status,
root_password)

View File

@ -207,15 +207,9 @@ class InstanceController(wsgi.Controller):
else:
backup_id = None
if 'availability_zone' in body['instance']:
availability_zone = body['instance']['availability_zone']
else:
availability_zone = None
if 'nics' in body['instance']:
nics = body['instance']['nics']
else:
nics = None
availability_zone = body['instance'].get('availability_zone')
nics = body['instance'].get('nics')
slave_of_id = body['instance'].get('slave_of')
if 'slave_of' in body['instance']:
slave_of_id = body['instance']['slave_of']

View File

@ -92,6 +92,9 @@ class InstanceTasks(object):
'Build error: Secgroup '
'or rule.',
is_error=True)
BUILDING_ERROR_SLAVE = InstanceTask(0x54, 'BUILDING',
'Build error: Replication slave.',
is_error=True)
# Dissuade further additions at run-time.
InstanceTask.__init__ = None

View File

@ -114,7 +114,7 @@ class API(proxy.RpcProxy):
image_id, databases, users, datastore_manager,
packages, volume_size, backup_id=None,
availability_zone=None, root_password=None,
nics=None, overrides=None):
nics=None, overrides=None, slave_of_id=None):
LOG.debug("Making async call to create instance %s " % instance_id)
self.cast(self.context,
self.make_msg("create_instance",
@ -131,7 +131,8 @@ class API(proxy.RpcProxy):
availability_zone=availability_zone,
root_password=root_password,
nics=nics,
overrides=overrides))
overrides=overrides,
slave_of_id=slave_of_id))
def update_overrides(self, instance_id, overrides=None):
LOG.debug("Making async call to update datastore configurations for "

View File

@ -81,13 +81,24 @@ class Manager(periodic_task.PeriodicTasks):
def create_instance(self, context, instance_id, name, flavor,
image_id, databases, users, datastore_manager,
packages, volume_size, backup_id, availability_zone,
root_password, nics, overrides):
root_password, nics, overrides, slave_of_id):
instance_tasks = FreshInstanceTasks.load(context, instance_id)
if slave_of_id:
# We are creating a slave of an existing instance: get a snapshot
# of the master so we can restore the data on the slave.
snapshot = instance_tasks.get_replication_master_snapshot(
context, slave_of_id, backup_id)
backup_id = snapshot['dataset']['snapshot_id']
instance_tasks.create_instance(flavor, image_id, databases, users,
datastore_manager, packages,
volume_size, backup_id,
availability_zone, root_password, nics,
overrides)
availability_zone, root_password,
nics, overrides)
if slave_of_id:
# Enable replication on the newly created slave instance
instance_tasks.attach_replication_slave(snapshot)
def update_overrides(self, context, instance_id, overrides):
instance_tasks = models.BuiltInstanceTasks.load(context, instance_id)

View File

@ -266,11 +266,31 @@ class FreshInstanceTasks(FreshInstance, NotifyMixin, ConfigurationMixin):
"Timeout waiting for instance to become active. "
"No usage create-event was sent.") % self.id)
self.update_statuses_on_time_out()
except Exception:
LOG.exception(_("Failed to send usage create-event for "
"instance %s.") % self.id)
def attach_replication_slave(self, snapshot, slave_config=None):
LOG.debug("Calling attach_replication_slave for %s.", self.id)
try:
self.guest.attach_replication_slave(snapshot, slave_config)
except GuestError as e:
msg = (_("Error attaching instance %s "
"as replication slave.") % self.id)
err = inst_models.InstanceTasks.BUILDING_ERROR_SLAVE
self._log_and_raise(e, msg, err)
def get_replication_master_snapshot(self, context, slave_of_id, backup_id):
try:
master_tasks = BuiltInstanceTasks.load(context, slave_of_id)
snapshot = master_tasks.get_replication_snapshot(backup_id)
return snapshot
except TroveError as e:
msg = (_("Error getting snapshot from "
"replication master %s.") % slave_of_id)
err = inst_models.InstanceTasks.BUILDING_ERROR_SLAVE
self._log_and_raise(e, msg, err)
def report_root_enabled(self):
mysql_models.RootHistory.create(self.context, self.id, 'root')
@ -812,6 +832,18 @@ class BuiltInstanceTasks(BuiltInstance, NotifyMixin, ConfigurationMixin):
LOG.info(_("Initiating backup for instance %s.") % self.id)
self.guest.create_backup(backup_info)
def get_replication_snapshot(self, backup_id):
master_config = {'snapshot_id': backup_id or utils.generate_uuid()}
LOG.debug("Calling get_replication_snapshot on %s.", self.id)
try:
result = self.guest.get_replication_snapshot(master_config)
LOG.debug("Got replication snapshot from %s.", self.id)
return result
except (GuestError, GuestTimeout):
msg = _("Failed to get replication snapshot from %s.") % self.id
LOG.exception(msg)
raise TroveError(msg)
def reboot(self):
try:
LOG.debug("Stopping datastore on instance %s." % self.id)

View File

@ -0,0 +1,134 @@
# Copyright 2014 Hewlett-Packard Development Company, L.P.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from proboscis import test
from proboscis.asserts import assert_equal
from proboscis.asserts import assert_raises
from proboscis.decorators import time_out
from trove.common.utils import generate_uuid
from trove.common.utils import poll_until
from trove.tests.api.instances import instance_info
from trove.tests.api.instances import TIMEOUT_INSTANCE_CREATE
from trove.tests.api.instances import TIMEOUT_INSTANCE_DELETE
from trove.tests.api.instances import WaitForGuestInstallationToFinish
from trove.tests.config import CONFIG
from trove.tests.util.server_connection import create_server_connection
from troveclient.compat import exceptions
class SlaveInstanceTestInfo(object):
"""Stores slave instance information."""
def __init__(self):
self.id = None
self.replicated_db = generate_uuid()
GROUP = "dbaas.api.replication"
slave_instance = SlaveInstanceTestInfo()
@test(depends_on_classes=[WaitForGuestInstallationToFinish],
groups=[GROUP])
class CreateReplicationSlave(object):
@test
def test_create_slave(self):
result = instance_info.dbaas.instances.create(
instance_info.name + "_slave",
instance_info.dbaas_flavor_href,
instance_info.volume,
slave_of=instance_info.id)
assert_equal(200, instance_info.dbaas.last_http_code)
assert_equal("BUILD", result.status)
slave_instance.id = result.id
@test(groups=[GROUP])
class WaitForCreateSlaveToFinish(object):
"""Wait until the instance is created and set up as slave."""
@test(depends_on=[CreateReplicationSlave.test_create_slave])
@time_out(TIMEOUT_INSTANCE_CREATE)
def test_slave_created(self):
def result_is_active():
instance = instance_info.dbaas.instances.get(slave_instance.id)
if instance.status == "ACTIVE":
return True
else:
# If its not ACTIVE, anything but BUILD must be
# an error.
assert_equal("BUILD", instance.status)
if instance_info.volume is not None:
assert_equal(instance.volume.get('used', None), None)
return False
poll_until(result_is_active)
@test(enabled=(not CONFIG.fake_mode),
depends_on=[WaitForCreateSlaveToFinish],
groups=[GROUP])
class VerifySlave(object):
@test
@time_out(5 * 60)
def test_correctly_started_replication(self):
def slave_is_running():
server = create_server_connection(slave_instance.id)
cmd = ("mysqladmin extended-status "
"| awk '/Slave_running/{print $4}'")
stdout, stderr = server.execute(cmd)
return stdout == "ON\n"
poll_until(slave_is_running)
@test(depends_on=[test_correctly_started_replication])
def test_create_db_on_master(self):
databases = [{'name': slave_instance.replicated_db}]
instance_info.dbaas.databases.create(instance_info.id, databases)
assert_equal(202, instance_info.dbaas.last_http_code)
@test(depends_on=[test_create_db_on_master])
@time_out(5 * 60)
def test_database_replicated_on_slave(self):
def db_is_found():
databases = instance_info.dbaas.databases.list(slave_instance.id)
return (slave_instance.replicated_db
in [d.name for d in databases])
poll_until(db_is_found)
@test(groups=[GROUP],
depends_on=[WaitForCreateSlaveToFinish],
runs_after=[VerifySlave])
class DeleteSlaveInstance(object):
@test
@time_out(TIMEOUT_INSTANCE_DELETE)
def test_delete_slave_instance(self):
instance_info.dbaas.instances.delete(slave_instance.id)
assert_equal(202, instance_info.dbaas.last_http_code)
def instance_is_gone():
try:
instance_info.dbaas.instances.get(slave_instance.id)
return False
except exceptions.NotFound:
return True
poll_until(instance_is_gone)
assert_raises(exceptions.NotFound, instance_info.dbaas.instances.get,
slave_instance.id)

View File

@ -323,6 +323,23 @@ class FakeGuest(object):
def apply_overrides(self, overrides):
self.overrides = overrides
def get_replication_snapshot(self, master_config):
return {
'dataset':
{
'datastore_manager': 'mysql',
'dataset_size': '0.0',
'volume_size': '10.0',
'snapshot_id': None
},
'replication_strategy': 'replication_strategy',
'master': '1',
'log_position': '100'
}
def attach_replication_slave(self, snapshot, slave_config):
pass
def get_or_create(id):
if id not in DB:

View File

@ -26,6 +26,7 @@ from trove.tests.api.mgmt import admin_required
from trove.tests.api.mgmt import hosts
from trove.tests.api.mgmt import instances as mgmt_instances
from trove.tests.api.mgmt import storage
from trove.tests.api import replication
from trove.tests.api import root
from trove.tests.api import user_access
from trove.tests.api import users
@ -46,6 +47,7 @@ black_box_groups = [
instances.GROUP_QUOTAS,
instances.GROUP_SECURITY_GROUPS,
backups.GROUP,
replication.GROUP,
configurations.GROUP,
datastores.GROUP,
instances_actions.GROUP_RESIZE,