diff --git a/trove/guestagent/datastore/experimental/mariadb/__init__.py b/trove/guestagent/datastore/experimental/mariadb/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/trove/guestagent/datastore/experimental/mariadb/manager.py b/trove/guestagent/datastore/experimental/mariadb/manager.py new file mode 100644 index 0000000000..02e43e6782 --- /dev/null +++ b/trove/guestagent/datastore/experimental/mariadb/manager.py @@ -0,0 +1,47 @@ +# Copyright 2015 Tesora, Inc. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +from oslo_utils import importutils +from trove.common import cfg +from trove.guestagent.datastore.mysql import manager_base +from trove.guestagent.strategies.replication import get_replication_strategy + +CONF = cfg.CONF +MANAGER = CONF.datastore_manager if CONF.datastore_manager else 'mysql' +REPLICATION_STRATEGY = CONF.get(MANAGER).replication_strategy +REPLICATION_NAMESPACE = CONF.get(MANAGER).replication_namespace +REPLICATION_STRATEGY_CLASS = get_replication_strategy(REPLICATION_STRATEGY, + REPLICATION_NAMESPACE) + +MYSQL_APP = "trove.guestagent.datastore.experimental.mariadb." \ + "service.MySqlApp" +MYSQL_APP_STATUS = "trove.guestagent.datastore.experimental.mariadb." \ + "service.MySqlAppStatus" +MYSQL_ADMIN = "trove.guestagent.datastore.experimental.mariadb.service." \ + "MySqlAdmin" + + +class Manager(manager_base.BaseMySqlManager): + + def __init__(self): + mysql_app = importutils.import_class(MYSQL_APP) + mysql_app_status = importutils.import_class(MYSQL_APP_STATUS) + mysql_admin = importutils.import_class(MYSQL_ADMIN) + + super(Manager, self).__init__(mysql_app, mysql_app_status, + mysql_admin, REPLICATION_STRATEGY, + REPLICATION_NAMESPACE, + REPLICATION_STRATEGY_CLASS, MANAGER) diff --git a/trove/guestagent/datastore/experimental/mariadb/service.py b/trove/guestagent/datastore/experimental/mariadb/service.py new file mode 100644 index 0000000000..d6c7026ed5 --- /dev/null +++ b/trove/guestagent/datastore/experimental/mariadb/service.py @@ -0,0 +1,50 @@ +# Copyright 2015 Tesora, Inc. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +from oslo_log import log as logging +from trove.guestagent.datastore.mysql import service_base + +LOG = logging.getLogger(__name__) + + +class KeepAliveConnection(service_base.BaseKeepAliveConnection): + pass + + +class MySqlAppStatus(service_base.BaseMySqlAppStatus): + pass + + +class LocalSqlClient(service_base.BaseLocalSqlClient): + pass + + +class MySqlApp(service_base.BaseMySqlApp): + def __init__(self, status): + super(MySqlApp, self).__init__(status, LocalSqlClient, + KeepAliveConnection) + + +class MySqlRootAccess(service_base.BaseMySqlRootAccess): + def __init__(self): + super(MySqlRootAccess, self).__init__(LocalSqlClient, + MySqlApp(MySqlAppStatus.get())) + + +class MySqlAdmin(service_base.BaseMySqlAdmin): + def __init__(self): + super(MySqlAdmin, self).__init__(LocalSqlClient, MySqlRootAccess(), + MySqlApp) diff --git a/trove/guestagent/datastore/experimental/percona/__init__.py b/trove/guestagent/datastore/experimental/percona/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/trove/guestagent/datastore/experimental/percona/manager.py b/trove/guestagent/datastore/experimental/percona/manager.py new file mode 100644 index 0000000000..1664e8538b --- /dev/null +++ b/trove/guestagent/datastore/experimental/percona/manager.py @@ -0,0 +1,47 @@ +# Copyright 2015 Tesora, Inc. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +from oslo_utils import importutils +from trove.common import cfg +from trove.guestagent.datastore.mysql import manager_base +from trove.guestagent.strategies.replication import get_replication_strategy + +CONF = cfg.CONF +MANAGER = CONF.datastore_manager if CONF.datastore_manager else 'mysql' +REPLICATION_STRATEGY = CONF.get(MANAGER).replication_strategy +REPLICATION_NAMESPACE = CONF.get(MANAGER).replication_namespace +REPLICATION_STRATEGY_CLASS = get_replication_strategy(REPLICATION_STRATEGY, + REPLICATION_NAMESPACE) + +MYSQL_APP = "trove.guestagent.datastore.experimental.percona." \ + "service.MySqlApp" +MYSQL_APP_STATUS = "trove.guestagent.datastore.experimental.percona." \ + "service.MySqlAppStatus" +MYSQL_ADMIN = "trove.guestagent.datastore.experimental.percona." \ + "service.MySqlAdmin" + + +class Manager(manager_base.BaseMySqlManager): + + def __init__(self): + mysql_app = importutils.import_class(MYSQL_APP) + mysql_app_status = importutils.import_class(MYSQL_APP_STATUS) + mysql_admin = importutils.import_class(MYSQL_ADMIN) + + super(Manager, self).__init__(mysql_app, mysql_app_status, + mysql_admin, REPLICATION_STRATEGY, + REPLICATION_NAMESPACE, + REPLICATION_STRATEGY_CLASS, MANAGER) diff --git a/trove/guestagent/datastore/experimental/percona/service.py b/trove/guestagent/datastore/experimental/percona/service.py new file mode 100644 index 0000000000..d6c7026ed5 --- /dev/null +++ b/trove/guestagent/datastore/experimental/percona/service.py @@ -0,0 +1,50 @@ +# Copyright 2015 Tesora, Inc. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +from oslo_log import log as logging +from trove.guestagent.datastore.mysql import service_base + +LOG = logging.getLogger(__name__) + + +class KeepAliveConnection(service_base.BaseKeepAliveConnection): + pass + + +class MySqlAppStatus(service_base.BaseMySqlAppStatus): + pass + + +class LocalSqlClient(service_base.BaseLocalSqlClient): + pass + + +class MySqlApp(service_base.BaseMySqlApp): + def __init__(self, status): + super(MySqlApp, self).__init__(status, LocalSqlClient, + KeepAliveConnection) + + +class MySqlRootAccess(service_base.BaseMySqlRootAccess): + def __init__(self): + super(MySqlRootAccess, self).__init__(LocalSqlClient, + MySqlApp(MySqlAppStatus.get())) + + +class MySqlAdmin(service_base.BaseMySqlAdmin): + def __init__(self): + super(MySqlAdmin, self).__init__(LocalSqlClient, MySqlRootAccess(), + MySqlApp) diff --git a/trove/guestagent/datastore/mysql/manager.py b/trove/guestagent/datastore/mysql/manager.py index 2e687d427b..6dfeddeb22 100644 --- a/trove/guestagent/datastore/mysql/manager.py +++ b/trove/guestagent/datastore/mysql/manager.py @@ -16,27 +16,11 @@ # under the License. # -import os - -from oslo_log import log as logging -from oslo_service import periodic_task - +from oslo_utils import importutils from trove.common import cfg -from trove.common import exception -from trove.common.i18n import _ -from trove.common import instance as rd_instance -from trove.guestagent import backup -from trove.guestagent.common import operating_system -from trove.guestagent.datastore.mysql import service -from trove.guestagent.datastore.mysql.service import MySqlAdmin -from trove.guestagent.datastore.mysql.service import MySqlApp -from trove.guestagent.datastore.mysql.service import MySqlAppStatus -from trove.guestagent import dbaas +from trove.guestagent.datastore.mysql import manager_base from trove.guestagent.strategies.replication import get_replication_strategy -from trove.guestagent import volume - -LOG = logging.getLogger(__name__) CONF = cfg.CONF MANAGER = CONF.datastore_manager if CONF.datastore_manager else 'mysql' REPLICATION_STRATEGY = CONF.get(MANAGER).replication_strategy @@ -44,309 +28,19 @@ REPLICATION_NAMESPACE = CONF.get(MANAGER).replication_namespace REPLICATION_STRATEGY_CLASS = get_replication_strategy(REPLICATION_STRATEGY, REPLICATION_NAMESPACE) +MYSQL_APP = "trove.guestagent.datastore.mysql.service.MySqlApp" +MYSQL_APP_STATUS = "trove.guestagent.datastore.mysql.service.MySqlAppStatus" +MYSQL_ADMIN = "trove.guestagent.datastore.mysql.service.MySqlAdmin" -class Manager(periodic_task.PeriodicTasks): + +class Manager(manager_base.BaseMySqlManager): def __init__(self): - super(Manager, self).__init__(CONF) + mysql_app = importutils.import_class(MYSQL_APP) + mysql_app_status = importutils.import_class(MYSQL_APP_STATUS) + mysql_admin = importutils.import_class(MYSQL_ADMIN) - @periodic_task.periodic_task - def update_status(self, context): - """Update the status of the MySQL service.""" - MySqlAppStatus.get().update() - - def rpc_ping(self, context): - LOG.debug("Responding to RPC ping.") - return True - - def change_passwords(self, context, users): - return MySqlAdmin().change_passwords(users) - - def update_attributes(self, context, username, hostname, user_attrs): - return MySqlAdmin().update_attributes(username, hostname, user_attrs) - - def reset_configuration(self, context, configuration): - app = MySqlApp(MySqlAppStatus.get()) - app.reset_configuration(configuration) - - def create_database(self, context, databases): - return MySqlAdmin().create_database(databases) - - def create_user(self, context, users): - MySqlAdmin().create_user(users) - - def delete_database(self, context, database): - return MySqlAdmin().delete_database(database) - - def delete_user(self, context, user): - MySqlAdmin().delete_user(user) - - def get_user(self, context, username, hostname): - return MySqlAdmin().get_user(username, hostname) - - def grant_access(self, context, username, hostname, databases): - return MySqlAdmin().grant_access(username, hostname, databases) - - def revoke_access(self, context, username, hostname, database): - return MySqlAdmin().revoke_access(username, hostname, database) - - def list_access(self, context, username, hostname): - return MySqlAdmin().list_access(username, hostname) - - def list_databases(self, context, limit=None, marker=None, - include_marker=False): - return MySqlAdmin().list_databases(limit, marker, - include_marker) - - def list_users(self, context, limit=None, marker=None, - include_marker=False): - return MySqlAdmin().list_users(limit, marker, - include_marker) - - def enable_root(self, context): - return MySqlAdmin().enable_root() - - def is_root_enabled(self, context): - return MySqlAdmin().is_root_enabled() - - def _perform_restore(self, backup_info, context, restore_location, app): - LOG.info(_("Restoring database from backup %s.") % backup_info['id']) - try: - backup.restore(context, backup_info, restore_location) - except Exception: - LOG.exception(_("Error performing restore from backup %s.") % - backup_info['id']) - app.status.set_status(rd_instance.ServiceStatuses.FAILED) - raise - LOG.info(_("Restored database successfully.")) - - def prepare(self, context, packages, databases, memory_mb, users, - device_path=None, mount_point=None, backup_info=None, - config_contents=None, root_password=None, overrides=None, - cluster_config=None, snapshot=None): - """Makes ready DBAAS on a Guest container.""" - MySqlAppStatus.get().begin_install() - # status end_mysql_install set with secure() - app = MySqlApp(MySqlAppStatus.get()) - app.install_if_needed(packages) - if device_path: - # stop and do not update database - app.stop_db() - device = volume.VolumeDevice(device_path) - # unmount if device is already mounted - device.unmount_device(device_path) - device.format() - if os.path.exists(mount_point): - # rsync existing data to a "data" sub-directory - # on the new volume - device.migrate_data(mount_point, target_subdir="data") - # mount the volume - device.mount(mount_point) - operating_system.chown( - mount_point, service.MYSQL_OWNER, service.MYSQL_OWNER, - recursive=False, as_root=True) - - LOG.debug("Mounted the volume at %s." % mount_point) - # We need to temporarily update the default my.cnf so that - # mysql will start after the volume is mounted. Later on it - # will be changed based on the config template - # (see MySqlApp.secure()) and restart. - app.set_data_dir(mount_point + '/data') - app.start_mysql() - if backup_info: - self._perform_restore(backup_info, context, - mount_point + "/data", app) - LOG.debug("Securing MySQL now.") - app.secure(config_contents, overrides) - enable_root_on_restore = (backup_info and - MySqlAdmin().is_root_enabled()) - if root_password and not backup_info: - app.secure_root(secure_remote_root=True) - MySqlAdmin().enable_root(root_password) - elif enable_root_on_restore: - app.secure_root(secure_remote_root=False) - MySqlAppStatus.get().report_root(context, 'root') - else: - app.secure_root(secure_remote_root=True) - - app.complete_install_or_restart() - - if databases: - self.create_database(context, databases) - - if users: - self.create_user(context, users) - - if snapshot: - self.attach_replica(context, snapshot, snapshot['config']) - - LOG.info(_('Completed setup of MySQL database instance.')) - - def restart(self, context): - app = MySqlApp(MySqlAppStatus.get()) - app.restart() - - def start_db_with_conf_changes(self, context, config_contents): - app = MySqlApp(MySqlAppStatus.get()) - app.start_db_with_conf_changes(config_contents) - - def stop_db(self, context, do_not_start_on_reboot=False): - app = MySqlApp(MySqlAppStatus.get()) - app.stop_db(do_not_start_on_reboot=do_not_start_on_reboot) - - def get_filesystem_stats(self, context, fs_path): - """Gets the filesystem stats for the path given.""" - mount_point = CONF.get(MANAGER).mount_point - return dbaas.get_filesystem_volume_stats(mount_point) - - def create_backup(self, context, backup_info): - """ - Entry point for initiating a backup for this guest agents db instance. - The call currently blocks until the backup is complete or errors. If - device_path is specified, it will be mounted based to a point specified - in configuration. - - :param backup_info: a dictionary containing the db instance id of the - backup task, location, type, and other data. - """ - backup.backup(context, backup_info) - - def mount_volume(self, context, device_path=None, mount_point=None): - device = volume.VolumeDevice(device_path) - device.mount(mount_point, write_to_fstab=False) - LOG.debug("Mounted the device %s at the mount point %s." % - (device_path, mount_point)) - - def unmount_volume(self, context, device_path=None, mount_point=None): - device = volume.VolumeDevice(device_path) - device.unmount(mount_point) - LOG.debug("Unmounted the device %s from the mount point %s." % - (device_path, mount_point)) - - def resize_fs(self, context, device_path=None, mount_point=None): - device = volume.VolumeDevice(device_path) - device.resize_fs(mount_point) - LOG.debug("Resized the filesystem %s." % mount_point) - - def update_overrides(self, context, overrides, remove=False): - app = MySqlApp(MySqlAppStatus.get()) - if remove: - app.remove_overrides() - app.update_overrides(overrides) - - def apply_overrides(self, context, overrides): - LOG.debug("Applying overrides (%s)." % overrides) - app = MySqlApp(MySqlAppStatus.get()) - app.apply_overrides(overrides) - - def get_replication_snapshot(self, context, snapshot_info, - replica_source_config=None): - LOG.debug("Getting replication snapshot.") - app = MySqlApp(MySqlAppStatus.get()) - - replication = REPLICATION_STRATEGY_CLASS(context) - replication.enable_as_master(app, replica_source_config) - - snapshot_id, log_position = ( - replication.snapshot_for_replication(context, app, None, - snapshot_info)) - - mount_point = CONF.get(MANAGER).mount_point - volume_stats = dbaas.get_filesystem_volume_stats(mount_point) - - replication_snapshot = { - 'dataset': { - 'datastore_manager': MANAGER, - 'dataset_size': volume_stats.get('used', 0.0), - 'volume_size': volume_stats.get('total', 0.0), - 'snapshot_id': snapshot_id - }, - 'replication_strategy': REPLICATION_STRATEGY, - 'master': replication.get_master_ref(app, snapshot_info), - 'log_position': log_position - } - - return replication_snapshot - - def enable_as_master(self, context, replica_source_config): - LOG.debug("Calling enable_as_master.") - app = MySqlApp(MySqlAppStatus.get()) - replication = REPLICATION_STRATEGY_CLASS(context) - replication.enable_as_master(app, replica_source_config) - - # DEPRECATED: Maintain for API Compatibility - def get_txn_count(self, context): - LOG.debug("Calling get_txn_count") - return MySqlApp(MySqlAppStatus.get()).get_txn_count() - - def get_last_txn(self, context): - LOG.debug("Calling get_last_txn") - return MySqlApp(MySqlAppStatus.get()).get_last_txn() - - def get_latest_txn_id(self, context): - LOG.debug("Calling get_latest_txn_id.") - return MySqlApp(MySqlAppStatus.get()).get_latest_txn_id() - - def wait_for_txn(self, context, txn): - LOG.debug("Calling wait_for_txn.") - MySqlApp(MySqlAppStatus.get()).wait_for_txn(txn) - - def detach_replica(self, context, for_failover=False): - LOG.debug("Detaching replica.") - app = MySqlApp(MySqlAppStatus.get()) - replication = REPLICATION_STRATEGY_CLASS(context) - replica_info = replication.detach_slave(app, for_failover) - return replica_info - - def get_replica_context(self, context): - LOG.debug("Getting replica context.") - app = MySqlApp(MySqlAppStatus.get()) - replication = REPLICATION_STRATEGY_CLASS(context) - replica_info = replication.get_replica_context(app) - return replica_info - - def _validate_slave_for_replication(self, context, replica_info): - if (replica_info['replication_strategy'] != REPLICATION_STRATEGY): - raise exception.IncompatibleReplicationStrategy( - replica_info.update({ - 'guest_strategy': REPLICATION_STRATEGY - })) - - mount_point = CONF.get(MANAGER).mount_point - volume_stats = dbaas.get_filesystem_volume_stats(mount_point) - if (volume_stats.get('total', 0.0) < - replica_info['dataset']['dataset_size']): - raise exception.InsufficientSpaceForReplica( - replica_info.update({ - 'slave_volume_size': volume_stats.get('total', 0.0) - })) - - def attach_replica(self, context, replica_info, slave_config): - LOG.debug("Attaching replica.") - app = MySqlApp(MySqlAppStatus.get()) - try: - if 'replication_strategy' in replica_info: - self._validate_slave_for_replication(context, replica_info) - replication = REPLICATION_STRATEGY_CLASS(context) - replication.enable_as_slave(app, replica_info, slave_config) - except Exception: - LOG.exception("Error enabling replication.") - app.status.set_status(rd_instance.ServiceStatuses.FAILED) - raise - - def make_read_only(self, context, read_only): - LOG.debug("Executing make_read_only(%s)" % read_only) - app = MySqlApp(MySqlAppStatus.get()) - app.make_read_only(read_only) - - def cleanup_source_on_replica_detach(self, context, replica_info): - LOG.debug("Cleaning up the source on the detach of a replica.") - replication = REPLICATION_STRATEGY_CLASS(context) - replication.cleanup_source_on_replica_detach(MySqlAdmin(), - replica_info) - - def demote_replication_master(self, context): - LOG.debug("Demoting replication master.") - app = MySqlApp(MySqlAppStatus.get()) - replication = REPLICATION_STRATEGY_CLASS(context) - replication.demote_master(app) + super(Manager, self).__init__(mysql_app, mysql_app_status, + mysql_admin, REPLICATION_STRATEGY, + REPLICATION_NAMESPACE, + REPLICATION_STRATEGY_CLASS, MANAGER) diff --git a/trove/guestagent/datastore/mysql/manager_base.py b/trove/guestagent/datastore/mysql/manager_base.py new file mode 100644 index 0000000000..7b8b9e118c --- /dev/null +++ b/trove/guestagent/datastore/mysql/manager_base.py @@ -0,0 +1,384 @@ +# Copyright 2013 OpenStack Foundation +# Copyright 2013 Rackspace Hosting +# Copyright 2013 Hewlett-Packard Development Company, L.P. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +import os + +from oslo_log import log as logging +from oslo_service import periodic_task + +from trove.common import cfg +from trove.common import exception +from trove.common.i18n import _ +from trove.common import instance as rd_instance +from trove.guestagent import backup +from trove.guestagent.common import operating_system +from trove.guestagent.datastore.mysql import service_base +from trove.guestagent import dbaas +from trove.guestagent.strategies.replication import get_replication_strategy +from trove.guestagent import volume + + +LOG = logging.getLogger(__name__) +CONF = cfg.CONF + + +class BaseMySqlManager(periodic_task.PeriodicTasks): + + def __init__(self, mysql_app, mysql_app_status, mysql_admin, + replication_strategy, replication_namespace, + replication_strategy_class, manager): + + super(BaseMySqlManager, self).__init__(CONF) + self._mysql_app = mysql_app + self._mysql_app_status = mysql_app_status + self._mysql_admin = mysql_admin + self._replication_strategy = replication_strategy + self._replication_namespace = replication_namespace + self._replication_strategy_class = replication_strategy_class + self._manager = manager + + @property + def mysql_app(self): + return self._mysql_app + + @property + def mysql_app_status(self): + return self._mysql_app_status + + @property + def mysql_admin(self): + return self._mysql_admin + + @property + def replication_strategy(self): + return self._replication_strategy + + @property + def replication_namespace(self): + return self._replication_namespace + + @property + def replication_strategy_class(self): + return get_replication_strategy(self._replication_strategy, + self._replication_namespace) + + @property + def manager(self): + return self._manager + + @periodic_task.periodic_task + def update_status(self, context): + """Update the status of the MySQL service.""" + self.mysql_app_status.get().update() + + def rpc_ping(self, context): + LOG.debug("Responding to RPC ping.") + return True + + def change_passwords(self, context, users): + return self.mysql_admin().change_passwords(users) + + def update_attributes(self, context, username, hostname, user_attrs): + return self.mysql_admin().update_attributes( + username, hostname, user_attrs) + + def reset_configuration(self, context, configuration): + app = self.mysql_app(self.mysql_app_status.get()) + app.reset_configuration(configuration) + + def create_database(self, context, databases): + return self.mysql_admin().create_database(databases) + + def create_user(self, context, users): + self.mysql_admin().create_user(users) + + def delete_database(self, context, database): + return self.mysql_admin().delete_database(database) + + def delete_user(self, context, user): + self.mysql_admin().delete_user(user) + + def get_user(self, context, username, hostname): + return self.mysql_admin().get_user(username, hostname) + + def grant_access(self, context, username, hostname, databases): + return self.mysql_admin().grant_access(username, hostname, databases) + + def revoke_access(self, context, username, hostname, database): + return self.mysql_admin().revoke_access(username, hostname, database) + + def list_access(self, context, username, hostname): + return self.mysql_admin().list_access(username, hostname) + + def list_databases(self, context, limit=None, marker=None, + include_marker=False): + return self.mysql_admin().list_databases(limit, marker, + include_marker) + + def list_users(self, context, limit=None, marker=None, + include_marker=False): + return self.mysql_admin().list_users(limit, marker, + include_marker) + + def enable_root(self, context): + return self.mysql_admin().enable_root() + + def is_root_enabled(self, context): + return self.mysql_admin().is_root_enabled() + + def _perform_restore(self, backup_info, context, restore_location, app): + LOG.info(_("Restoring database from backup %s.") % backup_info['id']) + try: + backup.restore(context, backup_info, restore_location) + except Exception: + LOG.exception(_("Error performing restore from backup %s.") % + backup_info['id']) + app.status.set_status(rd_instance.ServiceStatuses.FAILED) + raise + LOG.info(_("Restored database successfully.")) + + def prepare(self, context, packages, databases, memory_mb, users, + device_path=None, mount_point=None, backup_info=None, + config_contents=None, root_password=None, overrides=None, + cluster_config=None, snapshot=None): + """Makes ready DBAAS on a Guest container.""" + self.mysql_app_status.get().begin_install() + # status end_mysql_install set with secure() + app = self.mysql_app(self.mysql_app_status.get()) + app.install_if_needed(packages) + if device_path: + # stop and do not update database + app.stop_db() + device = volume.VolumeDevice(device_path) + # unmount if device is already mounted + device.unmount_device(device_path) + device.format() + if os.path.exists(mount_point): + # rsync existing data to a "data" sub-directory + # on the new volume + device.migrate_data(mount_point, target_subdir="data") + # mount the volume + device.mount(mount_point) + operating_system.chown(mount_point, service_base.MYSQL_OWNER, + service_base.MYSQL_OWNER, + recursive=False, as_root=True) + + LOG.debug("Mounted the volume at %s." % mount_point) + # We need to temporarily update the default my.cnf so that + # mysql will start after the volume is mounted. Later on it + # will be changed based on the config template + # (see MySqlApp.secure()) and restart. + app.set_data_dir(mount_point + '/data') + app.start_mysql() + if backup_info: + self._perform_restore(backup_info, context, + mount_point + "/data", app) + LOG.debug("Securing MySQL now.") + app.secure(config_contents, overrides) + enable_root_on_restore = (backup_info and + self.mysql_admin().is_root_enabled()) + if root_password and not backup_info: + app.secure_root(secure_remote_root=True) + self.mysql_admin().enable_root(root_password) + elif enable_root_on_restore: + app.secure_root(secure_remote_root=False) + self.mysql_app_status.get().report_root(context, 'root') + else: + app.secure_root(secure_remote_root=True) + + app.complete_install_or_restart() + + if databases: + self.create_database(context, databases) + + if users: + self.create_user(context, users) + + if snapshot: + self.attach_replica(context, snapshot, snapshot['config']) + + LOG.info(_('Completed setup of MySQL database instance.')) + + def restart(self, context): + app = self.mysql_app(self.mysql_app_status.get()) + app.restart() + + def start_db_with_conf_changes(self, context, config_contents): + app = self.mysql_app(self.mysql_app_status.get()) + app.start_db_with_conf_changes(config_contents) + + def stop_db(self, context, do_not_start_on_reboot=False): + app = self.mysql_app(self.mysql_app_status.get()) + app.stop_db(do_not_start_on_reboot=do_not_start_on_reboot) + + def get_filesystem_stats(self, context, fs_path): + """Gets the filesystem stats for the path given.""" + mount_point = CONF.get(self.manager).mount_point + return dbaas.get_filesystem_volume_stats(mount_point) + + def create_backup(self, context, backup_info): + """ + Entry point for initiating a backup for this guest agents db instance. + The call currently blocks until the backup is complete or errors. If + device_path is specified, it will be mounted based to a point specified + in configuration. + + :param backup_info: a dictionary containing the db instance id of the + backup task, location, type, and other data. + """ + backup.backup(context, backup_info) + + def mount_volume(self, context, device_path=None, mount_point=None): + device = volume.VolumeDevice(device_path) + device.mount(mount_point, write_to_fstab=False) + LOG.debug("Mounted the device %s at the mount point %s." % + (device_path, mount_point)) + + def unmount_volume(self, context, device_path=None, mount_point=None): + device = volume.VolumeDevice(device_path) + device.unmount(mount_point) + LOG.debug("Unmounted the device %s from the mount point %s." % + (device_path, mount_point)) + + def resize_fs(self, context, device_path=None, mount_point=None): + device = volume.VolumeDevice(device_path) + device.resize_fs(mount_point) + LOG.debug("Resized the filesystem %s." % mount_point) + + def update_overrides(self, context, overrides, remove=False): + app = self.mysql_app(self.mysql_app_status.get()) + if remove: + app.remove_overrides() + app.update_overrides(overrides) + + def apply_overrides(self, context, overrides): + LOG.debug("Applying overrides (%s)." % overrides) + app = self.mysql_app(self.mysql_app_status.get()) + app.apply_overrides(overrides) + + def get_replication_snapshot(self, context, snapshot_info, + replica_source_config=None): + LOG.debug("Getting replication snapshot.") + app = self.mysql_app(self.mysql_app_status.get()) + + replication = self.replication_strategy_class(context) + replication.enable_as_master(app, replica_source_config) + + snapshot_id, log_position = ( + replication.snapshot_for_replication(context, app, None, + snapshot_info)) + + mount_point = CONF.get(self.manager).mount_point + volume_stats = dbaas.get_filesystem_volume_stats(mount_point) + + replication_snapshot = { + 'dataset': { + 'datastore_manager': self.manager, + 'dataset_size': volume_stats.get('used', 0.0), + 'volume_size': volume_stats.get('total', 0.0), + 'snapshot_id': snapshot_id + }, + 'replication_strategy': self.replication_strategy, + 'master': replication.get_master_ref(app, snapshot_info), + 'log_position': log_position + } + + return replication_snapshot + + def enable_as_master(self, context, replica_source_config): + LOG.debug("Calling enable_as_master.") + app = self.mysql_app(self.mysql_app_status.get()) + replication = self.replication_strategy_class(context) + replication.enable_as_master(app, replica_source_config) + + # DEPRECATED: Maintain for API Compatibility + def get_txn_count(self, context): + LOG.debug("Calling get_txn_count") + return self.mysql_app(self.mysql_app_status.get()).get_txn_count() + + def get_last_txn(self, context): + LOG.debug("Calling get_last_txn") + return self.mysql_app(self.mysql_app_status.get()).get_last_txn() + + def get_latest_txn_id(self, context): + LOG.debug("Calling get_latest_txn_id.") + return self.mysql_app(self.mysql_app_status.get()).get_latest_txn_id() + + def wait_for_txn(self, context, txn): + LOG.debug("Calling wait_for_txn.") + self.mysql_app(self.mysql_app_status.get()).wait_for_txn(txn) + + def detach_replica(self, context, for_failover=False): + LOG.debug("Detaching replica.") + app = self.mysql_app(self.mysql_app_status.get()) + replication = self.replication_strategy_class(context) + replica_info = replication.detach_slave(app, for_failover) + return replica_info + + def get_replica_context(self, context): + LOG.debug("Getting replica context.") + app = self.mysql_app(self.mysql_app_status.get()) + replication = self.replication_strategy_class(context) + replica_info = replication.get_replica_context(app) + return replica_info + + def _validate_slave_for_replication(self, context, replica_info): + if (replica_info['replication_strategy'] != self.replication_strategy): + raise exception.IncompatibleReplicationStrategy( + replica_info.update({ + 'guest_strategy': self.replication_strategy + })) + + mount_point = CONF.get(self.manager).mount_point + volume_stats = dbaas.get_filesystem_volume_stats(mount_point) + if (volume_stats.get('total', 0.0) < + replica_info['dataset']['dataset_size']): + raise exception.InsufficientSpaceForReplica( + replica_info.update({ + 'slave_volume_size': volume_stats.get('total', 0.0) + })) + + def attach_replica(self, context, replica_info, slave_config): + LOG.debug("Attaching replica.") + app = self.mysql_app(self.mysql_app_status.get()) + try: + if 'replication_strategy' in replica_info: + self._validate_slave_for_replication(context, replica_info) + replication = self.replication_strategy_class(context) + replication.enable_as_slave(app, replica_info, slave_config) + except Exception: + LOG.exception("Error enabling replication.") + app.status.set_status(rd_instance.ServiceStatuses.FAILED) + raise + + def make_read_only(self, context, read_only): + LOG.debug("Executing make_read_only(%s)" % read_only) + app = self.mysql_app(self.mysql_app_status.get()) + app.make_read_only(read_only) + + def cleanup_source_on_replica_detach(self, context, replica_info): + LOG.debug("Cleaning up the source on the detach of a replica.") + replication = self.replication_strategy_class(context) + replication.cleanup_source_on_replica_detach(self.mysql_admin(), + replica_info) + + def demote_replication_master(self, context): + LOG.debug("Demoting replication master.") + app = self.mysql_app(self.mysql_app_status.get()) + replication = self.replication_strategy_class(context) + replication.demote_master(app) diff --git a/trove/guestagent/datastore/mysql/service.py b/trove/guestagent/datastore/mysql/service.py index c59c1e59db..de6bc54b78 100644 --- a/trove/guestagent/datastore/mysql/service.py +++ b/trove/guestagent/datastore/mysql/service.py @@ -16,1018 +16,41 @@ # under the License. # -from collections import defaultdict -import os -import re -import uuid - from oslo_log import log as logging -import sqlalchemy -from sqlalchemy import exc -from sqlalchemy import interfaces -from sqlalchemy.sql.expression import text +from trove.guestagent.datastore.mysql import service_base -from trove.common import cfg -from trove.common.configurations import MySQLConfParser -from trove.common import exception -from trove.common.exception import PollTimeOut -from trove.common.i18n import _ -from trove.common import instance as rd_instance -from trove.common.stream_codecs import IniCodec -from trove.common import utils as utils -from trove.guestagent.common.configuration import ConfigurationManager -from trove.guestagent.common.configuration import ImportOverrideStrategy -from trove.guestagent.common import guestagent_utils -from trove.guestagent.common import operating_system -from trove.guestagent.common import sql_query -from trove.guestagent.datastore import service -from trove.guestagent.db import models -from trove.guestagent import pkg - -ADMIN_USER_NAME = "os_admin" LOG = logging.getLogger(__name__) -FLUSH = text(sql_query.FLUSH) -ENGINE = None -PREPARING = False -UUID = False - -CONF = cfg.CONF -MANAGER = CONF.datastore_manager if CONF.datastore_manager else 'mysql' - -INCLUDE_MARKER_OPERATORS = { - True: ">=", - False: ">" -} - -OS_NAME = operating_system.get_os() -MYSQL_CONFIG = {operating_system.REDHAT: "/etc/my.cnf", - operating_system.DEBIAN: "/etc/mysql/my.cnf", - operating_system.SUSE: "/etc/my.cnf"}[OS_NAME] -MYSQL_SERVICE_CANDIDATES = ["mysql", "mysqld", "mysql-server"] -MYSQL_BIN_CANDIDATES = ["/usr/sbin/mysqld", "/usr/libexec/mysqld"] -MYSQL_OWNER = 'mysql' -CNF_EXT = 'cnf' -CNF_INCLUDE_DIR = '/etc/mysql/conf.d/' -CNF_MASTER = 'master-replication' -CNF_SLAVE = 'slave-replication' +CONF = service_base.CONF -# Create a package impl -packager = pkg.Package() +class KeepAliveConnection(service_base.BaseKeepAliveConnection): + pass -def clear_expired_password(): - """ - Some mysql installations generate random root password - and save it in /root/.mysql_secret, this password is - expired and should be changed by client that supports expired passwords. - """ - LOG.debug("Removing expired password.") - secret_file = "/root/.mysql_secret" - try: - out, err = utils.execute("cat", secret_file, - run_as_root=True, root_helper="sudo") - except exception.ProcessExecutionError: - LOG.exception(_("/root/.mysql_secret does not exist.")) - return - m = re.match('# The random password set for the root user at .*: (.*)', - out) - if m: - try: - out, err = utils.execute("mysqladmin", "-p%s" % m.group(1), - "password", "", run_as_root=True, - root_helper="sudo") - except exception.ProcessExecutionError: - LOG.exception(_("Cannot change mysql password.")) - return - operating_system.remove(secret_file, force=True, as_root=True) - LOG.debug("Expired password removed.") +class MySqlAppStatus(service_base.BaseMySqlAppStatus): + pass -def get_engine(): - """Create the default engine with the updated admin user.""" - # TODO(rnirmal):Based on permissions issues being resolved we may revert - # url = URL(drivername='mysql', host='localhost', - # query={'read_default_file': '/etc/mysql/my.cnf'}) - global ENGINE - if ENGINE: - return ENGINE - pwd = MySqlApp.get_auth_password() - ENGINE = sqlalchemy.create_engine("mysql://%s:%s@localhost:3306" % - (ADMIN_USER_NAME, pwd.strip()), - pool_recycle=7200, - echo=CONF.sql_query_logging, - listeners=[KeepAliveConnection()]) - return ENGINE +class LocalSqlClient(service_base.BaseLocalSqlClient): + pass -def load_mysqld_options(): - # find mysqld bin - for bin in MYSQL_BIN_CANDIDATES: - if os.path.isfile(bin): - mysqld_bin = bin - break - else: - return {} - try: - out, err = utils.execute(mysqld_bin, "--print-defaults", - run_as_root=True, root_helper="sudo") - arglist = re.split("\n", out)[1].split() - args = defaultdict(list) - for item in arglist: - if "=" in item: - key, value = item.split("=", 1) - args[key.lstrip("--")].append(value) - else: - args[item.lstrip("--")].append(None) - return args - except exception.ProcessExecutionError: - return {} - - -class MySqlAppStatus(service.BaseDbStatus): - - @classmethod - def get(cls): - if not cls._instance: - cls._instance = MySqlAppStatus() - return cls._instance - - def _get_actual_db_status(self): - try: - out, err = utils.execute_with_timeout( - "/usr/bin/mysqladmin", - "ping", run_as_root=True, root_helper="sudo", - log_output_on_error=True) - LOG.info(_("MySQL Service Status is RUNNING.")) - return rd_instance.ServiceStatuses.RUNNING - except exception.ProcessExecutionError: - LOG.exception(_("Failed to get database status.")) - try: - out, err = utils.execute_with_timeout("/bin/ps", "-C", - "mysqld", "h") - pid = out.split()[0] - # TODO(rnirmal): Need to create new statuses for instances - # where the mysql service is up, but unresponsive - LOG.info(_('MySQL Service Status %(pid)s is BLOCKED.') % - {'pid': pid}) - return rd_instance.ServiceStatuses.BLOCKED - except exception.ProcessExecutionError: - LOG.exception(_("Process execution failed.")) - mysql_args = load_mysqld_options() - pid_file = mysql_args.get('pid_file', - ['/var/run/mysqld/mysqld.pid'])[0] - if os.path.exists(pid_file): - LOG.info(_("MySQL Service Status is CRASHED.")) - return rd_instance.ServiceStatuses.CRASHED - else: - LOG.info(_("MySQL Service Status is SHUTDOWN.")) - return rd_instance.ServiceStatuses.SHUTDOWN - - -class LocalSqlClient(object): - """A sqlalchemy wrapper to manage transactions.""" - - def __init__(self, engine, use_flush=True): - self.engine = engine - self.use_flush = use_flush - - def __enter__(self): - self.conn = self.engine.connect() - self.trans = self.conn.begin() - return self.conn - - def __exit__(self, type, value, traceback): - if self.trans: - if type is not None: # An error occurred - self.trans.rollback() - else: - if self.use_flush: - self.conn.execute(FLUSH) - self.trans.commit() - self.conn.close() - - def execute(self, t, **kwargs): - try: - return self.conn.execute(t, kwargs) - except Exception: - self.trans.rollback() - self.trans = None - raise - - -class MySqlAdmin(object): - """Handles administrative tasks on the MySQL database.""" - - def _associate_dbs(self, user): - """Internal. Given a MySQLUser, populate its databases attribute.""" - LOG.debug("Associating dbs to user %s at %s." % - (user.name, user.host)) - with LocalSqlClient(get_engine()) as client: - q = sql_query.Query() - q.columns = ["grantee", "table_schema"] - q.tables = ["information_schema.SCHEMA_PRIVILEGES"] - q.group = ["grantee", "table_schema"] - q.where = ["privilege_type != 'USAGE'"] - t = text(str(q)) - db_result = client.execute(t) - for db in db_result: - LOG.debug("\t db: %s." % db) - if db['grantee'] == "'%s'@'%s'" % (user.name, user.host): - mysql_db = models.MySQLDatabase() - mysql_db.name = db['table_schema'] - user.databases.append(mysql_db.serialize()) - - def change_passwords(self, users): - """Change the passwords of one or more existing users.""" - LOG.debug("Changing the password of some users.") - with LocalSqlClient(get_engine()) as client: - for item in users: - LOG.debug("Changing password for user %s." % item) - user_dict = {'_name': item['name'], - '_host': item['host'], - '_password': item['password']} - user = models.MySQLUser() - user.deserialize(user_dict) - LOG.debug("\tDeserialized: %s." % user.__dict__) - uu = sql_query.UpdateUser(user.name, host=user.host, - clear=user.password) - t = text(str(uu)) - client.execute(t) - - def update_attributes(self, username, hostname, user_attrs): - """Change the attributes of an existing user.""" - LOG.debug("Changing user attributes for user %s." % username) - user = self._get_user(username, hostname) - db_access = set() - grantee = set() - with LocalSqlClient(get_engine()) as client: - q = sql_query.Query() - q.columns = ["grantee", "table_schema"] - q.tables = ["information_schema.SCHEMA_PRIVILEGES"] - q.group = ["grantee", "table_schema"] - q.where = ["privilege_type != 'USAGE'"] - t = text(str(q)) - db_result = client.execute(t) - for db in db_result: - grantee.add(db['grantee']) - if db['grantee'] == "'%s'@'%s'" % (user.name, user.host): - db_name = db['table_schema'] - db_access.add(db_name) - with LocalSqlClient(get_engine()) as client: - uu = sql_query.UpdateUser(user.name, host=user.host, - clear=user_attrs.get('password'), - new_user=user_attrs.get('name'), - new_host=user_attrs.get('host')) - t = text(str(uu)) - client.execute(t) - uname = user_attrs.get('name') or username - host = user_attrs.get('host') or hostname - find_user = "'%s'@'%s'" % (uname, host) - if find_user not in grantee: - self.grant_access(uname, host, db_access) - - def create_database(self, databases): - """Create the list of specified databases.""" - with LocalSqlClient(get_engine()) as client: - for item in databases: - mydb = models.ValidatedMySQLDatabase() - mydb.deserialize(item) - cd = sql_query.CreateDatabase(mydb.name, - mydb.character_set, - mydb.collate) - t = text(str(cd)) - client.execute(t) - - def create_user(self, users): - """Create users and grant them privileges for the - specified databases. - """ - with LocalSqlClient(get_engine()) as client: - for item in users: - user = models.MySQLUser() - user.deserialize(item) - # TODO(cp16net):Should users be allowed to create users - # 'os_admin' or 'debian-sys-maint' - g = sql_query.Grant(user=user.name, host=user.host, - clear=user.password) - t = text(str(g)) - client.execute(t) - for database in user.databases: - mydb = models.ValidatedMySQLDatabase() - mydb.deserialize(database) - g = sql_query.Grant(permissions='ALL', database=mydb.name, - user=user.name, host=user.host, - clear=user.password) - t = text(str(g)) - client.execute(t) - - def delete_database(self, database): - """Delete the specified database.""" - with LocalSqlClient(get_engine()) as client: - mydb = models.ValidatedMySQLDatabase() - mydb.deserialize(database) - dd = sql_query.DropDatabase(mydb.name) - t = text(str(dd)) - client.execute(t) - - def delete_user(self, user): - """Delete the specified user.""" - mysql_user = models.MySQLUser() - mysql_user.deserialize(user) - self.delete_user_by_name(mysql_user.name, mysql_user.host) - - def delete_user_by_name(self, name, host='%'): - with LocalSqlClient(get_engine()) as client: - du = sql_query.DropUser(name, host=host) - t = text(str(du)) - LOG.debug("delete_user_by_name: %s", t) - client.execute(t) - - def get_user(self, username, hostname): - user = self._get_user(username, hostname) - if not user: - return None - return user.serialize() - - def _get_user(self, username, hostname): - """Return a single user matching the criteria.""" - user = models.MySQLUser() - try: - user.name = username # Could possibly throw a BadRequest here. - except ValueError as ve: - LOG.exception(_("Error Getting user information")) - raise exception.BadRequest(_("Username %(user)s is not valid" - ": %(reason)s") % - {'user': username, 'reason': ve.message} - ) - with LocalSqlClient(get_engine()) as client: - q = sql_query.Query() - q.columns = ['User', 'Host', 'Password'] - q.tables = ['mysql.user'] - q.where = ["Host != 'localhost'", - "User = '%s'" % username, - "Host = '%s'" % hostname] - q.order = ['User', 'Host'] - t = text(str(q)) - result = client.execute(t).fetchall() - LOG.debug("Getting user information %s." % result) - if len(result) != 1: - return None - found_user = result[0] - user.password = found_user['Password'] - user.host = found_user['Host'] - self._associate_dbs(user) - return user - - def grant_access(self, username, hostname, databases): - """Grant a user permission to use a given database.""" - user = self._get_user(username, hostname) - mydb = models.ValidatedMySQLDatabase() - with LocalSqlClient(get_engine()) as client: - for database in databases: - try: - mydb.name = database - except ValueError: - LOG.exception(_("Error granting access")) - raise exception.BadRequest(_( - "Grant access to %s is not allowed") % database) - - g = sql_query.Grant(permissions='ALL', database=mydb.name, - user=user.name, host=user.host, - hashed=user.password) - t = text(str(g)) - client.execute(t) - - def is_root_enabled(self): - """Return True if root access is enabled; False otherwise.""" - return MySqlRootAccess.is_root_enabled() - - def enable_root(self, root_password=None): - """Enable the root user global access and/or - reset the root password. - """ - return MySqlRootAccess.enable_root(root_password) - - def list_databases(self, limit=None, marker=None, include_marker=False): - """List databases the user created on this mysql instance.""" - LOG.debug("---Listing Databases---") - ignored_database_names = "'%s'" % "', '".join(CONF.ignore_dbs) - LOG.debug("The following database names are on ignore list and will " - "be omitted from the listing: %s" % ignored_database_names) - databases = [] - with LocalSqlClient(get_engine()) as client: - q = sql_query.Query() - q.columns = [ - 'schema_name as name', - 'default_character_set_name as charset', - 'default_collation_name as collation', - ] - q.tables = ['information_schema.schemata'] - q.where = ["schema_name NOT IN (" + ignored_database_names + ")"] - q.order = ['schema_name ASC'] - if limit: - q.limit = limit + 1 - if marker: - q.where.append("schema_name %s '%s'" % - (INCLUDE_MARKER_OPERATORS[include_marker], - marker)) - t = text(str(q)) - database_names = client.execute(t) - next_marker = None - LOG.debug("database_names = %r." % database_names) - for count, database in enumerate(database_names): - if count >= limit: - break - LOG.debug("database = %s." % str(database)) - mysql_db = models.MySQLDatabase() - mysql_db.name = database[0] - next_marker = mysql_db.name - mysql_db.character_set = database[1] - mysql_db.collate = database[2] - databases.append(mysql_db.serialize()) - LOG.debug("databases = " + str(databases)) - if database_names.rowcount <= limit: - next_marker = None - return databases, next_marker - - def list_users(self, limit=None, marker=None, include_marker=False): - """List users that have access to the database.""" - ''' - SELECT - User, - Host, - Marker - FROM - (SELECT - User, - Host, - CONCAT(User, '@', Host) as Marker - FROM mysql.user - ORDER BY 1, 2) as innerquery - WHERE - Marker > :marker - ORDER BY - Marker - LIMIT :limit; - ''' - LOG.debug("---Listing Users---") - users = [] - with LocalSqlClient(get_engine()) as client: - mysql_user = models.MySQLUser() - iq = sql_query.Query() # Inner query. - iq.columns = ['User', 'Host', "CONCAT(User, '@', Host) as Marker"] - iq.tables = ['mysql.user'] - iq.order = ['User', 'Host'] - innerquery = str(iq).rstrip(';') - - oq = sql_query.Query() # Outer query. - oq.columns = ['User', 'Host', 'Marker'] - oq.tables = ['(%s) as innerquery' % innerquery] - oq.where = ["Host != 'localhost'"] - oq.order = ['Marker'] - if marker: - oq.where.append("Marker %s '%s'" % - (INCLUDE_MARKER_OPERATORS[include_marker], - marker)) - if limit: - oq.limit = limit + 1 - t = text(str(oq)) - result = client.execute(t) - next_marker = None - LOG.debug("result = " + str(result)) - for count, row in enumerate(result): - if count >= limit: - break - LOG.debug("user = " + str(row)) - mysql_user = models.MySQLUser() - mysql_user.name = row['User'] - mysql_user.host = row['Host'] - self._associate_dbs(mysql_user) - next_marker = row['Marker'] - users.append(mysql_user.serialize()) - if result.rowcount <= limit: - next_marker = None - LOG.debug("users = " + str(users)) - - return users, next_marker - - def revoke_access(self, username, hostname, database): - """Revoke a user's permission to use a given database.""" - user = self._get_user(username, hostname) - with LocalSqlClient(get_engine()) as client: - r = sql_query.Revoke(database=database, - user=user.name, - host=user.host) - t = text(str(r)) - client.execute(t) - - def list_access(self, username, hostname): - """Show all the databases to which the user has more than - USAGE granted. - """ - user = self._get_user(username, hostname) - return user.databases - - -class KeepAliveConnection(interfaces.PoolListener): - """ - A connection pool listener that ensures live connections are returned - from the connection pool at checkout. This alleviates the problem of - MySQL connections timing out. - """ - - def checkout(self, dbapi_con, con_record, con_proxy): - """Event triggered when a connection is checked out from the pool.""" - try: - try: - dbapi_con.ping(False) - except TypeError: - dbapi_con.ping() - except dbapi_con.OperationalError as ex: - if ex.args[0] in (2006, 2013, 2014, 2045, 2055): - raise exc.DisconnectionError() - else: - raise - - -class MySqlApp(object): - """Prepares DBaaS on a Guest container.""" - - TIME_OUT = 1000 - - configuration_manager = ConfigurationManager( - MYSQL_CONFIG, MYSQL_OWNER, MYSQL_OWNER, IniCodec(), requires_root=True, - override_strategy=ImportOverrideStrategy(CNF_INCLUDE_DIR, CNF_EXT)) - - @classmethod - def get_auth_password(cls): - return cls.configuration_manager.get_value('client').get('password') - - @classmethod - def get_data_dir(cls): - return cls.configuration_manager.get_value( - MySQLConfParser.SERVER_CONF_SECTION).get('datadir') - - @classmethod - def set_data_dir(cls, value): - cls.configuration_manager.apply_system_override( - {MySQLConfParser.SERVER_CONF_SECTION: {'datadir': value}}) - +class MySqlApp(service_base.BaseMySqlApp): def __init__(self, status): - """By default login with root no password for initial setup.""" - self.state_change_wait_time = CONF.state_change_wait_time - self.status = status + super(MySqlApp, self).__init__(status, LocalSqlClient, + KeepAliveConnection) - def _create_admin_user(self, client, password): - """ - Create a os_admin user with a random password - with all privileges similar to the root user. - """ - localhost = "localhost" - g = sql_query.Grant(permissions='ALL', user=ADMIN_USER_NAME, - host=localhost, grant_option=True, clear=password) - t = text(str(g)) - client.execute(t) - @staticmethod - def _generate_root_password(client): - """Generate and set a random root password and forget about it.""" - localhost = "localhost" - uu = sql_query.UpdateUser("root", host=localhost, - clear=utils.generate_random_password()) - t = text(str(uu)) - client.execute(t) +class MySqlRootAccess(service_base.BaseMySqlRootAccess): + def __init__(self): + super(MySqlRootAccess, self).__init__(LocalSqlClient, + MySqlApp(MySqlAppStatus.get())) - def install_if_needed(self, packages): - """Prepare the guest machine with a secure - mysql server installation. - """ - LOG.info(_("Preparing Guest as MySQL Server.")) - if not packager.pkg_is_installed(packages): - LOG.debug("Installing MySQL server.") - self._clear_mysql_config() - # set blank password on pkg configuration stage - pkg_opts = {'root_password': '', - 'root_password_again': ''} - packager.pkg_install(packages, pkg_opts, self.TIME_OUT) - self._create_mysql_confd_dir() - LOG.info(_("Finished installing MySQL server.")) - self.start_mysql() - def complete_install_or_restart(self): - self.status.end_install_or_restart() +class MySqlAdmin(service_base.BaseMySqlAdmin): + def __init__(self): + super(MySqlAdmin, self).__init__(LocalSqlClient, MySqlRootAccess(), + MySqlApp) - def secure(self, config_contents, overrides): - LOG.info(_("Generating admin password.")) - admin_password = utils.generate_random_password() - clear_expired_password() - engine = sqlalchemy.create_engine("mysql://root:@localhost:3306", - echo=True) - with LocalSqlClient(engine) as client: - self._remove_anonymous_user(client) - self._create_admin_user(client, admin_password) - self.stop_db() - - self._reset_configuration(config_contents, admin_password) - self._apply_user_overrides(overrides) - self.start_mysql() - - LOG.debug("MySQL secure complete.") - - def _reset_configuration(self, configuration, admin_password=None): - if not admin_password: - # Take the current admin password from the base configuration file - # if not given. - admin_password = MySqlApp.get_auth_password() - - self.configuration_manager.save_configuration(configuration) - self._save_authentication_properties(admin_password) - self.wipe_ib_logfiles() - - def _save_authentication_properties(self, admin_password): - self.configuration_manager.apply_system_override( - {'client': {'user': ADMIN_USER_NAME, 'password': admin_password}}) - - def secure_root(self, secure_remote_root=True): - with LocalSqlClient(get_engine()) as client: - LOG.info(_("Preserving root access from restore.")) - self._generate_root_password(client) - if secure_remote_root: - self._remove_remote_root_access(client) - - def _clear_mysql_config(self): - """Clear old configs, which can be incompatible with new version.""" - LOG.debug("Clearing old MySQL config.") - random_uuid = str(uuid.uuid4()) - configs = ["/etc/my.cnf", "/etc/mysql/conf.d", "/etc/mysql/my.cnf"] - for config in configs: - try: - old_conf_backup = "%s_%s" % (config, random_uuid) - operating_system.move(config, old_conf_backup, as_root=True) - LOG.debug("%s saved to %s_%s." % - (config, config, random_uuid)) - except exception.ProcessExecutionError: - pass - - def _create_mysql_confd_dir(self): - conf_dir = "/etc/mysql/conf.d" - LOG.debug("Creating %s." % conf_dir) - operating_system.create_directory(conf_dir, as_root=True) - - def _enable_mysql_on_boot(self): - LOG.debug("Enabling MySQL on boot.") - try: - mysql_service = operating_system.service_discovery( - MYSQL_SERVICE_CANDIDATES) - utils.execute_with_timeout(mysql_service['cmd_enable'], shell=True) - except KeyError: - LOG.exception(_("Error enabling MySQL start on boot.")) - raise RuntimeError("Service is not discovered.") - - def _disable_mysql_on_boot(self): - try: - mysql_service = operating_system.service_discovery( - MYSQL_SERVICE_CANDIDATES) - utils.execute_with_timeout(mysql_service['cmd_disable'], - shell=True) - except KeyError: - LOG.exception(_("Error disabling MySQL start on boot.")) - raise RuntimeError("Service is not discovered.") - - def stop_db(self, update_db=False, do_not_start_on_reboot=False): - LOG.info(_("Stopping MySQL.")) - if do_not_start_on_reboot: - self._disable_mysql_on_boot() - try: - mysql_service = operating_system.service_discovery( - MYSQL_SERVICE_CANDIDATES) - utils.execute_with_timeout(mysql_service['cmd_stop'], shell=True) - except KeyError: - LOG.exception(_("Error stopping MySQL.")) - raise RuntimeError("Service is not discovered.") - if not self.status.wait_for_real_status_to_change_to( - rd_instance.ServiceStatuses.SHUTDOWN, - self.state_change_wait_time, update_db): - LOG.error(_("Could not stop MySQL.")) - self.status.end_install_or_restart() - raise RuntimeError("Could not stop MySQL!") - - def _remove_anonymous_user(self, client): - t = text(sql_query.REMOVE_ANON) - client.execute(t) - - def _remove_remote_root_access(self, client): - t = text(sql_query.REMOVE_ROOT) - client.execute(t) - - def restart(self): - try: - self.status.begin_restart() - self.stop_db() - self.start_mysql() - finally: - self.status.end_install_or_restart() - - def update_overrides(self, overrides): - self._apply_user_overrides(overrides) - - def _apply_user_overrides(self, overrides): - # All user-defined values go to the server section of the configuration - # file. - if overrides: - self.configuration_manager.apply_user_override( - {MySQLConfParser.SERVER_CONF_SECTION: overrides}) - - def apply_overrides(self, overrides): - LOG.debug("Applying overrides to MySQL.") - with LocalSqlClient(get_engine()) as client: - LOG.debug("Updating override values in running MySQL.") - for k, v in overrides.iteritems(): - byte_value = guestagent_utils.to_bytes(v) - q = sql_query.SetServerVariable(key=k, value=byte_value) - t = text(str(q)) - try: - client.execute(t) - except exc.OperationalError: - output = {'key': k, 'value': byte_value} - LOG.exception(_("Unable to set %(key)s with value " - "%(value)s.") % output) - - def make_read_only(self, read_only): - with LocalSqlClient(get_engine()) as client: - q = "set global read_only = %s" % read_only - client.execute(text(str(q))) - - def wipe_ib_logfiles(self): - """Destroys the iblogfiles. - - If for some reason the selected log size in the conf changes from the - current size of the files MySQL will fail to start, so we delete the - files to be safe. - """ - LOG.info(_("Wiping ib_logfiles.")) - for index in range(2): - try: - # On restarts, sometimes these are wiped. So it can be a race - # to have MySQL start up before it's restarted and these have - # to be deleted. That's why its ok if they aren't found and - # that is why we use the "force" option to "remove". - operating_system.remove("%s/ib_logfile%d" - % (self.get_data_dir(), index), - force=True, as_root=True) - except exception.ProcessExecutionError: - LOG.exception("Could not delete logfile.") - raise - - def remove_overrides(self): - self.configuration_manager.remove_user_override() - - def _remove_replication_overrides(self, cnf_file): - LOG.info(_("Removing replication configuration file.")) - if os.path.exists(cnf_file): - operating_system.remove(cnf_file, as_root=True) - - def exists_replication_source_overrides(self): - return self.configuration_manager.has_system_override(CNF_MASTER) - - def write_replication_source_overrides(self, overrideValues): - self.configuration_manager.apply_system_override(overrideValues, - CNF_MASTER) - - def write_replication_replica_overrides(self, overrideValues): - self.configuration_manager.apply_system_override(overrideValues, - CNF_SLAVE) - - def remove_replication_source_overrides(self): - self.configuration_manager.remove_system_override(CNF_MASTER) - - def remove_replication_replica_overrides(self): - self.configuration_manager.remove_system_override(CNF_SLAVE) - - def grant_replication_privilege(self, replication_user): - LOG.info(_("Granting Replication Slave privilege.")) - - LOG.debug("grant_replication_privilege: %s" % replication_user) - - with LocalSqlClient(get_engine()) as client: - g = sql_query.Grant(permissions=['REPLICATION SLAVE'], - user=replication_user['name'], - clear=replication_user['password']) - - t = text(str(g)) - client.execute(t) - - def get_port(self): - with LocalSqlClient(get_engine()) as client: - result = client.execute('SELECT @@port').first() - return result[0] - - def get_binlog_position(self): - with LocalSqlClient(get_engine()) as client: - result = client.execute('SHOW MASTER STATUS').first() - binlog_position = { - 'log_file': result['File'], - 'position': result['Position'] - } - return binlog_position - - def execute_on_client(self, sql_statement): - LOG.debug("Executing SQL: %s" % sql_statement) - with LocalSqlClient(get_engine()) as client: - return client.execute(sql_statement) - - def start_slave(self): - LOG.info(_("Starting slave replication.")) - with LocalSqlClient(get_engine()) as client: - client.execute('START SLAVE') - self._wait_for_slave_status("ON", client, 60) - - def stop_slave(self, for_failover): - replication_user = None - LOG.info(_("Stopping slave replication.")) - with LocalSqlClient(get_engine()) as client: - result = client.execute('SHOW SLAVE STATUS') - replication_user = result.first()['Master_User'] - client.execute('STOP SLAVE') - client.execute('RESET SLAVE ALL') - self._wait_for_slave_status("OFF", client, 30) - if not for_failover: - client.execute('DROP USER ' + replication_user) - return { - 'replication_user': replication_user - } - - def stop_master(self): - LOG.info(_("Stopping replication master.")) - with LocalSqlClient(get_engine()) as client: - client.execute('RESET MASTER') - - def _wait_for_slave_status(self, status, client, max_time): - - def verify_slave_status(): - actual_status = client.execute( - "SHOW GLOBAL STATUS like 'slave_running'").first()[1] - return actual_status.upper() == status.upper() - - LOG.debug("Waiting for SLAVE_RUNNING to change to %s.", status) - try: - utils.poll_until(verify_slave_status, sleep_time=3, - time_out=max_time) - LOG.info(_("Replication is now %s.") % status.lower()) - except PollTimeOut: - raise RuntimeError( - _("Replication is not %(status)s after %(max)d seconds.") % { - 'status': status.lower(), 'max': max_time}) - - def start_mysql(self, update_db=False): - LOG.info(_("Starting MySQL.")) - # This is the site of all the trouble in the restart tests. - # Essentially what happens is that mysql start fails, but does not - # die. It is then impossible to kill the original, so - - self._enable_mysql_on_boot() - - try: - mysql_service = operating_system.service_discovery( - MYSQL_SERVICE_CANDIDATES) - utils.execute_with_timeout(mysql_service['cmd_start'], shell=True) - except KeyError: - raise RuntimeError("Service is not discovered.") - except exception.ProcessExecutionError: - # it seems mysql (percona, at least) might come back with [Fail] - # but actually come up ok. we're looking into the timing issue on - # parallel, but for now, we'd like to give it one more chance to - # come up. so regardless of the execute_with_timeout() response, - # we'll assume mysql comes up and check it's status for a while. - pass - if not self.status.wait_for_real_status_to_change_to( - rd_instance.ServiceStatuses.RUNNING, - self.state_change_wait_time, update_db): - LOG.error(_("Start up of MySQL failed.")) - # If it won't start, but won't die either, kill it by hand so we - # don't let a rouge process wander around. - try: - utils.execute_with_timeout("sudo", "pkill", "-9", "mysql") - except exception.ProcessExecutionError: - LOG.exception(_("Error killing stalled MySQL start command.")) - # There's nothing more we can do... - self.status.end_install_or_restart() - raise RuntimeError("Could not start MySQL!") - - def start_db_with_conf_changes(self, config_contents): - LOG.info(_("Starting MySQL with conf changes.")) - LOG.debug("Inside the guest - Status is_running = (%s)." - % self.status.is_running) - if self.status.is_running: - LOG.error(_("Cannot execute start_db_with_conf_changes because " - "MySQL state == %s.") % self.status) - raise RuntimeError("MySQL not stopped.") - LOG.info(_("Resetting configuration.")) - self._reset_configuration(config_contents) - self.start_mysql(True) - - def reset_configuration(self, configuration): - config_contents = configuration['config_contents'] - LOG.info(_("Resetting configuration.")) - self._reset_configuration(config_contents) - - # DEPRECATED: Mantain for API Compatibility - def get_txn_count(self): - LOG.info(_("Retrieving latest txn id.")) - txn_count = 0 - with LocalSqlClient(get_engine()) as client: - result = client.execute('SELECT @@global.gtid_executed').first() - for uuid_set in result[0].split(','): - for interval in uuid_set.split(':')[1:]: - if '-' in interval: - iparts = interval.split('-') - txn_count += int(iparts[1]) - int(iparts[0]) - else: - txn_count += 1 - return txn_count - - def _get_slave_status(self): - with LocalSqlClient(get_engine()) as client: - return client.execute('SHOW SLAVE STATUS').first() - - def _get_master_UUID(self): - slave_status = self._get_slave_status() - return slave_status and slave_status['Master_UUID'] or None - - def _get_gtid_executed(self): - with LocalSqlClient(get_engine()) as client: - return client.execute('SELECT @@global.gtid_executed').first()[0] - - def get_last_txn(self): - master_UUID = self._get_master_UUID() - last_txn_id = '0' - gtid_executed = self._get_gtid_executed() - for gtid_set in gtid_executed.split(','): - uuid_set = gtid_set.split(':') - if uuid_set[0] == master_UUID: - last_txn_id = uuid_set[-1].split('-')[-1] - break - return master_UUID, int(last_txn_id) - - def get_latest_txn_id(self): - LOG.info(_("Retrieving latest txn id.")) - return self._get_gtid_executed() - - def wait_for_txn(self, txn): - LOG.info(_("Waiting on txn '%s'.") % txn) - with LocalSqlClient(get_engine()) as client: - client.execute("SELECT WAIT_UNTIL_SQL_THREAD_AFTER_GTIDS('%s')" - % txn) - - -class MySqlRootAccess(object): - - @classmethod - def is_root_enabled(cls): - """Return True if root access is enabled; False otherwise.""" - with LocalSqlClient(get_engine()) as client: - t = text(sql_query.ROOT_ENABLED) - result = client.execute(t) - LOG.debug("Found %s with remote root access." % result.rowcount) - return result.rowcount != 0 - - @classmethod - def enable_root(cls, root_password=None): - """Enable the root user global access and/or - reset the root password. - """ - user = models.RootUser() - user.name = "root" - user.host = "%" - user.password = root_password or utils.generate_random_password() - with LocalSqlClient(get_engine()) as client: - print(client) - try: - cu = sql_query.CreateUser(user.name, host=user.host) - t = text(str(cu)) - client.execute(t, **cu.keyArgs) - except exc.OperationalError as err: - # Ignore, user is already created, just reset the password - # TODO(rnirmal): More fine grained error checking later on - LOG.debug(err) - with LocalSqlClient(get_engine()) as client: - print(client) - uu = sql_query.UpdateUser(user.name, host=user.host, - clear=user.password) - t = text(str(uu)) - client.execute(t) - - LOG.debug("CONF.root_grant: %s CONF.root_grant_option: %s." % - (CONF.root_grant, CONF.root_grant_option)) - - g = sql_query.Grant(permissions=CONF.root_grant, - user=user.name, - host=user.host, - grant_option=CONF.root_grant_option, - clear=user.password) - - t = text(str(g)) - client.execute(t) - return user.serialize() +get_engine = MySqlApp.get_engine diff --git a/trove/guestagent/datastore/mysql/service_base.py b/trove/guestagent/datastore/mysql/service_base.py new file mode 100644 index 0000000000..abe6ed4467 --- /dev/null +++ b/trove/guestagent/datastore/mysql/service_base.py @@ -0,0 +1,1085 @@ +# Copyright 2013 OpenStack Foundation +# Copyright 2013 Rackspace Hosting +# Copyright 2013 Hewlett-Packard Development Company, L.P. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +import abc +from collections import defaultdict +import os +import re +import six +import uuid + +from oslo_log import log as logging +import sqlalchemy +from sqlalchemy import exc +from sqlalchemy import interfaces +from sqlalchemy.sql.expression import text + +from trove.common import cfg +from trove.common.configurations import MySQLConfParser +from trove.common import exception +from trove.common.exception import PollTimeOut +from trove.common.i18n import _ +from trove.common import instance as rd_instance +from trove.common.stream_codecs import IniCodec +from trove.common import utils as utils +from trove.guestagent.common.configuration import ConfigurationManager +from trove.guestagent.common.configuration import ImportOverrideStrategy +from trove.guestagent.common import guestagent_utils +from trove.guestagent.common import operating_system +from trove.guestagent.common import sql_query +from trove.guestagent.datastore import service +from trove.guestagent.db import models +from trove.guestagent import pkg + +ADMIN_USER_NAME = "os_admin" +LOG = logging.getLogger(__name__) +FLUSH = text(sql_query.FLUSH) +ENGINE = None +DATADIR = None +PREPARING = False +UUID = False + +TMP_MYCNF = "/tmp/my.cnf.tmp" +MYSQL_BASE_DIR = "/var/lib/mysql" + +CONF = cfg.CONF +MANAGER = CONF.datastore_manager if CONF.datastore_manager else 'mysql' + +INCLUDE_MARKER_OPERATORS = { + True: ">=", + False: ">" +} + +OS_NAME = operating_system.get_os() +MYSQL_CONFIG = {operating_system.REDHAT: "/etc/my.cnf", + operating_system.DEBIAN: "/etc/mysql/my.cnf", + operating_system.SUSE: "/etc/my.cnf"}[OS_NAME] +MYSQL_SERVICE_CANDIDATES = ["mysql", "mysqld", "mysql-server"] +MYSQL_BIN_CANDIDATES = ["/usr/sbin/mysqld", "/usr/libexec/mysqld"] +MYSQL_OWNER = 'mysql' +CNF_EXT = 'cnf' +CNF_INCLUDE_DIR = '/etc/mysql/conf.d/' +CNF_MASTER = 'master-replication' +CNF_SLAVE = 'slave-replication' + +# Create a package impl +packager = pkg.Package() + + +def clear_expired_password(): + """ + Some mysql installations generate random root password + and save it in /root/.mysql_secret, this password is + expired and should be changed by client that supports expired passwords. + """ + LOG.debug("Removing expired password.") + secret_file = "/root/.mysql_secret" + try: + out, err = utils.execute("cat", secret_file, + run_as_root=True, root_helper="sudo") + except exception.ProcessExecutionError: + LOG.exception(_("/root/.mysql_secret does not exist.")) + return + m = re.match('# The random password set for the root user at .*: (.*)', + out) + if m: + try: + out, err = utils.execute("mysqladmin", "-p%s" % m.group(1), + "password", "", run_as_root=True, + root_helper="sudo") + except exception.ProcessExecutionError: + LOG.exception(_("Cannot change mysql password.")) + return + operating_system.remove(secret_file, force=True, as_root=True) + LOG.debug("Expired password removed.") + + +def load_mysqld_options(): + # find mysqld bin + for bin in MYSQL_BIN_CANDIDATES: + if os.path.isfile(bin): + mysqld_bin = bin + break + else: + return {} + try: + out, err = utils.execute(mysqld_bin, "--print-defaults", + run_as_root=True, root_helper="sudo") + arglist = re.split("\n", out)[1].split() + args = defaultdict(list) + for item in arglist: + if "=" in item: + key, value = item.split("=", 1) + args[key.lstrip("--")].append(value) + else: + args[item.lstrip("--")].append(None) + return args + except exception.ProcessExecutionError: + return {} + + +class BaseMySqlAppStatus(service.BaseDbStatus): + @classmethod + def get(cls): + if not cls._instance: + cls._instance = BaseMySqlAppStatus() + return cls._instance + + def _get_actual_db_status(self): + try: + out, err = utils.execute_with_timeout( + "/usr/bin/mysqladmin", + "ping", run_as_root=True, root_helper="sudo", + log_output_on_error=True) + LOG.info(_("MySQL Service Status is RUNNING.")) + return rd_instance.ServiceStatuses.RUNNING + except exception.ProcessExecutionError: + LOG.exception(_("Failed to get database status.")) + try: + out, err = utils.execute_with_timeout("/bin/ps", "-C", + "mysqld", "h") + pid = out.split()[0] + # TODO(rnirmal): Need to create new statuses for instances + # where the mysql service is up, but unresponsive + LOG.info(_('MySQL Service Status %(pid)s is BLOCKED.') % + {'pid': pid}) + return rd_instance.ServiceStatuses.BLOCKED + except exception.ProcessExecutionError: + LOG.exception(_("Process execution failed.")) + mysql_args = load_mysqld_options() + pid_file = mysql_args.get('pid_file', + ['/var/run/mysqld/mysqld.pid'])[0] + if os.path.exists(pid_file): + LOG.info(_("MySQL Service Status is CRASHED.")) + return rd_instance.ServiceStatuses.CRASHED + else: + LOG.info(_("MySQL Service Status is SHUTDOWN.")) + return rd_instance.ServiceStatuses.SHUTDOWN + + +class BaseLocalSqlClient(object): + """A sqlalchemy wrapper to manage transactions.""" + + def __init__(self, engine, use_flush=True): + self.engine = engine + self.use_flush = use_flush + + def __enter__(self): + self.conn = self.engine.connect() + self.trans = self.conn.begin() + return self.conn + + def __exit__(self, type, value, traceback): + if self.trans: + if type is not None: # An error occurred + self.trans.rollback() + else: + if self.use_flush: + self.conn.execute(FLUSH) + self.trans.commit() + self.conn.close() + + def execute(self, t, **kwargs): + try: + return self.conn.execute(t, kwargs) + except Exception: + self.trans.rollback() + self.trans = None + raise + + +@six.add_metaclass(abc.ABCMeta) +class BaseMySqlAdmin(object): + """Handles administrative tasks on the MySQL database.""" + + def __init__(self, local_sql_client, mysql_root_access, + mysql_app): + self._local_sql_client = local_sql_client + self._mysql_root_access = mysql_root_access + self._mysql_app = mysql_app(local_sql_client) + + @property + def local_sql_client(self): + return self._local_sql_client + + @property + def mysql_root_access(self): + return self._mysql_root_access + + @property + def mysql_app(self): + return self._mysql_app + + def _associate_dbs(self, user): + """Internal. Given a MySQLUser, populate its databases attribute.""" + LOG.debug("Associating dbs to user %s at %s." % + (user.name, user.host)) + with self.local_sql_client(self.mysql_app.get_engine()) as client: + q = sql_query.Query() + q.columns = ["grantee", "table_schema"] + q.tables = ["information_schema.SCHEMA_PRIVILEGES"] + q.group = ["grantee", "table_schema"] + q.where = ["privilege_type != 'USAGE'"] + t = text(str(q)) + db_result = client.execute(t) + for db in db_result: + LOG.debug("\t db: %s." % db) + if db['grantee'] == "'%s'@'%s'" % (user.name, user.host): + mysql_db = models.MySQLDatabase() + mysql_db.name = db['table_schema'] + user.databases.append(mysql_db.serialize()) + + def change_passwords(self, users): + """Change the passwords of one or more existing users.""" + LOG.debug("Changing the password of some users.") + with self.local_sql_client(self.mysql_app.get_engine()) as client: + for item in users: + LOG.debug("Changing password for user %s." % item) + user_dict = {'_name': item['name'], + '_host': item['host'], + '_password': item['password']} + user = models.MySQLUser() + user.deserialize(user_dict) + LOG.debug("\tDeserialized: %s." % user.__dict__) + uu = sql_query.UpdateUser(user.name, host=user.host, + clear=user.password) + t = text(str(uu)) + client.execute(t) + + def update_attributes(self, username, hostname, user_attrs): + """Change the attributes of an existing user.""" + LOG.debug("Changing user attributes for user %s." % username) + user = self._get_user(username, hostname) + db_access = set() + grantee = set() + with self.local_sql_client(self.mysql_app.get_engine()) as client: + q = sql_query.Query() + q.columns = ["grantee", "table_schema"] + q.tables = ["information_schema.SCHEMA_PRIVILEGES"] + q.group = ["grantee", "table_schema"] + q.where = ["privilege_type != 'USAGE'"] + t = text(str(q)) + db_result = client.execute(t) + for db in db_result: + grantee.add(db['grantee']) + if db['grantee'] == "'%s'@'%s'" % (user.name, user.host): + db_name = db['table_schema'] + db_access.add(db_name) + with self.local_sql_client(self.mysql_app.get_engine()) as client: + uu = sql_query.UpdateUser(user.name, host=user.host, + clear=user_attrs.get('password'), + new_user=user_attrs.get('name'), + new_host=user_attrs.get('host')) + t = text(str(uu)) + client.execute(t) + uname = user_attrs.get('name') or username + host = user_attrs.get('host') or hostname + find_user = "'%s'@'%s'" % (uname, host) + if find_user not in grantee: + self.grant_access(uname, host, db_access) + + def create_database(self, databases): + """Create the list of specified databases.""" + with self.local_sql_client(self.mysql_app.get_engine()) as client: + for item in databases: + mydb = models.ValidatedMySQLDatabase() + mydb.deserialize(item) + cd = sql_query.CreateDatabase(mydb.name, + mydb.character_set, + mydb.collate) + t = text(str(cd)) + client.execute(t) + + def create_user(self, users): + """Create users and grant them privileges for the + specified databases. + """ + with self.local_sql_client(self.mysql_app.get_engine()) as client: + for item in users: + user = models.MySQLUser() + user.deserialize(item) + # TODO(cp16net):Should users be allowed to create users + # 'os_admin' or 'debian-sys-maint' + g = sql_query.Grant(user=user.name, host=user.host, + clear=user.password) + t = text(str(g)) + client.execute(t) + for database in user.databases: + mydb = models.ValidatedMySQLDatabase() + mydb.deserialize(database) + g = sql_query.Grant(permissions='ALL', database=mydb.name, + user=user.name, host=user.host, + clear=user.password) + t = text(str(g)) + client.execute(t) + + def delete_database(self, database): + """Delete the specified database.""" + with self.local_sql_client(self.mysql_app.get_engine()) as client: + mydb = models.ValidatedMySQLDatabase() + mydb.deserialize(database) + dd = sql_query.DropDatabase(mydb.name) + t = text(str(dd)) + client.execute(t) + + def delete_user(self, user): + """Delete the specified user.""" + mysql_user = models.MySQLUser() + mysql_user.deserialize(user) + self.delete_user_by_name(mysql_user.name, mysql_user.host) + + def delete_user_by_name(self, name, host='%'): + with self.local_sql_client(self.mysql_app.get_engine()) as client: + du = sql_query.DropUser(name, host=host) + t = text(str(du)) + LOG.debug("delete_user_by_name: %s", t) + client.execute(t) + + def get_user(self, username, hostname): + user = self._get_user(username, hostname) + if not user: + return None + return user.serialize() + + def _get_user(self, username, hostname): + """Return a single user matching the criteria.""" + user = models.MySQLUser() + try: + user.name = username # Could possibly throw a BadRequest here. + except ValueError as ve: + LOG.exception(_("Error Getting user information")) + raise exception.BadRequest(_("Username %(user)s is not valid" + ": %(reason)s") % + {'user': username, 'reason': ve.message} + ) + with self.local_sql_client(self.mysql_app.get_engine()) as client: + q = sql_query.Query() + q.columns = ['User', 'Host', 'Password'] + q.tables = ['mysql.user'] + q.where = ["Host != 'localhost'", + "User = '%s'" % username, + "Host = '%s'" % hostname] + q.order = ['User', 'Host'] + t = text(str(q)) + result = client.execute(t).fetchall() + LOG.debug("Getting user information %s." % result) + if len(result) != 1: + return None + found_user = result[0] + user.password = found_user['Password'] + user.host = found_user['Host'] + self._associate_dbs(user) + return user + + def grant_access(self, username, hostname, databases): + """Grant a user permission to use a given database.""" + user = self._get_user(username, hostname) + mydb = models.ValidatedMySQLDatabase() + with self.local_sql_client(self.mysql_app.get_engine()) as client: + for database in databases: + try: + mydb.name = database + except ValueError: + LOG.exception(_("Error granting access")) + raise exception.BadRequest(_( + "Grant access to %s is not allowed") % database) + + g = sql_query.Grant(permissions='ALL', database=mydb.name, + user=user.name, host=user.host, + hashed=user.password) + t = text(str(g)) + client.execute(t) + + def is_root_enabled(self): + """Return True if root access is enabled; False otherwise.""" + LOG.debug("Class type of mysql_root_access is %s " % + self.mysql_root_access) + return self.mysql_root_access.is_root_enabled() + + def enable_root(self, root_password=None): + """Enable the root user global access and/or + reset the root password. + """ + return self.mysql_root_access.enable_root(root_password) + + def list_databases(self, limit=None, marker=None, include_marker=False): + """List databases the user created on this mysql instance.""" + LOG.debug("---Listing Databases---") + ignored_database_names = "'%s'" % "', '".join(CONF.ignore_dbs) + LOG.debug("The following database names are on ignore list and will " + "be omitted from the listing: %s" % ignored_database_names) + databases = [] + with self.local_sql_client(self.mysql_app.get_engine()) as client: + # If you have an external volume mounted at /var/lib/mysql + # the lost+found directory will show up in mysql as a database + # which will create errors if you try to do any database ops + # on it. So we remove it here if it exists. + q = sql_query.Query() + q.columns = [ + 'schema_name as name', + 'default_character_set_name as charset', + 'default_collation_name as collation', + ] + q.tables = ['information_schema.schemata'] + q.where = ["schema_name NOT IN (" + ignored_database_names + ")"] + q.order = ['schema_name ASC'] + if limit: + q.limit = limit + 1 + if marker: + q.where.append("schema_name %s '%s'" % + (INCLUDE_MARKER_OPERATORS[include_marker], + marker)) + t = text(str(q)) + database_names = client.execute(t) + next_marker = None + LOG.debug("database_names = %r." % database_names) + for count, database in enumerate(database_names): + if count >= limit: + break + LOG.debug("database = %s." % str(database)) + mysql_db = models.MySQLDatabase() + mysql_db.name = database[0] + next_marker = mysql_db.name + mysql_db.character_set = database[1] + mysql_db.collate = database[2] + databases.append(mysql_db.serialize()) + LOG.debug("databases = " + str(databases)) + if database_names.rowcount <= limit: + next_marker = None + return databases, next_marker + + def list_users(self, limit=None, marker=None, include_marker=False): + """List users that have access to the database.""" + ''' + SELECT + User, + Host, + Marker + FROM + (SELECT + User, + Host, + CONCAT(User, '@', Host) as Marker + FROM mysql.user + ORDER BY 1, 2) as innerquery + WHERE + Marker > :marker + ORDER BY + Marker + LIMIT :limit; + ''' + LOG.debug("---Listing Users---") + users = [] + with self.local_sql_client(self.mysql_app.get_engine()) as client: + mysql_user = models.MySQLUser() + iq = sql_query.Query() # Inner query. + iq.columns = ['User', 'Host', "CONCAT(User, '@', Host) as Marker"] + iq.tables = ['mysql.user'] + iq.order = ['User', 'Host'] + innerquery = str(iq).rstrip(';') + + oq = sql_query.Query() # Outer query. + oq.columns = ['User', 'Host', 'Marker'] + oq.tables = ['(%s) as innerquery' % innerquery] + oq.where = ["Host != 'localhost'"] + oq.order = ['Marker'] + if marker: + oq.where.append("Marker %s '%s'" % + (INCLUDE_MARKER_OPERATORS[include_marker], + marker)) + if limit: + oq.limit = limit + 1 + t = text(str(oq)) + result = client.execute(t) + next_marker = None + LOG.debug("result = " + str(result)) + for count, row in enumerate(result): + if count >= limit: + break + LOG.debug("user = " + str(row)) + mysql_user = models.MySQLUser() + mysql_user.name = row['User'] + mysql_user.host = row['Host'] + self._associate_dbs(mysql_user) + next_marker = row['Marker'] + users.append(mysql_user.serialize()) + if result.rowcount <= limit: + next_marker = None + LOG.debug("users = " + str(users)) + + return users, next_marker + + def revoke_access(self, username, hostname, database): + """Revoke a user's permission to use a given database.""" + user = self._get_user(username, hostname) + with self.local_sql_client(self.mysql_app.get_engine()) as client: + r = sql_query.Revoke(database=database, + user=user.name, + host=user.host) + t = text(str(r)) + client.execute(t) + + def list_access(self, username, hostname): + """Show all the databases to which the user has more than + USAGE granted. + """ + user = self._get_user(username, hostname) + return user.databases + + +class BaseKeepAliveConnection(interfaces.PoolListener): + """ + A connection pool listener that ensures live connections are returned + from the connection pool at checkout. This alleviates the problem of + MySQL connections timing out. + """ + + def checkout(self, dbapi_con, con_record, con_proxy): + """Event triggered when a connection is checked out from the pool.""" + try: + try: + dbapi_con.ping(False) + except TypeError: + dbapi_con.ping() + except dbapi_con.OperationalError as ex: + if ex.args[0] in (2006, 2013, 2014, 2045, 2055): + raise exc.DisconnectionError() + else: + raise + + +@six.add_metaclass(abc.ABCMeta) +class BaseMySqlApp(object): + """Prepares DBaaS on a Guest container.""" + + TIME_OUT = 1000 + + @property + def local_sql_client(self): + return self._local_sql_client + + @property + def keep_alive_connection_cls(self): + return self._keep_alive_connection_cls + + configuration_manager = ConfigurationManager( + MYSQL_CONFIG, MYSQL_OWNER, MYSQL_OWNER, IniCodec(), requires_root=True, + override_strategy=ImportOverrideStrategy(CNF_INCLUDE_DIR, CNF_EXT)) + + def get_engine(self): + """Create the default engine with the updated admin user.""" + # TODO(rnirmal):Based on permission issues being resolved we may revert + # url = URL(drivername='mysql', host='localhost', + # query={'read_default_file': '/etc/mysql/my.cnf'}) + global ENGINE + if ENGINE: + return ENGINE + + pwd = self.get_auth_password() + ENGINE = sqlalchemy.create_engine("mysql://%s:%s@localhost:3306" % + (ADMIN_USER_NAME, pwd.strip()), + pool_recycle=7200, + echo=CONF.sql_query_logging, + listeners=[ + self.keep_alive_connection_cls()] + ) + return ENGINE + + @classmethod + def get_auth_password(cls): + return cls.configuration_manager.get_value('client').get('password') + + @classmethod + def get_data_dir(cls): + return cls.configuration_manager.get_value( + MySQLConfParser.SERVER_CONF_SECTION).get('datadir') + + @classmethod + def set_data_dir(cls, value): + cls.configuration_manager.apply_system_override( + {MySQLConfParser.SERVER_CONF_SECTION: {'datadir': value}}) + + def __init__(self, status, local_sql_client, keep_alive_connection_cls): + """By default login with root no password for initial setup.""" + self.state_change_wait_time = CONF.state_change_wait_time + self.status = status + self._local_sql_client = local_sql_client + self._keep_alive_connection_cls = keep_alive_connection_cls + + def _create_admin_user(self, client, password): + """ + Create a os_admin user with a random password + with all privileges similar to the root user. + """ + localhost = "localhost" + g = sql_query.Grant(permissions='ALL', user=ADMIN_USER_NAME, + host=localhost, grant_option=True, clear=password) + t = text(str(g)) + client.execute(t) + + @staticmethod + def _generate_root_password(client): + """Generate and set a random root password and forget about it.""" + localhost = "localhost" + uu = sql_query.UpdateUser("root", host=localhost, + clear=utils.generate_random_password()) + t = text(str(uu)) + client.execute(t) + + def install_if_needed(self, packages): + """Prepare the guest machine with a secure + mysql server installation. + """ + LOG.info(_("Preparing Guest as MySQL Server.")) + if not packager.pkg_is_installed(packages): + LOG.debug("Installing MySQL server.") + self._clear_mysql_config() + # set blank password on pkg configuration stage + pkg_opts = {'root_password': '', + 'root_password_again': ''} + packager.pkg_install(packages, pkg_opts, self.TIME_OUT) + self._create_mysql_confd_dir() + LOG.info(_("Finished installing MySQL server.")) + self.start_mysql() + + def complete_install_or_restart(self): + self.status.end_install_or_restart() + + def secure(self, config_contents, overrides): + LOG.info(_("Generating admin password.")) + admin_password = utils.generate_random_password() + clear_expired_password() + engine = sqlalchemy.create_engine("mysql://root:@localhost:3306", + echo=True) + with self.local_sql_client(engine) as client: + self._remove_anonymous_user(client) + self._create_admin_user(client, admin_password) + + self.stop_db() + + self._reset_configuration(config_contents, admin_password) + self._apply_user_overrides(overrides) + self.start_mysql() + + LOG.debug("MySQL secure complete.") + + def _reset_configuration(self, configuration, admin_password=None): + if not admin_password: + # Take the current admin password from the base configuration file + # if not given. + admin_password = self.get_auth_password() + + self.configuration_manager.save_configuration(configuration) + self._save_authentication_properties(admin_password) + self.wipe_ib_logfiles() + + def _save_authentication_properties(self, admin_password): + self.configuration_manager.apply_system_override( + {'client': {'user': ADMIN_USER_NAME, 'password': admin_password}}) + + def secure_root(self, secure_remote_root=True): + with self.local_sql_client(self.get_engine()) as client: + LOG.info(_("Preserving root access from restore.")) + self._generate_root_password(client) + if secure_remote_root: + self._remove_remote_root_access(client) + + def _clear_mysql_config(self): + """Clear old configs, which can be incompatible with new version.""" + LOG.debug("Clearing old MySQL config.") + random_uuid = str(uuid.uuid4()) + configs = ["/etc/my.cnf", "/etc/mysql/conf.d", "/etc/mysql/my.cnf"] + for config in configs: + try: + old_conf_backup = "%s_%s" % (config, random_uuid) + operating_system.move(config, old_conf_backup, as_root=True) + LOG.debug("%s saved to %s_%s." % + (config, config, random_uuid)) + except exception.ProcessExecutionError: + pass + + def _create_mysql_confd_dir(self): + conf_dir = "/etc/mysql/conf.d" + LOG.debug("Creating %s." % conf_dir) + operating_system.create_directory(conf_dir, as_root=True) + + def _enable_mysql_on_boot(self): + LOG.debug("Enabling MySQL on boot.") + try: + mysql_service = operating_system.service_discovery( + MYSQL_SERVICE_CANDIDATES) + utils.execute_with_timeout(mysql_service['cmd_enable'], shell=True) + except KeyError: + LOG.exception(_("Error enabling MySQL start on boot.")) + raise RuntimeError("Service is not discovered.") + + def _disable_mysql_on_boot(self): + try: + mysql_service = operating_system.service_discovery( + MYSQL_SERVICE_CANDIDATES) + utils.execute_with_timeout(mysql_service['cmd_disable'], + shell=True) + except KeyError: + LOG.exception(_("Error disabling MySQL start on boot.")) + raise RuntimeError("Service is not discovered.") + + def stop_db(self, update_db=False, do_not_start_on_reboot=False): + LOG.info(_("Stopping MySQL.")) + if do_not_start_on_reboot: + self._disable_mysql_on_boot() + try: + mysql_service = operating_system.service_discovery( + MYSQL_SERVICE_CANDIDATES) + utils.execute_with_timeout(mysql_service['cmd_stop'], shell=True) + except KeyError: + LOG.exception(_("Error stopping MySQL.")) + raise RuntimeError("Service is not discovered.") + if not self.status.wait_for_real_status_to_change_to( + rd_instance.ServiceStatuses.SHUTDOWN, + self.state_change_wait_time, update_db): + LOG.error(_("Could not stop MySQL.")) + self.status.end_install_or_restart() + raise RuntimeError("Could not stop MySQL!") + + def _remove_anonymous_user(self, client): + t = text(sql_query.REMOVE_ANON) + client.execute(t) + + def _remove_remote_root_access(self, client): + t = text(sql_query.REMOVE_ROOT) + client.execute(t) + + def restart(self): + try: + self.status.begin_restart() + self.stop_db() + self.start_mysql() + finally: + self.status.end_install_or_restart() + + def update_overrides(self, overrides): + self._apply_user_overrides(overrides) + + def _apply_user_overrides(self, overrides): + # All user-defined values go to the server section of the configuration + # file. + if overrides: + self.configuration_manager.apply_user_override( + {MySQLConfParser.SERVER_CONF_SECTION: overrides}) + + def apply_overrides(self, overrides): + LOG.debug("Applying overrides to MySQL.") + with self.local_sql_client(self.get_engine()) as client: + LOG.debug("Updating override values in running MySQL.") + for k, v in overrides.iteritems(): + byte_value = guestagent_utils.to_bytes(v) + q = sql_query.SetServerVariable(key=k, value=byte_value) + t = text(str(q)) + try: + client.execute(t) + except exc.OperationalError: + output = {'key': k, 'value': byte_value} + LOG.exception(_("Unable to set %(key)s with value " + "%(value)s.") % output) + + def make_read_only(self, read_only): + with self.local_sql_client(self.get_engine()) as client: + q = "set global read_only = %s" % read_only + client.execute(text(str(q))) + + def wipe_ib_logfiles(self): + """Destroys the iblogfiles. + + If for some reason the selected log size in the conf changes from the + current size of the files MySQL will fail to start, so we delete the + files to be safe. + """ + LOG.info(_("Wiping ib_logfiles.")) + for index in range(2): + try: + # On restarts, sometimes these are wiped. So it can be a race + # to have MySQL start up before it's restarted and these have + # to be deleted. That's why its ok if they aren't found and + # that is why we use the "force" option to "remove". + operating_system.remove("%s/ib_logfile%d" + % (self.get_data_dir(), index), + force=True, as_root=True) + except exception.ProcessExecutionError: + LOG.exception("Could not delete logfile.") + raise + + def remove_overrides(self): + self.configuration_manager.remove_user_override() + + def _remove_replication_overrides(self, cnf_file): + LOG.info(_("Removing replication configuration file.")) + if os.path.exists(cnf_file): + operating_system.remove(cnf_file, as_root=True) + + def exists_replication_source_overrides(self): + return self.configuration_manager.has_system_override(CNF_MASTER) + + def write_replication_source_overrides(self, overrideValues): + self.configuration_manager.apply_system_override(overrideValues, + CNF_MASTER) + + def write_replication_replica_overrides(self, overrideValues): + self.configuration_manager.apply_system_override(overrideValues, + CNF_SLAVE) + + def remove_replication_source_overrides(self): + self.configuration_manager.remove_system_override(CNF_MASTER) + + def remove_replication_replica_overrides(self): + self.configuration_manager.remove_system_override(CNF_SLAVE) + + def grant_replication_privilege(self, replication_user): + LOG.info(_("Granting Replication Slave privilege.")) + + LOG.debug("grant_replication_privilege: %s" % replication_user) + + with self.local_sql_client(self.get_engine()) as client: + g = sql_query.Grant(permissions=['REPLICATION SLAVE'], + user=replication_user['name'], + clear=replication_user['password']) + + t = text(str(g)) + client.execute(t) + + def get_port(self): + with self.local_sql_client(self.get_engine()) as client: + result = client.execute('SELECT @@port').first() + return result[0] + + def get_binlog_position(self): + with self.local_sql_client(self.get_engine()) as client: + result = client.execute('SHOW MASTER STATUS').first() + binlog_position = { + 'log_file': result['File'], + 'position': result['Position'] + } + return binlog_position + + def execute_on_client(self, sql_statement): + LOG.debug("Executing SQL: %s" % sql_statement) + with self.local_sql_client(self.get_engine()) as client: + return client.execute(sql_statement) + + def start_slave(self): + LOG.info(_("Starting slave replication.")) + with self.local_sql_client(self.get_engine()) as client: + client.execute('START SLAVE') + self._wait_for_slave_status("ON", client, 60) + + def stop_slave(self, for_failover): + replication_user = None + LOG.info(_("Stopping slave replication.")) + with self.local_sql_client(self.get_engine()) as client: + result = client.execute('SHOW SLAVE STATUS') + replication_user = result.first()['Master_User'] + client.execute('STOP SLAVE') + client.execute('RESET SLAVE ALL') + self._wait_for_slave_status("OFF", client, 30) + if not for_failover: + client.execute('DROP USER ' + replication_user) + return { + 'replication_user': replication_user + } + + def stop_master(self): + LOG.info(_("Stopping replication master.")) + with self.local_sql_client(self.get_engine()) as client: + client.execute('RESET MASTER') + + def _wait_for_slave_status(self, status, client, max_time): + + def verify_slave_status(): + actual_status = client.execute( + "SHOW GLOBAL STATUS like 'slave_running'").first()[1] + return actual_status.upper() == status.upper() + + LOG.debug("Waiting for SLAVE_RUNNING to change to %s.", status) + try: + utils.poll_until(verify_slave_status, sleep_time=3, + time_out=max_time) + LOG.info(_("Replication is now %s.") % status.lower()) + except PollTimeOut: + raise RuntimeError( + _("Replication is not %(status)s after %(max)d seconds.") % { + 'status': status.lower(), 'max': max_time}) + + def start_mysql(self, update_db=False): + LOG.info(_("Starting MySQL.")) + # This is the site of all the trouble in the restart tests. + # Essentially what happens is that mysql start fails, but does not + # die. It is then impossible to kill the original, so + + self._enable_mysql_on_boot() + + try: + mysql_service = operating_system.service_discovery( + MYSQL_SERVICE_CANDIDATES) + utils.execute_with_timeout(mysql_service['cmd_start'], shell=True) + except KeyError: + raise RuntimeError("Service is not discovered.") + except exception.ProcessExecutionError: + # it seems mysql (percona, at least) might come back with [Fail] + # but actually come up ok. we're looking into the timing issue on + # parallel, but for now, we'd like to give it one more chance to + # come up. so regardless of the execute_with_timeout() response, + # we'll assume mysql comes up and check it's status for a while. + pass + if not self.status.wait_for_real_status_to_change_to( + rd_instance.ServiceStatuses.RUNNING, + self.state_change_wait_time, update_db): + LOG.error(_("Start up of MySQL failed.")) + # If it won't start, but won't die either, kill it by hand so we + # don't let a rouge process wander around. + try: + utils.execute_with_timeout("sudo", "pkill", "-9", "mysql") + except exception.ProcessExecutionError: + LOG.exception(_("Error killing stalled MySQL start command.")) + # There's nothing more we can do... + self.status.end_install_or_restart() + raise RuntimeError("Could not start MySQL!") + + def start_db_with_conf_changes(self, config_contents): + LOG.info(_("Starting MySQL with conf changes.")) + LOG.debug("Inside the guest - Status is_running = (%s)." + % self.status.is_running) + if self.status.is_running: + LOG.error(_("Cannot execute start_db_with_conf_changes because " + "MySQL state == %s.") % self.status) + raise RuntimeError("MySQL not stopped.") + LOG.info(_("Resetting configuration.")) + self._reset_configuration(config_contents) + self.start_mysql(True) + + def reset_configuration(self, configuration): + config_contents = configuration['config_contents'] + LOG.info(_("Resetting configuration.")) + self._reset_configuration(config_contents) + + # DEPRECATED: Mantain for API Compatibility + def get_txn_count(self): + LOG.info(_("Retrieving latest txn id.")) + txn_count = 0 + with self.local_sql_client(self.get_engine()) as client: + result = client.execute('SELECT @@global.gtid_executed').first() + for uuid_set in result[0].split(','): + for interval in uuid_set.split(':')[1:]: + if '-' in interval: + iparts = interval.split('-') + txn_count += int(iparts[1]) - int(iparts[0]) + else: + txn_count += 1 + return txn_count + + def _get_slave_status(self): + with self.local_sql_client(self.get_engine()) as client: + return client.execute('SHOW SLAVE STATUS').first() + + def _get_master_UUID(self): + slave_status = self._get_slave_status() + return slave_status and slave_status['Master_UUID'] or None + + def _get_gtid_executed(self): + with self.local_sql_client(self.get_engine()) as client: + return client.execute('SELECT @@global.gtid_executed').first()[0] + + def get_last_txn(self): + master_UUID = self._get_master_UUID() + last_txn_id = '0' + gtid_executed = self._get_gtid_executed() + for gtid_set in gtid_executed.split(','): + uuid_set = gtid_set.split(':') + if uuid_set[0] == master_UUID: + last_txn_id = uuid_set[-1].split('-')[-1] + break + return master_UUID, int(last_txn_id) + + def get_latest_txn_id(self): + LOG.info(_("Retrieving latest txn id.")) + return self._get_gtid_executed() + + def wait_for_txn(self, txn): + LOG.info(_("Waiting on txn '%s'.") % txn) + with self.local_sql_client(self.get_engine()) as client: + client.execute("SELECT WAIT_UNTIL_SQL_THREAD_AFTER_GTIDS('%s')" + % txn) + + +class BaseMySqlRootAccess(object): + + def __init__(self, local_sql_client, mysql_app): + self._local_sql_client = local_sql_client + self._mysql_app = mysql_app + + @property + def mysql_app(self): + return self._mysql_app + + @property + def local_sql_client(self): + return self._local_sql_client + + def is_root_enabled(self): + """Return True if root access is enabled; False otherwise.""" + with self.local_sql_client(self.mysql_app.get_engine()) as client: + t = text(sql_query.ROOT_ENABLED) + result = client.execute(t) + LOG.debug("Found %s with remote root access." % result.rowcount) + return result.rowcount != 0 + + def enable_root(self, root_password=None): + """Enable the root user global access and/or + reset the root password. + """ + user = models.RootUser() + user.name = "root" + user.host = "%" + user.password = root_password or utils.generate_random_password() + with self.local_sql_client(self.mysql_app.get_engine()) as client: + print(client) + try: + cu = sql_query.CreateUser(user.name, host=user.host) + t = text(str(cu)) + client.execute(t, **cu.keyArgs) + except exc.OperationalError as err: + # Ignore, user is already created, just reset the password + # TODO(rnirmal): More fine grained error checking later on + LOG.debug(err) + with self.local_sql_client(self.mysql_app.get_engine()) as client: + print(client) + uu = sql_query.UpdateUser(user.name, host=user.host, + clear=user.password) + t = text(str(uu)) + client.execute(t) + + LOG.debug("CONF.root_grant: %s CONF.root_grant_option: %s." % + (CONF.root_grant, CONF.root_grant_option)) + + g = sql_query.Grant(permissions=CONF.root_grant, + user=user.name, + host=user.host, + grant_option=CONF.root_grant_option, + clear=user.password) + + t = text(str(g)) + client.execute(t) + return user.serialize() diff --git a/trove/guestagent/dbaas.py b/trove/guestagent/dbaas.py index 16fc4e8037..35fe5fec7e 100644 --- a/trove/guestagent/dbaas.py +++ b/trove/guestagent/dbaas.py @@ -37,7 +37,7 @@ defaults = { 'mysql': 'trove.guestagent.datastore.mysql.manager.Manager', 'percona': - 'trove.guestagent.datastore.mysql.manager.Manager', + 'trove.guestagent.datastore.experimental.percona.manager.Manager', 'redis': 'trove.guestagent.datastore.experimental.redis.manager.Manager', 'cassandra': @@ -54,6 +54,8 @@ defaults = { 'trove.guestagent.datastore.experimental.vertica.manager.Manager', 'db2': 'trove.guestagent.datastore.experimental.db2.manager.Manager', + 'mariadb': + 'trove.guestagent.datastore.experimental.mariadb.manager.Manager' } CONF = cfg.CONF diff --git a/trove/guestagent/strategies/backup/mysql_impl.py b/trove/guestagent/strategies/backup/mysql_impl.py index e38e681e69..7d51042fdb 100644 --- a/trove/guestagent/strategies/backup/mysql_impl.py +++ b/trove/guestagent/strategies/backup/mysql_impl.py @@ -20,8 +20,8 @@ import re from oslo_log import log as logging from trove.common.i18n import _ -from trove.guestagent.datastore.mysql.service import ADMIN_USER_NAME from trove.guestagent.datastore.mysql.service import MySqlApp +from trove.guestagent.datastore.mysql.service_base import ADMIN_USER_NAME from trove.guestagent.strategies.backup import base LOG = logging.getLogger(__name__) diff --git a/trove/tests/unittests/guestagent/test_dbaas.py b/trove/tests/unittests/guestagent/test_dbaas.py index 10cc9b8ea6..6150981653 100644 --- a/trove/tests/unittests/guestagent/test_dbaas.py +++ b/trove/tests/unittests/guestagent/test_dbaas.py @@ -70,6 +70,7 @@ from trove.guestagent.datastore.mysql.service import MySqlAdmin from trove.guestagent.datastore.mysql.service import MySqlApp from trove.guestagent.datastore.mysql.service import MySqlAppStatus from trove.guestagent.datastore.mysql.service import MySqlRootAccess +import trove.guestagent.datastore.mysql.service_base as dbaas_base from trove.guestagent.datastore.service import BaseDbStatus from trove.guestagent.db import models from trove.guestagent import dbaas as dbaas_sr @@ -118,30 +119,33 @@ class DbaasTest(testtools.TestCase): def setUp(self): super(DbaasTest, self).setUp() - self.orig_utils_execute_with_timeout = dbaas.utils.execute_with_timeout - self.orig_utils_execute = dbaas.utils.execute + self.orig_utils_execute_with_timeout = \ + dbaas_base.utils.execute_with_timeout + self.orig_utils_execute = dbaas_base.utils.execute def tearDown(self): super(DbaasTest, self).tearDown() - dbaas.utils.execute_with_timeout = self.orig_utils_execute_with_timeout - dbaas.utils.execute = self.orig_utils_execute + dbaas_base.utils.execute_with_timeout = \ + self.orig_utils_execute_with_timeout + dbaas_base.utils.execute = self.orig_utils_execute @patch.object(operating_system, 'remove') def test_clear_expired_password(self, mock_remove): secret_content = ("# The random password set for the " "root user at Wed May 14 14:06:38 2014 " "(local time): somepassword") - with patch.object(dbaas.utils, 'execute', + with patch.object(dbaas_base.utils, 'execute', return_value=(secret_content, None)): - dbaas.clear_expired_password() - self.assertEqual(2, dbaas.utils.execute.call_count) + dbaas_base.clear_expired_password() + self.assertEqual(2, dbaas_base.utils.execute.call_count) self.assertEqual(1, mock_remove.call_count) @patch.object(operating_system, 'remove') def test_no_secret_content_clear_expired_password(self, mock_remove): - with patch.object(dbaas.utils, 'execute', return_value=('', None)): - dbaas.clear_expired_password() - self.assertEqual(1, dbaas.utils.execute.call_count) + with patch.object(dbaas_base.utils, 'execute', + return_value=('', None)): + dbaas_base.clear_expired_password() + self.assertEqual(1, dbaas_base.utils.execute.call_count) mock_remove.assert_not_called() @patch.object(operating_system, 'remove') @@ -150,19 +154,20 @@ class DbaasTest(testtools.TestCase): secret_content = ("# The random password set for the " "root user at Wed May 14 14:06:38 2014 " "(local time): somepassword") - with patch.object(dbaas.utils, 'execute', + with patch.object(dbaas_base.utils, 'execute', side_effect=[(secret_content, None), ProcessExecutionError]): - dbaas.clear_expired_password() - self.assertEqual(2, dbaas.utils.execute.call_count) + dbaas_base.clear_expired_password() + self.assertEqual(2, dbaas_base.utils.execute.call_count) mock_remove.assert_not_called() @patch.object(operating_system, 'remove') - @patch.object(dbaas.utils, 'execute', side_effect=ProcessExecutionError) + @patch.object(dbaas_base.utils, 'execute', + side_effect=ProcessExecutionError) def test_fail_retrieve_secret_content_clear_expired_password(self, mock_execute, mock_remove): - dbaas.clear_expired_password() + dbaas_base.clear_expired_password() self.assertEqual(1, mock_execute.call_count) mock_remove.assert_not_called() @@ -181,7 +186,8 @@ class DbaasTest(testtools.TestCase): def test_service_discovery(self): with patch.object(os.path, 'isfile', return_value=True): - mysql_service = dbaas.operating_system.service_discovery(["mysql"]) + mysql_service = \ + dbaas_base.operating_system.service_discovery(["mysql"]) self.assertIsNotNone(mysql_service['cmd_start']) self.assertIsNotNone(mysql_service['cmd_enable']) @@ -192,8 +198,8 @@ class DbaasTest(testtools.TestCase): "--tmpdir=/tmp --skip-external-locking" with patch.object(os.path, 'isfile', return_value=True): - dbaas.utils.execute = Mock(return_value=(output, None)) - options = dbaas.load_mysqld_options() + dbaas_base.utils.execute = Mock(return_value=(output, None)) + options = dbaas_base.load_mysqld_options() self.assertEqual(5, len(options)) self.assertEqual(["mysql"], options["user"]) @@ -208,8 +214,8 @@ class DbaasTest(testtools.TestCase): "--plugin-load=federated=ha_federated.so") with patch.object(os.path, 'isfile', return_value=True): - dbaas.utils.execute = Mock(return_value=(output, None)) - options = dbaas.load_mysqld_options() + dbaas_base.utils.execute = Mock(return_value=(output, None)) + options = dbaas_base.load_mysqld_options() self.assertEqual(1, len(options)) self.assertEqual(["blackhole=ha_blackhole.so", @@ -219,9 +225,9 @@ class DbaasTest(testtools.TestCase): @patch.object(os.path, 'isfile', return_value=True) def test_load_mysqld_options_error(self, mock_exists): - dbaas.utils.execute = Mock(side_effect=ProcessExecutionError()) + dbaas_base.utils.execute = Mock(side_effect=ProcessExecutionError()) - self.assertFalse(dbaas.load_mysqld_options()) + self.assertFalse(dbaas_base.load_mysqld_options()) class ResultSetStub(object): @@ -242,8 +248,15 @@ class ResultSetStub(object): class MySqlAdminMockTest(testtools.TestCase): + def setUp(self): + super(MySqlAdminMockTest, self).setUp() + dbaas.orig_configuration_manager = dbaas.MySqlApp.configuration_manager + dbaas.MySqlApp.configuration_manager = Mock() + def tearDown(self): super(MySqlAdminMockTest, self).tearDown() + dbaas.MySqlApp.configuration_manager = \ + dbaas.orig_configuration_manager @patch('trove.guestagent.datastore.mysql.service.MySqlApp' '.get_auth_password', return_value='some_password') @@ -279,6 +292,10 @@ class MySqlAdminTest(testtools.TestCase): dbaas.LocalSqlClient.__enter__ = Mock() dbaas.LocalSqlClient.__exit__ = Mock() dbaas.LocalSqlClient.execute = Mock() + # trove.guestagent.common.configuration import ConfigurationManager + dbaas.orig_configuration_manager = dbaas.MySqlApp.configuration_manager + dbaas.MySqlApp.configuration_manager = Mock() + self.mySqlAdmin = MySqlAdmin() def tearDown(self): @@ -291,6 +308,8 @@ class MySqlAdminTest(testtools.TestCase): dbaas.LocalSqlClient.execute = self.orig_LocalSqlClient_execute models.MySQLUser._is_valid_user_name = ( self.orig_MySQLUser_is_valid_user_name) + dbaas.MySqlApp.configuration_manager = \ + dbaas.orig_configuration_manager def test__associate_dbs(self): db_result = [{"grantee": "'test_user'@'%'", "table_schema": "db1"}, @@ -732,7 +751,8 @@ class MySqlAppTest(testtools.TestCase): def setUp(self): super(MySqlAppTest, self).setUp() - self.orig_utils_execute_with_timeout = dbaas.utils.execute_with_timeout + self.orig_utils_execute_with_timeout = \ + dbaas_base.utils.execute_with_timeout self.orig_time_sleep = time.sleep self.orig_unlink = os.unlink self.orig_get_auth_password = MySqlApp.get_auth_password @@ -759,15 +779,20 @@ class MySqlAppTest(testtools.TestCase): self.mock_client.__enter__ = Mock() self.mock_client.__exit__ = Mock() self.mock_client.__enter__.return_value.execute = self.mock_execute + dbaas.orig_configuration_manager = dbaas.MySqlApp.configuration_manager + dbaas.MySqlApp.configuration_manager = Mock() def tearDown(self): super(MySqlAppTest, self).tearDown() - dbaas.utils.execute_with_timeout = self.orig_utils_execute_with_timeout + dbaas_base.utils.execute_with_timeout = \ + self.orig_utils_execute_with_timeout time.sleep = self.orig_time_sleep os.unlink = self.orig_unlink operating_system.service_discovery = self.orig_service_discovery MySqlApp.get_auth_password = self.orig_get_auth_password InstanceServiceStatus.find_by(instance_id=self.FAKE_ID).delete() + dbaas.MySqlApp.configuration_manager = \ + dbaas.orig_configuration_manager def assert_reported_status(self, expected_status): service_status = InstanceServiceStatus.find_by( @@ -802,7 +827,7 @@ class MySqlAppTest(testtools.TestCase): def test_stop_mysql(self): - dbaas.utils.execute_with_timeout = Mock() + dbaas_base.utils.execute_with_timeout = Mock() self.appStatus.set_next_status( rd_instance.ServiceStatuses.SHUTDOWN) @@ -811,7 +836,7 @@ class MySqlAppTest(testtools.TestCase): def test_stop_mysql_with_db_update(self): - dbaas.utils.execute_with_timeout = Mock() + dbaas_base.utils.execute_with_timeout = Mock() self.appStatus.set_next_status( rd_instance.ServiceStatuses.SHUTDOWN) @@ -836,7 +861,7 @@ class MySqlAppTest(testtools.TestCase): def test_stop_mysql_error(self): - dbaas.utils.execute_with_timeout = Mock() + dbaas_base.utils.execute_with_timeout = Mock() self.appStatus.set_next_status(rd_instance.ServiceStatuses.RUNNING) self.mySqlApp.state_change_wait_time = 1 self.assertRaises(RuntimeError, self.mySqlApp.stop_db) @@ -882,14 +907,14 @@ class MySqlAppTest(testtools.TestCase): def test_wipe_ib_logfiles_error(self, get_datadir_mock): mocked = Mock(side_effect=ProcessExecutionError('Error')) - dbaas.utils.execute_with_timeout = mocked + dbaas_base.utils.execute_with_timeout = mocked self.assertRaises(ProcessExecutionError, self.mySqlApp.wipe_ib_logfiles) def test_start_mysql(self): - dbaas.utils.execute_with_timeout = Mock() + dbaas_base.utils.execute_with_timeout = Mock() self.appStatus.set_next_status(rd_instance.ServiceStatuses.RUNNING) self.mySqlApp._enable_mysql_on_boot = Mock() self.mySqlApp.start_mysql() @@ -897,7 +922,7 @@ class MySqlAppTest(testtools.TestCase): def test_start_mysql_with_db_update(self): - dbaas.utils.execute_with_timeout = Mock() + dbaas_base.utils.execute_with_timeout = Mock() self.mySqlApp._enable_mysql_on_boot = Mock() self.appStatus.set_next_status(rd_instance.ServiceStatuses.RUNNING) @@ -909,7 +934,7 @@ class MySqlAppTest(testtools.TestCase): def test_start_mysql_runs_forever(self): - dbaas.utils.execute_with_timeout = Mock() + dbaas_base.utils.execute_with_timeout = Mock() self.mySqlApp._enable_mysql_on_boot = Mock() self.mySqlApp.state_change_wait_time = 1 self.appStatus.set_next_status(rd_instance.ServiceStatuses.SHUTDOWN) @@ -924,7 +949,7 @@ class MySqlAppTest(testtools.TestCase): self.mySqlApp._enable_mysql_on_boot = Mock() mocked = Mock(side_effect=ProcessExecutionError('Error')) - dbaas.utils.execute_with_timeout = mocked + dbaas_base.utils.execute_with_timeout = mocked self.assertRaises(RuntimeError, self.mySqlApp.start_mysql) @@ -976,13 +1001,14 @@ class MySqlAppTest(testtools.TestCase): save_cfg_mock.assert_called_once_with('some junk') apply_mock.assert_called_once_with( - {'client': {'user': dbaas.ADMIN_USER_NAME, + {'client': {'user': dbaas_base.ADMIN_USER_NAME, 'password': auth_pwd_mock.return_value}}) wipe_ib_mock.assert_called_once_with() @patch.object(utils, 'execute_with_timeout') def test__enable_mysql_on_boot(self, mock_execute): - mysql_service = dbaas.operating_system.service_discovery(["mysql"]) + mysql_service = \ + dbaas_base.operating_system.service_discovery(["mysql"]) self.mySqlApp._enable_mysql_on_boot() self.assertEqual(1, mock_execute.call_count) mock_execute.assert_called_with(mysql_service['cmd_enable'], @@ -998,7 +1024,8 @@ class MySqlAppTest(testtools.TestCase): @patch.object(utils, 'execute_with_timeout') def test__disable_mysql_on_boot(self, mock_execute): - mysql_service = dbaas.operating_system.service_discovery(["mysql"]) + mysql_service = \ + dbaas_base.operating_system.service_discovery(["mysql"]) self.mySqlApp._disable_mysql_on_boot() self.assertEqual(1, mock_execute.call_count) mock_execute.assert_called_with(mysql_service['cmd_disable'], @@ -1030,26 +1057,26 @@ class MySqlAppTest(testtools.TestCase): 'apply_system_override') as apply_sys_mock: self.mySqlApp.write_replication_source_overrides('something') apply_sys_mock.assert_called_once_with('something', - dbaas.CNF_MASTER) + dbaas_base.CNF_MASTER) def test_write_replication_replica_overrides(self): with patch.object(self.mySqlApp.configuration_manager, 'apply_system_override') as apply_sys_mock: self.mySqlApp.write_replication_replica_overrides('something') apply_sys_mock.assert_called_once_with('something', - dbaas.CNF_SLAVE) + dbaas_base.CNF_SLAVE) def test_remove_replication_source_overrides(self): with patch.object(self.mySqlApp.configuration_manager, 'remove_system_override') as remove_sys_mock: self.mySqlApp.remove_replication_source_overrides() - remove_sys_mock.assert_called_once_with(dbaas.CNF_MASTER) + remove_sys_mock.assert_called_once_with(dbaas_base.CNF_MASTER) def test_remove_replication_replica_overrides(self): with patch.object(self.mySqlApp.configuration_manager, 'remove_system_override') as remove_sys_mock: self.mySqlApp.remove_replication_replica_overrides() - remove_sys_mock.assert_called_once_with(dbaas.CNF_SLAVE) + remove_sys_mock.assert_called_once_with(dbaas_base.CNF_SLAVE) def test_exists_replication_source_overrides(self): with patch.object(self.mySqlApp.configuration_manager, @@ -1063,7 +1090,7 @@ class MySqlAppTest(testtools.TestCase): return_value=MagicMock(name='get_engine')) def test_grant_replication_privilege(self, *args): replication_user = {'name': 'testUSr', 'password': 'somePwd'} - with patch.object(dbaas, 'LocalSqlClient', + with patch.object(dbaas.MySqlApp, 'local_sql_client', return_value=self.mock_client): self.mySqlApp.grant_replication_privilege(replication_user) args, _ = self.mock_execute.call_args_list[0] @@ -1075,7 +1102,7 @@ class MySqlAppTest(testtools.TestCase): @patch.object(dbaas, 'get_engine', return_value=MagicMock(name='get_engine')) def test_get_port(self, *args): - with patch.object(dbaas, 'LocalSqlClient', + with patch.object(dbaas.MySqlApp, 'local_sql_client', return_value=self.mock_client): self.mySqlApp.get_port() args, _ = self.mock_execute.call_args_list[0] @@ -1088,7 +1115,7 @@ class MySqlAppTest(testtools.TestCase): def test_get_binlog_position(self, *args): result = {'File': 'mysql-bin.003', 'Position': '73'} self.mock_execute.return_value.first = Mock(return_value=result) - with patch.object(dbaas, 'LocalSqlClient', + with patch.object(dbaas.MySqlApp, 'local_sql_client', return_value=self.mock_client): found_result = self.mySqlApp.get_binlog_position() @@ -1103,7 +1130,7 @@ class MySqlAppTest(testtools.TestCase): @patch.object(dbaas, 'get_engine', return_value=MagicMock(name='get_engine')) def test_execute_on_client(self, *args): - with patch.object(dbaas, 'LocalSqlClient', + with patch.object(dbaas.MySqlApp, 'local_sql_client', return_value=self.mock_client): self.mySqlApp.execute_on_client('show tables') args, _ = self.mock_execute.call_args_list[0] @@ -1115,7 +1142,7 @@ class MySqlAppTest(testtools.TestCase): return_value=MagicMock(name='get_engine')) @patch.object(dbaas.MySqlApp, '_wait_for_slave_status') def test_start_slave(self, *args): - with patch.object(dbaas, 'LocalSqlClient', + with patch.object(dbaas.MySqlApp, 'local_sql_client', return_value=self.mock_client): self.mySqlApp.start_slave() args, _ = self.mock_execute.call_args_list[0] @@ -1129,7 +1156,7 @@ class MySqlAppTest(testtools.TestCase): def test_stop_slave_with_failover(self, *args): self.mock_execute.return_value.first = Mock( return_value={'Master_User': 'root'}) - with patch.object(dbaas, 'LocalSqlClient', + with patch.object(dbaas.MySqlApp, 'local_sql_client', return_value=self.mock_client): result = self.mySqlApp.stop_slave(True) self.assertEqual('root', result['replication_user']) @@ -1147,7 +1174,7 @@ class MySqlAppTest(testtools.TestCase): def test_stop_slave_without_failover(self, *args): self.mock_execute.return_value.first = Mock( return_value={'Master_User': 'root'}) - with patch.object(dbaas, 'LocalSqlClient', + with patch.object(dbaas.MySqlApp, 'local_sql_client', return_value=self.mock_client): result = self.mySqlApp.stop_slave(False) self.assertEqual('root', result['replication_user']) @@ -1163,7 +1190,7 @@ class MySqlAppTest(testtools.TestCase): @patch.object(dbaas, 'get_engine', return_value=MagicMock(name='get_engine')) def test_stop_master(self, *args): - with patch.object(dbaas, 'LocalSqlClient', + with patch.object(dbaas.MySqlApp, 'local_sql_client', return_value=self.mock_client): self.mySqlApp.stop_master() args, _ = self.mock_execute.call_args_list[0] @@ -1197,7 +1224,7 @@ class MySqlAppTest(testtools.TestCase): return_value=MagicMock(name='get_engine')) def test__get_slave_status(self, *args): self.mock_execute.return_value.first = Mock(return_value='some_thing') - with patch.object(dbaas, 'LocalSqlClient', + with patch.object(dbaas.MySqlApp, 'local_sql_client', return_value=self.mock_client): result = self.mySqlApp._get_slave_status() self.assertEqual('some_thing', result) @@ -1211,7 +1238,7 @@ class MySqlAppTest(testtools.TestCase): def test_get_latest_txn_id(self, *args): self.mock_execute.return_value.first = Mock(return_value=['some_thing'] ) - with patch.object(dbaas, 'LocalSqlClient', + with patch.object(dbaas.MySqlApp, 'local_sql_client', return_value=self.mock_client): result = self.mySqlApp.get_latest_txn_id() self.assertEqual('some_thing', result) @@ -1223,7 +1250,7 @@ class MySqlAppTest(testtools.TestCase): @patch.object(dbaas, 'get_engine', return_value=MagicMock(name='get_engine')) def test_wait_for_txn(self, *args): - with patch.object(dbaas, 'LocalSqlClient', + with patch.object(dbaas.MySqlApp, 'local_sql_client', return_value=self.mock_client): self.mySqlApp.wait_for_txn('abcd') args, _ = self.mock_execute.call_args_list[0] @@ -1236,7 +1263,7 @@ class MySqlAppTest(testtools.TestCase): def test_get_txn_count(self, *args): self.mock_execute.return_value.first = Mock( return_value=['b1f3f33a-0789-ee1c-43f3-f8373e12f1ea:1']) - with patch.object(dbaas, 'LocalSqlClient', + with patch.object(dbaas.MySqlApp, 'local_sql_client', return_value=self.mock_client): result = self.mySqlApp.get_txn_count() self.assertEqual(1, result) @@ -1251,7 +1278,7 @@ class MySqlAppInstallTest(MySqlAppTest): def setUp(self): super(MySqlAppInstallTest, self).setUp() self.orig_create_engine = sqlalchemy.create_engine - self.orig_pkg_version = dbaas.packager.pkg_version + self.orig_pkg_version = dbaas_base.packager.pkg_version self.orig_utils_execute_with_timeout = utils.execute_with_timeout self.mock_client = Mock() self.mock_execute = Mock() @@ -1262,7 +1289,7 @@ class MySqlAppInstallTest(MySqlAppTest): def tearDown(self): super(MySqlAppInstallTest, self).tearDown() sqlalchemy.create_engine = self.orig_create_engine - dbaas.packager.pkg_version = self.orig_pkg_version + dbaas_base.packager.pkg_version = self.orig_pkg_version utils.execute_with_timeout = self.orig_utils_execute_with_timeout def test_install(self): @@ -1282,7 +1309,7 @@ class MySqlAppInstallTest(MySqlAppTest): return_value='some_password') def test_secure(self, auth_pwd_mock): - dbaas.clear_expired_password = Mock() + dbaas_base.clear_expired_password = Mock() self.mySqlApp.start_mysql = Mock() self.mySqlApp.stop_db = Mock() self.mySqlApp._reset_configuration = Mock() @@ -1306,7 +1333,7 @@ class MySqlAppInstallTest(MySqlAppTest): @patch.object(utils, 'generate_random_password', return_value='some_password') def test_secure_root(self, *args): - with patch.object(dbaas, 'LocalSqlClient', + with patch.object(dbaas.MySqlApp, 'local_sql_client', return_value=self.mock_client): self.mySqlApp.secure_root() update_root_password, _ = self.mock_execute.call_args_list[0] @@ -1344,7 +1371,7 @@ class MySqlAppInstallTest(MySqlAppTest): return_value=MagicMock(name='get_engine')) def test_apply_overrides(self, *args): overrides = {'sort_buffer_size': 1000000} - with patch.object(dbaas, 'LocalSqlClient', + with patch.object(dbaas.MySqlApp, 'local_sql_client', return_value=self.mock_client): self.mySqlApp.apply_overrides(overrides) args, _ = self.mock_execute.call_args_list[0] @@ -1355,7 +1382,7 @@ class MySqlAppInstallTest(MySqlAppTest): @patch.object(dbaas, 'get_engine', return_value=MagicMock(name='get_engine')) def test_make_read_only(self, *args): - with patch.object(dbaas, 'LocalSqlClient', + with patch.object(dbaas.MySqlApp, 'local_sql_client', return_value=self.mock_client): self.mySqlApp.make_read_only('ON') args, _ = self.mock_execute.call_args_list[0] @@ -1380,7 +1407,7 @@ class MySqlAppInstallTest(MySqlAppTest): def test_secure_write_conf_error(self): - dbaas.clear_expired_password = Mock() + dbaas_base.clear_expired_password = Mock() self.mySqlApp.start_mysql = Mock() self.mySqlApp.stop_db = Mock() self.mySqlApp._reset_configuration = Mock( @@ -1443,7 +1470,8 @@ class MySqlAppMockTest(testtools.TestCase): mock_status = MagicMock() mock_status.wait_for_real_status_to_change_to = MagicMock( return_value=True) - dbaas.clear_expired_password = MagicMock(return_value=None) + dbaas_base.clear_expired_password = \ + MagicMock(return_value=None) app = MySqlApp(mock_status) app._reset_configuration = MagicMock() app._write_mycnf = MagicMock(return_value=True) @@ -1469,9 +1497,11 @@ class MySqlAppMockTest(testtools.TestCase): mock_status = MagicMock() mock_status.wait_for_real_status_to_change_to = MagicMock( return_value=True) - dbaas.clear_expired_password = MagicMock(return_value=None) + dbaas_base.clear_expired_password = \ + MagicMock(return_value=None) app = MySqlApp(mock_status) - dbaas.clear_expired_password = MagicMock(return_value=None) + dbaas_base.clear_expired_password = \ + MagicMock(return_value=None) self.assertRaises(RuntimeError, app.secure, None, None) self.assertTrue(mock_conn.execute.called) # At least called twice @@ -1486,10 +1516,14 @@ class MySqlRootStatusTest(testtools.TestCase): def setUp(self): super(MySqlRootStatusTest, self).setUp() self.orig_utils_execute_with_timeout = utils.execute_with_timeout + dbaas.orig_configuration_manager = dbaas.MySqlApp.configuration_manager + dbaas.MySqlApp.configuration_manager = Mock() def tearDown(self): super(MySqlRootStatusTest, self).tearDown() utils.execute_with_timeout = self.orig_utils_execute_with_timeout + dbaas.MySqlApp.configuration_manager = \ + dbaas.orig_configuration_manager @patch.object(dbaas.MySqlApp, 'get_auth_password', return_value='some_password') @@ -1509,7 +1543,7 @@ class MySqlRootStatusTest(testtools.TestCase): mock_rs = MagicMock() mock_rs.rowcount = 0 with patch.object(mock_conn, 'execute', return_value=mock_rs): - self.assertThat(MySqlRootAccess.is_root_enabled(), Equals(False)) + self.assertThat(MySqlRootAccess().is_root_enabled(), Equals(False)) @patch.object(dbaas.MySqlApp, 'get_auth_password', return_value='some_password') @@ -1518,7 +1552,7 @@ class MySqlRootStatusTest(testtools.TestCase): with patch.object(mock_conn, 'execute', return_value=None): # invocation - user_ser = MySqlRootAccess.enable_root() + user_ser = MySqlRootAccess().enable_root() # verification self.assertThat(user_ser, Not(Is(None))) mock_conn.execute.assert_any_call(TextClauseMatcher('CREATE USER'), @@ -1591,8 +1625,8 @@ class ServiceRegistryTest(testtools.TestCase): self.assertEqual('trove.guestagent.datastore.mysql.' 'manager.Manager', test_dict.get('mysql')) - self.assertEqual('trove.guestagent.datastore.mysql.' - 'manager.Manager', + self.assertEqual('trove.guestagent.datastore.experimental.' + 'percona.manager.Manager', test_dict.get('percona')) self.assertEqual('trove.guestagent.datastore.experimental.redis.' 'manager.Manager', @@ -1624,8 +1658,8 @@ class ServiceRegistryTest(testtools.TestCase): self.assertEqual('trove.guestagent.datastore.mysql.' 'manager.Manager123', test_dict.get('mysql')) - self.assertEqual('trove.guestagent.datastore.mysql.' - 'manager.Manager', + self.assertEqual('trove.guestagent.datastore.experimental.' + 'percona.manager.Manager', test_dict.get('percona')) self.assertEqual('trove.guestagent.datastore.experimental.redis.' 'manager.Manager', @@ -1657,8 +1691,8 @@ class ServiceRegistryTest(testtools.TestCase): self.assertEqual('trove.guestagent.datastore.mysql.' 'manager.Manager', test_dict.get('mysql')) - self.assertEqual('trove.guestagent.datastore.mysql.' - 'manager.Manager', + self.assertEqual('trove.guestagent.datastore.experimental.' + 'percona.manager.Manager', test_dict.get('percona')) self.assertEqual('trove.guestagent.datastore.experimental.redis.' 'manager.Manager', @@ -1695,12 +1729,14 @@ class KeepAliveConnectionTest(testtools.TestCase): def setUp(self): super(KeepAliveConnectionTest, self).setUp() - self.orig_utils_execute_with_timeout = dbaas.utils.execute_with_timeout + self.orig_utils_execute_with_timeout = \ + dbaas_base.utils.execute_with_timeout self.orig_LOG_err = dbaas.LOG def tearDown(self): super(KeepAliveConnectionTest, self).tearDown() - dbaas.utils.execute_with_timeout = self.orig_utils_execute_with_timeout + dbaas_base.utils.execute_with_timeout = \ + self.orig_utils_execute_with_timeout dbaas.LOG = self.orig_LOG_err def test_checkout_type_error(self): @@ -1855,9 +1891,10 @@ class MySqlAppStatusTest(testtools.TestCase): def setUp(self): super(MySqlAppStatusTest, self).setUp() util.init_db() - self.orig_utils_execute_with_timeout = dbaas.utils.execute_with_timeout - self.orig_load_mysqld_options = dbaas.load_mysqld_options - self.orig_dbaas_os_path_exists = dbaas.os.path.exists + self.orig_utils_execute_with_timeout = \ + dbaas_base.utils.execute_with_timeout + self.orig_load_mysqld_options = dbaas_base.load_mysqld_options + self.orig_dbaas_base_os_path_exists = dbaas_base.os.path.exists self.orig_dbaas_time_sleep = time.sleep self.FAKE_ID = str(uuid4()) InstanceServiceStatus.create(instance_id=self.FAKE_ID, @@ -1866,18 +1903,19 @@ class MySqlAppStatusTest(testtools.TestCase): def tearDown(self): super(MySqlAppStatusTest, self).tearDown() - dbaas.utils.execute_with_timeout = self.orig_utils_execute_with_timeout - dbaas.load_mysqld_options = self.orig_load_mysqld_options - dbaas.os.path.exists = self.orig_dbaas_os_path_exists + dbaas_base.utils.execute_with_timeout = \ + self.orig_utils_execute_with_timeout + dbaas_base.load_mysqld_options = self.orig_load_mysqld_options + dbaas_base.os.path.exists = self.orig_dbaas_base_os_path_exists time.sleep = self.orig_dbaas_time_sleep InstanceServiceStatus.find_by(instance_id=self.FAKE_ID).delete() dbaas.CONF.guest_id = None def test_get_actual_db_status(self): - dbaas.utils.execute_with_timeout = Mock(return_value=(None, None)) + dbaas_base.utils.execute_with_timeout = Mock(return_value=(None, None)) - self.mySqlAppStatus = MySqlAppStatus() + self.mySqlAppStatus = MySqlAppStatus.get() status = self.mySqlAppStatus._get_actual_db_status() self.assertEqual(rd_instance.ServiceStatuses.RUNNING, status) @@ -1887,31 +1925,31 @@ class MySqlAppStatusTest(testtools.TestCase): @patch.object(os.path, 'exists', return_value=True) def test_get_actual_db_status_error_crashed(self, mock_exists, mock_execute): - dbaas.load_mysqld_options = Mock(return_value={}) - self.mySqlAppStatus = MySqlAppStatus() + dbaas_base.load_mysqld_options = Mock(return_value={}) + self.mySqlAppStatus = MySqlAppStatus.get() status = self.mySqlAppStatus._get_actual_db_status() self.assertEqual(rd_instance.ServiceStatuses.CRASHED, status) def test_get_actual_db_status_error_shutdown(self): mocked = Mock(side_effect=ProcessExecutionError()) - dbaas.utils.execute_with_timeout = mocked - dbaas.load_mysqld_options = Mock(return_value={}) - dbaas.os.path.exists = Mock(return_value=False) + dbaas_base.utils.execute_with_timeout = mocked + dbaas_base.load_mysqld_options = Mock(return_value={}) + dbaas_base.os.path.exists = Mock(return_value=False) - self.mySqlAppStatus = MySqlAppStatus() + self.mySqlAppStatus = MySqlAppStatus.get() status = self.mySqlAppStatus._get_actual_db_status() self.assertEqual(rd_instance.ServiceStatuses.SHUTDOWN, status) def test_get_actual_db_status_error_blocked(self): - dbaas.utils.execute_with_timeout = MagicMock( + dbaas_base.utils.execute_with_timeout = MagicMock( side_effect=[ProcessExecutionError(), ("some output", None)]) - dbaas.load_mysqld_options = Mock() - dbaas.os.path.exists = Mock(return_value=True) + dbaas_base.load_mysqld_options = Mock() + dbaas_base.os.path.exists = Mock(return_value=True) - self.mySqlAppStatus = MySqlAppStatus() + self.mySqlAppStatus = MySqlAppStatus.get() status = self.mySqlAppStatus._get_actual_db_status() self.assertEqual(rd_instance.ServiceStatuses.BLOCKED, status) diff --git a/trove/tests/unittests/guestagent/test_mysql_manager.py b/trove/tests/unittests/guestagent/test_mysql_manager.py index 581cab22dd..be5a7b32d2 100644 --- a/trove/tests/unittests/guestagent/test_mysql_manager.py +++ b/trove/tests/unittests/guestagent/test_mysql_manager.py @@ -26,6 +26,8 @@ from trove.common.exception import ProcessExecutionError from trove.common import instance as rd_instance from trove.guestagent import backup from trove.guestagent.common import operating_system +# TODO(atomic77) The test cases should be made configurable +# to make it easier to test the various derived datastores. from trove.guestagent.datastore.mysql.manager import Manager import trove.guestagent.datastore.mysql.service as dbaas from trove.guestagent import dbaas as base_dbaas @@ -55,9 +57,8 @@ class GuestAgentManagerTest(testtools.TestCase): # set up common mock objects, etc. for replication testing self.patcher_gfvs = patch( 'trove.guestagent.dbaas.get_filesystem_volume_stats') - self.patcher_rs = patch( - 'trove.guestagent.datastore.mysql.manager.' - 'REPLICATION_STRATEGY_CLASS') + self.patcher_rs = patch(self.manager.replication_namespace + "." + + self.manager.replication_strategy) self.mock_gfvs_class = self.patcher_gfvs.start() self.mock_rs_class = self.patcher_rs.start() self.repl_datastore_manager = 'mysql'