MySQL Manager Refactor

Creates a class structure for MySQL-derived datastores to avoid
duplication of code for features and capabilities shared in common.

The existing manager and service classes were pulled up into the
manager_base and service_base modules. Module constants in the manager
and service modules were changed to class properties, so that the
various implementations can inject the appropriate dependencies.
The original logic was preserved to the extent possible.

Change-Id: I5217390c9ff0cdb1b781dd8f2291cca23f4e442c
Implements: blueprint mysql-manager-refactor
This commit is contained in:
Alex Tomic
2015-07-09 11:01:01 -04:00
parent 7cf297cd22
commit ca9cc2c6e6
14 changed files with 1832 additions and 1411 deletions

View File

@@ -0,0 +1,47 @@
# Copyright 2015 Tesora, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
from oslo_utils import importutils
from trove.common import cfg
from trove.guestagent.datastore.mysql import manager_base
from trove.guestagent.strategies.replication import get_replication_strategy
CONF = cfg.CONF
MANAGER = CONF.datastore_manager if CONF.datastore_manager else 'mysql'
REPLICATION_STRATEGY = CONF.get(MANAGER).replication_strategy
REPLICATION_NAMESPACE = CONF.get(MANAGER).replication_namespace
REPLICATION_STRATEGY_CLASS = get_replication_strategy(REPLICATION_STRATEGY,
REPLICATION_NAMESPACE)
MYSQL_APP = "trove.guestagent.datastore.experimental.mariadb." \
"service.MySqlApp"
MYSQL_APP_STATUS = "trove.guestagent.datastore.experimental.mariadb." \
"service.MySqlAppStatus"
MYSQL_ADMIN = "trove.guestagent.datastore.experimental.mariadb.service." \
"MySqlAdmin"
class Manager(manager_base.BaseMySqlManager):
def __init__(self):
mysql_app = importutils.import_class(MYSQL_APP)
mysql_app_status = importutils.import_class(MYSQL_APP_STATUS)
mysql_admin = importutils.import_class(MYSQL_ADMIN)
super(Manager, self).__init__(mysql_app, mysql_app_status,
mysql_admin, REPLICATION_STRATEGY,
REPLICATION_NAMESPACE,
REPLICATION_STRATEGY_CLASS, MANAGER)

View File

@@ -0,0 +1,50 @@
# Copyright 2015 Tesora, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
from oslo_log import log as logging
from trove.guestagent.datastore.mysql import service_base
LOG = logging.getLogger(__name__)
class KeepAliveConnection(service_base.BaseKeepAliveConnection):
pass
class MySqlAppStatus(service_base.BaseMySqlAppStatus):
pass
class LocalSqlClient(service_base.BaseLocalSqlClient):
pass
class MySqlApp(service_base.BaseMySqlApp):
def __init__(self, status):
super(MySqlApp, self).__init__(status, LocalSqlClient,
KeepAliveConnection)
class MySqlRootAccess(service_base.BaseMySqlRootAccess):
def __init__(self):
super(MySqlRootAccess, self).__init__(LocalSqlClient,
MySqlApp(MySqlAppStatus.get()))
class MySqlAdmin(service_base.BaseMySqlAdmin):
def __init__(self):
super(MySqlAdmin, self).__init__(LocalSqlClient, MySqlRootAccess(),
MySqlApp)

View File

@@ -0,0 +1,47 @@
# Copyright 2015 Tesora, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
from oslo_utils import importutils
from trove.common import cfg
from trove.guestagent.datastore.mysql import manager_base
from trove.guestagent.strategies.replication import get_replication_strategy
CONF = cfg.CONF
MANAGER = CONF.datastore_manager if CONF.datastore_manager else 'mysql'
REPLICATION_STRATEGY = CONF.get(MANAGER).replication_strategy
REPLICATION_NAMESPACE = CONF.get(MANAGER).replication_namespace
REPLICATION_STRATEGY_CLASS = get_replication_strategy(REPLICATION_STRATEGY,
REPLICATION_NAMESPACE)
MYSQL_APP = "trove.guestagent.datastore.experimental.percona." \
"service.MySqlApp"
MYSQL_APP_STATUS = "trove.guestagent.datastore.experimental.percona." \
"service.MySqlAppStatus"
MYSQL_ADMIN = "trove.guestagent.datastore.experimental.percona." \
"service.MySqlAdmin"
class Manager(manager_base.BaseMySqlManager):
def __init__(self):
mysql_app = importutils.import_class(MYSQL_APP)
mysql_app_status = importutils.import_class(MYSQL_APP_STATUS)
mysql_admin = importutils.import_class(MYSQL_ADMIN)
super(Manager, self).__init__(mysql_app, mysql_app_status,
mysql_admin, REPLICATION_STRATEGY,
REPLICATION_NAMESPACE,
REPLICATION_STRATEGY_CLASS, MANAGER)

View File

@@ -0,0 +1,50 @@
# Copyright 2015 Tesora, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
from oslo_log import log as logging
from trove.guestagent.datastore.mysql import service_base
LOG = logging.getLogger(__name__)
class KeepAliveConnection(service_base.BaseKeepAliveConnection):
pass
class MySqlAppStatus(service_base.BaseMySqlAppStatus):
pass
class LocalSqlClient(service_base.BaseLocalSqlClient):
pass
class MySqlApp(service_base.BaseMySqlApp):
def __init__(self, status):
super(MySqlApp, self).__init__(status, LocalSqlClient,
KeepAliveConnection)
class MySqlRootAccess(service_base.BaseMySqlRootAccess):
def __init__(self):
super(MySqlRootAccess, self).__init__(LocalSqlClient,
MySqlApp(MySqlAppStatus.get()))
class MySqlAdmin(service_base.BaseMySqlAdmin):
def __init__(self):
super(MySqlAdmin, self).__init__(LocalSqlClient, MySqlRootAccess(),
MySqlApp)

View File

