From 384576675f8205aa3985c1f576ce767e43b9944d Mon Sep 17 00:00:00 2001 From: Ed Cranford Date: Thu, 22 Aug 2013 10:01:42 -0500 Subject: [PATCH] Conductor proxies host db access for guests Previously, instances updated their status by updating the database on the host directly. Necessarily, each instance would need access to the database to stay updated. Trove's new conductor service eliminates that need by working as a proxy for those instances. By sending a heartbeat to conductor via RPC, conductor updates the database on the host on behalf of the instance. As backups also made use of the host database, the backup code has been refactored to take richer inputs to remove the need to query the host database, and now conductor is also used to submit updates to backup states. Implements: blueprint trove-conductor Change-Id: I4cb34baedd0e3a50051f9e66de95c9028c66e4b5 --- bin/trove-conductor | 62 ++++++++ doc/source/dev/design.rst | 33 ++++ etc/trove/trove-conductor.conf.sample | 8 + etc/trove/trove-guestagent.conf.sample | 27 +--- setup.cfg | 1 + tox.ini | 2 +- trove/backup/models.py | 11 +- trove/common/cfg.py | 2 + trove/common/instance.py | 3 +- trove/conductor/__init__.py | 16 ++ trove/conductor/api.py | 52 ++++++ trove/conductor/manager.py | 68 ++++++++ trove/db/models.py | 16 +- trove/guestagent/api.py | 12 +- trove/guestagent/backup/__init__.py | 8 +- trove/guestagent/backup/backupagent.py | 103 ++++++------ trove/guestagent/datastore/mysql/manager.py | 28 ++-- trove/guestagent/datastore/service.py | 26 +-- trove/taskmanager/api.py | 4 +- trove/taskmanager/manager.py | 4 +- trove/taskmanager/models.py | 26 +-- trove/tests/api/instances_resize.py | 3 +- trove/tests/fakes/guestagent.py | 7 +- .../unittests/backup/test_backupagent.py | 98 +++++++----- trove/tests/unittests/conductor/__init__.py | 13 ++ .../tests/unittests/conductor/test_methods.py | 148 ++++++++++++++++++ trove/tests/unittests/guestagent/test_api.py | 15 +- .../tests/unittests/guestagent/test_dbaas.py | 29 ++-- .../unittests/guestagent/test_manager.py | 26 ++- trove/tests/unittests/quota/test_quota.py | 4 +- .../unittests/taskmanager/test_models.py | 4 +- 31 files changed, 660 insertions(+), 199 deletions(-) create mode 100755 bin/trove-conductor create mode 100644 etc/trove/trove-conductor.conf.sample create mode 100644 trove/conductor/__init__.py create mode 100644 trove/conductor/api.py create mode 100644 trove/conductor/manager.py create mode 100644 trove/tests/unittests/conductor/__init__.py create mode 100644 trove/tests/unittests/conductor/test_methods.py diff --git a/bin/trove-conductor b/bin/trove-conductor new file mode 100755 index 0000000000..d8e7340e1e --- /dev/null +++ b/bin/trove-conductor @@ -0,0 +1,62 @@ +#!/usr/bin/env python +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2013 Rackspace Hosting +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import eventlet +eventlet.monkey_patch(all=True, thread=False) + +import gettext +import traceback +import sys + + +gettext.install('trove', unicode=1) + + +from trove.common import cfg +from trove.common import debug_utils +from trove.common.rpc import service as rpc_service +from trove.db import get_db_api +from trove.openstack.common import log as logging +from trove.openstack.common import service as openstack_service + +CONF = cfg.CONF + + +def main(): + get_db_api().configure_db(CONF) + manager = 'trove.conductor.manager.Manager' + topic = CONF.conductor_queue + server = rpc_service.RpcService(manager=manager, topic=topic) + launcher = openstack_service.launch(server, + workers=CONF.trove_conductor_workers) + launcher.wait() + + +if __name__ == '__main__': + cfg.parse_args(sys.argv) + logging.setup(None) + + debug_utils.setup() + + if not debug_utils.enabled(): + eventlet.monkey_patch(thread=True) + try: + main() + except RuntimeError as error: + print traceback.format_exc() + sys.exit("ERROR: %s" % error) diff --git a/doc/source/dev/design.rst b/doc/source/dev/design.rst index ad50532dfe..5b63dca431 100644 --- a/doc/source/dev/design.rst +++ b/doc/source/dev/design.rst @@ -91,4 +91,37 @@ bus and performs the requested operation. * Actual handling is usually done in the dbaas.py module +Trove-conductor +=============== + +Conductor is a service that runs on the host, responsible for recieving +messages from guest instances to update information on the host. +For example, instance statuses and the current status of a backup. +With conductor, guest instances do not need a direct connection to the +host's database. Conductor listens for RPC messages through the message +bus and performs the relevant operation. + +* Similar to guest-agent in that it is a service that listens to a + RabbitMQ topic. The difference is conductor lives on the host, not + the guest. +* Guest agents communicate to conductor by putting messages on the + topic defined in cfg as conductor_queue. By default this is + "trove-conductor". +* Entry point - Trove/bin/trove-conductor +* Runs as RpcService configured by + Trove/etc/trove/trove-conductor.conf.sample which defines + trove.conductor.manager.Manager as the manager. This is the entry + point for requests arriving on the queue. +* As guestagent above, requests are pushed to MQ from another component + using _cast() (synchronous), generally of the form + {"method": "", "args": {}} +* Actual database update work is done by trove/conductor/manager.py +* The "heartbeat" method updates the status of an instance. This is + used to report that instance has changed from NEW to BUILDING to + ACTIVE and so on. +* The "update_backup" method changes the details of a backup, including + its current status, size of the backup, type, and checksum. + + + .. Trove - Database as a Service: https://wiki.openstack.org/wiki/Trove diff --git a/etc/trove/trove-conductor.conf.sample b/etc/trove/trove-conductor.conf.sample new file mode 100644 index 0000000000..f8603ce70d --- /dev/null +++ b/etc/trove/trove-conductor.conf.sample @@ -0,0 +1,8 @@ +[DEFAULT] +control_exchange = trove +trove_auth_url = http://0.0.0.0:5000/v2.0 +nova_proxy_admin_pass = 3de4922d8b6ac5a1aad9 +nova_proxy_admin_tenant_name = admin +nova_proxy_admin_user = admin +sql_connection = mysql://root:e1a2c042c828d3566d0a@localhost/trove +rabbit_password = f7999d1955c5014aa32c diff --git a/etc/trove/trove-guestagent.conf.sample b/etc/trove/trove-guestagent.conf.sample index 77649d9cba..d9da78e9ab 100644 --- a/etc/trove/trove-guestagent.conf.sample +++ b/etc/trove/trove-guestagent.conf.sample @@ -14,34 +14,12 @@ bind_port = 8778 # AMQP Connection info rabbit_password=f7999d1955c5014aa32c -# SQLAlchemy connection string for the reference implementation -# registry server. Any valid SQLAlchemy connection string is fine. -# See: http://www.sqlalchemy.org/docs/05/reference/sqlalchemy/connections.html#sqlalchemy.create_engine -#sql_connection = sqlite:///trove_test.sqlite -sql_connection = mysql://root:e1a2c042c828d3566d0a@10.0.0.1/trove?charset=utf8 -#sql_connection = postgresql://trove:trove@10.0.0.1/trove - -#Allow SQLAlchemy to log all queries -sql_query_logging = False - -# Period in seconds after which SQLAlchemy should reestablish its connection -# to the database. -# -# MySQL uses a default `wait_timeout` of 8 hours, after which it will drop -# idle connections. This can result in 'MySQL Gone Away' exceptions. If you -# notice this, you can lower this value to ensure that SQLAlchemy reconnects -# before MySQL can drop the connection. -sql_idle_timeout = 3600 - -#DB Api Implementation -db_api_implementation = "trove.db.sqlalchemy.api" - # Path to the extensions api_extensions_path = trove/extensions/routes # Configuration options for talking to nova via the novaclient. # These options are for an admin user in your keystone config. -# It proxy's the token received from the user to send to nova via this admin users creds, +# It proxies the token received from the user to send to nova via this admin users creds, # basically acting like the client via that proxy token. nova_proxy_admin_user = admin nova_proxy_admin_pass = 3de4922d8b6ac5a1aad9 @@ -62,6 +40,9 @@ root_grant_option = True # used by passlib to generate root password #default_password_length = 36 +# For communicating with trove-conductor +control_exchange = trove + # ============ kombu connection options ======================== rabbit_host=10.0.0.1 diff --git a/setup.cfg b/setup.cfg index 43e328495e..98dad17541 100644 --- a/setup.cfg +++ b/setup.cfg @@ -24,6 +24,7 @@ packages = scripts = bin/trove-api bin/trove-fake-mode + bin/trove-conductor bin/trove-manage bin/trove-mgmt-taskmanager bin/trove-taskmanager diff --git a/tox.ini b/tox.ini index 87b4670419..502db4a6dc 100644 --- a/tox.ini +++ b/tox.ini @@ -36,4 +36,4 @@ show-source = True ignore = F401,F403,F821,H301,H306,H401,H402,H403,H404,H702 builtins = _ exclude=.venv,.tox,dist,doc,openstack,*egg,rsdns,tools,etc,build -filename=*.py,trove-* \ No newline at end of file +filename=*.py,trove-* diff --git a/trove/backup/models.py b/trove/backup/models.py index 9112121cf3..74040fcae6 100644 --- a/trove/backup/models.py +++ b/trove/backup/models.py @@ -58,7 +58,7 @@ class Backup(object): # parse the ID from the Ref instance_id = utils.get_id_from_href(instance) - # verify that the instance exist and can perform actions + # verify that the instance exists and can perform actions from trove.instance.models import Instance instance_model = Instance.load(context, instance_id) instance_model.validate_can_perform_action() @@ -76,7 +76,14 @@ class Backup(object): LOG.exception("Unable to create Backup record:") raise exception.BackupCreationError(str(ex)) - api.API(context).create_backup(db_info.id, instance_id) + backup_info = {'id': db_info.id, + 'name': name, + 'description': description, + 'instance_id': instance_id, + 'backup_type': db_info.backup_type, + 'checksum': db_info.checksum, + } + api.API(context).create_backup(backup_info, instance_id) return db_info return run_with_quotas(context.tenant, diff --git a/trove/common/cfg.py b/trove/common/cfg.py index d31bc03f36..2cd978c7dd 100644 --- a/trove/common/cfg.py +++ b/trove/common/cfg.py @@ -108,6 +108,8 @@ common_opts = [ default='trove.quota.quota.DbQuotaDriver', help='default driver to use for quota checks'), cfg.StrOpt('taskmanager_queue', default='taskmanager'), + cfg.StrOpt('conductor_queue', default='trove-conductor'), + cfg.IntOpt('trove_conductor_workers', default=1), cfg.BoolOpt('use_nova_server_volume', default=False), cfg.BoolOpt('use_heat', default=False), cfg.StrOpt('device_path', default='/dev/vdb'), diff --git a/trove/common/instance.py b/trove/common/instance.py index 9108d1b572..c691026869 100644 --- a/trove/common/instance.py +++ b/trove/common/instance.py @@ -67,7 +67,8 @@ class ServiceStatus(object): @staticmethod def from_description(desc): all_items = ServiceStatus._lookup.items() - status_codes = [code for (code, status) in all_items if status == desc] + status_codes = [code for (code, status) in all_items + if status.description == desc] if not status_codes: msg = 'Status description %s is not a valid ServiceStatus.' raise ValueError(msg % desc) diff --git a/trove/conductor/__init__.py b/trove/conductor/__init__.py new file mode 100644 index 0000000000..9c2513da75 --- /dev/null +++ b/trove/conductor/__init__.py @@ -0,0 +1,16 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2013 Rackspace Hosting +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. diff --git a/trove/conductor/api.py b/trove/conductor/api.py new file mode 100644 index 0000000000..d0a93abc01 --- /dev/null +++ b/trove/conductor/api.py @@ -0,0 +1,52 @@ +# Copyright 2013 OpenStack Foundation +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import traceback +import sys + +from trove.common import cfg +from trove.openstack.common.rpc import proxy +from trove.openstack.common import log as logging + + +CONF = cfg.CONF +LOG = logging.getLogger(__name__) +RPC_API_VERSION = "1.0" + + +class API(proxy.RpcProxy): + """API for interacting with trove conductor.""" + + def __init__(self, context): + self.context = context + super(API, self).__init__(self._get_routing_key(), RPC_API_VERSION) + + def _get_routing_key(self): + """Create the routing key for conductor.""" + return CONF.conductor_queue + + def heartbeat(self, instance_id, payload): + LOG.debug("Making async call to cast heartbeat for instance: %s" + % instance_id) + self.cast(self.context, self.make_msg("heartbeat", + instance_id=instance_id, + payload=payload)) + + def update_backup(self, instance_id, backup_id, **backup_fields): + LOG.debug("Making async call to cast update_backup for instance: %s" + % instance_id) + self.cast(self.context, self.make_msg("update_backup", + instance_id=instance_id, + backup_id=backup_id, + **backup_fields)) diff --git a/trove/conductor/manager.py b/trove/conductor/manager.py new file mode 100644 index 0000000000..1d4b2271d4 --- /dev/null +++ b/trove/conductor/manager.py @@ -0,0 +1,68 @@ +# Copyright 2013 OpenStack Foundation +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from trove.backup import models as bkup_models +from trove.common.context import TroveContext +from trove.common.instance import ServiceStatus +from trove.instance import models as t_models +from trove.openstack.common import periodic_task +from trove.openstack.common import log as logging +from trove.common import cfg + +LOG = logging.getLogger(__name__) +RPC_API_VERSION = "1.0" +CONF = cfg.CONF + + +class Manager(periodic_task.PeriodicTasks): + + def __init__(self): + super(Manager, self).__init__() + self.admin_context = TroveContext( + user=CONF.nova_proxy_admin_user, + auth_token=CONF.nova_proxy_admin_pass, + tenant=CONF.nova_proxy_admin_tenant_name) + + def heartbeat(self, context, instance_id, payload): + LOG.debug("Instance ID: %s" % str(instance_id)) + LOG.debug("Payload: %s" % str(payload)) + status = t_models.InstanceServiceStatus.find_by( + instance_id=instance_id) + if payload.get('service_status') is not None: + status.set_status(ServiceStatus.from_description( + payload['service_status'])) + status.save() + + def update_backup(self, context, instance_id, backup_id, + **backup_fields): + LOG.debug("Instance ID: %s" % str(instance_id)) + LOG.debug("Backup ID: %s" % str(backup_id)) + backup = bkup_models.DBBackup.find_by(id=backup_id) + # TODO(datsun180b): use context to verify tenant matches + + # Some verification based on IDs + if backup_id != backup.id: + LOG.error("Backup IDs mismatch! Expected %s, found %s" % + (backup_id, backup.id)) + return + if instance_id != backup.instance_id: + LOG.error("Backup instance IDs mismatch! Expected %s, found %s" % + (instance_id, backup.instance_id)) + return + + for k, v in backup_fields.items(): + if hasattr(backup, k): + LOG.debug("Backup %s: %s" % (k, v)) + setattr(backup, k, v) + backup.save() diff --git a/trove/db/models.py b/trove/db/models.py index 8584baa7db..c0e45a18e5 100644 --- a/trove/db/models.py +++ b/trove/db/models.py @@ -29,15 +29,17 @@ class DatabaseModelBase(models.ModelBase): @classmethod def create(cls, **values): - if 'id' not in values: - values['id'] = utils.generate_uuid() - if hasattr(cls, 'deleted') and 'deleted' not in values: - values['deleted'] = False - values['created'] = utils.utcnow() - instance = cls(**values).save() + init_vals = { + 'id': utils.generate_uuid(), + 'created': utils.utcnow(), + } + if hasattr(cls, 'deleted'): + init_vals['deleted'] = False + init_vals.update(values) + instance = cls(**init_vals) if not instance.is_valid(): raise exception.InvalidModelError(errors=instance.errors) - return instance + return instance.save() @property def db_api(self): diff --git a/trove/guestagent/api.py b/trove/guestagent/api.py index 8be215b4bb..d9bd5d7114 100644 --- a/trove/guestagent/api.py +++ b/trove/guestagent/api.py @@ -214,7 +214,7 @@ class API(proxy.RpcProxy): def prepare(self, memory_mb, packages, databases, users, device_path='/dev/vdb', mount_point='/mnt/volume', - backup_id=None, config_contents=None, root_password=None): + backup_info=None, config_contents=None, root_password=None): """Make an asynchronous call to prepare the guest as a database container optionally includes a backup id for restores """ @@ -222,7 +222,7 @@ class API(proxy.RpcProxy): self._cast_with_consumer( "prepare", packages=packages, databases=databases, memory_mb=memory_mb, users=users, device_path=device_path, - mount_point=mount_point, backup_id=backup_id, + mount_point=mount_point, backup_info=backup_info, config_contents=config_contents, root_password=root_password) def restart(self): @@ -267,9 +267,9 @@ class API(proxy.RpcProxy): """Make a synchronous call to update the guest agent.""" self._call("update_guest", AGENT_HIGH_TIMEOUT) - def create_backup(self, backup_id): + def create_backup(self, backup_info): """Make async call to create a full backup of this instance""" LOG.debug(_("Create Backup %(backup_id)s " - "for Instance %(instance_id)s") % {'backup_id': backup_id, - 'instance_id': self.id}) - self._cast("create_backup", backup_id=backup_id) + "for Instance %(instance_id)s") % + {'backup_id': backup_info['id'], 'instance_id': self.id}) + self._cast("create_backup", backup_info=backup_info) diff --git a/trove/guestagent/backup/__init__.py b/trove/guestagent/backup/__init__.py index 7fe47509b5..4309bed973 100644 --- a/trove/guestagent/backup/__init__.py +++ b/trove/guestagent/backup/__init__.py @@ -3,7 +3,7 @@ from trove.guestagent.backup.backupagent import BackupAgent AGENT = BackupAgent() -def backup(context, backup_id): +def backup(context, backup_info): """ Main entry point for starting a backup based on the given backup id. This will create a backup for this DB instance and will then store the backup @@ -12,10 +12,10 @@ def backup(context, backup_id): :param context: the context token which contains the users details :param backup_id: the id of the persisted backup object """ - return AGENT.execute_backup(context, backup_id) + return AGENT.execute_backup(context, backup_info) -def restore(context, backup_id, restore_location): +def restore(context, backup_info, restore_location): """ Main entry point for restoring a backup based on the given backup id. This will transfer backup data to this instance an will carry out the @@ -24,4 +24,4 @@ def restore(context, backup_id, restore_location): :param context: the context token which contains the users details :param backup_id: the id of the persisted backup object """ - return AGENT.execute_restore(context, backup_id, restore_location) + return AGENT.execute_restore(context, backup_info, restore_location) diff --git a/trove/guestagent/backup/backupagent.py b/trove/guestagent/backup/backupagent.py index 81a1ef3b53..7ab4059f40 100644 --- a/trove/guestagent/backup/backupagent.py +++ b/trove/guestagent/backup/backupagent.py @@ -15,9 +15,11 @@ # import logging -from trove.backup.models import DBBackup from trove.backup.models import BackupState -from trove.common import cfg, utils +from trove.common import cfg +from trove.common import context as trove_context +from trove.common import utils +from trove.conductor import api as conductor_api from trove.guestagent.dbaas import get_filesystem_volume_stats from trove.guestagent.datastore.mysql.service import ADMIN_USER_NAME from trove.guestagent.datastore.mysql.service import get_auth_password @@ -45,15 +47,14 @@ class BackupAgent(object): raise UnknownBackupType("Unknown Backup type: %s" % backup_type) return runner - def execute_backup(self, context, backup_id, runner=RUNNER): - LOG.debug("Searching for backup instance %s", backup_id) - backup = DBBackup.find_by(id=backup_id) - LOG.info("Setting task state to %s for instance %s", - BackupState.NEW, backup.instance_id) - backup.state = BackupState.NEW - backup.save() + def execute_backup(self, context, backup_info, runner=RUNNER): + LOG.debug("Searching for backup instance %s", backup_info['id']) + ctxt = trove_context.TroveContext( + user=CONF.nova_proxy_admin_user, + auth_token=CONF.nova_proxy_admin_pass) + conductor = conductor_api.API(ctxt) - LOG.info("Running backup %s", backup_id) + LOG.info("Running backup %s", backup_info['id']) user = ADMIN_USER_NAME password = get_auth_password() swiftStorage = get_storage_strategy( @@ -62,43 +63,50 @@ class BackupAgent(object): # Store the size of the filesystem before the backup. stats = get_filesystem_volume_stats(CONF.mount_point) - backup.size = stats.get('used', 0.0) - backup.state = BackupState.BUILDING - backup.save() + conductor.update_backup(CONF.guest_id, + backup_id=backup_info['id'], + size=stats.get('used', 0.0), + state=BackupState.BUILDING) - try: - with runner(filename=backup_id, user=user, password=password)\ - as bkup: - LOG.info("Starting Backup %s", backup_id) + with runner(filename=backup_info['id'], + user=user, password=password) as bkup: + try: + LOG.info("Starting Backup %s", backup_info['id']) success, note, checksum, location = swiftStorage.save( BACKUP_CONTAINER, bkup) - LOG.info("Backup %s completed status: %s", backup_id, success) - LOG.info("Backup %s file size: %s", backup_id, bkup.content_length) - LOG.info('Backup %s swift checksum: %s', backup_id, checksum) - LOG.info('Backup %s location: %s', backup_id, location) + LOG.info("Backup %s completed status: %s", backup_info['id'], + success) + LOG.info("Backup %s file size: %s", backup_info['id'], + bkup.content_length) + LOG.info('Backup %s file swift checksum: %s', + backup_info['id'], checksum) + LOG.info('Backup %s location: %s', backup_info['id'], + location) - if not success: - raise BackupError(backup.note) + if not success: + raise BackupError(note) - except Exception as e: - LOG.error(e) - LOG.error("Error saving %s Backup", backup_id) - backup.state = BackupState.FAILED - backup.save() - raise + except Exception as e: + LOG.error(e) + LOG.error("Error saving %s Backup", backup_info['id']) + conductor.update_backup(CONF.guest_id, + backup_id=backup_info['id'], + state=BackupState.FAILED) + raise - else: - LOG.info("Saving %s Backup Info to model", backup_id) - backup.state = BackupState.COMPLETED - backup.checksum = checksum - backup.location = location - backup.note = note - backup.backup_type = bkup.backup_type - backup.save() + else: + LOG.info("Saving %s Backup Info to model", backup_info['id']) + conductor.update_backup(CONF.guest_id, + backup_id=backup_info['id'], + checksum=checksum, + location=location, + note=note, + backup_type=bkup.backup_type, + state=BackupState.COMPLETED) - def execute_restore(self, context, backup_id, restore_location): + def execute_restore(self, context, backup_info, restore_location): try: LOG.debug("Cleaning out restore location: %s", restore_location) @@ -106,11 +114,8 @@ class BackupAgent(object): "0777", restore_location) utils.clean_out(restore_location) - LOG.debug("Finding backup %s to restore", backup_id) - backup = DBBackup.find_by(id=backup_id) - - LOG.debug("Getting Restore Runner of type %s", backup.backup_type) - restore_runner = self._get_restore_runner(backup.backup_type) + LOG.debug("Getting Restore Runner of type %s", backup_info['type']) + restore_runner = self._get_restore_runner(backup_info['type']) LOG.debug("Getting Storage Strategy") storage_strategy = get_storage_strategy( @@ -119,17 +124,17 @@ class BackupAgent(object): LOG.debug("Preparing storage to download stream.") download_stream = storage_strategy.load(context, - backup.location, + backup_info['location'], restore_runner.is_zipped, - backup.checksum) + backup_info['checksum']) with restore_runner(restore_stream=download_stream, restore_location=restore_location) as runner: LOG.debug("Restoring instance from backup %s to %s", - backup_id, restore_location) + backup_info['id'], restore_location) content_size = runner.restore() LOG.info("Restore from backup %s completed successfully to %s", - backup_id, restore_location) + backup_info['id'], restore_location) LOG.info("Restore size: %s", content_size) utils.execute_with_timeout("sudo", "chown", "-R", @@ -137,8 +142,8 @@ class BackupAgent(object): except Exception as e: LOG.error(e) - LOG.error("Error restoring backup %s", backup_id) + LOG.error("Error restoring backup %s", backup_info['id']) raise else: - LOG.info("Restored Backup %s", backup_id) + LOG.info("Restored Backup %s", backup_info['id']) diff --git a/trove/guestagent/datastore/mysql/manager.py b/trove/guestagent/datastore/mysql/manager.py index 2163411b06..fe0c30a603 100644 --- a/trove/guestagent/datastore/mysql/manager.py +++ b/trove/guestagent/datastore/mysql/manager.py @@ -73,19 +73,20 @@ class Manager(periodic_task.PeriodicTasks): def is_root_enabled(self, context): return MySqlAdmin().is_root_enabled() - def _perform_restore(self, backup_id, context, restore_location, app): - LOG.info(_("Restoring database from backup %s") % backup_id) + def _perform_restore(self, backup_info, context, restore_location, app): + LOG.info(_("Restoring database from backup %s") % backup_info['id']) try: - backup.restore(context, backup_id, restore_location) + backup.restore(context, backup_info, restore_location) except Exception as e: LOG.error(e) - LOG.error("Error performing restore from backup %s", backup_id) + LOG.error("Error performing restore from backup %s", + backup_info['id']) app.status.set_status(rd_instance.ServiceStatuses.FAILED) raise LOG.info(_("Restored database successfully")) def prepare(self, context, packages, databases, memory_mb, users, - device_path=None, mount_point=None, backup_id=None, + device_path=None, mount_point=None, backup_info=None, config_contents=None, root_password=None): """Makes ready DBAAS on a Guest container.""" MySqlAppStatus.get().begin_install() @@ -104,12 +105,14 @@ class Manager(periodic_task.PeriodicTasks): device.mount(mount_point) LOG.debug(_("Mounted the volume.")) app.start_mysql() - if backup_id: - self._perform_restore(backup_id, context, CONF.mount_point, app) + if backup_info: + self._perform_restore(backup_info, context, + CONF.mount_point, app) LOG.info(_("Securing mysql now.")) app.secure(config_contents) - enable_root_on_restore = (backup_id and MySqlAdmin().is_root_enabled()) - if root_password and not backup_id: + enable_root_on_restore = (backup_info and + MySqlAdmin().is_root_enabled()) + if root_password and not backup_info: app.secure_root(secure_remote_root=True) MySqlAdmin().enable_root(root_password) MySqlAdmin().report_root_enabled(context) @@ -145,13 +148,14 @@ class Manager(periodic_task.PeriodicTasks): """ Gets the filesystem stats for the path given """ return dbaas.get_filesystem_volume_stats(fs_path) - def create_backup(self, context, backup_id): + def create_backup(self, context, backup_info): """ Entry point for initiating a backup for this guest agents db instance. The call currently blocks until the backup is complete or errors. If device_path is specified, it will be mounted based to a point specified in configuration. - :param backup_id: the db instance id of the backup task + :param backup_info: a dictionary containing the db instance id of the + backup task, location, type, and other data. """ - backup.backup(context, backup_id) + backup.backup(context, backup_info) diff --git a/trove/guestagent/datastore/service.py b/trove/guestagent/datastore/service.py index 8dfd9c3611..fed97a25b5 100644 --- a/trove/guestagent/datastore/service.py +++ b/trove/guestagent/datastore/service.py @@ -19,7 +19,9 @@ import time from trove.common import cfg +from trove.common import context from trove.common import instance as rd_instance +from trove.conductor import api as conductor_api from trove.instance import models as rd_models from trove.openstack.common import log as logging @@ -55,7 +57,9 @@ class BaseDbStatus(object): def __init__(self): if self._instance is not None: raise RuntimeError("Cannot instantiate twice.") - self.status = self._load_status().status + self.status = rd_models.InstanceServiceStatus( + instance_id=CONF.guest_id, + status=rd_instance.ServiceStatuses.NEW) self.restart_mode = False def begin_install(self): @@ -101,17 +105,17 @@ class BaseDbStatus(object): return (self.status is not None and self.status == rd_instance.ServiceStatuses.RUNNING) - @staticmethod - def _load_status(): - """Loads the status from the database.""" - inst_id = CONF.guest_id - return rd_models.InstanceServiceStatus.find_by(instance_id=inst_id) - def set_status(self, status): - """Changes the status of the DB app in the database.""" - db_status = self._load_status() - db_status.status = status - db_status.save() + """Use conductor to update the DB app status.""" + LOG.debug("Casting set_status message to conductor.") + ctxt = context.TroveContext(user=CONF.nova_proxy_admin_user, + auth_token=CONF.nova_proxy_admin_pass) + + heartbeat = { + 'service_status': status.description, + } + conductor_api.API(ctxt).heartbeat(CONF.guest_id, heartbeat) + LOG.debug("Successfully cast set_status.") self.status = status def update(self): diff --git a/trove/taskmanager/api.py b/trove/taskmanager/api.py index f96eb4fce1..aa9fd0cb94 100644 --- a/trove/taskmanager/api.py +++ b/trove/taskmanager/api.py @@ -87,11 +87,11 @@ class API(proxy.RpcProxy): self.cast(self.context, self.make_msg("delete_instance", instance_id=instance_id)) - def create_backup(self, backup_id, instance_id): + def create_backup(self, backup_info, instance_id): LOG.debug("Making async call to create a backup for instance: %s" % instance_id) self.cast(self.context, self.make_msg("create_backup", - backup_id=backup_id, + backup_info=backup_info, instance_id=instance_id)) def delete_backup(self, backup_id): diff --git a/trove/taskmanager/manager.py b/trove/taskmanager/manager.py index 0b7038cfb9..38f6078642 100644 --- a/trove/taskmanager/manager.py +++ b/trove/taskmanager/manager.py @@ -76,9 +76,9 @@ class Manager(periodic_task.PeriodicTasks): def delete_backup(self, context, backup_id): models.BackupTasks.delete_backup(context, backup_id) - def create_backup(self, context, backup_id, instance_id): + def create_backup(self, context, backup_info, instance_id): instance_tasks = models.BuiltInstanceTasks.load(context, instance_id) - instance_tasks.create_backup(backup_id) + instance_tasks.create_backup(backup_info) def create_instance(self, context, instance_id, name, flavor, image_id, databases, users, datastore_manager, diff --git a/trove/taskmanager/models.py b/trove/taskmanager/models.py index 0f7164ec78..43228bef22 100644 --- a/trove/taskmanager/models.py +++ b/trove/taskmanager/models.py @@ -17,6 +17,7 @@ import os.path from cinderclient import exceptions as cinder_exceptions from eventlet import greenthread from novaclient import exceptions as nova_exceptions +from trove.backup import models as bkup_models from trove.common import cfg from trove.common import template from trove.common import utils @@ -47,7 +48,6 @@ from trove.openstack.common.gettextutils import _ from trove.openstack.common.notifier import api as notifier from trove.openstack.common import timeutils import trove.common.remote as remote -import trove.backup.models LOG = logging.getLogger(__name__) CONF = cfg.CONF @@ -193,8 +193,16 @@ class FreshInstanceTasks(FreshInstance, NotifyMixin, ConfigurationMixin): config = self._render_config(datastore_manager, flavor, self.id) if server: + backup_info = None + if backup_id is not None: + backup = bkup_models.Backup.get_by_id(self.context, backup_id) + backup_info = {'id': backup_id, + 'location': backup.location, + 'type': backup.backup_type, + 'checksum': backup.checksum, + } self._guest_prepare(server, flavor['ram'], volume_info, - packages, databases, users, backup_id, + packages, databases, users, backup_info, config.config_contents, root_password) if not self.db_info.task_status.is_error: @@ -507,14 +515,14 @@ class FreshInstanceTasks(FreshInstance, NotifyMixin, ConfigurationMixin): return server def _guest_prepare(self, server, flavor_ram, volume_info, - packages, databases, users, backup_id=None, + packages, databases, users, backup_info=None, config_contents=None, root_password=None): LOG.info(_("Entering guest_prepare")) # Now wait for the response from the create to do additional work self.guest.prepare(flavor_ram, packages, databases, users, device_path=volume_info['device_path'], mount_point=volume_info['mount_point'], - backup_id=backup_id, + backup_info=backup_info, config_contents=config_contents, root_password=root_password) @@ -733,9 +741,9 @@ class BuiltInstanceTasks(BuiltInstance, NotifyMixin, ConfigurationMixin): action = MigrateAction(self, host) action.execute() - def create_backup(self, backup_id): + def create_backup(self, backup_info): LOG.debug(_("Calling create_backup %s ") % self.id) - self.guest.create_backup(backup_id) + self.guest.create_backup(backup_info) def reboot(self): try: @@ -836,7 +844,7 @@ class BackupTasks(object): @classmethod def delete_backup(cls, context, backup_id): #delete backup from swift - backup = trove.backup.models.Backup.get_by_id(context, backup_id) + backup = bkup_models.Backup.get_by_id(context, backup_id) try: filename = backup.filename if filename: @@ -849,8 +857,8 @@ class BackupTasks(object): backup.delete() else: LOG.exception(_("Exception deleting from swift. " - "Details: %s") % e) - backup.state = trove.backup.models.BackupState.DELETE_FAILED + "Details: %s") % e) + backup.state = bkup_models.BackupState.DELETE_FAILED backup.save() raise TroveError("Failed to delete swift objects") else: diff --git a/trove/tests/api/instances_resize.py b/trove/tests/api/instances_resize.py index cf15df68f6..fdea966ae7 100644 --- a/trove/tests/api/instances_resize.py +++ b/trove/tests/api/instances_resize.py @@ -74,7 +74,8 @@ class ResizeTestBase(TestCase): try: self.instance.update_db(task_status=InstanceTasks.NONE) self.mock.ReplayAll() - self.assertRaises(Exception, self.action.execute) + excs = (Exception) + self.assertRaises(excs, self.action.execute) self.mock.VerifyAll() finally: self.mock.UnsetStubs() diff --git a/trove/tests/fakes/guestagent.py b/trove/tests/fakes/guestagent.py index e2eeffb4d7..644bfef9f1 100644 --- a/trove/tests/fakes/guestagent.py +++ b/trove/tests/fakes/guestagent.py @@ -208,7 +208,7 @@ class FakeGuest(object): return self.users.get((username, hostname), None) def prepare(self, memory_mb, packages, databases, users, device_path=None, - mount_point=None, backup_id=None, config_contents=None, + mount_point=None, backup_info=None, config_contents=None, root_password=None): from trove.instance.models import DBInstance from trove.instance.models import InstanceServiceStatus @@ -296,9 +296,10 @@ class FakeGuest(object): } for db in current_grants] return dbs - def create_backup(self, backup_id): + def create_backup(self, backup_info): from trove.backup.models import Backup, BackupState - backup = Backup.get_by_id(context=None, backup_id=backup_id) + backup = Backup.get_by_id(context=None, + backup_id=backup_info['id']) def finish_create_backup(): backup.state = BackupState.COMPLETED diff --git a/trove/tests/unittests/backup/test_backupagent.py b/trove/tests/unittests/backup/test_backupagent.py index d0e183abcc..a2091a1949 100644 --- a/trove/tests/unittests/backup/test_backupagent.py +++ b/trove/tests/unittests/backup/test_backupagent.py @@ -12,16 +12,18 @@ #See the License for the specific language governing permissions and #limitations under the License. -import hashlib -import os - -import testtools +from mock import Mock +from mockito import when, verify, unstub, mock, any, contains from testtools.matchers import Equals, Is from webob.exc import HTTPNotFound -from mockito import when, verify, unstub, mock, any, contains + +import hashlib +import os +import testtools from trove.common import utils from trove.common.context import TroveContext +from trove.conductor import api as conductor_api from trove.guestagent.strategies.backup import mysql_impl from trove.guestagent.strategies.restore.base import RestoreRunner from trove.backup.models import DBBackup @@ -33,6 +35,8 @@ from trove.guestagent.strategies.backup.base import BackupRunner from trove.guestagent.strategies.backup.base import UnknownBackupType from trove.guestagent.strategies.storage.base import Storage +conductor_api.API.update_backup = Mock() + def create_fake_data(): from random import choice @@ -216,48 +220,60 @@ class BackupAgentTest(testtools.TestCase): starts storage reports status """ - backup = mock(DBBackup) - when(DatabaseModelBase).find_by(id='123').thenReturn(backup) - when(backup).save().thenReturn(backup) - agent = backupagent.BackupAgent() - agent.execute_backup(context=None, backup_id='123', runner=MockBackup) + backup_info = {'id': '123', + 'location': 'fake-location', + 'type': 'InnoBackupEx', + 'checksum': 'fake-checksum', + } + agent.execute_backup(context=None, backup_info=backup_info, + runner=MockBackup) - verify(DatabaseModelBase).find_by(id='123') - self.assertThat(backup.state, Is(BackupState.COMPLETED)) - self.assertThat(backup.location, - Equals('http://mockswift/v1/database_backups/123')) - verify(backup, times=3).save() + self.assertTrue( + conductor_api.API.update_backup.called_once_with( + any(), + backup_id=backup_info['id'], + state=BackupState.NEW)) + + self.assertTrue( + conductor_api.API.update_backup.called_once_with( + any(), + backup_id=backup_info['id'], + size=any(), + state=BackupState.BUILDING)) + + self.assertTrue( + conductor_api.API.update_backup.called_once_with( + any(), + backup_id=backup_info['id'], + checksum=any(), + location=any(), + note=any(), + backup_type=backup_info['type'], + state=BackupState.COMPLETED)) def test_execute_lossy_backup(self): """This test verifies that incomplete writes to swift will fail.""" - backup = mock(DBBackup) when(backupagent).get_auth_password().thenReturn('secret') - when(DatabaseModelBase).find_by(id='123').thenReturn(backup) - when(backup).save().thenReturn(backup) when(MockSwift).save(any(), any()).thenReturn((False, 'Error', 'y', 'z')) agent = backupagent.BackupAgent() + backup_info = {'id': '123', + 'location': 'fake-location', + 'type': 'InnoBackupEx', + 'checksum': 'fake-checksum', + } self.assertRaises(backupagent.BackupError, agent.execute_backup, - context=None, backup_id='123', + context=None, backup_info=backup_info, runner=MockLossyBackup) - self.assertThat(backup.state, Is(BackupState.FAILED)) - verify(backup, times=3).save() - - def test_execute_backup_model_exception(self): - """This test should ensure backup agent - properly handles condition where backup model is not found - """ - when(DatabaseModelBase).find_by(id='123').thenRaise(ModelNotFoundError) - - agent = backupagent.BackupAgent() - # probably should catch this exception and return a backup exception - # also note that since the model is not found there is no way to report - # this error - self.assertRaises(ModelNotFoundError, agent.execute_backup, - context=None, backup_id='123') + #self.assertThat(backup.state, Is(BackupState.FAILED)) + self.assertTrue( + conductor_api.API.update_backup.called_once_with( + any(), + backup_id=backup_info['id'], + state=BackupState.FAILED)) def test_execute_restore(self): """This test should ensure backup agent @@ -282,7 +298,12 @@ class BackupAgentTest(testtools.TestCase): agent = backupagent.BackupAgent() - agent.execute_restore(TroveContext(), '123', '/var/lib/mysql') + bkup_info = {'id': '123', + 'location': 'fake-location', + 'type': 'InnoBackupEx', + 'checksum': 'fake-checksum', + } + agent.execute_restore(TroveContext(), bkup_info, '/var/lib/mysql') def test_restore_unknown(self): backup = mock(DBBackup) @@ -296,6 +317,11 @@ class BackupAgentTest(testtools.TestCase): agent = backupagent.BackupAgent() + bkup_info = {'id': '123', + 'location': backup.location, + 'type': backup.backup_type, + 'checksum': 'fake-checksum', + } self.assertRaises(UnknownBackupType, agent.execute_restore, - context=None, backup_id='123', + context=None, backup_info=bkup_info, restore_location='/var/lib/mysql') diff --git a/trove/tests/unittests/conductor/__init__.py b/trove/tests/unittests/conductor/__init__.py new file mode 100644 index 0000000000..f0ec14305d --- /dev/null +++ b/trove/tests/unittests/conductor/__init__.py @@ -0,0 +1,13 @@ +# Copyright 2013 OpenStack Foundation +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. diff --git a/trove/tests/unittests/conductor/test_methods.py b/trove/tests/unittests/conductor/test_methods.py new file mode 100644 index 0000000000..a5c808df8d --- /dev/null +++ b/trove/tests/unittests/conductor/test_methods.py @@ -0,0 +1,148 @@ +# Copyright 2013 OpenStack Foundation +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import testtools +from datetime import datetime +from mockito import unstub +from trove.backup import models as bkup_models +from trove.common import context +from trove.common import exception as t_exception +from trove.common import instance as t_instance +from trove.conductor import api as conductor_api +from trove.conductor import manager as conductor_manager +from trove.instance import models as t_models +from trove.instance.tasks import InstanceTasks +from trove.tests.unittests.util import util +from uuid import uuid4 + + +# See LP bug #1255178 +OLD_DBB_SAVE = bkup_models.DBBackup.save + + +def generate_uuid(length=16): + uuid = [] + while len(''.join(uuid)) < length: + uuid.append(str(uuid4())) + return (''.join(uuid))[:length] + + +class ConductorMethodTests(testtools.TestCase): + def setUp(self): + # See LP bug #1255178 + bkup_models.DBBackup.save = OLD_DBB_SAVE + super(ConductorMethodTests, self).setUp() + util.init_db() + self.cond_mgr = conductor_manager.Manager() + self.instance_id = generate_uuid() + + def tearDown(self): + super(ConductorMethodTests, self).tearDown() + unstub() + + def _create_iss(self): + new_id = generate_uuid() + iss = t_models.InstanceServiceStatus( + id=new_id, + instance_id=self.instance_id, + status=t_instance.ServiceStatuses.NEW) + iss.save() + return new_id + + def _get_iss(self, id): + return t_models.InstanceServiceStatus.find_by(id=id) + + def _create_backup(self, name='fake backup'): + new_id = generate_uuid() + backup = bkup_models.DBBackup.create( + id=new_id, + name=name, + description='This is a fake backup object.', + tenant_id=generate_uuid(), + state=bkup_models.BackupState.NEW, + instance_id=self.instance_id) + backup.save() + return new_id + + def _get_backup(self, id): + return bkup_models.DBBackup.find_by(id=id) + + # --- Tests for heartbeat --- + + def test_heartbeat_instance_not_found(self): + new_id = generate_uuid() + self.assertRaises(t_exception.ModelNotFoundError, + self.cond_mgr.heartbeat, None, new_id, {}) + + def test_heartbeat_instance_no_changes(self): + iss_id = self._create_iss() + old_iss = self._get_iss(iss_id) + self.cond_mgr.heartbeat(None, self.instance_id, {}) + new_iss = self._get_iss(iss_id) + self.assertEqual(old_iss.status_id, new_iss.status_id) + self.assertEqual(old_iss.status_description, + new_iss.status_description) + + def test_heartbeat_instance_status_bogus_change(self): + iss_id = self._create_iss() + old_iss = self._get_iss(iss_id) + new_status = 'potato salad' + payload = { + 'service_status': new_status, + } + self.assertRaises(ValueError, self.cond_mgr.heartbeat, + None, self.instance_id, payload) + new_iss = self._get_iss(iss_id) + self.assertEqual(old_iss.status_id, new_iss.status_id) + self.assertEqual(old_iss.status_description, + new_iss.status_description) + + def test_heartbeat_instance_status_changed(self): + iss_id = self._create_iss() + payload = {'service_status': 'building'} + self.cond_mgr.heartbeat(None, self.instance_id, payload) + iss = self._get_iss(iss_id) + self.assertEqual(t_instance.ServiceStatuses.BUILDING, iss.status) + + # --- Tests for update_backup --- + + def test_backup_not_found(self): + new_bkup_id = generate_uuid() + self.assertRaises(t_exception.ModelNotFoundError, + self.cond_mgr.update_backup, + None, self.instance_id, new_bkup_id) + + def test_backup_instance_id_nomatch(self): + new_iid = generate_uuid() + bkup_id = self._create_backup('nomatch') + old_name = self._get_backup(bkup_id).name + self.cond_mgr.update_backup(None, new_iid, bkup_id, + name="remains unchanged") + bkup = self._get_backup(bkup_id) + self.assertEqual(old_name, bkup.name) + + def test_backup_bogus_fields_not_changed(self): + bkup_id = self._create_backup('bogus') + self.cond_mgr.update_backup(None, self.instance_id, bkup_id, + not_a_valid_field="INVALID") + bkup = self._get_backup(bkup_id) + self.assertFalse(hasattr(bkup, 'not_a_valid_field')) + + def test_backup_real_fields_changed(self): + bkup_id = self._create_backup('realrenamed') + new_name = "recently renamed" + self.cond_mgr.update_backup(None, self.instance_id, bkup_id, + name=new_name) + bkup = self._get_backup(bkup_id) + self.assertEqual(new_name, bkup.name) diff --git a/trove/tests/unittests/guestagent/test_api.py b/trove/tests/unittests/guestagent/test_api.py index 31195b2928..ec25d3b184 100644 --- a/trove/tests/unittests/guestagent/test_api.py +++ b/trove/tests/unittests/guestagent/test_api.py @@ -237,9 +237,9 @@ class ApiTest(testtools.TestCase): self._verify_rpc_call(exp_msg) def test_create_backup(self): - exp_msg = RpcMsgMatcher('create_backup', 'backup_id') + exp_msg = RpcMsgMatcher('create_backup', 'backup_info') self._mock_rpc_cast(exp_msg) - self.api.create_backup('123') + self.api.create_backup({'id': '123'}) self._verify_rpc_cast(exp_msg) def _verify_rpc_connection_and_cast(self, rpc, mock_conn, exp_msg): @@ -254,8 +254,8 @@ class ApiTest(testtools.TestCase): when(mock_conn).create_consumer(any(), any(), any()).thenReturn(None) exp_msg = RpcMsgMatcher('prepare', 'memory_mb', 'packages', 'databases', 'users', 'device_path', - 'mount_point', 'backup_id', 'config_contents', - 'root_password') + 'mount_point', 'backup_info', + 'config_contents', 'root_password') when(rpc).cast(any(), any(), exp_msg).thenReturn(None) @@ -270,12 +270,13 @@ class ApiTest(testtools.TestCase): when(mock_conn).create_consumer(any(), any(), any()).thenReturn(None) exp_msg = RpcMsgMatcher('prepare', 'memory_mb', 'packages', 'databases', 'users', 'device_path', - 'mount_point', 'backup_id', 'config_contents', - 'root_password') + 'mount_point', 'backup_info', + 'config_contents', 'root_password') when(rpc).cast(any(), any(), exp_msg).thenReturn(None) + bkup = {'id': 'backup_id_123'} self.api.prepare('2048', 'package1', 'db1', 'user1', '/dev/vdt', - '/mnt/opt', 'backup_id_123', 'cont', '1-2-3-4') + '/mnt/opt', bkup, 'cont', '1-2-3-4') self._verify_rpc_connection_and_cast(rpc, mock_conn, exp_msg) diff --git a/trove/tests/unittests/guestagent/test_dbaas.py b/trove/tests/unittests/guestagent/test_dbaas.py index 750d8a3b24..2e49c477ce 100644 --- a/trove/tests/unittests/guestagent/test_dbaas.py +++ b/trove/tests/unittests/guestagent/test_dbaas.py @@ -13,7 +13,7 @@ # under the License. import os -from random import randint +from uuid import uuid4 import time from mock import Mock from mock import MagicMock @@ -37,6 +37,7 @@ import trove from trove.common.context import TroveContext from trove.common import utils from trove.common import instance as rd_instance +from trove.conductor import api as conductor_api import trove.guestagent.datastore.mysql.service as dbaas from trove.guestagent import dbaas as dbaas_sr from trove.guestagent import pkg @@ -65,6 +66,9 @@ FAKE_USER = [{"_name": "random", "_password": "guesswhat", "_databases": [FAKE_DB]}] +conductor_api.API.heartbeat = Mock() + + class FakeAppStatus(MySqlAppStatus): def __init__(self, id, status): @@ -74,9 +78,6 @@ class FakeAppStatus(MySqlAppStatus): def _get_actual_db_status(self): return self.next_fake_status - def _load_status(self): - return InstanceServiceStatus.find_by(instance_id=self.id) - def set_next_status(self, next_status): self.next_fake_status = next_status @@ -455,7 +456,7 @@ class MySqlAppTest(testtools.TestCase): self.orig_utils_execute_with_timeout = dbaas.utils.execute_with_timeout self.orig_time_sleep = dbaas.time.sleep util.init_db() - self.FAKE_ID = randint(1, 10000) + self.FAKE_ID = str(uuid4()) InstanceServiceStatus.create(instance_id=self.FAKE_ID, status=rd_instance.ServiceStatuses.NEW) self.appStatus = FakeAppStatus(self.FAKE_ID, @@ -523,7 +524,8 @@ class MySqlAppTest(testtools.TestCase): rd_instance.ServiceStatuses.SHUTDOWN) self.mySqlApp.stop_db(True) - self.assert_reported_status(rd_instance.ServiceStatuses.SHUTDOWN) + self.assertTrue(conductor_api.API.heartbeat.called_once_with( + self.FAKE_ID, {'service_status': 'shutdown'})) def test_stop_mysql_error(self): @@ -543,7 +545,8 @@ class MySqlAppTest(testtools.TestCase): self.assertTrue(self.mySqlApp.stop_db.called) self.assertTrue(self.mySqlApp.start_mysql.called) - self.assert_reported_status(rd_instance.ServiceStatuses.RUNNING) + self.assertTrue(conductor_api.API.heartbeat.called_once_with( + self.FAKE_ID, {'service_status': 'running'})) def test_restart_mysql_wont_start_up(self): @@ -589,8 +592,9 @@ class MySqlAppTest(testtools.TestCase): self.mySqlApp._enable_mysql_on_boot = Mock() self.appStatus.set_next_status(rd_instance.ServiceStatuses.RUNNING) - self.mySqlApp.start_mysql(True) - self.assert_reported_status(rd_instance.ServiceStatuses.RUNNING) + self.mySqlApp.start_mysql(update_db=True) + self.assertTrue(conductor_api.API.heartbeat.called_once_with( + self.FAKE_ID, {'service_status': 'running'})) def test_start_mysql_runs_forever(self): @@ -600,7 +604,8 @@ class MySqlAppTest(testtools.TestCase): self.appStatus.set_next_status(rd_instance.ServiceStatuses.SHUTDOWN) self.assertRaises(RuntimeError, self.mySqlApp.start_mysql) - self.assert_reported_status(rd_instance.ServiceStatuses.SHUTDOWN) + self.assertTrue(conductor_api.API.heartbeat.called_once_with( + self.FAKE_ID, {'service_status': 'shutdown'})) def test_start_mysql_error(self): @@ -1003,7 +1008,7 @@ class BaseDbStatusTest(testtools.TestCase): super(BaseDbStatusTest, self).setUp() util.init_db() self.orig_dbaas_time_sleep = dbaas.time.sleep - self.FAKE_ID = randint(1, 10000) + self.FAKE_ID = str(uuid4()) InstanceServiceStatus.create(instance_id=self.FAKE_ID, status=rd_instance.ServiceStatuses.NEW) dbaas.CONF.guest_id = self.FAKE_ID @@ -1122,7 +1127,7 @@ class MySqlAppStatusTest(testtools.TestCase): self.orig_load_mysqld_options = dbaas.load_mysqld_options self.orig_dbaas_os_path_exists = dbaas.os.path.exists self.orig_dbaas_time_sleep = dbaas.time.sleep - self.FAKE_ID = randint(1, 10000) + self.FAKE_ID = str(uuid4()) InstanceServiceStatus.create(instance_id=self.FAKE_ID, status=rd_instance.ServiceStatuses.NEW) dbaas.CONF.guest_id = self.FAKE_ID diff --git a/trove/tests/unittests/guestagent/test_manager.py b/trove/tests/unittests/guestagent/test_manager.py index ca86790cc8..0bf0af5269 100644 --- a/trove/tests/unittests/guestagent/test_manager.py +++ b/trove/tests/unittests/guestagent/test_manager.py @@ -140,6 +140,13 @@ class GuestAgentManagerTest(testtools.TestCase): # covering all outcomes is starting to cause trouble here COUNT = 1 if device_path else 0 + backup_info = None + if backup_id is not None: + backup_info = {'id': backup_id, + 'location': 'fake-location', + 'type': 'InnoBackupEx', + 'checksum': 'fake-checksum', + } # TODO(juice): this should stub an instance of the MySqlAppStatus mock_status = mock() @@ -150,8 +157,10 @@ class GuestAgentManagerTest(testtools.TestCase): when(VolumeDevice).mount().thenReturn(None) when(dbaas.MySqlApp).stop_db().thenReturn(None) when(dbaas.MySqlApp).start_mysql().thenReturn(None) - when(dbaas.MySqlApp).install_if_needed().thenReturn(None) - when(backup).restore(self.context, backup_id).thenReturn(None) + when(dbaas.MySqlApp).install_if_needed(any()).thenReturn(None) + when(backup).restore(self.context, + backup_info, + '/var/lib/mysql').thenReturn(None) when(dbaas.MySqlApp).secure(any()).thenReturn(None) when(dbaas.MySqlApp).secure_root(any()).thenReturn(None) (when(pkg.Package).pkg_is_installed(any()). @@ -164,12 +173,14 @@ class GuestAgentManagerTest(testtools.TestCase): when(os.path).exists(any()).thenReturn(True) # invocation - self.manager.prepare(context=self.context, packages=None, + self.manager.prepare(context=self.context, + packages=None, + memory_mb='2048', databases=None, - memory_mb='2048', users=None, + users=None, device_path=device_path, mount_point='/var/lib/mysql', - backup_id=backup_id) + backup_info=backup_info) # verification/assertion verify(mock_status).begin_install() @@ -177,8 +188,9 @@ class GuestAgentManagerTest(testtools.TestCase): verify(dbaas.MySqlApp, times=COUNT).stop_db() verify(VolumeDevice, times=COUNT).migrate_data( any()) - if backup_id: - verify(backup).restore(self.context, backup_id, '/var/lib/mysql') + if backup_info: + verify(backup).restore(self.context, backup_info, '/var/lib/mysql') + verify(dbaas.MySqlApp).install_if_needed(any()) # We dont need to make sure the exact contents are there verify(dbaas.MySqlApp).secure(any()) verify(dbaas.MySqlAdmin, never).create_database() diff --git a/trove/tests/unittests/quota/test_quota.py b/trove/tests/unittests/quota/test_quota.py index c73dcf8fa5..8d0ee4fdab 100644 --- a/trove/tests/unittests/quota/test_quota.py +++ b/trove/tests/unittests/quota/test_quota.py @@ -68,9 +68,9 @@ class Run_with_quotasTest(testtools.TestCase): def test_run_with_quotas_error(self): - f = Mock(side_effect=Exception()) + f = Mock(side_effect=exception.TroveError()) - self.assertRaises(Exception, run_with_quotas, FAKE_TENANT1, + self.assertRaises(exception.TroveError, run_with_quotas, FAKE_TENANT1, {'instances': 1, 'volumes': 5}, f) self.assertTrue(QUOTAS.reserve.called) self.assertTrue(QUOTAS.rollback.called) diff --git a/trove/tests/unittests/taskmanager/test_models.py b/trove/tests/unittests/taskmanager/test_models.py index 9740f4edc1..af2e30313a 100644 --- a/trove/tests/unittests/taskmanager/test_models.py +++ b/trove/tests/unittests/taskmanager/test_models.py @@ -17,12 +17,12 @@ from testtools.matchers import Equals from mockito import mock, when, unstub, any, verify, never from trove.datastore import models as datastore_models from trove.taskmanager import models as taskmanager_models -import trove.common.remote as remote +from trove.backup import models as backup_models +from trove.common import remote from trove.common.instance import ServiceStatuses from trove.instance.models import InstanceServiceStatus from trove.instance.models import DBInstance from trove.instance.tasks import InstanceTasks -import trove.backup.models as backup_models from trove.common.exception import TroveError from swiftclient.client import ClientException from tempfile import NamedTemporaryFile