From ca9cc2c6e6b5cd3d5acda65a96791d6dd9383478 Mon Sep 17 00:00:00 2001 From: Alex Tomic Date: Thu, 9 Jul 2015 11:01:01 -0400 Subject: [PATCH] MySQL Manager Refactor Creates a class structure for MySQL-derived datastores to avoid duplication of code for features and capabilities shared in common. The existing manager and service classes were pulled up into the manager_base and service_base modules. Module constants in the manager and service modules were changed to class properties, so that the various implementations can inject the appropriate dependencies. The original logic was preserved to the extent possible. Change-Id: I5217390c9ff0cdb1b781dd8f2291cca23f4e442c Implements: blueprint mysql-manager-refactor --- .../experimental/mariadb/__init__.py | 0 .../datastore/experimental/mariadb/manager.py | 47 + .../datastore/experimental/mariadb/service.py | 50 + .../experimental/percona/__init__.py | 0 .../datastore/experimental/percona/manager.py | 47 + .../datastore/experimental/percona/service.py | 50 + trove/guestagent/datastore/mysql/manager.py | 334 +---- .../datastore/mysql/manager_base.py | 384 ++++++ trove/guestagent/datastore/mysql/service.py | 1017 +-------------- .../datastore/mysql/service_base.py | 1085 +++++++++++++++++ trove/guestagent/dbaas.py | 4 +- .../strategies/backup/mysql_impl.py | 2 +- .../tests/unittests/guestagent/test_dbaas.py | 216 ++-- .../guestagent/test_mysql_manager.py | 7 +- 14 files changed, 1832 insertions(+), 1411 deletions(-) create mode 100644 trove/guestagent/datastore/experimental/mariadb/__init__.py create mode 100644 trove/guestagent/datastore/experimental/mariadb/manager.py create mode 100644 trove/guestagent/datastore/experimental/mariadb/service.py create mode 100644 trove/guestagent/datastore/experimental/percona/__init__.py create mode 100644 trove/guestagent/datastore/experimental/percona/manager.py create mode 100644 trove/guestagent/datastore/experimental/percona/service.py create mode 100644 trove/guestagent/datastore/mysql/manager_base.py create mode 100644 trove/guestagent/datastore/mysql/service_base.py diff --git a/trove/guestagent/datastore/experimental/mariadb/__init__.py b/trove/guestagent/datastore/experimental/mariadb/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/trove/guestagent/datastore/experimental/mariadb/manager.py b/trove/guestagent/datastore/experimental/mariadb/manager.py new file mode 100644 index 0000000000..02e43e6782 --- /dev/null +++ b/trove/guestagent/datastore/experimental/mariadb/manager.py @@ -0,0 +1,47 @@ +# Copyright 2015 Tesora, Inc. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +from oslo_utils import importutils +from trove.common import cfg +from trove.guestagent.datastore.mysql import manager_base +from trove.guestagent.strategies.replication import get_replication_strategy + +CONF = cfg.CONF +MANAGER = CONF.datastore_manager if CONF.datastore_manager else 'mysql' +REPLICATION_STRATEGY = CONF.get(MANAGER).replication_strategy +REPLICATION_NAMESPACE = CONF.get(MANAGER).replication_namespace +REPLICATION_STRATEGY_CLASS = get_replication_strategy(REPLICATION_STRATEGY, + REPLICATION_NAMESPACE) + +MYSQL_APP = "trove.guestagent.datastore.experimental.mariadb." \ + "service.MySqlApp" +MYSQL_APP_STATUS = "trove.guestagent.datastore.experimental.mariadb." \ + "service.MySqlAppStatus" +MYSQL_ADMIN = "trove.guestagent.datastore.experimental.mariadb.service." \ + "MySqlAdmin" + + +class Manager(manager_base.BaseMySqlManager): + + def __init__(self): + mysql_app = importutils.import_class(MYSQL_APP) + mysql_app_status = importutils.import_class(MYSQL_APP_STATUS) + mysql_admin = importutils.import_class(MYSQL_ADMIN) + + super(Manager, self).__init__(mysql_app, mysql_app_status, + mysql_admin, REPLICATION_STRATEGY, + REPLICATION_NAMESPACE, + REPLICATION_STRATEGY_CLASS, MANAGER) diff --git a/trove/guestagent/datastore/experimental/mariadb/service.py b/trove/guestagent/datastore/experimental/mariadb/service.py new file mode 100644 index 0000000000..d6c7026ed5 --- /dev/null +++ b/trove/guestagent/datastore/experimental/mariadb/service.py @@ -0,0 +1,50 @@ +# Copyright 2015 Tesora, Inc. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +from oslo_log import log as logging +from trove.guestagent.datastore.mysql import service_base + +LOG = logging.getLogger(__name__) + + +class KeepAliveConnection(service_base.BaseKeepAliveConnection): + pass + + +class MySqlAppStatus(service_base.BaseMySqlAppStatus): + pass + + +class LocalSqlClient(service_base.BaseLocalSqlClient): + pass + + +class MySqlApp(service_base.BaseMySqlApp): + def __init__(self, status): + super(MySqlApp, self).__init__(status, LocalSqlClient, + KeepAliveConnection) + + +class MySqlRootAccess(service_base.BaseMySqlRootAccess): + def __init__(self): + super(MySqlRootAccess, self).__init__(LocalSqlClient, + MySqlApp(MySqlAppStatus.get())) + + +class MySqlAdmin(service_base.BaseMySqlAdmin): + def __init__(self): + super(MySqlAdmin, self).__init__(LocalSqlClient, MySqlRootAccess(), + MySqlApp) diff --git a/trove/guestagent/datastore/experimental/percona/__init__.py b/trove/guestagent/datastore/experimental/percona/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/trove/guestagent/datastore/experimental/percona/manager.py b/trove/guestagent/datastore/experimental/percona/manager.py new file mode 100644 index 0000000000..1664e8538b --- /dev/null +++ b/trove/guestagent/datastore/experimental/percona/manager.py @@ -0,0 +1,47 @@ +# Copyright 2015 Tesora, Inc. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +from oslo_utils import importutils +from trove.common import cfg +from trove.guestagent.datastore.mysql import manager_base +from trove.guestagent.strategies.replication import get_replication_strategy + +CONF = cfg.CONF +MANAGER = CONF.datastore_manager if CONF.datastore_manager else 'mysql' +REPLICATION_STRATEGY = CONF.get(MANAGER).replication_strategy +REPLICATION_NAMESPACE = CONF.get(MANAGER).replication_namespace +REPLICATION_STRATEGY_CLASS = get_replication_strategy(REPLICATION_STRATEGY, + REPLICATION_NAMESPACE) + +MYSQL_APP = "trove.guestagent.datastore.experimental.percona." \ + "service.MySqlApp" +MYSQL_APP_STATUS = "trove.guestagent.datastore.experimental.percona." \ + "service.MySqlAppStatus" +MYSQL_ADMIN = "trove.guestagent.datastore.experimental.percona." \ + "service.MySqlAdmin" + + +class Manager(manager_base.BaseMySqlManager): + + def __init__(self): + mysql_app = importutils.import_class(MYSQL_APP) + mysql_app_status = importutils.import_class(MYSQL_APP_STATUS) + mysql_admin = importutils.import_class(MYSQL_ADMIN) + + super(Manager, self).__init__(mysql_app, mysql_app_status, + mysql_admin, REPLICATION_STRATEGY, + REPLICATION_NAMESPACE, + REPLICATION_STRATEGY_CLASS, MANAGER) diff --git a/trove/guestagent/datastore/experimental/percona/service.py b/trove/guestagent/datastore/experimental/percona/service.py new file mode 100644 index 0000000000..d6c7026ed5 --- /dev/null +++ b/trove/guestagent/datastore/experimental/percona/service.py @@ -0,0 +1,50 @@ +# Copyright 2015 Tesora, Inc. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +from oslo_log import log as logging +from trove.guestagent.datastore.mysql import service_base + +LOG = logging.getLogger(__name__) + + +class KeepAliveConnection(service_base.BaseKeepAliveConnection): + pass + + +class MySqlAppStatus(service_base.BaseMySqlAppStatus): + pass + + +class LocalSqlClient(service_base.BaseLocalSqlClient): + pass + + +class MySqlApp(service_base.BaseMySqlApp): + def __init__(self, status): + super(MySqlApp, self).__init__(status, LocalSqlClient, + KeepAliveConnection) + + +class MySqlRootAccess(service_base.BaseMySqlRootAccess): + def __init__(self): + super(MySqlRootAccess, self).__init__(LocalSqlClient, + MySqlApp(MySqlAppStatus.get())) + + +class MySqlAdmin(service_base.BaseMySqlAdmin): + def __init__(self): + super(MySqlAdmin, self).__init__(LocalSqlClient, MySqlRootAccess(), + MySqlApp) diff --git a/trove/guestagent/datastore/mysql/manager.py b/trove/guestagent/datastore/mysql/manager.py index 2e687d427b..6dfeddeb22 100644 --- a/trove/guestagent/datastore/mysql/manager.py +++ b/trove/guestagent/datastore/mysql/manager.py @@ -16,27 +16,11 @@ # under the License. # -import os - -from oslo_log import log as logging -from oslo_service import periodic_task - +from oslo_utils import importutils from trove.common import cfg -from trove.common import exception -from trove.common.i18n import _ -from trove.common import instance as rd_instance -from trove.guestagent import backup -from trove.guestagent.common import operating_system -from trove.guestagent.datastore.mysql import service -from trove.guestagent.datastore.mysql.service import MySqlAdmin -from trove.guestagent.datastore.mysql.service import MySqlApp -from trove.guestagent.datastore.mysql.service import MySqlAppStatus -from trove.guestagent import dbaas +from trove.guestagent.datastore.mysql import manager_base from trove.guestagent.strategies.replication import get_replication_strategy -from trove.guestagent import volume - -LOG = logging.getLogger(__name__) CONF = cfg.CONF MANAGER = CONF.datastore_manager if CONF.datastore_manager else 'mysql' REPLICATION_STRATEGY = CONF.get(MANAGER).replication_strategy @@ -44,309 +28,19 @@ REPLICATION_NAMESPACE = CONF.get(MANAGER).replication_namespace REPLICATION_STRATEGY_CLASS = get_replication_strategy(REPLICATION_STRATEGY, REPLICATION_NAMESPACE) +MYSQL_APP = "trove.guestagent.datastore.mysql.service.MySqlApp" +MYSQL_APP_STATUS = "trove.guestagent.datastore.mysql.service.MySqlAppStatus" +MYSQL_ADMIN = "trove.guestagent.datastore.mysql.service.MySqlAdmin" -class Manager(periodic_task.PeriodicTasks): + +class Manager(manager_base.BaseMySqlManager): def __init__(self): - super(Manager, self).__init__(CONF) + mysql_app = importutils.import_class(MYSQL_APP) + mysql_app_status = importutils.import_class(MYSQL_APP_STATUS) + mysql_admin = importutils.import_class(MYSQL_ADMIN) - @periodic_task.periodic_task - def update_status(self, context): - """Update the status of the MySQL service.""" - MySqlAppStatus.get().update() - - def rpc_ping(self, context): - LOG.debug("Responding to RPC ping.") - return True - - def change_passwords(self, context, users): - return MySqlAdmin().change_passwords(users) - - def update_attributes(self, context, username, hostname, user_attrs): - return MySqlAdmin().update_attributes(username, hostname, user_attrs) - - def reset_configuration(self, context, configuration): - app = MySqlApp(MySqlAppStatus.get()) - app.reset_configuration(configuration) - - def create_database(self, context, databases): - return MySqlAdmin().create_database(databases) - - def create_user(self, context, users): - MySqlAdmin().create_user(users) - - def delete_database(self, context, database): - return MySqlAdmin().delete_database(database) - - def delete_user(self, context, user): - MySqlAdmin().delete_user(user) - - def get_user(self, context, username, hostname): - return MySqlAdmin().get_user(username, hostname) - - def grant_access(self, context, username, hostname, databases): - return MySqlAdmin().grant_access(username, hostname, databases) - - def revoke_access(self, context, username, hostname, database): - return MySqlAdmin().revoke_access(username, hostname, database) - - def list_access(self, context, username, hostname): - return MySqlAdmin().list_access(username, hostname) - - def list_databases(self, context, limit=None, marker=None, - include_marker=False): - return MySqlAdmin().list_databases(limit, marker, - include_marker) - - def list_users(self, context, limit=None, marker=None, - include_marker=False): - return MySqlAdmin().list_users(limit, marker, - include_marker) - - def enable_root(self, context): - return MySqlAdmin().enable_root() - - def is_root_enabled(self, context): - return MySqlAdmin().is_root_enabled() - - def _perform_restore(self, backup_info, context, restore_location, app): - LOG.info(_("Restoring database from backup %s.") % backup_info['id']) - try: - backup.restore(context, backup_info, restore_location) - except Exception: - LOG.exception(_("Error performing restore from backup %s.") % - backup_info['id']) - app.status.set_status(rd_instance.ServiceStatuses.FAILED) - raise - LOG.info(_("Restored database successfully.")) - - def prepare(self, context, packages, databases, memory_mb, users, - device_path=None, mount_point=None, backup_info=None, - config_contents=None, root_password=None, overrides=None, - cluster_config=None, snapshot=None): - """Makes ready DBAAS on a Guest container.""" - MySqlAppStatus.get().begin_install() - # status end_mysql_install set with secure() - app = MySqlApp(MySqlAppStatus.get()) - app.install_if_needed(packages) - if device_path: - # stop and do not update database - app.stop_db() - device = volume.VolumeDevice(device_path) - # unmount if device is already mounted - device.unmount_device(device_path) - device.format() - if os.path.exists(mount_point): - # rsync existing data to a "data" sub-directory - # on the new volume - device.migrate_data(mount_point, target_subdir="data") - # mount the volume - device.mount(mount_point) - operating_system.chown( - mount_point, service.MYSQL_OWNER, service.MYSQL_OWNER, - recursive=False, as_root=True) - - LOG.debug("Mounted the volume at %s." % mount_point) - # We need to temporarily update the default my.cnf so that - # mysql will start after the volume is mounted. Later on it - # will be changed based on the config template - # (see MySqlApp.secure()) and restart. - app.set_data_dir(mount_point + '/data') - app.start_mysql() - if backup_info: - self._perform_restore(backup_info, context, - mount_point + "/data", app) - LOG.debug("Securing MySQL now.") - app.secure(config_contents, overrides) - enable_root_on_restore = (backup_info and - MySqlAdmin().is_root_enabled()) - if root_password and not backup_info: - app.secure_root(secure_remote_root=True) - MySqlAdmin().enable_root(root_password) - elif enable_root_on_restore: - app.secure_root(secure_remote_root=False) - MySqlAppStatus.get().report_root(context, 'root') - else: - app.secure_root(secure_remote_root=True) - - app.complete_install_or_restart() - - if databases: - self.create_database(context, databases) - - if users: - self.create_user(context, users) - - if snapshot: - self.attach_replica(context, snapshot, snapshot['config']) - - LOG.info(_('Completed setup of MySQL database instance.')) - - def restart(self, context): - app = MySqlApp(MySqlAppStatus.get()) - app.restart() - - def start_db_with_conf_changes(self, context, config_contents): - app = MySqlApp(MySqlAppStatus.get()) - app.start_db_with_conf_changes(config_contents) - - def stop_db(self, context, do_not_start_on_reboot=False): - app = MySqlApp(MySqlAppStatus.get()) - app.stop_db(do_not_start_on_reboot=do_not_start_on_reboot) - - def get_filesystem_stats(self, context, fs_path): - """Gets the filesystem stats for the path given.""" - mount_point = CONF.get(MANAGER).mount_point - return dbaas.get_filesystem_volume_stats(mount_point) - - def create_backup(self, context, backup_info): - """ - Entry point for initiating a backup for this guest agents db instance. - The call currently blocks until the backup is complete or errors. If - device_path is specified, it will be mounted based to a point specified - in configuration. - - :param backup_info: a dictionary containing the db instance id of the - backup task, location, type, and other data. - """ - backup.backup(context, backup_info) - - def mount_volume(self, context, device_path=None, mount_point=None): - device = volume.VolumeDevice(device_path) - device.mount(mount_point, write_to_fstab=False) - LOG.debug("Mounted the device %s at the mount point %s." % - (device_path, mount_point)) - - def unmount_volume(self, context, device_path=None, mount_point=None): - device = volume.VolumeDevice(device_path) - device.unmount(mount_point) - LOG.debug("Unmounted the device %s from the mount point %s." % - (device_path, mount_point)) - - def resize_fs(self, context, device_path=None, mount_point=None): - device = volume.VolumeDevice(device_path) - device.resize_fs(mount_point) - LOG.debug("Resized the filesystem %s." % mount_point) - - def update_overrides(self, context, overrides, remove=False): - app = MySqlApp(MySqlAppStatus.get()) - if remove: - app.remove_overrides() - app.update_overrides(overrides) - - def apply_overrides(self, context, overrides): - LOG.debug("Applying overrides (%s)." % overrides) - app = MySqlApp(MySqlAppStatus.get()) - app.apply_overrides(overrides) - - def get_replication_snapshot(self, context, snapshot_info, - replica_source_config=None): - LOG.debug("Getting replication snapshot.") - app = MySqlApp(MySqlAppStatus.get()) - - replication = REPLICATION_STRATEGY_CLASS(context) - replication.enable_as_master(app, replica_source_config) - - snapshot_id, log_position = ( - replication.snapshot_for_replication(context, app, None, - snapshot_info)) - - mount_point = CONF.get(MANAGER).mount_point - volume_stats = dbaas.get_filesystem_volume_stats(mount_point) - - replication_snapshot = { - 'dataset': { - 'datastore_manager': MANAGER, - 'dataset_size': volume_stats.get('used', 0.0), - 'volume_size': volume_stats.get('total', 0.0), - 'snapshot_id': snapshot_id - }, - 'replication_strategy': REPLICATION_STRATEGY, - 'master': replication.get_master_ref(app, snapshot_info), - 'log_position': log_position - } - - return replication_snapshot - - def enable_as_master(self, context, replica_source_config): - LOG.debug("Calling enable_as_master.") - app = MySqlApp(MySqlAppStatus.get()) - replication = REPLICATION_STRATEGY_CLASS(context) - replication.enable_as_master(app, replica_source_config) - - # DEPRECATED: Maintain for API Compatibility - def get_txn_count(self, context): - LOG.debug("Calling get_txn_count") - return MySqlApp(MySqlAppStatus.get()).get_txn_count() - - def get_last_txn(self, context): - LOG.debug("Calling get_last_txn") - return MySqlApp(MySqlAppStatus.get()).get_last_txn() - - def get_latest_txn_id(self, context): - LOG.debug("Calling get_latest_txn_id.") - return MySqlApp(MySqlAppStatus.get()).get_latest_txn_id() - - def wait_for_txn(self, context, txn): - LOG.debug("Calling wait_for_txn.") - MySqlApp(MySqlAppStatus.get()).wait_for_txn(txn) - - def detach_replica(self, context, for_failover=False): - LOG.debug("Detaching replica.") - app = MySqlApp(MySqlAppStatus.get()) - replication = REPLICATION_STRATEGY_CLASS(context) - replica_info = replication.detach_slave(app, for_failover) - return replica_info - - def get_replica_context(self, context): - LOG.debug("Getting replica context.") - app = MySqlApp(MySqlAppStatus.get()) - replication = REPLICATION_STRATEGY_CLASS(context) - replica_info = replication.get_replica_context(app) - return replica_info - - def _validate_slave_for_replication(self, context, replica_info): - if (replica_info['replication_strategy'] != REPLICATION_STRATEGY): - raise exception.IncompatibleReplicationStrategy( - replica_info.update({ - 'guest_strategy': REPLICATION_STRATEGY - })) - - mount_point = CONF.get(MANAGER).mount_point - volume_stats = dbaas.get_filesystem_volume_stats(mount_point) - if (volume_stats.get('total', 0.0) < - replica_info['dataset']['dataset_size']): - raise exception.InsufficientSpaceForReplica( - replica_info.update({ - 'slave_volume_size': volume_stats.get('total', 0.0) - })) - - def attach_replica(self, context, replica_info, slave_config): - LOG.debug("Attaching replica.") - app = MySqlApp(MySqlAppStatus.get()) - try: - if 'replication_strategy' in replica_info: - self._validate_slave_for_replication(context, replica_info) - replication = REPLICATION_STRATEGY_CLASS(context) - replication.enable_as_slave(app, replica_info, slave_config) - except Exception: - LOG.exception("Error enabling replication.") - app.status.set_status(rd_instance.ServiceStatuses.FAILED) - raise - - def make_read_only(self, context, read_only): - LOG.debug("Executing make_read_only(%s)" % read_only) - app = MySqlApp(MySqlAppStatus.get()) - app.make_read_only(read_only) - - def cleanup_source_on_replica_detach(self, context, replica_info): - LOG.debug("Cleaning up the source on the detach of a replica.") - replication = REPLICATION_STRATEGY_CLASS(context) - replication.cleanup_source_on_replica_detach(MySqlAdmin(), - replica_info) - - def demote_replication_master(self, context): - LOG.debug("Demoting replication master.") - app = MySqlApp(MySqlAppStatus.get()) - replication = REPLICATION_STRATEGY_CLASS(context) - replication.demote_master(app) + super(Manager, self).__init__(mysql_app, mysql_app_status, + mysql_admin, REPLICATION_STRATEGY, + REPLICATION_NAMESPACE, + REPLICATION_STRATEGY_CLASS, MANAGER) diff --git a/trove/guestagent/datastore/mysql/manager_base.py b/trove/guestagent/datastore/mysql/manager_base.py new file mode 100644 index 0000000000..7b8b9e118c --- /dev/null +++ b/trove/guestagent/datastore/mysql/manager_base.py @@ -0,0 +1,384 @@ +# Copyright 2013 OpenStack Foundation +# Copyright 2013 Rackspace Hosting +# Copyright 2013 Hewlett-Packard Development Company, L.P. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +import os + +from oslo_log import log as logging +from oslo_service import periodic_task + +from trove.common import cfg +from trove.common import exception +from trove.common.i18n import _ +from trove.common import instance as rd_instance +from trove.guestagent import backup +from trove.guestagent.common import operating_system +from trove.guestagent.datastore.mysql import service_base +from trove.guestagent import dbaas +from trove.guestagent.strategies.replication import get_replication_strategy +from trove.guestagent import volume + + +LOG = logging.getLogger(__name__) +CONF = cfg.CONF + + +class BaseMySqlManager(periodic_task.PeriodicTasks): + + def __init__(self, mysql_app, mysql_app_status, mysql_admin, + replication_strategy, replication_namespace, + replication_strategy_class, manager): + + super(BaseMySqlManager, self).__init__(CONF) + self._mysql_app = mysql_app + self._mysql_app_status = mysql_app_status + self._mysql_admin = mysql_admin + self._replication_strategy = replication_strategy + self._replication_namespace = replication_namespace + self._replication_strategy_class = replication_strategy_class + self._manager = manager + + @property + def mysql_app(self): + return self._mysql_app + + @property + def mysql_app_status(self): + return self._mysql_app_status + + @property + def mysql_admin(self): + return self._mysql_admin + + @property + def replication_strategy(self): + return self._replication_strategy + + @property + def replication_namespace(self): + return self._replication_namespace + + @property + def replication_strategy_class(self): + return get_replication_strategy(self._replication_strategy, + self._replication_namespace) + + @property + def manager(self): + return self._manager + + @periodic_task.periodic_task + def update_status(self, context): + """Update the status of the MySQL service.""" + self.mysql_app_status.get().update() + + def rpc_ping(self, context): + LOG.debug("Responding to RPC ping.") + return True + + def change_passwords(self, context, users): + return self.mysql_admin().change_passwords(users) + + def update_attributes(self, context, username, hostname, user_attrs): + return self.mysql_admin().update_attributes( + username, hostname, user_attrs) + + def reset_configuration(self, context, configuration): + app = self.mysql_app(self.mysql_app_status.get()) + app.reset_configuration(configuration) + + def create_database(self, context, databases): + return self.mysql_admin().create_database(databases) + + def create_user(self, context, users): + self.mysql_admin().create_user(users) + + def delete_database(self, context, database): + return self.mysql_admin().delete_database(database) + + def delete_user(self, context, user): + self.mysql_admin().delete_user(user) + + def get_user(self, context, username, hostname): + return self.mysql_admin().get_user(username, hostname) + + def grant_access(self, context, username, hostname, databases): + return self.mysql_admin().grant_access(username, hostname, databases) + + def revoke_access(self, context, username, hostname, database): + return self.mysql_admin().revoke_access(username, hostname, database) + + def list_access(self, context, username, hostname): + return self.mysql_admin().list_access(username, hostname) + + def list_databases(self, context, limit=None, marker=None, + include_marker=False): + return self.mysql_admin().list_databases(limit, marker, + include_marker) + + def list_users(self, context, limit=None, marker=None, + include_marker=False): + return self.mysql_admin().list_users(limit, marker, + include_marker) + + def enable_root(self, context): + return self.mysql_admin().enable_root() + + def is_root_enabled(self, context): + return self.mysql_admin().is_root_enabled() + + def _perform_restore(self, backup_info, context, restore_location, app): + LOG.info(_("Restoring database from backup %s.") % backup_info['id']) + try: + backup.restore(context, backup_info, restore_location) + except Exception: + LOG.exception(_("Error performing restore from backup %s.") % + backup_info['id']) + app.status.set_status(rd_instance.ServiceStatuses.FAILED) + raise + LOG.info(_("Restored database successfully.")) + + def prepare(self, context, packages, databases, memory_mb, users, + device_path=None, mount_point=None, backup_info=None, + config_contents=None, root_password=None, overrides=None, + cluster_config=None, snapshot=None): + """Makes ready DBAAS on a Guest container.""" + self.mysql_app_status.get().begin_install() + # status end_mysql_install set with secure() + app = self.mysql_app(self.mysql_app_status.get()) + app.install_if_needed(packages) + if device_path: + # stop and do not update database + app.stop_db() + device = volume.VolumeDevice(device_path) + # unmount if device is already mounted + device.unmount_device(device_path) + device.format() + if os.path.exists(mount_point): + # rsync existing data to a "data" sub-directory + # on the new volume + device.migrate_data(mount_point, target_subdir="data") + # mount the volume + device.mount(mount_point) + operating_system.chown(mount_point, service_base.MYSQL_OWNER, + service_base.MYSQL_OWNER, + recursive=False, as_root=True) + + LOG.debug("Mounted the volume at %s." % mount_point) + # We need to temporarily update the default my.cnf so that + # mysql will start after the volume is mounted. Later on it + # will be changed based on the config template + # (see MySqlApp.secure()) and restart. + app.set_data_dir(mount_point + '/data') + app.start_mysql() + if backup_info: + self._perform_restore(backup_info, context, + mount_point + "/data", app) + LOG.debug("Securing MySQL now.") + app.secure(config_contents, overrides) + enable_root_on_restore = (backup_info and + self.mysql_admin().is_root_enabled()) + if root_password and not backup_info: + app.secure_root(secure_remote_root=True) + self.mysql_admin().enable_root(root_password) + elif enable_root_on_restore: + app.secure_root(secure_remote_root=False) + self.mysql_app_status.get().report_root(context, 'root') + else: + app.secure_root(secure_remote_root=True) + + app.complete_install_or_restart() + + if databases: + self.create_database(context, databases) + + if users: + self.create_user(context, users) + + if snapshot: + self.attach_replica(context, snapshot, snapshot['config']) + + LOG.info(_('Completed setup of MySQL database instance.')) + + def restart(self, context): + app = self.mysql_app(self.mysql_app_status.get()) + app.restart() + + def start_db_with_conf_changes(self, context, config_contents): + app = self.mysql_app(self.mysql_app_status.get()) + app.start_db_with_conf_changes(config_contents) + + def stop_db(self, context, do_not_start_on_reboot=False): + app = self.mysql_app(self.mysql_app_status.get()) + app.stop_db(do_not_start_on_reboot=do_not_start_on_reboot) + + def get_filesystem_stats(self, context, fs_path): + """Gets the filesystem stats for the path given.""" + mount_point = CONF.get(self.manager).mount_point + return dbaas.get_filesystem_volume_stats(mount_point) + + def create_backup(self, context, backup_info): + """ + Entry point for initiating a backup for this guest agents db instance. + The call currently blocks until the backup is complete or errors. If + device_path is specified, it will be mounted based to a point specified + in configuration. + + :param backup_info: a dictionary containing the db instance id of the + backup task, location, type, and other data. + """ + backup.backup(context, backup_info) + + def mount_volume(self, context, device_path=None, mount_point=None): + device = volume.VolumeDevice(device_path) + device.mount(mount_point, write_to_fstab=False) + LOG.debug("Mounted the device %s at the mount point %s." % + (device_path, mount_point)) + + def unmount_volume(self, context, device_path=None, mount_point=None): + device = volume.VolumeDevice(device_path) + device.unmount(mount_point) + LOG.debug("Unmounted the device %s from the mount point %s." % + (device_path, mount_point)) + + def resize_fs(self, context, device_path=None, mount_point=None): + device = volume.VolumeDevice(device_path) + device.resize_fs(mount_point) + LOG.debug("Resized the filesystem %s." % mount_point) + + def update_overrides(self, context, overrides, remove=False): + app = self.mysql_app(self.mysql_app_status.get()) + if remove: + app.remove_overrides() + app.update_overrides(overrides) + + def apply_overrides(self, context, overrides): + LOG.debug("Applying overrides (%s)." % overrides) + app = self.mysql_app(self.mysql_app_status.get()) + app.apply_overrides(overrides) + + def get_replication_snapshot(self, context, snapshot_info, + replica_source_config=None): + LOG.debug("Getting replication snapshot.") + app = self.mysql_app(self.mysql_app_status.get()) + + replication = self.replication_strategy_class(context) + replication.enable_as_master(app, replica_source_config) + + snapshot_id, log_position = ( + replication.snapshot_for_replication(context, app, None, + snapshot_info)) + + mount_point = CONF.get(self.manager).mount_point + volume_stats = dbaas.get_filesystem_volume_stats(mount_point) + + replication_snapshot = { + 'dataset': { + 'datastore_manager': self.manager, + 'dataset_size': volume_stats.get('used', 0.0), + 'volume_size': volume_stats.get('total', 0.0), + 'snapshot_id': snapshot_id + }, + 'replication_strategy': self.replication_strategy, + 'master': replication.get_master_ref(app, snapshot_info), + 'log_position': log_position + } + + return replication_snapshot + + def enable_as_master(self, context, replica_source_config): + LOG.debug("Calling enable_as_master.") + app = self.mysql_app(self.mysql_app_status.get()) + replication = self.replication_strategy_class(context) + replication.enable_as_master(app, replica_source_config) + + # DEPRECATED: Maintain for API Compatibility + def get_txn_count(self, context): + LOG.debug("Calling get_txn_count") + return self.mysql_app(self.mysql_app_status.get()).get_txn_count() + + def get_last_txn(self, context): + LOG.debug("Calling get_last_txn") + return self.mysql_app(self.mysql_app_status.get()).get_last_txn() + + def get_latest_txn_id(self, context): + LOG.debug("Calling get_latest_txn_id.") + return self.mysql_app(self.mysql_app_status.get()).get_latest_txn_id() + + def wait_for_txn(self, context, txn): + LOG.debug("Calling wait_for_txn.") + self.mysql_app(self.mysql_app_status.get()).wait_for_txn(txn) + + def detach_replica(self, context, for_failover=False): + LOG.debug("Detaching replica.") + app = self.mysql_app(self.mysql_app_status.get()) + replication = self.replication_strategy_class(context) + replica_info = replication.detach_slave(app, for_failover) + return replica_info + + def get_replica_context(self, context): + LOG.debug("Getting replica context.") + app = self.mysql_app(self.mysql_app_status.get()) + replication = self.replication_strategy_class(context) + replica_info = replication.get_replica_context(app) + return replica_info + + def _validate_slave_for_replication(self, context, replica_info): + if (replica_info['replication_strategy'] != self.replication_strategy): + raise exception.IncompatibleReplicationStrategy( + replica_info.update({ + 'guest_strategy': self.replication_strategy + })) + + mount_point = CONF.get(self.manager).mount_point + volume_stats = dbaas.get_filesystem_volume_stats(mount_point) + if (volume_stats.get('total', 0.0) < + replica_info['dataset']['dataset_size']): + raise exception.InsufficientSpaceForReplica( + replica_info.update({ + 'slave_volume_size': volume_stats.get('total', 0.0) + })) + + def attach_replica(self, context, replica_info, slave_config): + LOG.debug("Attaching replica.") + app = self.mysql_app(self.mysql_app_status.get()) + try: + if 'replication_strategy' in replica_info: + self._validate_slave_for_replication(context, replica_info) + replication = self.replication_strategy_class(context) + replication.enable_as_slave(app, replica_info, slave_config) + except Exception: + LOG.exception("Error enabling replication.") + app.status.set_status(rd_instance.ServiceStatuses.FAILED) + raise + + def make_read_only(self, context, read_only): + LOG.debug("Executing make_read_only(%s)" % read_only) + app = self.mysql_app(self.mysql_app_status.get()) + app.make_read_only(read_only) + + def cleanup_source_on_replica_detach(self, context, replica_info): + LOG.debug("Cleaning up the source on the detach of a replica.") + replication = self.replication_strategy_class(context) + replication.cleanup_source_on_replica_detach(self.mysql_admin(), + replica_info) + + def demote_replication_master(self, context): + LOG.debug("Demoting replication master.") + app = self.mysql_app(self.mysql_app_status.get()) + replication = self.replication_strategy_class(context) + replication.demote_master(app) diff --git a/trove/guestagent/datastore/mysql/service.py b/trove/guestagent/datastore/mysql/service.py index c59c1e59db..de6bc54b78 100644 --- a/trove/guestagent/datastore/mysql/service.py +++ b/trove/guestagent/datastore/mysql/service.py @@ -16,1018 +16,41 @@ # under the License. # -from collections import defaultdict -import os -import re -import uuid - from oslo_log import log as logging -import sqlalchemy -from sqlalchemy import exc -from sqlalchemy import interfaces -from sqlalchemy.sql.expression import text +from trove.guestagent.datastore.mysql import service_base -from trove.common import cfg -from trove.common.configurations import MySQLConfParser -from trove.common import exception -from trove.common.exception import PollTimeOut -from trove.common.i18n import _ -from trove.common import instance as rd_instance -from trove.common.stream_codecs import IniCodec -from trove.common import utils as utils -from trove.guestagent.common.configuration import ConfigurationManager -from trove.guestagent.common.configuration import ImportOverrideStrategy -from trove.guestagent.common import guestagent_utils -from trove.guestagent.common import operating_system -from trove.guestagent.common import sql_query -from trove.guestagent.datastore import service -from trove.guestagent.db import models -from trove.guestagent import pkg - -ADMIN_USER_NAME = "os_admin" LOG = logging.getLogger(__name__) -FLUSH = text(sql_query.FLUSH) -ENGINE = None -PREPARING = False -UUID = False - -CONF = cfg.CONF -MANAGER = CONF.datastore_manager if CONF.datastore_manager else 'mysql' - -INCLUDE_MARKER_OPERATORS = { - True: ">=", - False: ">" -} - -OS_NAME = operating_system.get_os() -MYSQL_CONFIG = {operating_system.REDHAT: "/etc/my.cnf", - operating_system.DEBIAN: "/etc/mysql/my.cnf", - operating_system.SUSE: "/etc/my.cnf"}[OS_NAME] -MYSQL_SERVICE_CANDIDATES = ["mysql", "mysqld", "mysql-server"] -MYSQL_BIN_CANDIDATES = ["/usr/sbin/mysqld", "/usr/libexec/mysqld"] -MYSQL_OWNER = 'mysql' -CNF_EXT = 'cnf' -CNF_INCLUDE_DIR = '/etc/mysql/conf.d/' -CNF_MASTER = 'master-replication' -CNF_SLAVE = 'slave-replication' +CONF = service_base.CONF -# Create a package impl -packager = pkg.Package() +class KeepAliveConnection(service_base.BaseKeepAliveConnection): + pass -def clear_expired_password(): - """ - Some mysql installations generate random root password - and save it in /root/.mysql_secret, this password is - expired and should be changed by client that supports expired passwords. - """ - LOG.debug("Removing expired password.") - secret_file = "/root/.mysql_secret" - try: - out, err = utils.execute("cat", secret_file, - run_as_root=True, root_helper="sudo") - except exception.ProcessExecutionError: - LOG.exception(_("/root/.mysql_secret does not exist.")) - return - m = re.match('# The random password set for the root user at .*: (.*)', - out) - if m: - try: - out, err = utils.execute("mysqladmin", "-p%s" % m.group(1), - "password", "", run_as_root=True, - root_helper="sudo") - except exception.ProcessExecutionError: - LOG.exception(_("Cannot change mysql password.")) - return - operating_system.remove(secret_file, force=True, as_root=True) - LOG.debug("Expired password removed.") +class MySqlAppStatus(service_base.BaseMySqlAppStatus): + pass -def get_engine(): - """Create the default engine with the updated admin user.""" - # TODO(rnirmal):Based on permissions issues being resolved we may revert - # url = URL(drivername='mysql', host='localhost', - # query={'read_default_file': '/etc/mysql/my.cnf'}) - global ENGINE - if ENGINE: - return ENGINE - pwd = MySqlApp.get_auth_password() - ENGINE = sqlalchemy.create_engine("mysql://%s:%s@localhost:3306" % - (ADMIN_USER_NAME, pwd.strip()), - pool_recycle=7200, - echo=CONF.sql_query_logging, - listeners=[KeepAliveConnection()]) - return ENGINE +class LocalSqlClient(service_base.BaseLocalSqlClient): + pass -def load_mysqld_options(): - # find mysqld bin - for bin in MYSQL_BIN_CANDIDATES: - if os.path.isfile(bin): - mysqld_bin = bin - break - else: - return {} - try: - out, err = utils.execute(mysqld_bin, "--print-defaults", - run_as_root=True, root_helper="sudo") - arglist = re.split("\n", out)[1].split() - args = defaultdict(list) - for item in arglist: - if "=" in item: - key, value = item.split("=", 1) - args[key.lstrip("--")].append(value) - else: - args[item.lstrip("--")].append(None) - return args - except exception.ProcessExecutionError: - return {} - - -class MySqlAppStatus(service.BaseDbStatus): - - @classmethod - def get(cls): - if not cls._instance: - cls._instance = MySqlAppStatus() - return cls._instance - - def _get_actual_db_status(self): - try: - out, err = utils.execute_with_timeout( - "/usr/bin/mysqladmin", - "ping", run_as_root=True, root_helper="sudo", - log_output_on_error=True) - LOG.info(_("MySQL Service Status is RUNNING.")) - return rd_instance.ServiceStatuses.RUNNING - except exception.ProcessExecutionError: - LOG.exception(_("Failed to get database status.")) - try: - out, err = utils.execute_with_timeout("/bin/ps", "-C", - "mysqld", "h") - pid = out.split()[0] - # TODO(rnirmal): Need to create new statuses for instances - # where the mysql service is up, but unresponsive - LOG.info(_('MySQL Service Status %(pid)s is BLOCKED.') % - {'pid': pid}) - return rd_instance.ServiceStatuses.BLOCKED - except exception.ProcessExecutionError: - LOG.exception(_("Process execution failed.")) - mysql_args = load_mysqld_options() - pid_file = mysql_args.get('pid_file', - ['/var/run/mysqld/mysqld.pid'])[0] - if os.path.exists(pid_file): - LOG.info(_("MySQL Service Status is CRASHED.")) - return rd_instance.ServiceStatuses.CRASHED - else: - LOG.info(_("MySQL Service Status is SHUTDOWN.")) - return rd_instance.ServiceStatuses.SHUTDOWN - - -class LocalSqlClient(object): - """A sqlalchemy wrapper to manage transactions.""" - - def __init__(self, engine, use_flush=True): - self.engine = engine - self.use_flush = use_flush - - def __enter__(self): - self.conn = self.engine.connect() - self.trans = self.conn.begin() - return self.conn - - def __exit__(self, type, value, traceback): - if self.trans: - if type is not None: # An error occurred - self.trans.rollback() - else: - if self.use_flush: - self.conn.execute(FLUSH) - self.trans.commit() - self.conn.close() - - def execute(self, t, **kwargs): - try: - return self.conn.execute(t, kwargs) - except Exception: - self.trans.rollback() - self.trans = None - raise - - -class MySqlAdmin(object): - """Handles administrative tasks on the MySQL database.""" - - def _associate_dbs(self, user): - """Internal. Given a MySQLUser, populate its databases attribute.""" - LOG.debug("Associating dbs to user %s at %s." % - (user.name, user.host)) - with LocalSqlClient(get_engine()) as client: - q = sql_query.Query() - q.columns = ["grantee", "table_schema"] - q.tables = ["information_schema.SCHEMA_PRIVILEGES"] - q.group = ["grantee", "table_schema"] - q.where = ["privilege_type != 'USAGE'"] - t = text(str(q)) - db_result = client.execute(t) - for db in db_result: - LOG.debug("\t db: %s." % db) - if db['grantee'] == "'%s'@'%s'" % (user.name, user.host): - mysql_db = models.MySQLDatabase() - mysql_db.name = db['table_schema'] - user.databases.append(mysql_db.serialize()) - - def change_passwords(self, users): - """Change the passwords of one or more existing users.""" - LOG.debug("Changing the password of some users.") - with LocalSqlClient(get_engine()) as client: - for item in users: - LOG.debug("Changing password for user %s." % item) - user_dict = {'_name': item['name'], - '_host': item['host'], - '_password': item['password']} - user = models.MySQLUser() - user.deserialize(user_dict) - LOG.debug("\tDeserialized: %s." % user.__dict__) - uu = sql_query.UpdateUser(user.name, host=user.host, - clear=user.password) - t = text(str(uu)) - client.execute(t) - - def update_attributes(self, username, hostname, user_attrs): - """Change the attributes of an existing user.""" - LOG.debug("Changing user attributes for user %s." % username) - user = self._get_user(username, hostname) - db_access = set() - grantee = set() - with LocalSqlClient(get_engine()) as client: - q = sql_query.Query() - q.columns = ["grantee", "table_schema"] - q.tables = ["information_schema.SCHEMA_PRIVILEGES"] - q.group = ["grantee", "table_schema"] - q.where = ["privilege_type != 'USAGE'"] - t = text(str(q)) - db_result = client.execute(t) - for db in db_result: - grantee.add(db['grantee']) - if db['grantee'] == "'%s'@'%s'" % (user.name, user.host): - db_name = db['table_schema'] - db_access.add(db_name) - with LocalSqlClient(get_engine()) as client: - uu = sql_query.UpdateUser(user.name, host=user.host, - clear=user_attrs.get('password'), - new_user=user_attrs.get('name'), - new_host=user_attrs.get('host')) - t = text(str(uu)) - client.execute(t) - uname = user_attrs.get('name') or username - host = user_attrs.get('host') or hostname - find_user = "'%s'@'%s'" % (uname, host) - if find_user not in grantee: - self.grant_access(uname, host, db_access) - - def create_database(self, databases): - """Create the list of specified databases.""" - with LocalSqlClient(get_engine()) as client: - for item in databases: - mydb = models.ValidatedMySQLDatabase() - mydb.deserialize(item) - cd = sql_query.CreateDatabase(mydb.name, - mydb.character_set, - mydb.collate) - t = text(str(cd)) - client.execute(t) - - def create_user(self, users): - """Create users and grant them privileges for the - specified databases. - """ - with LocalSqlClient(get_engine()) as client: - for item in users: - user = models.MySQLUser() - user.deserialize(item) - # TODO(cp16net):Should users be allowed to create users - # 'os_admin' or 'debian-sys-maint' - g = sql_query.Grant(user=user.name, host=user.host, - clear=user.password) - t = text(str(g)) - client.execute(t) - for database in user.databases: - mydb = models.ValidatedMySQLDatabase() - mydb.deserialize(database) - g = sql_query.Grant(permissions='ALL', database=mydb.name, - user=user.name, host=user.host, - clear=user.password) - t = text(str(g)) - client.execute(t) - - def delete_database(self, database): - """Delete the specified database.""" - with LocalSqlClient(get_engine()) as client: - mydb = models.ValidatedMySQLDatabase() - mydb.deserialize(database) - dd = sql_query.DropDatabase(mydb.name) - t = text(str(dd)) - client.execute(t) - - def delete_user(self, user): - """Delete the specified user.""" - mysql_user = models.MySQLUser() - mysql_user.deserialize(user) - self.delete_user_by_name(mysql_user.name, mysql_user.host) - - def delete_user_by_name(self, name, host='%'): - with LocalSqlClient(get_engine()) as client: - du = sql_query.DropUser(name, host=host) - t = text(str(du)) - LOG.debug("delete_user_by_name: %s", t) - client.execute(t) - - def get_user(self, username, hostname): - user = self._get_user(username, hostname) - if not user: - return None - return user.serialize() - - def _get_user(self, username, hostname): - """Return a single user matching the criteria.""" - user = models.MySQLUser() - try: - user.name = username # Could possibly throw a BadRequest here. - except ValueError as ve: - LOG.exception(_("Error Getting user information")) - raise exception.BadRequest(_("Username %(user)s is not valid" - ": %(reason)s") % - {'user': username, 'reason': ve.message} - ) - with LocalSqlClient(get_engine()) as client: - q = sql_query.Query() - q.columns = ['User', 'Host', 'Password'] - q.tables = ['mysql.user'] - q.where = ["Host != 'localhost'", - "User = '%s'" % username, - "Host = '%s'" % hostname] - q.order = ['User', 'Host'] - t = text(str(q)) - result = client.execute(t).fetchall() - LOG.debug("Getting user information %s." % result) - if len(result) != 1: - return None - found_user = result[0] - user.password = found_user['Password'] - user.host = found_user['Host'] - self._associate_dbs(user) - return user - - def grant_access(self, username, hostname, databases): - """Grant a user permission to use a given database.""" - user = self._get_user(username, hostname) - mydb = models.ValidatedMySQLDatabase() - with LocalSqlClient(get_engine()) as client: - for database in databases: - try: - mydb.name = database - except ValueError: - LOG.exception(_("Error granting access")) - raise exception.BadRequest(_( - "Grant access to %s is not allowed") % database) - - g = sql_query.Grant(permissions='ALL', database=mydb.name, - user=user.name, host=user.host, - hashed=user.password) - t = text(str(g)) - client.execute(t) - - def is_root_enabled(self): - """Return True if root access is enabled; False otherwise.""" - return MySqlRootAccess.is_root_enabled() - - def enable_root(self, root_password=None): - """Enable the root user global access and/or - reset the root password. - """ - return MySqlRootAccess.enable_root(root_password) - - def list_databases(self, limit=None, marker=None, include_marker=False): - """List databases the user created on this mysql instance.""" - LOG.debug("---Listing Databases---") - ignored_database_names = "'%s'" % "', '".join(CONF.ignore_dbs) - LOG.debug("The following database names are on ignore list and will " - "be omitted from the listing: %s" % ignored_database_names) - databases = [] - with LocalSqlClient(get_engine()) as client: - q = sql_query.Query() - q.columns = [ - 'schema_name as name', - 'default_character_set_name as charset', - 'default_collation_name as collation', - ] - q.tables = ['information_schema.schemata'] - q.where = ["schema_name NOT IN (" + ignored_database_names + ")"] - q.order = ['schema_name ASC'] - if limit: - q.limit = limit + 1 - if marker: - q.where.append("schema_name %s '%s'" % - (INCLUDE_MARKER_OPERATORS[include_marker], - marker)) - t = text(str(q)) - database_names = client.execute(t) - next_marker = None - LOG.debug("database_names = %r." % database_names) - for count, database in enumerate(database_names): - if count >= limit: - break - LOG.debug("database = %s." % str(database)) - mysql_db = models.MySQLDatabase() - mysql_db.name = database[0] - next_marker = mysql_db.name - mysql_db.character_set = database[1] - mysql_db.collate = database[2] - databases.append(mysql_db.serialize()) - LOG.debug("databases = " + str(databases)) - if database_names.rowcount <= limit: - next_marker = None - return databases, next_marker - - def list_users(self, limit=None, marker=None, include_marker=False): - """List users that have access to the database.""" - ''' - SELECT - User, - Host, - Marker - FROM - (SELECT - User, - Host, - CONCAT(User, '@', Host) as Marker - FROM mysql.user - ORDER BY 1, 2) as innerquery - WHERE - Marker > :marker - ORDER BY - Marker - LIMIT :limit; - ''' - LOG.debug("---Listing Users---") - users = [] - with LocalSqlClient(get_engine()) as client: - mysql_user = models.MySQLUser() - iq = sql_query.Query() # Inner query. - iq.columns = ['User', 'Host', "CONCAT(User, '@', Host) as Marker"] - iq.tables = ['mysql.user'] - iq.order = ['User', 'Host'] - innerquery = str(iq).rstrip(';') - - oq = sql_query.Query() # Outer query. - oq.columns = ['User', 'Host', 'Marker'] - oq.tables = ['(%s) as innerquery' % innerquery] - oq.where = ["Host != 'localhost'"] - oq.order = ['Marker'] - if marker: - oq.where.append("Marker %s '%s'" % - (INCLUDE_MARKER_OPERATORS[include_marker], - marker)) - if limit: - oq.limit = limit + 1 - t = text(str(oq)) - result = client.execute(t) - next_marker = None - LOG.debug("result = " + str(result)) - for count, row in enumerate(result): - if count >= limit: - break - LOG.debug("user = " + str(row)) - mysql_user = models.MySQLUser() - mysql_user.name = row['User'] - mysql_user.host = row['Host'] - self._associate_dbs(mysql_user) - next_marker = row['Marker'] - users.append(mysql_user.serialize()) - if result.rowcount <= limit: - next_marker = None - LOG.debug("users = " + str(users)) - - return users, next_marker - - def revoke_access(self, username, hostname, database): - """Revoke a user's permission to use a given database.""" - user = self._get_user(username, hostname) - with LocalSqlClient(get_engine()) as client: - r = sql_query.Revoke(database=database, - user=user.name, - host=user.host) - t = text(str(r)) - client.execute(t) - - def list_access(self, username, hostname): - """Show all the databases to which the user has more than - USAGE granted. - """ - user = self._get_user(username, hostname) - return user.databases - - -class KeepAliveConnection(interfaces.PoolListener): - """ - A connection pool listener that ensures live connections are returned - from the connection pool at checkout. This alleviates the problem of - MySQL connections timing out. - """ - - def checkout(self, dbapi_con, con_record, con_proxy): - """Event triggered when a connection is checked out from the pool.""" - try: - try: - dbapi_con.ping(False) - except TypeError: - dbapi_con.ping() - except dbapi_con.OperationalError as ex: - if ex.args[0] in (2006, 2013, 2014, 2045, 2055): - raise exc.DisconnectionError() - else: - raise - - -class MySqlApp(object): - """Prepares DBaaS on a Guest container.""" - - TIME_OUT = 1000 - - configuration_manager = ConfigurationManager( - MYSQL_CONFIG, MYSQL_OWNER, MYSQL_OWNER, IniCodec(), requires_root=True, - override_strategy=ImportOverrideStrategy(CNF_INCLUDE_DIR, CNF_EXT)) - - @classmethod - def get_auth_password(cls): - return cls.configuration_manager.get_value('client').get('password') - - @classmethod - def get_data_dir(cls): - return cls.configuration_manager.get_value( - MySQLConfParser.SERVER_CONF_SECTION).get('datadir') - - @classmethod - def set_data_dir(cls, value): - cls.configuration_manager.apply_system_override( - {MySQLConfParser.SERVER_CONF_SECTION: {'datadir': value}}) - +class MySqlApp(service_base.BaseMySqlApp): def __init__(self, status): - """By default login with root no password for initial setup.""" - self.state_change_wait_time = CONF.state_change_wait_time - self.status = status + super(MySqlApp, self).__init__(status, LocalSqlClient, + KeepAliveConnection) - def _create_admin_user(self, client, password): - """ - Create a os_admin user with a random password - with all privileges similar to the root user. - """ - localhost = "localhost" - g = sql_query.Grant(permissions='ALL', user=ADMIN_USER_NAME, - host=localhost, grant_option=True, clear=password) - t = text(str(g)) - client.execute(t) - @staticmethod - def _generate_root_password(client): - """Generate and set a random root password and forget about it.""" - localhost = "localhost" - uu = sql_query.UpdateUser("root", host=localhost, - clear=utils.generate_random_password()) - t = text(str(uu)) - client.execute(t) +class MySqlRootAccess(service_base.BaseMySqlRootAccess): + def __init__(self): + super(MySqlRootAccess, self).__init__(LocalSqlClient, + MySqlApp(MySqlAppStatus.get())) - def install_if_needed(self, packages): - """Prepare the guest machine with a secure - mysql server installation. - """ - LOG.info(_("Preparing Guest as MySQL Server.")) - if not packager.pkg_is_installed(packages): - LOG.debug("Installing MySQL server.") - self._clear_mysql_config() - # set blank password on pkg configuration stage - pkg_opts = {'root_password': '', - 'root_password_again': ''} - packager.pkg_install(packages, pkg_opts, self.TIME_OUT) - self._create_mysql_confd_dir() - LOG.info(_("Finished installing MySQL server.")) - self.start_mysql() - def complete_install_or_restart(self): - self.status.end_install_or_restart() +class MySqlAdmin(service_base.BaseMySqlAdmin): + def __init__(self): + super(MySqlAdmin, self).__init__(LocalSqlClient, MySqlRootAccess(), + MySqlApp) - def secure(self, config_contents, overrides): - LOG.info(_("Generating admin password.")) - admin_password = utils.generate_random_password() - clear_expired_password() - engine = sqlalchemy.create_engine("mysql://root:@localhost:3306", - echo=True) - with LocalSqlClient(engine) as client: - self._remove_anonymous_user(client) - self._create_admin_user(client, admin_password) - self.stop_db() - - self._reset_configuration(config_contents, admin_password) - self._apply_user_overrides(overrides) - self.start_mysql() - - LOG.debug("MySQL secure complete.") - - def _reset_configuration(self, configuration, admin_password=None): - if not admin_password: - # Take the current admin password from the base configuration file - # if not given. - admin_password = MySqlApp.get_auth_password() - - self.configuration_manager.save_configuration(configuration) - self._save_authentication_properties(admin_password) - self.wipe_ib_logfiles() - - def _save_authentication_properties(self, admin_password): - self.configuration_manager.apply_system_override( - {'client': {'user': ADMIN_USER_NAME, 'password': admin_password}}) - - def secure_root(self, secure_remote_root=True): - with LocalSqlClient(get_engine()) as client: - LOG.info(_("Preserving root access from restore.")) - self._generate_root_password(client) - if secure_remote_root: - self._remove_remote_root_access(client) - - def _clear_mysql_config(self): - """Clear old configs, which can be incompatible with new version.""" - LOG.debug("Clearing old MySQL config.") - random_uuid = str(uuid.uuid4()) - configs = ["/etc/my.cnf", "/etc/mysql/conf.d", "/etc/mysql/my.cnf"] - for config in configs: - try: - old_conf_backup = "%s_%s" % (config, random_uuid) - operating_system.move(config, old_conf_backup, as_root=True) - LOG.debug("%s saved to %s_%s." % - (config, config, random_uuid)) - except exception.ProcessExecutionError: - pass - - def _create_mysql_confd_dir(self): - conf_dir = "/etc/mysql/conf.d" - LOG.debug("Creating %s." % conf_dir) - operating_system.create_directory(conf_dir, as_root=True) - - def _enable_mysql_on_boot(self): - LOG.debug("Enabling MySQL on boot.") - try: - mysql_service = operating_system.service_discovery( - MYSQL_SERVICE_CANDIDATES) - utils.execute_with_timeout(mysql_service['cmd_enable'], shell=True) - except KeyError: - LOG.exception(_("Error enabling MySQL start on boot.")) - raise RuntimeError("Service is not discovered.") - - def _disable_mysql_on_boot(self): - try: - mysql_service = operating_system.service_discovery( - MYSQL_SERVICE_CANDIDATES) - utils.execute_with_timeout(mysql_service['cmd_disable'], - shell=True) - except KeyError: - LOG.exception(_("Error disabling MySQL start on boot.")) - raise RuntimeError("Service is not discovered.") - - def stop_db(self, update_db=False, do_not_start_on_reboot=False): - LOG.info(_("Stopping MySQL.")) - if do_not_start_on_reboot: - self._disable_mysql_on_boot() - try: - mysql_service = operating_system.service_discovery( - MYSQL_SERVICE_CANDIDATES) - utils.execute_with_timeout(mysql_service['cmd_stop'], shell=True) - except KeyError: - LOG.exception(_("Error stopping MySQL.")) - raise RuntimeError("Service is not discovered.") - if not self.status.wait_for_real_status_to_change_to( - rd_instance.ServiceStatuses.SHUTDOWN, - self.state_change_wait_time, update_db): - LOG.error(_("Could not stop MySQL.")) - self.status.end_install_or_restart() - raise RuntimeError("Could not stop MySQL!") - - def _remove_anonymous_user(self, client): - t = text(sql_query.REMOVE_ANON) - client.execute(t) - - def _remove_remote_root_access(self, client): - t = text(sql_query.REMOVE_ROOT) - client.execute(t) - - def restart(self): - try: - self.status.begin_restart() - self.stop_db() - self.start_mysql() - finally: - self.status.end_install_or_restart() - - def update_overrides(self, overrides): - self._apply_user_overrides(overrides) - - def _apply_user_overrides(self, overrides): - # All user-defined values go to the server section of the configuration - # file. - if overrides: - self.configuration_manager.apply_user_override( - {MySQLConfParser.SERVER_CONF_SECTION: overrides}) - - def apply_overrides(self, overrides): - LOG.debug("Applying overrides to MySQL.") - with LocalSqlClient(get_engine()) as client: - LOG.debug("Updating override values in running MySQL.") - for k, v in overrides.iteritems(): - byte_value = guestagent_utils.to_bytes(v) - q = sql_query.SetServerVariable(key=k, value=byte_value) - t = text(str(q)) - try: - client.execute(t) - except exc.OperationalError: - output = {'key': k, 'value': byte_value} - LOG.exception(_("Unable to set %(key)s with value " - "%(value)s.") % output) - - def make_read_only(self, read_only): - with LocalSqlClient(get_engine()) as client: - q = "set global read_only = %s" % read_only - client.execute(text(str(q))) - - def wipe_ib_logfiles(self): - """Destroys the iblogfiles. - - If for some reason the selected log size in the conf changes from the - current size of the files MySQL will fail to start, so we delete the - files to be safe. - """ - LOG.info(_("Wiping ib_logfiles.")) - for index in range(2): - try: - # On restarts, sometimes these are wiped. So it can be a race - # to have MySQL start up before it's restarted and these have - # to be deleted. That's why its ok if they aren't found and - # that is why we use the "force" option to "remove". - operating_system.remove("%s/ib_logfile%d" - % (self.get_data_dir(), index), - force=True, as_root=True) - except exception.ProcessExecutionError: - LOG.exception("Could not delete logfile.") - raise - - def remove_overrides(self): - self.configuration_manager.remove_user_override() - - def _remove_replication_overrides(self, cnf_file): - LOG.info(_("Removing replication configuration file.")) - if os.path.exists(cnf_file): - operating_system.remove(cnf_file, as_root=True) - - def exists_replication_source_overrides(self): - return self.configuration_manager.has_system_override(CNF_MASTER) - - def write_replication_source_overrides(self, overrideValues): - self.configuration_manager.apply_system_override(overrideValues, - CNF_MASTER) - - def write_replication_replica_overrides(self, overrideValues): - self.configuration_manager.apply_system_override(overrideValues, - CNF_SLAVE) - - def remove_replication_source_overrides(self): - self.configuration_manager.remove_system_override(CNF_MASTER) - - def remove_replication_replica_overrides(self): - self.configuration_manager.remove_system_override(CNF_SLAVE) - - def grant_replication_privilege(self, replication_user): - LOG.info(_("Granting Replication Slave privilege.")) - - LOG.debug("grant_replication_privilege: %s" % replication_user) - - with LocalSqlClient(get_engine()) as client: - g = sql_query.Grant(permissions=['REPLICATION SLAVE'], - user=replication_user['name'], - clear=replication_user['password']) - - t = text(str(g)) - client.execute(t) - - def get_port(self): - with LocalSqlClient(get_engine()) as client: - result = client.execute('SELECT @@port').first() - return result[0] - - def get_binlog_position(self): - with LocalSqlClient(get_engine()) as client: - result = client.execute('SHOW MASTER STATUS').first() - binlog_position = { - 'log_file': result['File'], - 'position': result['Position'] - } - return binlog_position - - def execute_on_client(self, sql_statement): - LOG.debug("Executing SQL: %s" % sql_statement) - with LocalSqlClient(get_engine()) as client: - return client.execute(sql_statement) - - def start_slave(self): - LOG.info(_("Starting slave replication.")) - with LocalSqlClient(get_engine()) as client: - client.execute('START SLAVE') - self._wait_for_slave_status("ON", client, 60) - - def stop_slave(self, for_failover): - replication_user = None - LOG.info(_("Stopping slave replication.")) - with LocalSqlClient(get_engine()) as client: - result = client.execute('SHOW SLAVE STATUS') - replication_user = result.first()['Master_User'] - client.execute('STOP SLAVE') - client.execute('RESET SLAVE ALL') - self._wait_for_slave_status("OFF", client, 30) - if not for_failover: - client.execute('DROP USER ' + replication_user) - return { - 'replication_user': replication_user - } - - def stop_master(self): - LOG.info(_("Stopping replication master.")) - with LocalSqlClient(get_engine()) as client: - client.execute('RESET MASTER') - - def _wait_for_slave_status(self, status, client, max_time): - - def verify_slave_status(): - actual_status = client.execute( - "SHOW GLOBAL STATUS like 'slave_running'").first()[1] - return actual_status.upper() == status.upper() - - LOG.debug("Waiting for SLAVE_RUNNING to change to %s.", status) - try: - utils.poll_until(verify_slave_status, sleep_time=3, - time_out=max_time) - LOG.info(_("Replication is now %s.") % status.lower()) - except PollTimeOut: - raise RuntimeError( - _("Replication is not %(status)s after %(max)d seconds.") % { - 'status': status.lower(), 'max': max_time}) - - def start_mysql(self, update_db=False): - LOG.info(_("Starting MySQL.")) - # This is the site of all the trouble in the restart tests. - # Essentially what happens is that mysql start fails, but does not - # die. It is then impossible to kill the original, so - - self._enable_mysql_on_boot() - - try: - mysql_service = operating_system.service_discovery( - MYSQL_SERVICE_CANDIDATES) - utils.execute_with_timeout(mysql_service['cmd_start'], shell=True) - except KeyError: - raise RuntimeError("Service is not discovered.") - except exception.ProcessExecutionError: - # it seems mysql (percona, at least) might come back with [Fail] - # but actually come up ok. we're looking into the timing issue on - # parallel, but for now, we'd like to give it one more chance to - # come up. so regardless of the execute_with_timeout() response, - # we'll assume mysql comes up and check it's status for a while. - pass - if not self.status.wait_for_real_status_to_change_to( - rd_instance.ServiceStatuses.RUNNING, - self.state_change_wait_time, update_db): - LOG.error(_("Start up of MySQL failed.")) - # If it won't start, but won't die either, kill it by hand so we - # don't let a rouge process wander around. - try: - utils.execute_with_timeout("sudo", "pkill", "-9", "mysql") - except exception.ProcessExecutionError: - LOG.exception(_("Error killing stalled MySQL start command.")) - # There's nothing more we can do... - self.status.end_install_or_restart() - raise RuntimeError("Could not start MySQL!") - - def start_db_with_conf_changes(self, config_contents): - LOG.info(_("Starting MySQL with conf changes.")) - LOG.debug("Inside the guest - Status is_running = (%s)." - % self.status.is_running) - if self.status.is_running: - LOG.error(_("Cannot execute start_db_with_conf_changes because " - "MySQL state == %s.") % self.status) - raise RuntimeError("MySQL not stopped.") - LOG.info(_("Resetting configuration.")) - self._reset_configuration(config_contents) - self.start_mysql(True) - - def reset_configuration(self, configuration): - config_contents = configuration['config_contents'] - LOG.info(_("Resetting configuration.")) - self._reset_configuration(config_contents) - - # DEPRECATED: Mantain for API Compatibility - def get_txn_count(self): - LOG.info(_("Retrieving latest txn id.")) - txn_count = 0 - with LocalSqlClient(get_engine()) as client: - result = client.execute('SELECT @@global.gtid_executed').first() - for uuid_set in result[0].split(','): - for interval in uuid_set.split(':')[1:]: - if '-' in interval: - iparts = interval.split('-') - txn_count += int(iparts[1]) - int(iparts[0]) - else: - txn_count += 1 - return txn_count - - def _get_slave_status(self): - with LocalSqlClient(get_engine()) as client: - return client.execute('SHOW SLAVE STATUS').first() - - def _get_master_UUID(self): - slave_status = self._get_slave_status() - return slave_status and slave_status['Master_UUID'] or None - - def _get_gtid_executed(self): - with LocalSqlClient(get_engine()) as client: - return client.execute('SELECT @@global.gtid_executed').first()[0] - - def get_last_txn(self): - master_UUID = self._get_master_UUID() - last_txn_id = '0' - gtid_executed = self._get_gtid_executed() - for gtid_set in gtid_executed.split(','): - uuid_set = gtid_set.split(':') - if uuid_set[0] == master_UUID: - last_txn_id = uuid_set[-1].split('-')[-1] - break - return master_UUID, int(last_txn_id) - - def get_latest_txn_id(self): - LOG.info(_("Retrieving latest txn id.")) - return self._get_gtid_executed() - - def wait_for_txn(self, txn): - LOG.info(_("Waiting on txn '%s'.") % txn) - with LocalSqlClient(get_engine()) as client: - client.execute("SELECT WAIT_UNTIL_SQL_THREAD_AFTER_GTIDS('%s')" - % txn) - - -class MySqlRootAccess(object): - - @classmethod - def is_root_enabled(cls): - """Return True if root access is enabled; False otherwise.""" - with LocalSqlClient(get_engine()) as client: - t = text(sql_query.ROOT_ENABLED) - result = client.execute(t) - LOG.debug("Found %s with remote root access." % result.rowcount) - return result.rowcount != 0 - - @classmethod - def enable_root(cls, root_password=None): - """Enable the root user global access and/or - reset the root password. - """ - user = models.RootUser() - user.name = "root" - user.host = "%" - user.password = root_password or utils.generate_random_password() - with LocalSqlClient(get_engine()) as client: - print(client) - try: - cu = sql_query.CreateUser(user.name, host=user.host) - t = text(str(cu)) - client.execute(t, **cu.keyArgs) - except exc.OperationalError as err: - # Ignore, user is already created, just reset the password - # TODO(rnirmal): More fine grained error checking later on - LOG.debug(err) - with LocalSqlClient(get_engine()) as client: - print(client) - uu = sql_query.UpdateUser(user.name, host=user.host, - clear=user.password) - t = text(str(uu)) - client.execute(t) - - LOG.debug("CONF.root_grant: %s CONF.root_grant_option: %s." % - (CONF.root_grant, CONF.root_grant_option)) - - g = sql_query.Grant(permissions=CONF.root_grant, - user=user.name, - host=user.host, - grant_option=CONF.root_grant_option, - clear=user.password) - - t = text(str(g)) - client.execute(t) - return user.serialize() +get_engine = MySqlApp.get_engine diff --git a/trove/guestagent/datastore/mysql/service_base.py b/trove/guestagent/datastore/mysql/service_base.py new file mode 100644 index 0000000000..abe6ed4467 --- /dev/null +++ b/trove/guestagent/datastore/mysql/service_base.py @@ -0,0 +1,1085 @@ +# Copyright 2013 OpenStack Foundation +# Copyright 2013 Rackspace Hosting +# Copyright 2013 Hewlett-Packard Development Company, L.P. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +import abc +from collections import defaultdict +import os +import re +import six +import uuid + +from oslo_log import log as logging +import sqlalchemy +from sqlalchemy import exc +from sqlalchemy import interfaces +from sqlalchemy.sql.expression import text + +from trove.common import cfg +from trove.common.configurations import MySQLConfParser +from trove.common import exception +from trove.common.exception import PollTimeOut +from trove.common.i18n import _ +from trove.common import instance as rd_instance +from trove.common.stream_codecs import IniCodec +from trove.common import utils as utils +from trove.guestagent.common.configuration import ConfigurationManager +from trove.guestagent.common.configuration import ImportOverrideStrategy +from trove.guestagent.common import guestagent_utils +from trove.guestagent.common import operating_system +from trove.guestagent.common import sql_query +from trove.guestagent.datastore import service +from trove.guestagent.db import models +from trove.guestagent import pkg + +ADMIN_USER_NAME = "os_admin" +LOG = logging.getLogger(__name__) +FLUSH = text(sql_query.FLUSH) +ENGINE = None +DATADIR = None +PREPARING = False +UUID = False + +TMP_MYCNF = "/tmp/my.cnf.tmp" +MYSQL_BASE_DIR = "/var/lib/mysql" + +CONF = cfg.CONF +MANAGER = CONF.datastore_manager if CONF.datastore_manager else 'mysql' + +INCLUDE_MARKER_OPERATORS = { + True: ">=", + False: ">" +} + +OS_NAME = operating_system.get_os() +MYSQL_CONFIG = {operating_system.REDHAT: "/etc/my.cnf", + operating_system.DEBIAN: "/etc/mysql/my.cnf", + operating_system.SUSE: "/etc/my.cnf"}[OS_NAME] +MYSQL_SERVICE_CANDIDATES = ["mysql", "mysqld", "mysql-server"] +MYSQL_BIN_CANDIDATES = ["/usr/sbin/mysqld", "/usr/libexec/mysqld"] +MYSQL_OWNER = 'mysql' +CNF_EXT = 'cnf' +CNF_INCLUDE_DIR = '/etc/mysql/conf.d/' +CNF_MASTER = 'master-replication' +CNF_SLAVE = 'slave-replication' + +# Create a package impl +packager = pkg.Package() + + +def clear_expired_password(): + """ + Some mysql installations generate random root password + and save it in /root/.mysql_secret, this password is + expired and should be changed by client that supports expired passwords. + """ + LOG.debug("Removing expired password.") + secret_file = "/root/.mysql_secret" + try: + out, err = utils.execute("cat", secret_file, + run_as_root=True, root_helper="sudo") + except exception.ProcessExecutionError: + LOG.exception(_("/root/.mysql_secret does not exist.")) + return + m = re.match('# The random password set for the root user at .*: (.*)', + out) + if m: + try: + out, err = utils.execute("mysqladmin", "-p%s" % m.group(1), + "password", "", run_as_root=True, + root_helper="sudo") + except exception.ProcessExecutionError: + LOG.exception(_("Cannot change mysql password.")) + return + operating_system.remove(secret_file, force=True, as_root=True) + LOG.debug("Expired password removed.") + + +def load_mysqld_options(): + # find mysqld bin + for bin in MYSQL_BIN_CANDIDATES: + if os.path.isfile(bin): + mysqld_bin = bin + break + else: + return {} + try: + out, err = utils.execute(mysqld_bin, "--print-defaults", + run_as_root=True, root_helper="sudo") + arglist = re.split("\n", out)[1].split() + args = defaultdict(list) + for item in arglist: + if "=" in item: + key, value = item.split("=", 1) + args[key.lstrip("--")].append(value) + else: + args[item.lstrip("--")].append(None) + return args + except exception.ProcessExecutionError: + return {} + + +class BaseMySqlAppStatus(service.BaseDbStatus): + @classmethod + def get(cls): + if not cls._instance: + cls._instance = BaseMySqlAppStatus() + return cls._instance + + def _get_actual_db_status(self): + try: + out, err = utils.execute_with_timeout( + "/usr/bin/mysqladmin", + "ping", run_as_root=True, root_helper="sudo", + log_output_on_error=True) + LOG.info(_("MySQL Service Status is RUNNING.")) + return rd_instance.ServiceStatuses.RUNNING + except exception.ProcessExecutionError: + LOG.exception(_("Failed to get database status.")) + try: + out, err = utils.execute_with_timeout("/bin/ps", "-C", + "mysqld", "h") + pid = out.split()[0] + # TODO(rnirmal): Need to create new statuses for instances + # where the mysql service is up, but unresponsive + LOG.info(_('MySQL Service Status %(pid)s is BLOCKED.') % + {'pid': pid}) + return rd_instance.ServiceStatuses.BLOCKED + except exception.ProcessExecutionError: + LOG.exception(_("Process execution failed.")) + mysql_args = load_mysqld_options() + pid_file = mysql_args.get('pid_file', + ['/var/run/mysqld/mysqld.pid'])[0] + if os.path.exists(pid_file): + LOG.info(_("MySQL Service Status is CRASHED.")) + return rd_instance.ServiceStatuses.CRASHED + else: + LOG.info(_("MySQL Service Status is SHUTDOWN.")) + return rd_instance.ServiceStatuses.SHUTDOWN + + +class BaseLocalSqlClient(object): + """A sqlalchemy wrapper to manage transactions.""" + + def __init__(self, engine, use_flush=True): + self.engine = engine + self.use_flush = use_flush + + def __enter__(self): + self.conn = self.engine.connect() + self.trans = self.conn.begin() + return self.conn + + def __exit__(self, type, value, traceback): + if self.trans: + if type is not None: # An error occurred + self.trans.rollback() + else: + if self.use_flush: + self.conn.execute(FLUSH) + self.trans.commit() + self.conn.close() + + def execute(self, t, **kwargs): + try: + return self.conn.execute(t, kwargs) + except Exception: + self.trans.rollback() + self.trans = None + raise + + +@six.add_metaclass(abc.ABCMeta) +class BaseMySqlAdmin(object): + """Handles administrative tasks on the MySQL database.""" + + def __init__(self, local_sql_client, mysql_root_access, + mysql_app): + self._local_sql_client = local_sql_client + self._mysql_root_access = mysql_root_access + self._mysql_app = mysql_app(local_sql_client) + + @property + def local_sql_client(self): + return self._local_sql_client + + @property + def mysql_root_access(self): + return self._mysql_root_access + + @property + def mysql_app(self): + return self._mysql_app + + def _associate_dbs(self, user): + """Internal. Given a MySQLUser, populate its databases attribute.""" + LOG.debug("Associating dbs to user %s at %s." % + (user.name, user.host)) + with self.local_sql_client(self.mysql_app.get_engine()) as client: + q = sql_query.Query() + q.columns = ["grantee", "table_schema"] + q.tables = ["information_schema.SCHEMA_PRIVILEGES"] + q.group = ["grantee", "table_schema"] + q.where = ["privilege_type != 'USAGE'"] + t = text(str(q)) + db_result = client.execute(t) + for db in db_result: + LOG.debug("\t db: %s." % db) + if db['grantee'] == "'%s'@'%s'" % (user.name, user.host): + mysql_db = models.MySQLDatabase() + mysql_db.name = db['table_schema'] + user.databases.append(mysql_db.serialize()) + + def change_passwords(self, users): + """Change the passwords of one or more existing users.""" + LOG.debug("Changing the password of some users.") + with self.local_sql_client(self.mysql_app.get_engine()) as client: + for item in users: + LOG.debug("Changing password for user %s." % item) + user_dict = {'_name': item['name'], + '_host': item['host'], + '_password': item['password']} + user = models.MySQLUser() + user.deserialize(user_dict) + LOG.debug("\tDeserialized: %s." % user.__dict__) + uu = sql_query.UpdateUser(user.name, host=user.host, + clear=user.password) + t = text(str(uu)) + client.execute(t) + + def update_attributes(self, username, hostname, user_attrs): + """Change the attributes of an existing user.""" + LOG.debug("Changing user attributes for user %s." % username) + user = self._get_user(username, hostname) + db_access = set() + grantee = set() + with self.local_sql_client(self.mysql_app.get_engine()) as client: + q = sql_query.Query() + q.columns = ["grantee", "table_schema"] + q.tables = ["information_schema.SCHEMA_PRIVILEGES"] + q.group = ["grantee", "table_schema"] + q.where = ["privilege_type != 'USAGE'"] + t = text(str(q)) + db_result = client.execute(t) + for db in db_result: + grantee.add(db['grantee']) + if db['grantee'] == "'%s'@'%s'" % (user.name, user.host): + db_name = db['table_schema'] + db_access.add(db_name) + with self.local_sql_client(self.mysql_app.get_engine()) as client: + uu = sql_query.UpdateUser(user.name, host=user.host, + clear=user_attrs.get('password'), + new_user=user_attrs.get('name'), + new_host=user_attrs.get('host')) + t = text(str(uu)) + client.execute(t) + uname = user_attrs.get('name') or username + host = user_attrs.get('host') or hostname + find_user = "'%s'@'%s'" % (uname, host) + if find_user not in grantee: + self.grant_access(uname, host, db_access) + + def create_database(self, databases): + """Create the list of specified databases.""" + with self.local_sql_client(self.mysql_app.get_engine()) as client: + for item in databases: + mydb = models.ValidatedMySQLDatabase() + mydb.deserialize(item) + cd = sql_query.CreateDatabase(mydb.name, + mydb.character_set, + mydb.collate) + t = text(str(cd)) + client.execute(t) + + def create_user(self, users): + """Create users and grant them privileges for the + specified databases. + """ + with self.local_sql_client(self.mysql_app.get_engine()) as client: + for item in users: + user = models.MySQLUser() + user.deserialize(item) + # TODO(cp16net):Should users be allowed to create users + # 'os_admin' or 'debian-sys-maint' + g = sql_query.Grant(user=user.name, host=user.host, + clear=user.password) + t = text(str(g)) + client.execute(t) + for database in user.databases: + mydb = models.ValidatedMySQLDatabase() + mydb.deserialize(database) + g = sql_query.Grant(permissions='ALL', database=mydb.name, + user=user.name, host=user.host, + clear=user.password) + t = text(str(g)) + client.execute(t) + + def delete_database(self, database): + """Delete the specified database.""" + with self.local_sql_client(self.mysql_app.get_engine()) as client: + mydb = models.ValidatedMySQLDatabase() + mydb.deserialize(database) + dd = sql_query.DropDatabase(mydb.name) + t = text(str(dd)) + client.execute(t) + + def delete_user(self, user): + """Delete the specified user.""" + mysql_user = models.MySQLUser() + mysql_user.deserialize(user) + self.delete_user_by_name(mysql_user.name, mysql_user.host) + + def delete_user_by_name(self, name, host='%'): + with self.local_sql_client(self.mysql_app.get_engine()) as client: + du = sql_query.DropUser(name, host=host) + t = text(str(du)) + LOG.debug("delete_user_by_name: %s", t) + client.execute(t) + + def get_user(self, username, hostname): + user = self._get_user(username, hostname) + if not user: + return None + return user.serialize() + + def _get_user(self, username, hostname): + """Return a single user matching the criteria.""" + user = models.MySQLUser() + try: + user.name = username # Could possibly throw a BadRequest here. + except ValueError as ve: + LOG.exception(_("Error Getting user information")) + raise exception.BadRequest(_("Username %(user)s is not valid" + ": %(reason)s") % + {'user': username, 'reason': ve.message} + ) + with self.local_sql_client(self.mysql_app.get_engine()) as client: + q = sql_query.Query() + q.columns = ['User', 'Host', 'Password'] + q.tables = ['mysql.user'] + q.where = ["Host != 'localhost'", + "User = '%s'" % username, + "Host = '%s'" % hostname] + q.order = ['User', 'Host'] + t = text(str(q)) + result = client.execute(t).fetchall() + LOG.debug("Getting user information %s." % result) + if len(result) != 1: + return None + found_user = result[0] + user.password = found_user['Password'] + user.host = found_user['Host'] + self._associate_dbs(user) + return user + + def grant_access(self, username, hostname, databases): + """Grant a user permission to use a given database.""" + user = self._get_user(username, hostname) + mydb = models.ValidatedMySQLDatabase() + with self.local_sql_client(self.mysql_app.get_engine()) as client: + for database in databases: + try: + mydb.name = database + except ValueError: + LOG.exception(_("Error granting access")) + raise exception.BadRequest(_( + "Grant access to %s is not allowed") % database) + + g = sql_query.Grant(permissions='ALL', database=mydb.name, + user=user.name, host=user.host, + hashed=user.password) + t = text(str(g)) + client.execute(t) + + def is_root_enabled(self): + """Return True if root access is enabled; False otherwise.""" + LOG.debug("Class type of mysql_root_access is %s " % + self.mysql_root_access) + return self.mysql_root_access.is_root_enabled() + + def enable_root(self, root_password=None): + """Enable the root user global access and/or + reset the root password. + """ + return self.mysql_root_access.enable_root(root_password) + + def list_databases(self, limit=None, marker=None, include_marker=False): + """List databases the user created on this mysql instance.""" + LOG.debug("---Listing Databases---") + ignored_database_names = "'%s'" % "', '".join(CONF.ignore_dbs) + LOG.debug("The following database names are on ignore list and will " + "be omitted from the listing: %s" % ignored_database_names) + databases = [] + with self.local_sql_client(self.mysql_app.get_engine()) as client: + # If you have an external volume mounted at /var/lib/mysql + # the lost+found directory will show up in mysql as a database + # which will create errors if you try to do any database ops + # on it. So we remove it here if it exists. + q = sql_query.Query() + q.columns = [ + 'schema_name as name', + 'default_character_set_name as charset', + 'default_collation_name as collation', + ] + q.tables = ['information_schema.schemata'] + q.where = ["schema_name NOT IN (" + ignored_database_names + ")"] + q.order = ['schema_name ASC'] + if limit: + q.limit = limit + 1 + if marker: + q.where.append("schema_name %s '%s'" % + (INCLUDE_MARKER_OPERATORS[include_marker], + marker)) + t = text(str(q)) + database_names = client.execute(t) + next_marker = None + LOG.debug("database_names = %r." % database_names) + for count, database in enumerate(database_names): + if count >= limit: + break + LOG.debug("database = %s." % str(database)) + mysql_db = models.MySQLDatabase() + mysql_db.name = database[0] + next_marker = mysql_db.name + mysql_db.character_set = database[1] + mysql_db.collate = database[2] + databases.append(mysql_db.serialize()) + LOG.debug("databases = " + str(databases)) + if database_names.rowcount <= limit: + next_marker = None + return databases, next_marker + + def list_users(self, limit=None, marker=None, include_marker=False): + """List users that have access to the database.""" + ''' + SELECT + User, + Host, + Marker + FROM + (SELECT + User, + Host, + CONCAT(User, '@', Host) as Marker + FROM mysql.user + ORDER BY 1, 2) as innerquery + WHERE + Marker > :marker + ORDER BY + Marker + LIMIT :limit; + ''' + LOG.debug("---Listing Users---") + users = [] + with self.local_sql_client(self.mysql_app.get_engine()) as client: + mysql_user = models.MySQLUser() + iq = sql_query.Query() # Inner query. + iq.columns = ['User', 'Host', "CONCAT(User, '@', Host) as Marker"] + iq.tables = ['mysql.user'] + iq.order = ['User', 'Host'] + innerquery = str(iq).rstrip(';') + + oq = sql_query.Query() # Outer query. + oq.columns = ['User', 'Host', 'Marker'] + oq.tables = ['(%s) as innerquery' % innerquery] + oq.where = ["Host != 'localhost'"] + oq.order = ['Marker'] + if marker: + oq.where.append("Marker %s '%s'" % + (INCLUDE_MARKER_OPERATORS[include_marker], + marker)) + if limit: + oq.limit = limit + 1 + t = text(str(oq)) + result = client.execute(t) + next_marker = None + LOG.debug("result = " + str(result)) + for count, row in enumerate(result): + if count >= limit: + break + LOG.debug("user = " + str(row)) + mysql_user = models.MySQLUser() + mysql_user.name = row['User'] + mysql_user.host = row['Host'] + self._associate_dbs(mysql_user) + next_marker = row['Marker'] + users.append(mysql_user.serialize()) + if result.rowcount <= limit: + next_marker = None + LOG.debug("users = " + str(users)) + + return users, next_marker + + def revoke_access(self, username, hostname, database): + """Revoke a user's permission to use a given database.""" + user = self._get_user(username, hostname) + with self.local_sql_client(self.mysql_app.get_engine()) as client: + r = sql_query.Revoke(database=database, + user=user.name, + host=user.host) + t = text(str(r)) + client.execute(t) + + def list_access(self, username, hostname): + """Show all the databases to which the user has more than + USAGE granted. + """ + user = self._get_user(username, hostname) + return user.databases + + +class BaseKeepAliveConnection(interfaces.PoolListener): + """ + A connection pool listener that ensures live connections are returned + from the connection pool at checkout. This alleviates the problem of + MySQL connections timing out. + """ + + def checkout(self, dbapi_con, con_record, con_proxy): + """Event triggered when a connection is checked out from the pool.""" + try: + try: + dbapi_con.ping(False) + except TypeError: + dbapi_con.ping() + except dbapi_con.OperationalError as ex: + if ex.args[0] in (2006, 2013, 2014, 2045, 2055): + raise exc.DisconnectionError() + else: + raise + + +@six.add_metaclass(abc.ABCMeta) +class BaseMySqlApp(object): + """Prepares DBaaS on a Guest container.""" + + TIME_OUT = 1000 + + @property + def local_sql_client(self): + return self._local_sql_client + + @property + def keep_alive_connection_cls(self): + return self._keep_alive_connection_cls + + configuration_manager = ConfigurationManager( + MYSQL_CONFIG, MYSQL_OWNER, MYSQL_OWNER, IniCodec(), requires_root=True, + override_strategy=ImportOverrideStrategy(CNF_INCLUDE_DIR, CNF_EXT)) + + def get_engine(self): + """Create the default engine with the updated admin user.""" + # TODO(rnirmal):Based on permission issues being resolved we may revert + # url = URL(drivername='mysql', host='localhost', + # query={'read_default_file': '/etc/mysql/my.cnf'}) + global ENGINE + if ENGINE: + return ENGINE + + pwd = self.get_auth_password() + ENGINE = sqlalchemy.create_engine("mysql://%s:%s@localhost:3306" % + (ADMIN_USER_NAME, pwd.strip()), + pool_recycle=7200, + echo=CONF.sql_query_logging, + listeners=[ + self.keep_alive_connection_cls()] + ) + return ENGINE + + @classmethod + def get_auth_password(cls): + return cls.configuration_manager.get_value('client').get('password') + + @classmethod + def get_data_dir(cls): + return cls.configuration_manager.get_value( + MySQLConfParser.SERVER_CONF_SECTION).get('datadir') + + @classmethod + def set_data_dir(cls, value): + cls.configuration_manager.apply_system_override( + {MySQLConfParser.SERVER_CONF_SECTION: {'datadir': value}}) + + def __init__(self, status, local_sql_client, keep_alive_connection_cls): + """By default login with root no password for initial setup.""" + self.state_change_wait_time = CONF.state_change_wait_time + self.status = status + self._local_sql_client = local_sql_client + self._keep_alive_connection_cls = keep_alive_connection_cls + + def _create_admin_user(self, client, password): + """ + Create a os_admin user with a random password + with all privileges similar to the root user. + """ + localhost = "localhost" + g = sql_query.Grant(permissions='ALL', user=ADMIN_USER_NAME, + host=localhost, grant_option=True, clear=password) + t = text(str(g)) + client.execute(t) + + @staticmethod + def _generate_root_password(client): + """Generate and set a random root password and forget about it.""" + localhost = "localhost" + uu = sql_query.UpdateUser("root", host=localhost, + clear=utils.generate_random_password()) + t = text(str(uu)) + client.execute(t) + + def install_if_needed(self, packages): + """Prepare the guest machine with a secure + mysql server installation. + """ + LOG.info(_("Preparing Guest as MySQL Server.")) + if not packager.pkg_is_installed(packages): + LOG.debug("Installing MySQL server.") + self._clear_mysql_config() + # set blank password on pkg configuration stage + pkg_opts = {'root_password': '', + 'root_password_again': ''} + packager.pkg_install(packages, pkg_opts, self.TIME_OUT) + self._create_mysql_confd_dir() + LOG.info(_("Finished installing MySQL server.")) + self.start_mysql() + + def complete_install_or_restart(self): + self.status.end_install_or_restart() + + def secure(self, config_contents, overrides): + LOG.info(_("Generating admin password.")) + admin_password = utils.generate_random_password() + clear_expired_password() + engine = sqlalchemy.create_engine("mysql://root:@localhost:3306", + echo=True) + with self.local_sql_client(engine) as client: + self._remove_anonymous_user(client) + self._create_admin_user(client, admin_password) + + self.stop_db() + + self._reset_configuration(config_contents, admin_password) + self._apply_user_overrides(overrides) + self.start_mysql() + + LOG.debug("MySQL secure complete.") + + def _reset_configuration(self, configuration, admin_password=None): + if not admin_password: + # Take the current admin password from the base configuration file + # if not given. + admin_password = self.get_auth_password() + + self.configuration_manager.save_configuration(configuration) + self._save_authentication_properties(admin_password) + self.wipe_ib_logfiles() + + def _save_authentication_properties(self, admin_password): + self.configuration_manager.apply_system_override( + {'client': {'user': ADMIN_USER_NAME, 'password': admin_password}}) + + def secure_root(self, secure_remote_root=True): + with self.local_sql_client(self.get_engine()) as client: + LOG.info(_("Preserving root access from restore.")) + self._generate_root_password(client) + if secure_remote_root: + self._remove_remote_root_access(client) + + def _clear_mysql_config(self): + """Clear old configs, which can be incompatible with new version.""" + LOG.debug("Clearing old MySQL config.") + random_uuid = str(uuid.uuid4()) + configs = ["/etc/my.cnf", "/etc/mysql/conf.d", "/etc/mysql/my.cnf"] + for config in configs: + try: + old_conf_backup = "%s_%s" % (config, random_uuid) + operating_system.move(config, old_conf_backup, as_root=True) + LOG.debug("%s saved to %s_%s." % + (config, config, random_uuid)) + except exception.ProcessExecutionError: + pass + + def _create_mysql_confd_dir(self): + conf_dir = "/etc/mysql/conf.d" + LOG.debug("Creating %s." % conf_dir) + operating_system.create_directory(conf_dir, as_root=True) + + def _enable_mysql_on_boot(self): + LOG.debug("Enabling MySQL on boot.") + try: + mysql_service = operating_system.service_discovery( + MYSQL_SERVICE_CANDIDATES) + utils.execute_with_timeout(mysql_service['cmd_enable'], shell=True) + except KeyError: + LOG.exception(_("Error enabling MySQL start on boot.")) + raise RuntimeError("Service is not discovered.") + + def _disable_mysql_on_boot(self): + try: + mysql_service = operating_system.service_discovery( + MYSQL_SERVICE_CANDIDATES) + utils.execute_with_timeout(mysql_service['cmd_disable'], + shell=True) + except KeyError: + LOG.exception(_("Error disabling MySQL start on boot.")) + raise RuntimeError("Service is not discovered.") + + def stop_db(self, update_db=False, do_not_start_on_reboot=False): + LOG.info(_("Stopping MySQL.")) + if do_not_start_on_reboot: + self._disable_mysql_on_boot() + try: + mysql_service = operating_system.service_discovery( + MYSQL_SERVICE_CANDIDATES) + utils.execute_with_timeout(mysql_service['cmd_stop'], shell=True) + except KeyError: + LOG.exception(_("Error stopping MySQL.")) + raise RuntimeError("Service is not discovered.") + if not self.status.wait_for_real_status_to_change_to( + rd_instance.ServiceStatuses.SHUTDOWN, + self.state_change_wait_time, update_db): + LOG.error(_("Could not stop MySQL.")) + self.status.end_install_or_restart() + raise RuntimeError("Could not stop MySQL!") + + def _remove_anonymous_user(self, client): + t = text(sql_query.REMOVE_ANON) + client.execute(t) + + def _remove_remote_root_access(self, client): + t = text(sql_query.REMOVE_ROOT) + client.execute(t) + + def restart(self): + try: + self.status.begin_restart() + self.stop_db() + self.start_mysql() + finally: + self.status.end_install_or_restart() + + def update_overrides(self, overrides): + self._apply_user_overrides(overrides) + + def _apply_user_overrides(self, overrides): + # All user-defined values go to the server section of the configuration + # file. + if overrides: + self.configuration_manager.apply_user_override( + {MySQLConfParser.SERVER_CONF_SECTION: overrides}) + + def apply_overrides(self, overrides): + LOG.debug("Applying overrides to MySQL.") + with self.local_sql_client(self.get_engine()) as client: + LOG.debug("Updating override values in running MySQL.") + for k, v in overrides.iteritems(): + byte_value = guestagent_utils.to_bytes(v) + q = sql_query.SetServerVariable(key=k, value=byte_value) + t = text(str(q)) + try: + client.execute(t) + except exc.OperationalError: + output = {'key': k, 'value': byte_value} + LOG.exception(_("Unable to set %(key)s with value " + "%(value)s.") % output) + + def make_read_only(self, read_only): + with self.local_sql_client(self.get_engine()) as client: + q = "set global read_only = %s" % read_only + client.execute(text(str(q))) + + def wipe_ib_logfiles(self): + """Destroys the iblogfiles. + + If for some reason the selected log size in the conf changes from the + current size of the files MySQL will fail to start, so we delete the + files to be safe. + """ + LOG.info(_("Wiping ib_logfiles.")) + for index in range(2): + try: + # On restarts, sometimes these are wiped. So it can be a race + # to have MySQL start up before it's restarted and these have + # to be deleted. That's why its ok if they aren't found and + # that is why we use the "force" option to "remove". + operating_system.remove("%s/ib_logfile%d" + % (self.get_data_dir(), index), + force=True, as_root=True) + except exception.ProcessExecutionError: + LOG.exception("Could not delete logfile.") + raise + + def remove_overrides(self): + self.configuration_manager.remove_user_override() + + def _remove_replication_overrides(self, cnf_file): + LOG.info(_("Removing replication configuration file.")) + if os.path.exists(cnf_file): + operating_system.remove(cnf_file, as_root=True) + + def exists_replication_source_overrides(self): + return self.configuration_manager.has_system_override(CNF_MASTER) + + def write_replication_source_overrides(self, overrideValues): + self.configuration_manager.apply_system_override(overrideValues, + CNF_MASTER) + + def write_replication_replica_overrides(self, overrideValues): + self.configuration_manager.apply_system_override(overrideValues, + CNF_SLAVE) + + def remove_replication_source_overrides(self): + self.configuration_manager.remove_system_override(CNF_MASTER) + + def remove_replication_replica_overrides(self): + self.configuration_manager.remove_system_override(CNF_SLAVE) + + def grant_replication_privilege(self, replication_user): + LOG.info(_("Granting Replication Slave privilege.")) + + LOG.debug("grant_replication_privilege: %s" % replication_user) + + with self.local_sql_client(self.get_engine()) as client: + g = sql_query.Grant(permissions=['REPLICATION SLAVE'], + user=replication_user['name'], + clear=replication_user['password']) + + t = text(str(g)) + client.execute(t) + + def get_port(self): + with self.local_sql_client(self.get_engine()) as client: + result = client.execute('SELECT @@port').first() + return result[0] + + def get_binlog_position(self): + with self.local_sql_client(self.get_engine()) as client: + result = client.execute('SHOW MASTER STATUS').first() + binlog_position = { + 'log_file': result['File'], + 'position': result['Position'] + } + return binlog_position + + def execute_on_client(self, sql_statement): + LOG.debug("Executing SQL: %s" % sql_statement) + with self.local_sql_client(self.get_engine()) as client: + return client.execute(sql_statement) + + def start_slave(self): + LOG.info(_("Starting slave replication.")) + with self.local_sql_client(self.get_engine()) as client: + client.execute('START SLAVE') + self._wait_for_slave_status("ON", client, 60) + + def stop_slave(self, for_failover): + replication_user = None + LOG.info(_("Stopping slave replication.")) + with self.local_sql_client(self.get_engine()) as client: + result = client.execute('SHOW SLAVE STATUS') + replication_user = result.first()['Master_User'] + client.execute('STOP SLAVE') + client.execute('RESET SLAVE ALL') + self._wait_for_slave_status("OFF", client, 30) + if not for_failover: + client.execute('DROP USER ' + replication_user) + return { + 'replication_user': replication_user + } + + def stop_master(self): + LOG.info(_("Stopping replication master.")) + with self.local_sql_client(self.get_engine()) as client: + client.execute('RESET MASTER') + + def _wait_for_slave_status(self, status, client, max_time): + + def verify_slave_status(): + actual_status = client.execute( + "SHOW GLOBAL STATUS like 'slave_running'").first()[1] + return actual_status.upper() == status.upper() + + LOG.debug("Waiting for SLAVE_RUNNING to change to %s.", status) + try: + utils.poll_until(verify_slave_status, sleep_time=3, + time_out=max_time) + LOG.info(_("Replication is now %s.") % status.lower()) + except PollTimeOut: + raise RuntimeError( + _("Replication is not %(status)s after %(max)d seconds.") % { + 'status': status.lower(), 'max': max_time}) + + def start_mysql(self, update_db=False): + LOG.info(_("Starting MySQL.")) + # This is the site of all the trouble in the restart tests. + # Essentially what happens is that mysql start fails, but does not + # die. It is then impossible to kill the original, so + + self._enable_mysql_on_boot() + + try: + mysql_service = operating_system.service_discovery( + MYSQL_SERVICE_CANDIDATES) + utils.execute_with_timeout(mysql_service['cmd_start'], shell=True) + except KeyError: + raise RuntimeError("Service is not discovered.") + except exception.ProcessExecutionError: + # it seems mysql (percona, at least) might come back with [Fail] + # but actually come up ok. we're looking into the timing issue on + # parallel, but for now, we'd like to give it one more chance to + # come up. so regardless of the execute_with_timeout() response, + # we'll assume mysql comes up and check it's status for a while. + pass + if not self.status.wait_for_real_status_to_change_to( + rd_instance.ServiceStatuses.RUNNING, + self.state_change_wait_time, update_db): + LOG.error(_("Start up of MySQL failed.")) + # If it won't start, but won't die either, kill it by hand so we + # don't let a rouge process wander around. + try: + utils.execute_with_timeout("sudo", "pkill", "-9", "mysql") + except exception.ProcessExecutionError: + LOG.exception(_("Error killing stalled MySQL start command.")) + # There's nothing more we can do... + self.status.end_install_or_restart() + raise RuntimeError("Could not start MySQL!") + + def start_db_with_conf_changes(self, config_contents): + LOG.info(_("Starting MySQL with conf changes.")) + LOG.debug("Inside the guest - Status is_running = (%s)." + % self.status.is_running) + if self.status.is_running: + LOG.error(_("Cannot execute start_db_with_conf_changes because " + "MySQL state == %s.") % self.status) + raise RuntimeError("MySQL not stopped.") + LOG.info(_("Resetting configuration.")) + self._reset_configuration(config_contents) + self.start_mysql(True) + + def reset_configuration(self, configuration): + config_contents = configuration['config_contents'] + LOG.info(_("Resetting configuration.")) + self._reset_configuration(config_contents) + + # DEPRECATED: Mantain for API Compatibility + def get_txn_count(self): + LOG.info(_("Retrieving latest txn id.")) + txn_count = 0 + with self.local_sql_client(self.get_engine()) as client: + result = client.execute('SELECT @@global.gtid_executed').first() + for uuid_set in result[0].split(','): + for interval in uuid_set.split(':')[1:]: + if '-' in interval: + iparts = interval.split('-') + txn_count += int(iparts[1]) - int(iparts[0]) + else: + txn_count += 1 + return txn_count + + def _get_slave_status(self): + with self.local_sql_client(self.get_engine()) as client: + return client.execute('SHOW SLAVE STATUS').first() + + def _get_master_UUID(self): + slave_status = self._get_slave_status() + return slave_status and slave_status['Master_UUID'] or None + + def _get_gtid_executed(self): + with self.local_sql_client(self.get_engine()) as client: + return client.execute('SELECT @@global.gtid_executed').first()[0] + + def get_last_txn(self): + master_UUID = self._get_master_UUID() + last_txn_id = '0' + gtid_executed = self._get_gtid_executed() + for gtid_set in gtid_executed.split(','): + uuid_set = gtid_set.split(':') + if uuid_set[0] == master_UUID: + last_txn_id = uuid_set[-1].split('-')[-1] + break + return master_UUID, int(last_txn_id) + + def get_latest_txn_id(self): + LOG.info(_("Retrieving latest txn id.")) + return self._get_gtid_executed() + + def wait_for_txn(self, txn): + LOG.info(_("Waiting on txn '%s'.") % txn) + with self.local_sql_client(self.get_engine()) as client: + client.execute("SELECT WAIT_UNTIL_SQL_THREAD_AFTER_GTIDS('%s')" + % txn) + + +class BaseMySqlRootAccess(object): + + def __init__(self, local_sql_client, mysql_app): + self._local_sql_client = local_sql_client + self._mysql_app = mysql_app + + @property + def mysql_app(self): + return self._mysql_app + + @property + def local_sql_client(self): + return self._local_sql_client + + def is_root_enabled(self): + """Return True if root access is enabled; False otherwise.""" + with self.local_sql_client(self.mysql_app.get_engine()) as client: + t = text(sql_query.ROOT_ENABLED) + result = client.execute(t) + LOG.debug("Found %s with remote root access." % result.rowcount) + return result.rowcount != 0 + + def enable_root(self, root_password=None): + """Enable the root user global access and/or + reset the root password. + """ + user = models.RootUser() + user.name = "root" + user.host = "%" + user.password = root_password or utils.generate_random_password() + with self.local_sql_client(self.mysql_app.get_engine()) as client: + print(client) + try: + cu = sql_query.CreateUser(user.name, host=user.host) + t = text(str(cu)) + client.execute(t, **cu.keyArgs) + except exc.OperationalError as err: + # Ignore, user is already created, just reset the password + # TODO(rnirmal): More fine grained error checking later on + LOG.debug(err) + with self.local_sql_client(self.mysql_app.get_engine()) as client: + print(client) + uu = sql_query.UpdateUser(user.name, host=user.host, + clear=user.password) + t = text(str(uu)) + client.execute(t) + + LOG.debug("CONF.root_grant: %s CONF.root_grant_option: %s." % + (CONF.root_grant, CONF.root_grant_option)) + + g = sql_query.Grant(permissions=CONF.root_grant, + user=user.name, + host=user.host, + grant_option=CONF.root_grant_option, + clear=user.password) + + t = text(str(g)) + client.execute(t) + return user.serialize() diff --git a/trove/guestagent/dbaas.py b/trove/guestagent/dbaas.py index 16fc4e8037..35fe5fec7e 100644 --- a/trove/guestagent/dbaas.py +++ b/trove/guestagent/dbaas.py @@ -37,7 +37,7 @@ defaults = { 'mysql': 'trove.guestagent.datastore.mysql.manager.Manager', 'percona': - 'trove.guestagent.datastore.mysql.manager.Manager', + 'trove.guestagent.datastore.experimental.percona.manager.Manager', 'redis': 'trove.guestagent.datastore.experimental.redis.manager.Manager', 'cassandra': @@ -54,6 +54,8 @@ defaults = { 'trove.guestagent.datastore.experimental.vertica.manager.Manager', 'db2': 'trove.guestagent.datastore.experimental.db2.manager.Manager', + 'mariadb': + 'trove.guestagent.datastore.experimental.mariadb.manager.Manager' } CONF = cfg.CONF diff --git a/trove/guestagent/strategies/backup/mysql_impl.py b/trove/guestagent/strategies/backup/mysql_impl.py index e38e681e69..7d51042fdb 100644 --- a/trove/guestagent/strategies/backup/mysql_impl.py +++ b/trove/guestagent/strategies/backup/mysql_impl.py @@ -20,8 +20,8 @@ import re from oslo_log import log as logging from trove.common.i18n import _ -from trove.guestagent.datastore.mysql.service import ADMIN_USER_NAME from trove.guestagent.datastore.mysql.service import MySqlApp +from trove.guestagent.datastore.mysql.service_base import ADMIN_USER_NAME from trove.guestagent.strategies.backup import base LOG = logging.getLogger(__name__) diff --git a/trove/tests/unittests/guestagent/test_dbaas.py b/trove/tests/unittests/guestagent/test_dbaas.py index 10cc9b8ea6..6150981653 100644 --- a/trove/tests/unittests/guestagent/test_dbaas.py +++ b/trove/tests/unittests/guestagent/test_dbaas.py @@ -70,6 +70,7 @@ from trove.guestagent.datastore.mysql.service import MySqlAdmin from trove.guestagent.datastore.mysql.service import MySqlApp from trove.guestagent.datastore.mysql.service import MySqlAppStatus from trove.guestagent.datastore.mysql.service import MySqlRootAccess +import trove.guestagent.datastore.mysql.service_base as dbaas_base from trove.guestagent.datastore.service import BaseDbStatus from trove.guestagent.db import models from trove.guestagent import dbaas as dbaas_sr @@ -118,30 +119,33 @@ class DbaasTest(testtools.TestCase): def setUp(self): super(DbaasTest, self).setUp() - self.orig_utils_execute_with_timeout = dbaas.utils.execute_with_timeout - self.orig_utils_execute = dbaas.utils.execute + self.orig_utils_execute_with_timeout = \ + dbaas_base.utils.execute_with_timeout + self.orig_utils_execute = dbaas_base.utils.execute def tearDown(self): super(DbaasTest, self).tearDown() - dbaas.utils.execute_with_timeout = self.orig_utils_execute_with_timeout - dbaas.utils.execute = self.orig_utils_execute + dbaas_base.utils.execute_with_timeout = \ + self.orig_utils_execute_with_timeout + dbaas_base.utils.execute = self.orig_utils_execute @patch.object(operating_system, 'remove') def test_clear_expired_password(self, mock_remove): secret_content = ("# The random password set for the " "root user at Wed May 14 14:06:38 2014 " "(local time): somepassword") - with patch.object(dbaas.utils, 'execute', + with patch.object(dbaas_base.utils, 'execute', return_value=(secret_content, None)): - dbaas.clear_expired_password() - self.assertEqual(2, dbaas.utils.execute.call_count) + dbaas_base.clear_expired_password() + self.assertEqual(2, dbaas_base.utils.execute.call_count) self.assertEqual(1, mock_remove.call_count) @patch.object(operating_system, 'remove') def test_no_secret_content_clear_expired_password(self, mock_remove): - with patch.object(dbaas.utils, 'execute', return_value=('', None)): - dbaas.clear_expired_password() - self.assertEqual(1, dbaas.utils.execute.call_count) + with patch.object(dbaas_base.utils, 'execute', + return_value=('', None)): + dbaas_base.clear_expired_password() + self.assertEqual(1, dbaas_base.utils.execute.call_count) mock_remove.assert_not_called() @patch.object(operating_system, 'remove') @@ -150,19 +154,20 @@ class DbaasTest(testtools.TestCase): secret_content = ("# The random password set for the " "root user at Wed May 14 14:06:38 2014 " "(local time): somepassword") - with patch.object(dbaas.utils, 'execute', + with patch.object(dbaas_base.utils, 'execute', side_effect=[(secret_content, None), ProcessExecutionError]): - dbaas.clear_expired_password() - self.assertEqual(2, dbaas.utils.execute.call_count) + dbaas_base.clear_expired_password() + self.assertEqual(2, dbaas_base.utils.execute.call_count) mock_remove.assert_not_called() @patch.object(operating_system, 'remove') - @patch.object(dbaas.utils, 'execute', side_effect=ProcessExecutionError) + @patch.object(dbaas_base.utils, 'execute', + side_effect=ProcessExecutionError) def test_fail_retrieve_secret_content_clear_expired_password(self, mock_execute, mock_remove): - dbaas.clear_expired_password() + dbaas_base.clear_expired_password() self.assertEqual(1, mock_execute.call_count) mock_remove.assert_not_called() @@ -181,7 +186,8 @@ class DbaasTest(testtools.TestCase): def test_service_discovery(self): with patch.object(os.path, 'isfile', return_value=True): - mysql_service = dbaas.operating_system.service_discovery(["mysql"]) + mysql_service = \ + dbaas_base.operating_system.service_discovery(["mysql"]) self.assertIsNotNone(mysql_service['cmd_start']) self.assertIsNotNone(mysql_service['cmd_enable']) @@ -192,8 +198,8 @@ class DbaasTest(testtools.TestCase): "--tmpdir=/tmp --skip-external-locking" with patch.object(os.path, 'isfile', return_value=True): - dbaas.utils.execute = Mock(return_value=(output, None)) - options = dbaas.load_mysqld_options() + dbaas_base.utils.execute = Mock(return_value=(output, None)) + options = dbaas_base.load_mysqld_options() self.assertEqual(5, len(options)) self.assertEqual(["mysql"], options["user"]) @@ -208,8 +214,8 @@ class DbaasTest(testtools.TestCase): "--plugin-load=federated=ha_federated.so") with patch.object(os.path, 'isfile', return_value=True): - dbaas.utils.execute = Mock(return_value=(output, None)) - options = dbaas.load_mysqld_options() + dbaas_base.utils.execute = Mock(return_value=(output, None)) + options = dbaas_base.load_mysqld_options() self.assertEqual(1, len(options)) self.assertEqual(["blackhole=ha_blackhole.so", @@ -219,9 +225,9 @@ class DbaasTest(testtools.TestCase): @patch.object(os.path, 'isfile', return_value=True) def test_load_mysqld_options_error(self, mock_exists): - dbaas.utils.execute = Mock(side_effect=ProcessExecutionError()) + dbaas_base.utils.execute = Mock(side_effect=ProcessExecutionError()) - self.assertFalse(dbaas.load_mysqld_options()) + self.assertFalse(dbaas_base.load_mysqld_options()) class ResultSetStub(object): @@ -242,8 +248,15 @@ class ResultSetStub(object): class MySqlAdminMockTest(testtools.TestCase): + def setUp(self): + super(MySqlAdminMockTest, self).setUp() + dbaas.orig_configuration_manager = dbaas.MySqlApp.configuration_manager + dbaas.MySqlApp.configuration_manager = Mock() + def tearDown(self): super(MySqlAdminMockTest, self).tearDown() + dbaas.MySqlApp.configuration_manager = \ + dbaas.orig_configuration_manager @patch('trove.guestagent.datastore.mysql.service.MySqlApp' '.get_auth_password', return_value='some_password') @@ -279,6 +292,10 @@ class MySqlAdminTest(testtools.TestCase): dbaas.LocalSqlClient.__enter__ = Mock() dbaas.LocalSqlClient.__exit__ = Mock() dbaas.LocalSqlClient.execute = Mock() + # trove.guestagent.common.configuration import ConfigurationManager + dbaas.orig_configuration_manager = dbaas.MySqlApp.configuration_manager + dbaas.MySqlApp.configuration_manager = Mock() + self.mySqlAdmin = MySqlAdmin() def tearDown(self): @@ -291,6 +308,8 @@ class MySqlAdminTest(testtools.TestCase): dbaas.LocalSqlClient.execute = self.orig_LocalSqlClient_execute models.MySQLUser._is_valid_user_name = ( self.orig_MySQLUser_is_valid_user_name) + dbaas.MySqlApp.configuration_manager = \ + dbaas.orig_configuration_manager def test__associate_dbs(self): db_result = [{"grantee": "'test_user'@'%'", "table_schema": "db1"}, @@ -732,7 +751,8 @@ class MySqlAppTest(testtools.TestCase): def setUp(self): super(MySqlAppTest, self).setUp() - self.orig_utils_execute_with_timeout = dbaas.utils.execute_with_timeout + self.orig_utils_execute_with_timeout = \ + dbaas_base.utils.execute_with_timeout self.orig_time_sleep = time.sleep self.orig_unlink = os.unlink self.orig_get_auth_password = MySqlApp.get_auth_password @@ -759,15 +779,20 @@ class MySqlAppTest(testtools.TestCase): self.mock_client.__enter__ = Mock() self.mock_client.__exit__ = Mock() self.mock_client.__enter__.return_value.execute = self.mock_execute + dbaas.orig_configuration_manager = dbaas.MySqlApp.configuration_manager + dbaas.MySqlApp.configuration_manager = Mock() def tearDown(self): super(MySqlAppTest, self).tearDown() - dbaas.utils.execute_with_timeout = self.orig_utils_execute_with_timeout + dbaas_base.utils.execute_with_timeout = \ + self.orig_utils_execute_with_timeout time.sleep = self.orig_time_sleep os.unlink = self.orig_unlink operating_system.service_discovery = self.orig_service_discovery MySqlApp.get_auth_password = self.orig_get_auth_password InstanceServiceStatus.find_by(instance_id=self.FAKE_ID).delete() + dbaas.MySqlApp.configuration_manager = \ + dbaas.orig_configuration_manager def assert_reported_status(self, expected_status): service_status = InstanceServiceStatus.find_by( @@ -802,7 +827,7 @@ class MySqlAppTest(testtools.TestCase): def test_stop_mysql(self): - dbaas.utils.execute_with_timeout = Mock() + dbaas_base.utils.execute_with_timeout = Mock() self.appStatus.set_next_status( rd_instance.ServiceStatuses.SHUTDOWN) @@ -811,7 +836,7 @@ class MySqlAppTest(testtools.TestCase): def test_stop_mysql_with_db_update(self): - dbaas.utils.execute_with_timeout = Mock() + dbaas_base.utils.execute_with_timeout = Mock() self.appStatus.set_next_status( rd_instance.ServiceStatuses.SHUTDOWN) @@ -836,7 +861,7 @@ class MySqlAppTest(testtools.TestCase): def test_stop_mysql_error(self): - dbaas.utils.execute_with_timeout = Mock() + dbaas_base.utils.execute_with_timeout = Mock() self.appStatus.set_next_status(rd_instance.ServiceStatuses.RUNNING) self.mySqlApp.state_change_wait_time = 1 self.assertRaises(RuntimeError, self.mySqlApp.stop_db) @@ -882,14 +907,14 @@ class MySqlAppTest(testtools.TestCase): def test_wipe_ib_logfiles_error(self, get_datadir_mock): mocked = Mock(side_effect=ProcessExecutionError('Error')) - dbaas.utils.execute_with_timeout = mocked + dbaas_base.utils.execute_with_timeout = mocked self.assertRaises(ProcessExecutionError, self.mySqlApp.wipe_ib_logfiles) def test_start_mysql(self): - dbaas.utils.execute_with_timeout = Mock() + dbaas_base.utils.execute_with_timeout = Mock() self.appStatus.set_next_status(rd_instance.ServiceStatuses.RUNNING) self.mySqlApp._enable_mysql_on_boot = Mock() self.mySqlApp.start_mysql() @@ -897,7 +922,7 @@ class MySqlAppTest(testtools.TestCase): def test_start_mysql_with_db_update(self): - dbaas.utils.execute_with_timeout = Mock() + dbaas_base.utils.execute_with_timeout = Mock() self.mySqlApp._enable_mysql_on_boot = Mock() self.appStatus.set_next_status(rd_instance.ServiceStatuses.RUNNING) @@ -909,7 +934,7 @@ class MySqlAppTest(testtools.TestCase): def test_start_mysql_runs_forever(self): - dbaas.utils.execute_with_timeout = Mock() + dbaas_base.utils.execute_with_timeout = Mock() self.mySqlApp._enable_mysql_on_boot = Mock() self.mySqlApp.state_change_wait_time = 1 self.appStatus.set_next_status(rd_instance.ServiceStatuses.SHUTDOWN) @@ -924,7 +949,7 @@ class MySqlAppTest(testtools.TestCase): self.mySqlApp._enable_mysql_on_boot = Mock() mocked = Mock(side_effect=ProcessExecutionError('Error')) - dbaas.utils.execute_with_timeout = mocked + dbaas_base.utils.execute_with_timeout = mocked self.assertRaises(RuntimeError, self.mySqlApp.start_mysql) @@ -976,13 +1001,14 @@ class MySqlAppTest(testtools.TestCase): save_cfg_mock.assert_called_once_with('some junk') apply_mock.assert_called_once_with( - {'client': {'user': dbaas.ADMIN_USER_NAME, + {'client': {'user': dbaas_base.ADMIN_USER_NAME, 'password': auth_pwd_mock.return_value}}) wipe_ib_mock.assert_called_once_with() @patch.object(utils, 'execute_with_timeout') def test__enable_mysql_on_boot(self, mock_execute): - mysql_service = dbaas.operating_system.service_discovery(["mysql"]) + mysql_service = \ + dbaas_base.operating_system.service_discovery(["mysql"]) self.mySqlApp._enable_mysql_on_boot() self.assertEqual(1, mock_execute.call_count) mock_execute.assert_called_with(mysql_service['cmd_enable'], @@ -998,7 +1024,8 @@ class MySqlAppTest(testtools.TestCase): @patch.object(utils, 'execute_with_timeout') def test__disable_mysql_on_boot(self, mock_execute): - mysql_service = dbaas.operating_system.service_discovery(["mysql"]) + mysql_service = \ + dbaas_base.operating_system.service_discovery(["mysql"]) self.mySqlApp._disable_mysql_on_boot() self.assertEqual(1, mock_execute.call_count) mock_execute.assert_called_with(mysql_service['cmd_disable'], @@ -1030,26 +1057,26 @@ class MySqlAppTest(testtools.TestCase): 'apply_system_override') as apply_sys_mock: self.mySqlApp.write_replication_source_overrides('something') apply_sys_mock.assert_called_once_with('something', - dbaas.CNF_MASTER) + dbaas_base.CNF_MASTER) def test_write_replication_replica_overrides(self): with patch.object(self.mySqlApp.configuration_manager, 'apply_system_override') as apply_sys_mock: self.mySqlApp.write_replication_replica_overrides('something') apply_sys_mock.assert_called_once_with('something', - dbaas.CNF_SLAVE) + dbaas_base.CNF_SLAVE) def test_remove_replication_source_overrides(self): with patch.object(self.mySqlApp.configuration_manager, 'remove_system_override') as remove_sys_mock: self.mySqlApp.remove_replication_source_overrides() - remove_sys_mock.assert_called_once_with(dbaas.CNF_MASTER) + remove_sys_mock.assert_called_once_with(dbaas_base.CNF_MASTER) def test_remove_replication_replica_overrides(self): with patch.object(self.mySqlApp.configuration_manager, 'remove_system_override') as remove_sys_mock: self.mySqlApp.remove_replication_replica_overrides() - remove_sys_mock.assert_called_once_with(dbaas.CNF_SLAVE) + remove_sys_mock.assert_called_once_with(dbaas_base.CNF_SLAVE) def test_exists_replication_source_overrides(self): with patch.object(self.mySqlApp.configuration_manager, @@ -1063,7 +1090,7 @@ class MySqlAppTest(testtools.TestCase): return_value=MagicMock(name='get_engine')) def test_grant_replication_privilege(self, *args): replication_user = {'name': 'testUSr', 'password': 'somePwd'} - with patch.object(dbaas, 'LocalSqlClient', + with patch.object(dbaas.MySqlApp, 'local_sql_client', return_value=self.mock_client): self.mySqlApp.grant_replication_privilege(replication_user) args, _ = self.mock_execute.call_args_list[0] @@ -1075,7 +1102,7 @@ class MySqlAppTest(testtools.TestCase): @patch.object(dbaas, 'get_engine', return_value=MagicMock(name='get_engine')) def test_get_port(self, *args): - with patch.object(dbaas, 'LocalSqlClient', + with patch.object(dbaas.MySqlApp, 'local_sql_client', return_value=self.mock_client): self.mySqlApp.get_port() args, _ = self.mock_execute.call_args_list[0] @@ -1088,7 +1115,7 @@ class MySqlAppTest(testtools.TestCase): def test_get_binlog_position(self, *args): result = {'File': 'mysql-bin.003', 'Position': '73'} self.mock_execute.return_value.first = Mock(return_value=result) - with patch.object(dbaas, 'LocalSqlClient', + with patch.object(dbaas.MySqlApp, 'local_sql_client', return_value=self.mock_client): found_result = self.mySqlApp.get_binlog_position() @@ -1103,7 +1130,7 @@ class MySqlAppTest(testtools.TestCase): @patch.object(dbaas, 'get_engine', return_value=MagicMock(name='get_engine')) def test_execute_on_client(self, *args): - with patch.object(dbaas, 'LocalSqlClient', + with patch.object(dbaas.MySqlApp, 'local_sql_client', return_value=self.mock_client): self.mySqlApp.execute_on_client('show tables') args, _ = self.mock_execute.call_args_list[0] @@ -1115,7 +1142,7 @@ class MySqlAppTest(testtools.TestCase): return_value=MagicMock(name='get_engine')) @patch.object(dbaas.MySqlApp, '_wait_for_slave_status') def test_start_slave(self, *args): - with patch.object(dbaas, 'LocalSqlClient', + with patch.object(dbaas.MySqlApp, 'local_sql_client', return_value=self.mock_client): self.mySqlApp.start_slave() args, _ = self.mock_execute.call_args_list[0] @@ -1129,7 +1156,7 @@ class MySqlAppTest(testtools.TestCase): def test_stop_slave_with_failover(self, *args): self.mock_execute.return_value.first = Mock( return_value={'Master_User': 'root'}) - with patch.object(dbaas, 'LocalSqlClient', + with patch.object(dbaas.MySqlApp, 'local_sql_client', return_value=self.mock_client): result = self.mySqlApp.stop_slave(True) self.assertEqual('root', result['replication_user']) @@ -1147,7 +1174,7 @@ class MySqlAppTest(testtools.TestCase): def test_stop_slave_without_failover(self, *args): self.mock_execute.return_value.first = Mock( return_value={'Master_User': 'root'}) - with patch.object(dbaas, 'LocalSqlClient', + with patch.object(dbaas.MySqlApp, 'local_sql_client', return_value=self.mock_client): result = self.mySqlApp.stop_slave(False) self.assertEqual('root', result['replication_user']) @@ -1163,7 +1190,7 @@ class MySqlAppTest(testtools.TestCase): @patch.object(dbaas, 'get_engine', return_value=MagicMock(name='get_engine')) def test_stop_master(self, *args): - with patch.object(dbaas, 'LocalSqlClient', + with patch.object(dbaas.MySqlApp, 'local_sql_client', return_value=self.mock_client): self.mySqlApp.stop_master() args, _ = self.mock_execute.call_args_list[0] @@ -1197,7 +1224,7 @@ class MySqlAppTest(testtools.TestCase): return_value=MagicMock(name='get_engine')) def test__get_slave_status(self, *args): self.mock_execute.return_value.first = Mock(return_value='some_thing') - with patch.object(dbaas, 'LocalSqlClient', + with patch.object(dbaas.MySqlApp, 'local_sql_client', return_value=self.mock_client): result = self.mySqlApp._get_slave_status() self.assertEqual('some_thing', result) @@ -1211,7 +1238,7 @@ class MySqlAppTest(testtools.TestCase): def test_get_latest_txn_id(self, *args): self.mock_execute.return_value.first = Mock(return_value=['some_thing'] ) - with patch.object(dbaas, 'LocalSqlClient', + with patch.object(dbaas.MySqlApp, 'local_sql_client', return_value=self.mock_client): result = self.mySqlApp.get_latest_txn_id() self.assertEqual('some_thing', result) @@ -1223,7 +1250,7 @@ class MySqlAppTest(testtools.TestCase): @patch.object(dbaas, 'get_engine', return_value=MagicMock(name='get_engine')) def test_wait_for_txn(self, *args): - with patch.object(dbaas, 'LocalSqlClient', + with patch.object(dbaas.MySqlApp, 'local_sql_client', return_value=self.mock_client): self.mySqlApp.wait_for_txn('abcd') args, _ = self.mock_execute.call_args_list[0] @@ -1236,7 +1263,7 @@ class MySqlAppTest(testtools.TestCase): def test_get_txn_count(self, *args): self.mock_execute.return_value.first = Mock( return_value=['b1f3f33a-0789-ee1c-43f3-f8373e12f1ea:1']) - with patch.object(dbaas, 'LocalSqlClient', + with patch.object(dbaas.MySqlApp, 'local_sql_client', return_value=self.mock_client): result = self.mySqlApp.get_txn_count() self.assertEqual(1, result) @@ -1251,7 +1278,7 @@ class MySqlAppInstallTest(MySqlAppTest): def setUp(self): super(MySqlAppInstallTest, self).setUp() self.orig_create_engine = sqlalchemy.create_engine - self.orig_pkg_version = dbaas.packager.pkg_version + self.orig_pkg_version = dbaas_base.packager.pkg_version self.orig_utils_execute_with_timeout = utils.execute_with_timeout self.mock_client = Mock() self.mock_execute = Mock() @@ -1262,7 +1289,7 @@ class MySqlAppInstallTest(MySqlAppTest): def tearDown(self): super(MySqlAppInstallTest, self).tearDown() sqlalchemy.create_engine = self.orig_create_engine - dbaas.packager.pkg_version = self.orig_pkg_version + dbaas_base.packager.pkg_version = self.orig_pkg_version utils.execute_with_timeout = self.orig_utils_execute_with_timeout def test_install(self): @@ -1282,7 +1309,7 @@ class MySqlAppInstallTest(MySqlAppTest): return_value='some_password') def test_secure(self, auth_pwd_mock): - dbaas.clear_expired_password = Mock() + dbaas_base.clear_expired_password = Mock() self.mySqlApp.start_mysql = Mock() self.mySqlApp.stop_db = Mock() self.mySqlApp._reset_configuration = Mock() @@ -1306,7 +1333,7 @@ class MySqlAppInstallTest(MySqlAppTest): @patch.object(utils, 'generate_random_password', return_value='some_password') def test_secure_root(self, *args): - with patch.object(dbaas, 'LocalSqlClient', + with patch.object(dbaas.MySqlApp, 'local_sql_client', return_value=self.mock_client): self.mySqlApp.secure_root() update_root_password, _ = self.mock_execute.call_args_list[0] @@ -1344,7 +1371,7 @@ class MySqlAppInstallTest(MySqlAppTest): return_value=MagicMock(name='get_engine')) def test_apply_overrides(self, *args): overrides = {'sort_buffer_size': 1000000} - with patch.object(dbaas, 'LocalSqlClient', + with patch.object(dbaas.MySqlApp, 'local_sql_client', return_value=self.mock_client): self.mySqlApp.apply_overrides(overrides) args, _ = self.mock_execute.call_args_list[0] @@ -1355,7 +1382,7 @@ class MySqlAppInstallTest(MySqlAppTest): @patch.object(dbaas, 'get_engine', return_value=MagicMock(name='get_engine')) def test_make_read_only(self, *args): - with patch.object(dbaas, 'LocalSqlClient', + with patch.object(dbaas.MySqlApp, 'local_sql_client', return_value=self.mock_client): self.mySqlApp.make_read_only('ON') args, _ = self.mock_execute.call_args_list[0] @@ -1380,7 +1407,7 @@ class MySqlAppInstallTest(MySqlAppTest): def test_secure_write_conf_error(self): - dbaas.clear_expired_password = Mock() + dbaas_base.clear_expired_password = Mock() self.mySqlApp.start_mysql = Mock() self.mySqlApp.stop_db = Mock() self.mySqlApp._reset_configuration = Mock( @@ -1443,7 +1470,8 @@ class MySqlAppMockTest(testtools.TestCase): mock_status = MagicMock() mock_status.wait_for_real_status_to_change_to = MagicMock( return_value=True) - dbaas.clear_expired_password = MagicMock(return_value=None) + dbaas_base.clear_expired_password = \ + MagicMock(return_value=None) app = MySqlApp(mock_status) app._reset_configuration = MagicMock() app._write_mycnf = MagicMock(return_value=True) @@ -1469,9 +1497,11 @@ class MySqlAppMockTest(testtools.TestCase): mock_status = MagicMock() mock_status.wait_for_real_status_to_change_to = MagicMock( return_value=True) - dbaas.clear_expired_password = MagicMock(return_value=None) + dbaas_base.clear_expired_password = \ + MagicMock(return_value=None) app = MySqlApp(mock_status) - dbaas.clear_expired_password = MagicMock(return_value=None) + dbaas_base.clear_expired_password = \ + MagicMock(return_value=None) self.assertRaises(RuntimeError, app.secure, None, None) self.assertTrue(mock_conn.execute.called) # At least called twice @@ -1486,10 +1516,14 @@ class MySqlRootStatusTest(testtools.TestCase): def setUp(self): super(MySqlRootStatusTest, self).setUp() self.orig_utils_execute_with_timeout = utils.execute_with_timeout + dbaas.orig_configuration_manager = dbaas.MySqlApp.configuration_manager + dbaas.MySqlApp.configuration_manager = Mock() def tearDown(self): super(MySqlRootStatusTest, self).tearDown() utils.execute_with_timeout = self.orig_utils_execute_with_timeout + dbaas.MySqlApp.configuration_manager = \ + dbaas.orig_configuration_manager @patch.object(dbaas.MySqlApp, 'get_auth_password', return_value='some_password') @@ -1509,7 +1543,7 @@ class MySqlRootStatusTest(testtools.TestCase): mock_rs = MagicMock() mock_rs.rowcount = 0 with patch.object(mock_conn, 'execute', return_value=mock_rs): - self.assertThat(MySqlRootAccess.is_root_enabled(), Equals(False)) + self.assertThat(MySqlRootAccess().is_root_enabled(), Equals(False)) @patch.object(dbaas.MySqlApp, 'get_auth_password', return_value='some_password') @@ -1518,7 +1552,7 @@ class MySqlRootStatusTest(testtools.TestCase): with patch.object(mock_conn, 'execute', return_value=None): # invocation - user_ser = MySqlRootAccess.enable_root() + user_ser = MySqlRootAccess().enable_root() # verification self.assertThat(user_ser, Not(Is(None))) mock_conn.execute.assert_any_call(TextClauseMatcher('CREATE USER'), @@ -1591,8 +1625,8 @@ class ServiceRegistryTest(testtools.TestCase): self.assertEqual('trove.guestagent.datastore.mysql.' 'manager.Manager', test_dict.get('mysql')) - self.assertEqual('trove.guestagent.datastore.mysql.' - 'manager.Manager', + self.assertEqual('trove.guestagent.datastore.experimental.' + 'percona.manager.Manager', test_dict.get('percona')) self.assertEqual('trove.guestagent.datastore.experimental.redis.' 'manager.Manager', @@ -1624,8 +1658,8 @@ class ServiceRegistryTest(testtools.TestCase): self.assertEqual('trove.guestagent.datastore.mysql.' 'manager.Manager123', test_dict.get('mysql')) - self.assertEqual('trove.guestagent.datastore.mysql.' - 'manager.Manager', + self.assertEqual('trove.guestagent.datastore.experimental.' + 'percona.manager.Manager', test_dict.get('percona')) self.assertEqual('trove.guestagent.datastore.experimental.redis.' 'manager.Manager', @@ -1657,8 +1691,8 @@ class ServiceRegistryTest(testtools.TestCase): self.assertEqual('trove.guestagent.datastore.mysql.' 'manager.Manager', test_dict.get('mysql')) - self.assertEqual('trove.guestagent.datastore.mysql.' - 'manager.Manager', + self.assertEqual('trove.guestagent.datastore.experimental.' + 'percona.manager.Manager', test_dict.get('percona')) self.assertEqual('trove.guestagent.datastore.experimental.redis.' 'manager.Manager', @@ -1695,12 +1729,14 @@ class KeepAliveConnectionTest(testtools.TestCase): def setUp(self): super(KeepAliveConnectionTest, self).setUp() - self.orig_utils_execute_with_timeout = dbaas.utils.execute_with_timeout + self.orig_utils_execute_with_timeout = \ + dbaas_base.utils.execute_with_timeout self.orig_LOG_err = dbaas.LOG def tearDown(self): super(KeepAliveConnectionTest, self).tearDown() - dbaas.utils.execute_with_timeout = self.orig_utils_execute_with_timeout + dbaas_base.utils.execute_with_timeout = \ + self.orig_utils_execute_with_timeout dbaas.LOG = self.orig_LOG_err def test_checkout_type_error(self): @@ -1855,9 +1891,10 @@ class MySqlAppStatusTest(testtools.TestCase): def setUp(self): super(MySqlAppStatusTest, self).setUp() util.init_db() - self.orig_utils_execute_with_timeout = dbaas.utils.execute_with_timeout - self.orig_load_mysqld_options = dbaas.load_mysqld_options - self.orig_dbaas_os_path_exists = dbaas.os.path.exists + self.orig_utils_execute_with_timeout = \ + dbaas_base.utils.execute_with_timeout + self.orig_load_mysqld_options = dbaas_base.load_mysqld_options + self.orig_dbaas_base_os_path_exists = dbaas_base.os.path.exists self.orig_dbaas_time_sleep = time.sleep self.FAKE_ID = str(uuid4()) InstanceServiceStatus.create(instance_id=self.FAKE_ID, @@ -1866,18 +1903,19 @@ class MySqlAppStatusTest(testtools.TestCase): def tearDown(self): super(MySqlAppStatusTest, self).tearDown() - dbaas.utils.execute_with_timeout = self.orig_utils_execute_with_timeout - dbaas.load_mysqld_options = self.orig_load_mysqld_options - dbaas.os.path.exists = self.orig_dbaas_os_path_exists + dbaas_base.utils.execute_with_timeout = \ + self.orig_utils_execute_with_timeout + dbaas_base.load_mysqld_options = self.orig_load_mysqld_options + dbaas_base.os.path.exists = self.orig_dbaas_base_os_path_exists time.sleep = self.orig_dbaas_time_sleep InstanceServiceStatus.find_by(instance_id=self.FAKE_ID).delete() dbaas.CONF.guest_id = None def test_get_actual_db_status(self): - dbaas.utils.execute_with_timeout = Mock(return_value=(None, None)) + dbaas_base.utils.execute_with_timeout = Mock(return_value=(None, None)) - self.mySqlAppStatus = MySqlAppStatus() + self.mySqlAppStatus = MySqlAppStatus.get() status = self.mySqlAppStatus._get_actual_db_status() self.assertEqual(rd_instance.ServiceStatuses.RUNNING, status) @@ -1887,31 +1925,31 @@ class MySqlAppStatusTest(testtools.TestCase): @patch.object(os.path, 'exists', return_value=True) def test_get_actual_db_status_error_crashed(self, mock_exists, mock_execute): - dbaas.load_mysqld_options = Mock(return_value={}) - self.mySqlAppStatus = MySqlAppStatus() + dbaas_base.load_mysqld_options = Mock(return_value={}) + self.mySqlAppStatus = MySqlAppStatus.get() status = self.mySqlAppStatus._get_actual_db_status() self.assertEqual(rd_instance.ServiceStatuses.CRASHED, status) def test_get_actual_db_status_error_shutdown(self): mocked = Mock(side_effect=ProcessExecutionError()) - dbaas.utils.execute_with_timeout = mocked - dbaas.load_mysqld_options = Mock(return_value={}) - dbaas.os.path.exists = Mock(return_value=False) + dbaas_base.utils.execute_with_timeout = mocked + dbaas_base.load_mysqld_options = Mock(return_value={}) + dbaas_base.os.path.exists = Mock(return_value=False) - self.mySqlAppStatus = MySqlAppStatus() + self.mySqlAppStatus = MySqlAppStatus.get() status = self.mySqlAppStatus._get_actual_db_status() self.assertEqual(rd_instance.ServiceStatuses.SHUTDOWN, status) def test_get_actual_db_status_error_blocked(self): - dbaas.utils.execute_with_timeout = MagicMock( + dbaas_base.utils.execute_with_timeout = MagicMock( side_effect=[ProcessExecutionError(), ("some output", None)]) - dbaas.load_mysqld_options = Mock() - dbaas.os.path.exists = Mock(return_value=True) + dbaas_base.load_mysqld_options = Mock() + dbaas_base.os.path.exists = Mock(return_value=True) - self.mySqlAppStatus = MySqlAppStatus() + self.mySqlAppStatus = MySqlAppStatus.get() status = self.mySqlAppStatus._get_actual_db_status() self.assertEqual(rd_instance.ServiceStatuses.BLOCKED, status) diff --git a/trove/tests/unittests/guestagent/test_mysql_manager.py b/trove/tests/unittests/guestagent/test_mysql_manager.py index 581cab22dd..be5a7b32d2 100644 --- a/trove/tests/unittests/guestagent/test_mysql_manager.py +++ b/trove/tests/unittests/guestagent/test_mysql_manager.py @@ -26,6 +26,8 @@ from trove.common.exception import ProcessExecutionError from trove.common import instance as rd_instance from trove.guestagent import backup from trove.guestagent.common import operating_system +# TODO(atomic77) The test cases should be made configurable +# to make it easier to test the various derived datastores. from trove.guestagent.datastore.mysql.manager import Manager import trove.guestagent.datastore.mysql.service as dbaas from trove.guestagent import dbaas as base_dbaas @@ -55,9 +57,8 @@ class GuestAgentManagerTest(testtools.TestCase): # set up common mock objects, etc. for replication testing self.patcher_gfvs = patch( 'trove.guestagent.dbaas.get_filesystem_volume_stats') - self.patcher_rs = patch( - 'trove.guestagent.datastore.mysql.manager.' - 'REPLICATION_STRATEGY_CLASS') + self.patcher_rs = patch(self.manager.replication_namespace + "." + + self.manager.replication_strategy) self.mock_gfvs_class = self.patcher_gfvs.start() self.mock_rs_class = self.patcher_rs.start() self.repl_datastore_manager = 'mysql'