@@ -16,27 +16,11 @@
# under the License.
#
import os
from oslo_log import log as logging
from oslo_service import periodic_task
from oslo_utils import importutils
from trove.common import cfg
from trove.common import exception
from trove.common.i18n import _
from trove.common import instance as rd_instance
from trove.guestagent import backup
from trove.guestagent.common import operating_system
from trove.guestagent.datastore.mysql import service
from trove.guestagent.datastore.mysql.service import MySqlAdmin
from trove.guestagent.datastore.mysql.service import MySqlApp
from trove.guestagent.datastore.mysql.service import MySqlAppStatus
from trove.guestagent import dbaas
from trove.guestagent.datastore.mysql import manager_base
from trove.guestagent.strategies.replication import get_replication_strategy
from trove.guestagent import volume
LOG = logging.getLogger(__name__)
CONF = cfg.CONF
MANAGER = CONF.datastore_manager if CONF.datastore_manager else 'mysql'
REPLICATION_STRATEGY = CONF.get(MANAGER).replication_strategy
@@ -44,309 +28,19 @@ REPLICATION_NAMESPACE = CONF.get(MANAGER).replication_namespace
REPLICATION_STRATEGY_CLASS = get_replication_strategy(REPLICATION_STRATEGY,
REPLICATION_NAMESPACE)
MYSQL_APP = "trove.guestagent.datastore.mysql.service.MySqlApp"
MYSQL_APP_STATUS = "trove.guestagent.datastore.mysql.service.MySqlAppStatus"
MYSQL_ADMIN = "trove.guestagent.datastore.mysql.service.MySqlAdmin"
class Manager(periodic_task.PeriodicTasks):
class Manager(manager_base.BaseMySqlManager):
def __init__(self):
super(Manager, self).__init__(CONF)
mysql_app = importutils.import_class(MYSQL_APP)
mysql_app_status = importutils.import_class(MYSQL_APP_STATUS)
mysql_admin = importutils.import_class(MYSQL_ADMIN)
@periodic_task.periodic_task
def update_status(self, context):
"""Update the status of the MySQL service."""
MySqlAppStatus.get().update()
def rpc_ping(self, context):
LOG.debug("Responding to RPC ping.")
return True
def change_passwords(self, context, users):
return MySqlAdmin().change_passwords(users)
def update_attributes(self, context, username, hostname, user_attrs):
return MySqlAdmin().update_attributes(username, hostname, user_attrs)
def reset_configuration(self, context, configuration):
app = MySqlApp(MySqlAppStatus.get())
app.reset_configuration(configuration)
def create_database(self, context, databases):
return MySqlAdmin().create_database(databases)
def create_user(self, context, users):
MySqlAdmin().create_user(users)
def delete_database(self, context, database):
return MySqlAdmin().delete_database(database)
def delete_user(self, context, user):
MySqlAdmin().delete_user(user)
def get_user(self, context, username, hostname):
return MySqlAdmin().get_user(username, hostname)
def grant_access(self, context, username, hostname, databases):
return MySqlAdmin().grant_access(username, hostname, databases)
def revoke_access(self, context, username, hostname, database):
return MySqlAdmin().revoke_access(username, hostname, database)
def list_access(self, context, username, hostname):
return MySqlAdmin().list_access(username, hostname)
def list_databases(self, context, limit=None, marker=None,
include_marker=False):
return MySqlAdmin().list_databases(limit, marker,
include_marker)
def list_users(self, context, limit=None, marker=None,
include_marker=False):
return MySqlAdmin().list_users(limit, marker,
include_marker)
def enable_root(self, context):
return MySqlAdmin().enable_root()
def is_root_enabled(self, context):
return MySqlAdmin().is_root_enabled()
def _perform_restore(self, backup_info, context, restore_location, app):
LOG.info(_("Restoring database from backup %s.") % backup_info['id'])
try:
backup.restore(context, backup_info, restore_location)
except Exception:
LOG.exception(_("Error performing restore from backup %s.") %
backup_info['id'])
app.status.set_status(rd_instance.ServiceStatuses.FAILED)
raise
LOG.info(_("Restored database successfully."))
def prepare(self, context, packages, databases, memory_mb, users,
device_path=None, mount_point=None, backup_info=None,
config_contents=None, root_password=None, overrides=None,
cluster_config=None, snapshot=None):
"""Makes ready DBAAS on a Guest container."""
MySqlAppStatus.get().begin_install()
# status end_mysql_install set with secure()
app = MySqlApp(MySqlAppStatus.get())
app.install_if_needed(packages)
if device_path:
# stop and do not update database
app.stop_db()
device = volume.VolumeDevice(device_path)
# unmount if device is already mounted
device.unmount_device(device_path)
device.format()
if os.path.exists(mount_point):
# rsync existing data to a "data" sub-directory
# on the new volume
device.migrate_data(mount_point, target_subdir="data")
# mount the volume
device.mount(mount_point)
operating_system.chown(
mount_point, service.MYSQL_OWNER, service.MYSQL_OWNER,
recursive=False, as_root=True)
LOG.debug("Mounted the volume at %s." % mount_point)
# We need to temporarily update the default my.cnf so that
# mysql will start after the volume is mounted. Later on it
# will be changed based on the config template
# (see MySqlApp.secure()) and restart.
app.set_data_dir(mount_point + '/data')
app.start_mysql()
if backup_info:
self._perform_restore(backup_info, context,
mount_point + "/data", app)
LOG.debug("Securing MySQL now.")
app.secure(config_contents, overrides)
enable_root_on_restore = (backup_info and
MySqlAdmin().is_root_enabled())
if root_password and not backup_info:
app.secure_root(secure_remote_root=True)
MySqlAdmin().enable_root(root_password)
elif enable_root_on_restore:
app.secure_root(secure_remote_root=False)
MySqlAppStatus.get().report_root(context, 'root')
else:
app.secure_root(secure_remote_root=True)
app.complete_install_or_restart()
if databases:
self.create_database(context, databases)
if users:
self.create_user(context, users)
if snapshot:
self.attach_replica(context, snapshot, snapshot['config'])
LOG.info(_('Completed setup of MySQL database instance.'))
def restart(self, context):
app = MySqlApp(MySqlAppStatus.get())
app.restart()
def start_db_with_conf_changes(self, context, config_contents):
app = MySqlApp(MySqlAppStatus.get())
app.start_db_with_conf_changes(config_contents)
def stop_db(self, context, do_not_start_on_reboot=False):
app = MySqlApp(MySqlAppStatus.get())
app.stop_db(do_not_start_on_reboot=do_not_start_on_reboot)
def get_filesystem_stats(self, context, fs_path):
"""Gets the filesystem stats for the path given."""
mount_point = CONF.get(MANAGER).mount_point
return dbaas.get_filesystem_volume_stats(mount_point)
def create_backup(self, context, backup_info):
"""
Entry point for initiating a backup for this guest agents db instance.
The call currently blocks until the backup is complete or errors. If
device_path is specified, it will be mounted based to a point specified
in configuration.
:param backup_info: a dictionary containing the db instance id of the
backup task, location, type, and other data.
"""
backup.backup(context, backup_info)
def mount_volume(self, context, device_path=None, mount_point=None):
device = volume.VolumeDevice(device_path)
device.mount(mount_point, write_to_fstab=False)
LOG.debug("Mounted the device %s at the mount point %s." %
(device_path, mount_point))
def unmount_volume(self, context, device_path=None, mount_point=None):
device = volume.VolumeDevice(device_path)
device.unmount(mount_point)
LOG.debug("Unmounted the device %s from the mount point %s." %
(device_path, mount_point))
def resize_fs(self, context, device_path=None, mount_point=None):
device = volume.VolumeDevice(device_path)
device.resize_fs(mount_point)
LOG.debug("Resized the filesystem %s." % mount_point)
def update_overrides(self, context, overrides, remove=False):
app = MySqlApp(MySqlAppStatus.get())
if remove:
app.remove_overrides()
app.update_overrides(overrides)
def apply_overrides(self, context, overrides):
LOG.debug("Applying overrides (%s)." % overrides)
app = MySqlApp(MySqlAppStatus.get())
app.apply_overrides(overrides)
def get_replication_snapshot(self, context, snapshot_info,
replica_source_config=None):
LOG.debug("Getting replication snapshot.")
app = MySqlApp(MySqlAppStatus.get())
replication = REPLICATION_STRATEGY_CLASS(context)
replication.enable_as_master(app, replica_source_config)
snapshot_id, log_position = (
replication.snapshot_for_replication(context, app, None,
snapshot_info))
mount_point = CONF.get(MANAGER).mount_point
volume_stats = dbaas.get_filesystem_volume_stats(mount_point)
replication_snapshot = {
'dataset': {
'datastore_manager': MANAGER,
'dataset_size': volume_stats.get('used', 0.0),
'volume_size': volume_stats.get('total', 0.0),
'snapshot_id': snapshot_id
},
'replication_strategy': REPLICATION_STRATEGY,
'master': replication.get_master_ref(app, snapshot_info),
'log_position': log_position
}
return replication_snapshot
def enable_as_master(self, context, replica_source_config):
LOG.debug("Calling enable_as_master.")
app = MySqlApp(MySqlAppStatus.get())
replication = REPLICATION_STRATEGY_CLASS(context)
replication.enable_as_master(app, replica_source_config)
# DEPRECATED: Maintain for API Compatibility
def get_txn_count(self, context):
LOG.debug("Calling get_txn_count")
return MySqlApp(MySqlAppStatus.get()).get_txn_count()
def get_last_txn(self, context):
LOG.debug("Calling get_last_txn")
return MySqlApp(MySqlAppStatus.get()).get_last_txn()
def get_latest_txn_id(self, context):
LOG.debug("Calling get_latest_txn_id.")
return MySqlApp(MySqlAppStatus.get()).get_latest_txn_id()
def wait_for_txn(self, context, txn):
LOG.debug("Calling wait_for_txn.")
MySqlApp(MySqlAppStatus.get()).wait_for_txn(txn)
def detach_replica(self, context, for_failover=False):
LOG.debug("Detaching replica.")
app = MySqlApp(MySqlAppStatus.get())
replication = REPLICATION_STRATEGY_CLASS(context)
replica_info = replication.detach_slave(app, for_failover)
return replica_info
def get_replica_context(self, context):
LOG.debug("Getting replica context.")
app = MySqlApp(MySqlAppStatus.get())
replication = REPLICATION_STRATEGY_CLASS(context)
replica_info = replication.get_replica_context(app)
return replica_info
def _validate_slave_for_replication(self, context, replica_info):
if (replica_info['replication_strategy'] != REPLICATION_STRATEGY):
raise exception.IncompatibleReplicationStrategy(
replica_info.update({
'guest_strategy': REPLICATION_STRATEGY
}))
mount_point = CONF.get(MANAGER).mount_point
volume_stats = dbaas.get_filesystem_volume_stats(mount_point)
if (volume_stats.get('total', 0.0) <
replica_info['dataset']['dataset_size']):
raise exception.InsufficientSpaceForReplica(
replica_info.update({
'slave_volume_size': volume_stats.get('total', 0.0)
}))
def attach_replica(self, context, replica_info, slave_config):
LOG.debug("Attaching replica.")
app = MySqlApp(MySqlAppStatus.get())
try:
if 'replication_strategy' in replica_info:
self._validate_slave_for_replication(context, replica_info)
replication = REPLICATION_STRATEGY_CLASS(context)
replication.enable_as_slave(app, replica_info, slave_config)
except Exception:
LOG.exception("Error enabling replication.")
app.status.set_status(rd_instance.ServiceStatuses.FAILED)
raise
def make_read_only(self, context, read_only):
LOG.debug("Executing make_read_only(%s)" % read_only)
app = MySqlApp(MySqlAppStatus.get())
app.make_read_only(read_only)
def cleanup_source_on_replica_detach(self, context, replica_info):
LOG.debug("Cleaning up the source on the detach of a replica.")
replication = REPLICATION_STRATEGY_CLASS(context)
replication.cleanup_source_on_replica_detach(MySqlAdmin(),
replica_info)
def demote_replication_master(self, context):
LOG.debug("Demoting replication master.")
app = MySqlApp(MySqlAppStatus.get())
replication = REPLICATION_STRATEGY_CLASS(context)
replication.demote_master(app)
super(Manager, self).__init__(mysql_app, mysql_app_status,
mysql_admin, REPLICATION_STRATEGY,
REPLICATION_NAMESPACE,
REPLICATION_STRATEGY_CLASS, MANAGER)

View File

@@ -0,0 +1,384 @@
# Copyright 2013 OpenStack Foundation
# Copyright 2013 Rackspace Hosting
# Copyright 2013 Hewlett-Packard Development Company, L.P.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
import os
from oslo_log import log as logging
from oslo_service import periodic_task
from trove.common import cfg
from trove.common import exception
from trove.common.i18n import _
from trove.common import instance as rd_instance
from trove.guestagent import backup
from trove.guestagent.common import operating_system
from trove.guestagent.datastore.mysql import service_base
from trove.guestagent import dbaas
from trove.guestagent.strategies.replication import get_replication_strategy
from trove.guestagent import volume
LOG = logging.getLogger(__name__)
CONF = cfg.CONF
class BaseMySqlManager(periodic_task.PeriodicTasks):
def __init__(self, mysql_app, mysql_app_status, mysql_admin,
replication_strategy, replication_namespace,
replication_strategy_class, manager):
super(BaseMySqlManager, self).__init__(CONF)
self._mysql_app = mysql_app
self._mysql_app_status = mysql_app_status
self._mysql_admin = mysql_admin
self._replication_strategy = replication_strategy
self._replication_namespace = replication_namespace
self._replication_strategy_class = replication_strategy_class
self._manager = manager
@property
def mysql_app(self):
return self._mysql_app
@property
def mysql_app_status(self):
return self._mysql_app_status
@property
def mysql_admin(self):
return self._mysql_admin
@property
def replication_strategy(self):
return self._replication_strategy
@property
def replication_namespace(self):
return self._replication_namespace
@property
def replication_strategy_class(self):
return get_replication_strategy(self._replication_strategy,
self._replication_namespace)
@property
def manager(self):
return self._manager
@periodic_task.periodic_task
def update_status(self, context):
"""Update the status of the MySQL service."""
self.mysql_app_status.get().update()
def rpc_ping(self, context):
LOG.debug("Responding to RPC ping.")
return True
def change_passwords(self, context, users):
return self.mysql_admin().change_passwords(users)
def update_attributes(self, context, username, hostname, user_attrs):
return self.mysql_admin().update_attributes(
username, hostname, user_attrs)
def reset_configuration(self, context, configuration):
app = self.mysql_app(self.mysql_app_status.get())
app.reset_configuration(configuration)
def create_database(self, context, databases):
return self.mysql_admin().create_database(databases)
def create_user(self, context, users):
self.mysql_admin().create_user(users)
def delete_database(self, context, database):
return self.mysql_admin().delete_database(database)
def delete_user(self, context, user):
self.mysql_admin().delete_user(user)
def get_user(self, context, username, hostname):
return self.mysql_admin().get_user(username, hostname)
def grant_access(self, context, username, hostname, databases):
return self.mysql_admin().grant_access(username, hostname, databases)
def revoke_access(self, context, username, hostname, database):
return self.mysql_admin().revoke_access(username, hostname, database)
def list_access(self, context, username, hostname):
return self.mysql_admin().list_access(username, hostname)
def list_databases(self, context, limit=None, marker=None,
include_marker=False):
return self.mysql_admin().list_databases(limit, marker,
include_marker)
def list_users(self, context, limit=None, marker=None,
include_marker=False):
return self.mysql_admin().list_users(limit, marker,
include_marker)
def enable_root(self, context):
return self.mysql_admin().enable_root()
def is_root_enabled(self, context):
return self.mysql_admin().is_root_enabled()
def _perform_restore(self, backup_info, context, restore_location, app):
LOG.info(_("Restoring database from backup %s.") % backup_info['id'])
try:
backup.restore(context, backup_info, restore_location)
except Exception:
LOG.exception(_("Error performing restore from backup %s.") %
backup_info['id'])
app.status.set_status(rd_instance.ServiceStatuses.FAILED)
raise
LOG.info(_("Restored database successfully."))
def prepare(self, context, packages, databases, memory_mb, users,
device_path=None, mount_point=None, backup_info=None,
config_contents=None, root_password=None, overrides=None,
cluster_config=None, snapshot=None):
"""Makes ready DBAAS on a Guest container."""
self.mysql_app_status.get().begin_install()
# status end_mysql_install set with secure()
app = self.mysql_app(self.mysql_app_status.get())
app.install_if_needed(packages)
if device_path:
# stop and do not update database
app.stop_db()
device = volume.VolumeDevice(device_path)
# unmount if device is already mounted
device.unmount_device(device_path)
device.format()
if os.path.exists(mount_point):
# rsync existing data to a "data" sub-directory
# on the new volume
device.migrate_data(mount_point, target_subdir="data")
# mount the volume
device.mount(mount_point)
operating_system.chown(mount_point, service_base.MYSQL_OWNER,
service_base.MYSQL_OWNER,
recursive=False, as_root=True)
LOG.debug("Mounted the volume at %s." % mount_point)
# We need to temporarily update the default my.cnf so that
# mysql will start after the volume is mounted. Later on it
# will be changed based on the config template
# (see MySqlApp.secure()) and restart.
app.set_data_dir(mount_point + '/data')
app.start_mysql()
if backup_info:
self._perform_restore(backup_info, context,
mount_point + "/data", app)
LOG.debug("Securing MySQL now.")
app.secure(config_contents, overrides)
enable_root_on_restore = (backup_info and
self.mysql_admin().is_root_enabled())
if root_password and not backup_info:
app.secure_root(secure_remote_root=True)
self.mysql_admin().enable_root(root_password)
elif enable_root_on_restore:
app.secure_root(secure_remote_root=False)
self.mysql_app_status.get().report_root(context, 'root')
else:
app.secure_root(secure_remote_root=True)
app.complete_install_or_restart()
if databases:
self.create_database(context, databases)
if users:
self.create_user(context, users)
if snapshot:
self.attach_replica(context, snapshot, snapshot['config'])
LOG.info(_('Completed setup of MySQL database instance.'))
def restart(self, context):
app = self.mysql_app(self.mysql_app_status.get())
app.restart()
def start_db_with_conf_changes(self, context, config_contents):
app = self.mysql_app(self.mysql_app_status.get())
app.start_db_with_conf_changes(config_contents)
def stop_db(self, context, do_not_start_on_reboot=False):
app = self.mysql_app(self.mysql_app_status.get())
app.stop_db(do_not_start_on_reboot=do_not_start_on_reboot)
def get_filesystem_stats(self, context, fs_path):
"""Gets the filesystem stats for the path given."""
mount_point = CONF.get(self.manager).mount_point
return dbaas.get_filesystem_volume_stats(mount_point)
def create_backup(self, context, backup_info):
"""
Entry point for initiating a backup for this guest agents db instance.
The call currently blocks until the backup is complete or errors. If
device_path is specified, it will be mounted based to a point specified
in configuration.
:param backup_info: a dictionary containing the db instance id of the
backup task, location, type, and other data.
"""
backup.backup(context, backup_info)
def mount_volume(self, context, device_path=None, mount_point=None):
device = volume.VolumeDevice(device_path)
device.mount(mount_point, write_to_fstab=False)
LOG.debug("Mounted the device %s at the mount point %s." %
(device_path, mount_point))
def unmount_volume(self, context, device_path=None, mount_point=None):
device = volume.VolumeDevice(device_path)
device.unmount(mount_point)
LOG.debug("Unmounted the device %s from the mount point %s." %
(device_path, mount_point))
def resize_fs(self, context, device_path=None, mount_point=None):
device = volume.VolumeDevice(device_path)
device.resize_fs(mount_point)
LOG.debug("Resized the filesystem %s." % mount_point)
def update_overrides(self, context, overrides, remove=False):
app = self.mysql_app(self.mysql_app_status.get())
if remove:
app.remove_overrides()
app.update_overrides(overrides)
def apply_overrides(self, context, overrides):
LOG.debug("Applying overrides (%s)." % overrides)
app = self.mysql_app(self.mysql_app_status.get())
app.apply_overrides(overrides)
def get_replication_snapshot(self, context, snapshot_info,
replica_source_config=None):
LOG.debug("Getting replication snapshot.")
app = self.mysql_app(self.mysql_app_status.get())
replication = self.replication_strategy_class(context)
replication.enable_as_master(app, replica_source_config)
snapshot_id, log_position = (
replication.snapshot_for_replication(context, app, None,
snapshot_info))
mount_point = CONF.get(self.manager).mount_point
volume_stats = dbaas.get_filesystem_volume_stats(mount_point)
replication_snapshot = {
'dataset': {
'datastore_manager': self.manager,
'dataset_size': volume_stats.get('used', 0.0),
'volume_size': volume_stats.get('total', 0.0),
'snapshot_id': snapshot_id
},
'replication_strategy': self.replication_strategy,
'master': replication.get_master_ref(app, snapshot_info),
'log_position': log_position
}
return replication_snapshot
def enable_as_master(self, context, replica_source_config):
LOG.debug("Calling enable_as_master.")
app = self.mysql_app(self.mysql_app_status.get())
replication = self.replication_strategy_class(context)
replication.enable_as_master(app, replica_source_config)
# DEPRECATED: Maintain for API Compatibility
def get_txn_count(self, context):
LOG.debug("Calling get_txn_count")
return self.mysql_app(self.mysql_app_status.get()).get_txn_count()
def get_last_txn(self, context):
LOG.debug("Calling get_last_txn")
return self.mysql_app(self.mysql_app_status.get()).get_last_txn()
def get_latest_txn_id(self, context):
LOG.debug("Calling get_latest_txn_id.")
return self.mysql_app(self.mysql_app_status.get()).get_latest_txn_id()
def wait_for_txn(self, context, txn):
LOG.debug("Calling wait_for_txn.")
self.mysql_app(self.mysql_app_status.get()).wait_for_txn(txn)
def detach_replica(self, context, for_failover=False):
LOG.debug("Detaching replica.")
app = self.mysql_app(self.mysql_app_status.get())
replication = self.replication_strategy_class(context)
replica_info = replication.detach_slave(app, for_failover)
return replica_info
def get_replica_context(self, context):
LOG.debug("Getting replica context.")
app = self.mysql_app(self.mysql_app_status.get())
replication = self.replication_strategy_class(context)
replica_info = replication.get_replica_context(app)
return replica_info
def _validate_slave_for_replication(self, context, replica_info):
if (replica_info['replication_strategy'] != self.replication_strategy):
raise exception.IncompatibleReplicationStrategy(
replica_info.update({
'guest_strategy': self.replication_strategy
}))
mount_point = CONF.get(self.manager).mount_point
volume_stats = dbaas.get_filesystem_volume_stats(mount_point)
if (volume_stats.get('total', 0.0) <
replica_info['dataset']['dataset_size']):
raise exception.InsufficientSpaceForReplica(
replica_info.update({
'slave_volume_size': volume_stats.get('total', 0.0)
}))
def attach_replica(self, context, replica_info, slave_config):
LOG.debug("Attaching replica.")
app = self.mysql_app(self.mysql_app_status.get())
try:
if 'replication_strategy' in replica_info:
self._validate_slave_for_replication(context, replica_info)
replication = self.replication_strategy_class(context)
replication.enable_as_slave(app, replica_info, slave_config)
except Exception:
LOG.exception("Error enabling replication.")
app.status.set_status(rd_instance.ServiceStatuses.FAILED)
raise
def make_read_only(self, context, read_only):
LOG.debug("Executing make_read_only(%s)" % read_only)
app = self.mysql_app(self.mysql_app_status.get())
app.make_read_only(read_only)
def cleanup_source_on_replica_detach(self, context, replica_info):
LOG.debug("Cleaning up the source on the detach of a replica.")
replication = self.replication_strategy_class(context)
replication.cleanup_source_on_replica_detach(self.mysql_admin(),
replica_info)
def demote_replication_master(self, context):
LOG.debug("Demoting replication master.")
app = self.mysql_app(self.mysql_app_status.get())
replication = self.replication_strategy_class(context)
replication.demote_master(app)

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@@ -37,7 +37,7 @@ defaults = {
'mysql':
'trove.guestagent.datastore.mysql.manager.Manager',
'percona':
'trove.guestagent.datastore.mysql.manager.Manager',
'trove.guestagent.datastore.experimental.percona.manager.Manager',
'redis':
'trove.guestagent.datastore.experimental.redis.manager.Manager',
'cassandra':
@@ -54,6 +54,8 @@ defaults = {
'trove.guestagent.datastore.experimental.vertica.manager.Manager',
'db2':
'trove.guestagent.datastore.experimental.db2.manager.Manager',
'mariadb':
'trove.guestagent.datastore.experimental.mariadb.manager.Manager'
}
CONF = cfg.CONF

View File

@@ -20,8 +20,8 @@ import re
from oslo_log import log as logging
from trove.common.i18n import _
from trove.guestagent.datastore.mysql.service import ADMIN_USER_NAME
from trove.guestagent.datastore.mysql.service import MySqlApp
from trove.guestagent.datastore.mysql.service_base import ADMIN_USER_NAME
from trove.guestagent.strategies.backup import base
LOG = logging.getLogger(__name__)

View File

@@ -70,6 +70,7 @@ from trove.guestagent.datastore.mysql.service import MySqlAdmin
from trove.guestagent.datastore.mysql.service import MySqlApp
from trove.guestagent.datastore.mysql.service import MySqlAppStatus
from trove.guestagent.datastore.mysql.service import MySqlRootAccess
import trove.guestagent.datastore.mysql.service_base as dbaas_base
from trove.guestagent.datastore.service import BaseDbStatus
from trove.guestagent.db import models
from trove.guestagent import dbaas as dbaas_sr
@@ -118,30 +119,33 @@ class DbaasTest(testtools.TestCase):
def setUp(self):
super(DbaasTest, self).setUp()
self.orig_utils_execute_with_timeout = dbaas.utils.execute_with_timeout
self.orig_utils_execute = dbaas.utils.execute
self.orig_utils_execute_with_timeout = \
dbaas_base.utils.execute_with_timeout
self.orig_utils_execute = dbaas_base.utils.execute
def tearDown(self):
super(DbaasTest, self).tearDown()
dbaas.utils.execute_with_timeout = self.orig_utils_execute_with_timeout
dbaas.utils.execute = self.orig_utils_execute
dbaas_base.utils.execute_with_timeout = \
self.orig_utils_execute_with_timeout
dbaas_base.utils.execute = self.orig_utils_execute
@patch.object(operating_system, 'remove')
def test_clear_expired_password(self, mock_remove):
secret_content = ("# The random password set for the "
"root user at Wed May 14 14:06:38 2014 "
"(local time): somepassword")
with patch.object(dbaas.utils, 'execute',
with patch.object(dbaas_base.utils, 'execute',
return_value=(secret_content, None)):
dbaas.clear_expired_password()
self.assertEqual(2, dbaas.utils.execute.call_count)
dbaas_base.clear_expired_password()
self.assertEqual(2, dbaas_base.utils.execute.call_count)
self.assertEqual(1, mock_remove.call_count)
@patch.object(operating_system, 'remove')
def test_no_secret_content_clear_expired_password(self, mock_remove):
with patch.object(dbaas.utils, 'execute', return_value=('', None)):
dbaas.clear_expired_password()
self.assertEqual(1, dbaas.utils.execute.call_count)
with patch.object(dbaas_base.utils, 'execute',
return_value=('', None)):
dbaas_base.clear_expired_password()
self.assertEqual(1, dbaas_base.utils.execute.call_count)
mock_remove.assert_not_called()
@patch.object(operating_system, 'remove')
@@ -150,19 +154,20 @@ class DbaasTest(testtools.TestCase):
secret_content = ("# The random password set for the "
"root user at Wed May 14 14:06:38 2014 "
"(local time): somepassword")
with patch.object(dbaas.utils, 'execute',
with patch.object(dbaas_base.utils, 'execute',
side_effect=[(secret_content, None),
ProcessExecutionError]):
dbaas.clear_expired_password()
self.assertEqual(2, dbaas.utils.execute.call_count)
dbaas_base.clear_expired_password()
self.assertEqual(2, dbaas_base.utils.execute.call_count)
mock_remove.assert_not_called()
@patch.object(operating_system, 'remove')
@patch.object(dbaas.utils, 'execute', side_effect=ProcessExecutionError)
@patch.object(dbaas_base.utils, 'execute',
side_effect=ProcessExecutionError)
def test_fail_retrieve_secret_content_clear_expired_password(self,
mock_execute,
mock_remove):
dbaas.clear_expired_password()
dbaas_base.clear_expired_password()
self.assertEqual(1, mock_execute.call_count)
mock_remove.assert_not_called()
@@ -181,7 +186,8 @@ class DbaasTest(testtools.TestCase):
def test_service_discovery(self):
with patch.object(os.path, 'isfile', return_value=True):
mysql_service = dbaas.operating_system.service_discovery(["mysql"])
mysql_service = \
dbaas_base.operating_system.service_discovery(["mysql"])
self.assertIsNotNone(mysql_service['cmd_start'])
self.assertIsNotNone(mysql_service['cmd_enable'])
@@ -192,8 +198,8 @@ class DbaasTest(testtools.TestCase):
"--tmpdir=/tmp --skip-external-locking"
with patch.object(os.path, 'isfile', return_value=True):
dbaas.utils.execute = Mock(return_value=(output, None))
options = dbaas.load_mysqld_options()
dbaas_base.utils.execute = Mock(return_value=(output, None))
options = dbaas_base.load_mysqld_options()
self.assertEqual(5, len(options))
self.assertEqual(["mysql"], options["user"])
@@ -208,8 +214,8 @@ class DbaasTest(testtools.TestCase):
"--plugin-load=federated=ha_federated.so")
with patch.object(os.path, 'isfile', return_value=True):
dbaas.utils.execute = Mock(return_value=(output, None))
options = dbaas.load_mysqld_options()
dbaas_base.utils.execute = Mock(return_value=(output, None))
options = dbaas_base.load_mysqld_options()
self.assertEqual(1, len(options))
self.assertEqual(["blackhole=ha_blackhole.so",
@@ -219,9 +225,9 @@ class DbaasTest(testtools.TestCase):
@patch.object(os.path, 'isfile', return_value=True)
def test_load_mysqld_options_error(self, mock_exists):
dbaas.utils.execute = Mock(side_effect=ProcessExecutionError())
dbaas_base.utils.execute = Mock(side_effect=ProcessExecutionError())
self.assertFalse(dbaas.load_mysqld_options())
self.assertFalse(dbaas_base.load_mysqld_options())
class ResultSetStub(object):
@@ -242,8 +248,15 @@ class ResultSetStub(object):
class MySqlAdminMockTest(testtools.TestCase):
def setUp(self):
super(MySqlAdminMockTest, self).setUp()
dbaas.orig_configuration_manager = dbaas.MySqlApp.configuration_manager
dbaas.MySqlApp.configuration_manager = Mock()
def tearDown(self):
super(MySqlAdminMockTest, self).tearDown()
dbaas.MySqlApp.configuration_manager = \
dbaas.orig_configuration_manager
@patch('trove.guestagent.datastore.mysql.service.MySqlApp'
'.get_auth_password', return_value='some_password')
@@ -279,6 +292,10 @@ class MySqlAdminTest(testtools.TestCase):
dbaas.LocalSqlClient.__enter__ = Mock()
dbaas.LocalSqlClient.__exit__ = Mock()
dbaas.LocalSqlClient.execute = Mock()
# trove.guestagent.common.configuration import ConfigurationManager
dbaas.orig_configuration_manager = dbaas.MySqlApp.configuration_manager
dbaas.MySqlApp.configuration_manager = Mock()
self.mySqlAdmin = MySqlAdmin()
def tearDown(self):
@@ -291,6 +308,8 @@ class MySqlAdminTest(testtools.TestCase):
dbaas.LocalSqlClient.execute = self.orig_LocalSqlClient_execute
models.MySQLUser._is_valid_user_name = (
self.orig_MySQLUser_is_valid_user_name)
dbaas.MySqlApp.configuration_manager = \
dbaas.orig_configuration_manager
def test__associate_dbs(self):
db_result = [{"grantee": "'test_user'@'%'", "table_schema": "db1"},
@@ -732,7 +751,8 @@ class MySqlAppTest(testtools.TestCase):
def setUp(self):
super(MySqlAppTest, self).setUp()
self.orig_utils_execute_with_timeout = dbaas.utils.execute_with_timeout
self.orig_utils_execute_with_timeout = \
dbaas_base.utils.execute_with_timeout
self.orig_time_sleep = time.sleep
self.orig_unlink = os.unlink
self.orig_get_auth_password = MySqlApp.get_auth_password
@@ -759,15 +779,20 @@ class MySqlAppTest(testtools.TestCase):
self.mock_client.__enter__ = Mock()
self.mock_client.__exit__ = Mock()
self.mock_client.__enter__.return_value.execute = self.mock_execute
dbaas.orig_configuration_manager = dbaas.MySqlApp.configuration_manager
dbaas.MySqlApp.configuration_manager = Mock()
def tearDown(self):
super(MySqlAppTest, self).tearDown()
dbaas.utils.execute_with_timeout = self.orig_utils_execute_with_timeout
dbaas_base.utils.execute_with_timeout = \
self.orig_utils_execute_with_timeout
time.sleep = self.orig_time_sleep
os.unlink = self.orig_unlink
operating_system.service_discovery = self.orig_service_discovery
MySqlApp.get_auth_password = self.orig_get_auth_password
InstanceServiceStatus.find_by(instance_id=self.FAKE_ID).delete()
dbaas.MySqlApp.configuration_manager = \
dbaas.orig_configuration_manager
def assert_reported_status(self, expected_status):
service_status = InstanceServiceStatus.find_by(
@@ -802,7 +827,7 @@ class MySqlAppTest(testtools.TestCase):
def test_stop_mysql(self):
dbaas.utils.execute_with_timeout = Mock()
dbaas_base.utils.execute_with_timeout = Mock()
self.appStatus.set_next_status(
rd_instance.ServiceStatuses.SHUTDOWN)
@@ -811,7 +836,7 @@ class MySqlAppTest(testtools.TestCase):
def test_stop_mysql_with_db_update(self):
dbaas.utils.execute_with_timeout = Mock()
dbaas_base.utils.execute_with_timeout = Mock()
self.appStatus.set_next_status(
rd_instance.ServiceStatuses.SHUTDOWN)
@@ -836,7 +861,7 @@ class MySqlAppTest(testtools.TestCase):
def test_stop_mysql_error(self):
dbaas.utils.execute_with_timeout = Mock()
dbaas_base.utils.execute_with_timeout = Mock()
self.appStatus.set_next_status(rd_instance.ServiceStatuses.RUNNING)
self.mySqlApp.state_change_wait_time = 1
self.assertRaises(RuntimeError, self.mySqlApp.stop_db)
@@ -882,14 +907,14 @@ class MySqlAppTest(testtools.TestCase):
def test_wipe_ib_logfiles_error(self, get_datadir_mock):
mocked = Mock(side_effect=ProcessExecutionError('Error'))
dbaas.utils.execute_with_timeout = mocked
dbaas_base.utils.execute_with_timeout = mocked
self.assertRaises(ProcessExecutionError,
self.mySqlApp.wipe_ib_logfiles)
def test_start_mysql(self):
dbaas.utils.execute_with_timeout = Mock()
dbaas_base.utils.execute_with_timeout = Mock()
self.appStatus.set_next_status(rd_instance.ServiceStatuses.RUNNING)
self.mySqlApp._enable_mysql_on_boot = Mock()
self.mySqlApp.start_mysql()
@@ -897,7 +922,7 @@ class MySqlAppTest(testtools.TestCase):
def test_start_mysql_with_db_update(self):
dbaas.utils.execute_with_timeout = Mock()
dbaas_base.utils.execute_with_timeout = Mock()
self.mySqlApp._enable_mysql_on_boot = Mock()
self.appStatus.set_next_status(rd_instance.ServiceStatuses.RUNNING)
@@ -909,7 +934,7 @@ class MySqlAppTest(testtools.TestCase):
def test_start_mysql_runs_forever(self):
dbaas.utils.execute_with_timeout = Mock()
dbaas_base.utils.execute_with_timeout = Mock()
self.mySqlApp._enable_mysql_on_boot = Mock()
self.mySqlApp.state_change_wait_time = 1
self.appStatus.set_next_status(rd_instance.ServiceStatuses.SHUTDOWN)
@@ -924,7 +949,7 @@ class MySqlAppTest(testtools.TestCase):
self.mySqlApp._enable_mysql_on_boot = Mock()
mocked = Mock(side_effect=ProcessExecutionError('Error'))
dbaas.utils.execute_with_timeout = mocked
dbaas_base.utils.execute_with_timeout = mocked
self.assertRaises(RuntimeError, self.mySqlApp.start_mysql)
@@ -976,13 +1001,14 @@ class MySqlAppTest(testtools.TestCase):
save_cfg_mock.assert_called_once_with('some junk')
apply_mock.assert_called_once_with(
{'client': {'user': dbaas.ADMIN_USER_NAME,
{'client': {'user': dbaas_base.ADMIN_USER_NAME,
'password': auth_pwd_mock.return_value}})
wipe_ib_mock.assert_called_once_with()
@patch.object(utils, 'execute_with_timeout')
def test__enable_mysql_on_boot(self, mock_execute):
mysql_service = dbaas.operating_system.service_discovery(["mysql"])
mysql_service = \
dbaas_base.operating_system.service_discovery(["mysql"])
self.mySqlApp._enable_mysql_on_boot()
self.assertEqual(1, mock_execute.call_count)
mock_execute.assert_called_with(mysql_service['cmd_enable'],
@@ -998,7 +1024,8 @@ class MySqlAppTest(testtools.TestCase):
@patch.object(utils, 'execute_with_timeout')
def test__disable_mysql_on_boot(self, mock_execute):
mysql_service = dbaas.operating_system.service_discovery(["mysql"])
mysql_service = \
dbaas_base.operating_system.service_discovery(["mysql"])
self.mySqlApp._disable_mysql_on_boot()
self.assertEqual(1, mock_execute.call_count)
mock_execute.assert_called_with(mysql_service['cmd_disable'],
@@ -1030,26 +1057,26 @@ class MySqlAppTest(testtools.TestCase):
'apply_system_override') as apply_sys_mock:
self.mySqlApp.write_replication_source_overrides('something')
apply_sys_mock.assert_called_once_with('something',
dbaas.CNF_MASTER)
dbaas_base.CNF_MASTER)
def test_write_replication_replica_overrides(self):
with patch.object(self.mySqlApp.configuration_manager,
'apply_system_override') as apply_sys_mock:
self.mySqlApp.write_replication_replica_overrides('something')
apply_sys_mock.assert_called_once_with('something',
dbaas.CNF_SLAVE)
dbaas_base.CNF_SLAVE)
def test_remove_replication_source_overrides(self):
with patch.object(self.mySqlApp.configuration_manager,
'remove_system_override') as remove_sys_mock:
self.mySqlApp.remove_replication_source_overrides()
remove_sys_mock.assert_called_once_with(dbaas.CNF_MASTER)
remove_sys_mock.assert_called_once_with(dbaas_base.CNF_MASTER)
def test_remove_replication_replica_overrides(self):
with patch.object(self.mySqlApp.configuration_manager,
'remove_system_override') as remove_sys_mock:
self.mySqlApp.remove_replication_replica_overrides()
remove_sys_mock.assert_called_once_with(dbaas.CNF_SLAVE)
remove_sys_mock.assert_called_once_with(dbaas_base.CNF_SLAVE)
def test_exists_replication_source_overrides(self):
with patch.object(self.mySqlApp.configuration_manager,
@@ -1063,7 +1090,7 @@ class MySqlAppTest(testtools.TestCase):
return_value=MagicMock(name='get_engine'))
def test_grant_replication_privilege(self, *args):
replication_user = {'name': 'testUSr', 'password': 'somePwd'}
with patch.object(dbaas, 'LocalSqlClient',
with patch.object(dbaas.MySqlApp, 'local_sql_client',
return_value=self.mock_client):
self.mySqlApp.grant_replication_privilege(replication_user)
args, _ = self.mock_execute.call_args_list[0]
@@ -1075,7 +1102,7 @@ class MySqlAppTest(testtools.TestCase):
@patch.object(dbaas, 'get_engine',
return_value=MagicMock(name='get_engine'))
def test_get_port(self, *args):
with patch.object(dbaas, 'LocalSqlClient',
with patch.object(dbaas.MySqlApp, 'local_sql_client',
return_value=self.mock_client):
self.mySqlApp.get_port()
args, _ = self.mock_execute.call_args_list[0]
@@ -1088,7 +1115,7 @@ class MySqlAppTest(testtools.TestCase):
def test_get_binlog_position(self, *args):
result = {'File': 'mysql-bin.003', 'Position': '73'}
self.mock_execute.return_value.first = Mock(return_value=result)
with patch.object(dbaas, 'LocalSqlClient',
with patch.object(dbaas.MySqlApp, 'local_sql_client',
return_value=self.mock_client):
found_result = self.mySqlApp.get_binlog_position()
@@ -1103,7 +1130,7 @@ class MySqlAppTest(testtools.TestCase):
@patch.object(dbaas, 'get_engine',
return_value=MagicMock(name='get_engine'))
def test_execute_on_client(self, *args):
with patch.object(dbaas, 'LocalSqlClient',
with patch.object(dbaas.MySqlApp, 'local_sql_client',
return_value=self.mock_client):
self.mySqlApp.execute_on_client('show tables')
args, _ = self.mock_execute.call_args_list[0]
@@ -1115,7 +1142,7 @@ class MySqlAppTest(testtools.TestCase):
return_value=MagicMock(name='get_engine'))
@patch.object(dbaas.MySqlApp, '_wait_for_slave_status')
def test_start_slave(self, *args):
with patch.object(dbaas, 'LocalSqlClient',
with patch.object(dbaas.MySqlApp, 'local_sql_client',
return_value=self.mock_client):
self.mySqlApp.start_slave()
args, _ = self.mock_execute.call_args_list[0]
@@ -1129,7 +1156,7 @@ class MySqlAppTest(testtools.TestCase):
def test_stop_slave_with_failover(self, *args):
self.mock_execute.return_value.first = Mock(
return_value={'Master_User': 'root'})
with patch.object(dbaas, 'LocalSqlClient',
with patch.object(dbaas.MySqlApp, 'local_sql_client',
return_value=self.mock_client):
result = self.mySqlApp.stop_slave(True)
self.assertEqual('root', result['replication_user'])
@@ -1147,7 +1174,7 @@ class MySqlAppTest(testtools.TestCase):
def test_stop_slave_without_failover(self, *args):
self.mock_execute.return_value.first = Mock(
return_value={'Master_User': 'root'})
with patch.object(dbaas, 'LocalSqlClient',
with patch.object(dbaas.MySqlApp, 'local_sql_client',
return_value=self.mock_client):
result = self.mySqlApp.stop_slave(False)
self.assertEqual('root', result['replication_user'])
@@ -1163,7 +1190,7 @@ class MySqlAppTest(testtools.TestCase):
@patch.object(dbaas, 'get_engine',
return_value=MagicMock(name='get_engine'))
def test_stop_master(self, *args):
with patch.object(dbaas, 'LocalSqlClient',
with patch.object(dbaas.MySqlApp, 'local_sql_client',
return_value=self.mock_client):
self.mySqlApp.stop_master()
args, _ = self.mock_execute.call_args_list[0]
@@ -1197,7 +1224,7 @@ class MySqlAppTest(testtools.TestCase):
return_value=MagicMock(name='get_engine'))
def test__get_slave_status(self, *args):
self.mock_execute.return_value.first = Mock(return_value='some_thing')
with patch.object(dbaas, 'LocalSqlClient',
with patch.object(dbaas.MySqlApp, 'local_sql_client',
return_value=self.mock_client):
result = self.mySqlApp._get_slave_status()
self.assertEqual('some_thing', result)
@@ -1211,7 +1238,7 @@ class MySqlAppTest(testtools.TestCase):
def test_get_latest_txn_id(self, *args):
self.mock_execute.return_value.first = Mock(return_value=['some_thing']
)
with patch.object(dbaas, 'LocalSqlClient',
with patch.object(dbaas.MySqlApp, 'local_sql_client',
return_value=self.mock_client):
result = self.mySqlApp.get_latest_txn_id()
self.assertEqual('some_thing', result)
@@ -1223,7 +1250,7 @@ class MySqlAppTest(testtools.TestCase):
@patch.object(dbaas, 'get_engine',
return_value=MagicMock(name='get_engine'))
def test_wait_for_txn(self, *args):
with patch.object(dbaas, 'LocalSqlClient',
with patch.object(dbaas.MySqlApp, 'local_sql_client',
return_value=self.mock_client):
self.mySqlApp.wait_for_txn('abcd')
args, _ = self.mock_execute.call_args_list[0]
@@ -1236,7 +1263,7 @@ class MySqlAppTest(testtools.TestCase):
def test_get_txn_count(self, *args):
self.mock_execute.return_value.first = Mock(
return_value=['b1f3f33a-0789-ee1c-43f3-f8373e12f1ea:1'])
with patch.object(dbaas, 'LocalSqlClient',
with patch.object(dbaas.MySqlApp, 'local_sql_client',
return_value=self.mock_client):
result = self.mySqlApp.get_txn_count()
self.assertEqual(1, result)
@@ -1251,7 +1278,7 @@ class MySqlAppInstallTest(MySqlAppTest):
def setUp(self):
super(MySqlAppInstallTest, self).setUp()
self.orig_create_engine = sqlalchemy.create_engine
self.orig_pkg_version = dbaas.packager.pkg_version
self.orig_pkg_version = dbaas_base.packager.pkg_version
self.orig_utils_execute_with_timeout = utils.execute_with_timeout
self.mock_client = Mock()
self.mock_execute = Mock()
@@ -1262,7 +1289,7 @@ class MySqlAppInstallTest(MySqlAppTest):
def tearDown(self):
super(MySqlAppInstallTest, self).tearDown()
sqlalchemy.create_engine = self.orig_create_engine
dbaas.packager.pkg_version = self.orig_pkg_version
dbaas_base.packager.pkg_version = self.orig_pkg_version
utils.execute_with_timeout = self.orig_utils_execute_with_timeout
def test_install(self):
@@ -1282,7 +1309,7 @@ class MySqlAppInstallTest(MySqlAppTest):
return_value='some_password')
def test_secure(self, auth_pwd_mock):
dbaas.clear_expired_password = Mock()
dbaas_base.clear_expired_password = Mock()
self.mySqlApp.start_mysql = Mock()
self.mySqlApp.stop_db = Mock()
self.mySqlApp._reset_configuration = Mock()
@@ -1306,7 +1333,7 @@ class MySqlAppInstallTest(MySqlAppTest):
@patch.object(utils, 'generate_random_password',
return_value='some_password')
def test_secure_root(self, *args):
with patch.object(dbaas, 'LocalSqlClient',
with patch.object(dbaas.MySqlApp, 'local_sql_client',
return_value=self.mock_client):
self.mySqlApp.secure_root()
update_root_password, _ = self.mock_execute.call_args_list[0]
@@ -1344,7 +1371,7 @@ class MySqlAppInstallTest(MySqlAppTest):
return_value=MagicMock(name='get_engine'))
def test_apply_overrides(self, *args):
overrides = {'sort_buffer_size': 1000000}
with patch.object(dbaas, 'LocalSqlClient',
with patch.object(dbaas.MySqlApp, 'local_sql_client',
return_value=self.mock_client):
self.mySqlApp.apply_overrides(overrides)
args, _ = self.mock_execute.call_args_list[0]
@@ -1355,7 +1382,7 @@ class MySqlAppInstallTest(MySqlAppTest):
@patch.object(dbaas, 'get_engine',
return_value=MagicMock(name='get_engine'))
def test_make_read_only(self, *args):
with patch.object(dbaas, 'LocalSqlClient',
with patch.object(dbaas.MySqlApp, 'local_sql_client',
return_value=self.mock_client):
self.mySqlApp.make_read_only('ON')
args, _ = self.mock_execute.call_args_list[0]
@@ -1380,7 +1407,7 @@ class MySqlAppInstallTest(MySqlAppTest):
def test_secure_write_conf_error(self):
dbaas.clear_expired_password = Mock()
dbaas_base.clear_expired_password = Mock()
self.mySqlApp.start_mysql = Mock()
self.mySqlApp.stop_db = Mock()
self.mySqlApp._reset_configuration = Mock(
@@ -1443,7 +1470,8 @@ class MySqlAppMockTest(testtools.TestCase):
mock_status = MagicMock()
mock_status.wait_for_real_status_to_change_to = MagicMock(
return_value=True)
dbaas.clear_expired_password = MagicMock(return_value=None)
dbaas_base.clear_expired_password = \
MagicMock(return_value=None)
app = MySqlApp(mock_status)
app._reset_configuration = MagicMock()
app._write_mycnf = MagicMock(return_value=True)
@@ -1469,9 +1497,11 @@ class MySqlAppMockTest(testtools.TestCase):
mock_status = MagicMock()
mock_status.wait_for_real_status_to_change_to = MagicMock(
return_value=True)
dbaas.clear_expired_password = MagicMock(return_value=None)
dbaas_base.clear_expired_password = \
MagicMock(return_value=None)
app = MySqlApp(mock_status)
dbaas.clear_expired_password = MagicMock(return_value=None)
dbaas_base.clear_expired_password = \
MagicMock(return_value=None)
self.assertRaises(RuntimeError, app.secure, None, None)
self.assertTrue(mock_conn.execute.called)
# At least called twice
@@ -1486,10 +1516,14 @@ class MySqlRootStatusTest(testtools.TestCase):
def setUp(self):
super(MySqlRootStatusTest, self).setUp()
self.orig_utils_execute_with_timeout = utils.execute_with_timeout
dbaas.orig_configuration_manager = dbaas.MySqlApp.configuration_manager
dbaas.MySqlApp.configuration_manager = Mock()
def tearDown(self):
super(MySqlRootStatusTest, self).tearDown()
utils.execute_with_timeout = self.orig_utils_execute_with_timeout
dbaas.MySqlApp.configuration_manager = \
dbaas.orig_configuration_manager
@patch.object(dbaas.MySqlApp, 'get_auth_password',
return_value='some_password')
@@ -1509,7 +1543,7 @@ class MySqlRootStatusTest(testtools.TestCase):
mock_rs = MagicMock()
mock_rs.rowcount = 0
with patch.object(mock_conn, 'execute', return_value=mock_rs):
self.assertThat(MySqlRootAccess.is_root_enabled(), Equals(False))
self.assertThat(MySqlRootAccess().is_root_enabled(), Equals(False))
@patch.object(dbaas.MySqlApp, 'get_auth_password',
return_value='some_password')
@@ -1518,7 +1552,7 @@ class MySqlRootStatusTest(testtools.TestCase):
with patch.object(mock_conn, 'execute', return_value=None):
# invocation
user_ser = MySqlRootAccess.enable_root()
user_ser = MySqlRootAccess().enable_root()
# verification
self.assertThat(user_ser, Not(Is(None)))
mock_conn.execute.assert_any_call(TextClauseMatcher('CREATE USER'),
@@ -1591,8 +1625,8 @@ class ServiceRegistryTest(testtools.TestCase):
self.assertEqual('trove.guestagent.datastore.mysql.'
'manager.Manager',
test_dict.get('mysql'))
self.assertEqual('trove.guestagent.datastore.mysql.'
'manager.Manager',
self.assertEqual('trove.guestagent.datastore.experimental.'
'percona.manager.Manager',
test_dict.get('percona'))
self.assertEqual('trove.guestagent.datastore.experimental.redis.'
'manager.Manager',
@@ -1624,8 +1658,8 @@ class ServiceRegistryTest(testtools.TestCase):
self.assertEqual('trove.guestagent.datastore.mysql.'
'manager.Manager123',
test_dict.get('mysql'))
self.assertEqual('trove.guestagent.datastore.mysql.'
'manager.Manager',
self.assertEqual('trove.guestagent.datastore.experimental.'
'percona.manager.Manager',
test_dict.get('percona'))
self.assertEqual('trove.guestagent.datastore.experimental.redis.'
'manager.Manager',
@@ -1657,8 +1691,8 @@ class ServiceRegistryTest(testtools.TestCase):
self.assertEqual('trove.guestagent.datastore.mysql.'
'manager.Manager',
test_dict.get('mysql'))
self.assertEqual('trove.guestagent.datastore.mysql.'
'manager.Manager',
self.assertEqual('trove.guestagent.datastore.experimental.'
'percona.manager.Manager',
test_dict.get('percona'))
self.assertEqual('trove.guestagent.datastore.experimental.redis.'
'manager.Manager',
@@ -1695,12 +1729,14 @@ class KeepAliveConnectionTest(testtools.TestCase):
def setUp(self):
super(KeepAliveConnectionTest, self).setUp()
self.orig_utils_execute_with_timeout = dbaas.utils.execute_with_timeout
self.orig_utils_execute_with_timeout = \
dbaas_base.utils.execute_with_timeout
self.orig_LOG_err = dbaas.LOG
def tearDown(self):
super(KeepAliveConnectionTest, self).tearDown()
dbaas.utils.execute_with_timeout = self.orig_utils_execute_with_timeout
dbaas_base.utils.execute_with_timeout = \
self.orig_utils_execute_with_timeout
dbaas.LOG = self.orig_LOG_err
def test_checkout_type_error(self):
@@ -1855,9 +1891,10 @@ class MySqlAppStatusTest(testtools.TestCase):
def setUp(self):
super(MySqlAppStatusTest, self).setUp()
util.init_db()
self.orig_utils_execute_with_timeout = dbaas.utils.execute_with_timeout
self.orig_load_mysqld_options = dbaas.load_mysqld_options
self.orig_dbaas_os_path_exists = dbaas.os.path.exists
self.orig_utils_execute_with_timeout = \
dbaas_base.utils.execute_with_timeout
self.orig_load_mysqld_options = dbaas_base.load_mysqld_options
self.orig_dbaas_base_os_path_exists = dbaas_base.os.path.exists
self.orig_dbaas_time_sleep = time.sleep
self.FAKE_ID = str(uuid4())
InstanceServiceStatus.create(instance_id=self.FAKE_ID,
@@ -1866,18 +1903,19 @@ class MySqlAppStatusTest(testtools.TestCase):
def tearDown(self):
super(MySqlAppStatusTest, self).tearDown()
dbaas.utils.execute_with_timeout = self.orig_utils_execute_with_timeout
dbaas.load_mysqld_options = self.orig_load_mysqld_options
dbaas.os.path.exists = self.orig_dbaas_os_path_exists
dbaas_base.utils.execute_with_timeout = \
self.orig_utils_execute_with_timeout
dbaas_base.load_mysqld_options = self.orig_load_mysqld_options
dbaas_base.os.path.exists = self.orig_dbaas_base_os_path_exists
time.sleep = self.orig_dbaas_time_sleep
InstanceServiceStatus.find_by(instance_id=self.FAKE_ID).delete()
dbaas.CONF.guest_id = None
def test_get_actual_db_status(self):
dbaas.utils.execute_with_timeout = Mock(return_value=(None, None))
dbaas_base.utils.execute_with_timeout = Mock(return_value=(None, None))
self.mySqlAppStatus = MySqlAppStatus()
self.mySqlAppStatus = MySqlAppStatus.get()
status = self.mySqlAppStatus._get_actual_db_status()
self.assertEqual(rd_instance.ServiceStatuses.RUNNING, status)
@@ -1887,31 +1925,31 @@ class MySqlAppStatusTest(testtools.TestCase):
@patch.object(os.path, 'exists', return_value=True)
def test_get_actual_db_status_error_crashed(self, mock_exists,
mock_execute):
dbaas.load_mysqld_options = Mock(return_value={})
self.mySqlAppStatus = MySqlAppStatus()
dbaas_base.load_mysqld_options = Mock(return_value={})
self.mySqlAppStatus = MySqlAppStatus.get()
status = self.mySqlAppStatus._get_actual_db_status()
self.assertEqual(rd_instance.ServiceStatuses.CRASHED, status)
def test_get_actual_db_status_error_shutdown(self):
mocked = Mock(side_effect=ProcessExecutionError())
dbaas.utils.execute_with_timeout = mocked
dbaas.load_mysqld_options = Mock(return_value={})
dbaas.os.path.exists = Mock(return_value=False)
dbaas_base.utils.execute_with_timeout = mocked
dbaas_base.load_mysqld_options = Mock(return_value={})
dbaas_base.os.path.exists = Mock(return_value=False)
self.mySqlAppStatus = MySqlAppStatus()
self.mySqlAppStatus = MySqlAppStatus.get()
status = self.mySqlAppStatus._get_actual_db_status()
self.assertEqual(rd_instance.ServiceStatuses.SHUTDOWN, status)
def test_get_actual_db_status_error_blocked(self):
dbaas.utils.execute_with_timeout = MagicMock(
dbaas_base.utils.execute_with_timeout = MagicMock(
side_effect=[ProcessExecutionError(), ("some output", None)])
dbaas.load_mysqld_options = Mock()
dbaas.os.path.exists = Mock(return_value=True)
dbaas_base.load_mysqld_options = Mock()
dbaas_base.os.path.exists = Mock(return_value=True)
self.mySqlAppStatus = MySqlAppStatus()
self.mySqlAppStatus = MySqlAppStatus.get()
status = self.mySqlAppStatus._get_actual_db_status()
self.assertEqual(rd_instance.ServiceStatuses.BLOCKED, status)

View File

@@ -26,6 +26,8 @@ from trove.common.exception import ProcessExecutionError
from trove.common import instance as rd_instance
from trove.guestagent import backup
from trove.guestagent.common import operating_system
# TODO(atomic77) The test cases should be made configurable
# to make it easier to test the various derived datastores.
from trove.guestagent.datastore.mysql.manager import Manager
import trove.guestagent.datastore.mysql.service as dbaas
from trove.guestagent import dbaas as base_dbaas
@@ -55,9 +57,8 @@ class GuestAgentManagerTest(testtools.TestCase):
# set up common mock objects, etc. for replication testing
self.patcher_gfvs = patch(
'trove.guestagent.dbaas.get_filesystem_volume_stats')
self.patcher_rs = patch(
'trove.guestagent.datastore.mysql.manager.'
'REPLICATION_STRATEGY_CLASS')
self.patcher_rs = patch(self.manager.replication_namespace + "." +
self.manager.replication_strategy)
self.mock_gfvs_class = self.patcher_gfvs.start()
self.mock_rs_class = self.patcher_rs.start()
self.repl_datastore_manager = 'mysql'