From c61a3bf5f90291d3bece56aef0432d4b7af03c27 Mon Sep 17 00:00:00 2001 From: Sushil Kumar Date: Wed, 18 Mar 2015 13:59:08 +0000 Subject: [PATCH] Implement clustering for Vertica datastore A specification for this change was submitted for review in https://review.openstack.org/#/c/151279 - HP Vertica Community Edition supports upto a 3-node cluster. - HP Vertica requires a minimum of 3 nodes to achieve fault tolerance. - This patchset provides ability to launch HP Vertica 3-node cluster. - The cluster-show API, would also list the IPs of underlying instances. Code Added: - Added API strategy, taskmanager strategy, and guestagent strategy. - Included unit tests. Workflow for building Vertica cluster is as follows: - Guest instances are booted using new API strategy which then sends control to taskmanager strategy for further communication and guestagent API execution. - Once the guest instances are active in nova, they receive "prepare" message and following steps are performed: - Mount the data disk on device_path. - Check if vertica packages have been installed, install_if_needed(). - Run Vertica pre-install test, prepare_for_install_vertica(). - Get to a status BUILD_PENDING. - Cluster-Taskmanager strategy waits for all the instances in cluster to get to BUILD_PENDING state. - Once all instances in a cluster get to BUILD_PENDING state, taskmanager first, configures passwordless ssh for os-users(root, dbadmin) with the help of guestagent APIs get_keys and authroize_keys. - Once passwordless ssh has been configured, the taskmanager calls install_cluster guestagent API, which installs cluster on member instances and creates a database on the cluster. - Once this method finishes its job then taskmanager calls another guestagent API cluster_complete to notify cluster member of completion of cluster creation. New Files: - A new directory, vertica, has been created, for api, taskmanager, guestagent strategies under trove/common/strategies/cluster/experimental. - Unit-tests for cluster-controller, api and taskmanager code. DocImpact Change-Id: Ide30d1d2a136c7e638532a115db5ff5ab2a75e72 Implements: blueprint implement-vertica-cluster --- etc/trove/trove-guestagent.conf.sample | 6 + etc/trove/trove-taskmanager.conf.sample | 8 + etc/trove/trove.conf.sample | 9 + trove/common/cfg.py | 24 +- .../cluster/experimental/vertica/__init__.py | 0 .../cluster/experimental/vertica/api.py | 189 ++++++++++ .../experimental/vertica/guestagent.py | 52 +++ .../experimental/vertica/taskmanager.py | 191 ++++++++++ .../datastore/experimental/vertica/manager.py | 36 +- .../datastore/experimental/vertica/service.py | 69 ++++ .../datastore/experimental/vertica/system.py | 5 + .../test_cluster_vertica_controller.py | 345 ++++++++++++++++++ .../unittests/cluster/test_vertica_cluster.py | 194 ++++++++++ .../tests/unittests/guestagent/test_dbaas.py | 33 ++ .../guestagent/test_vertica_manager.py | 49 +++ .../taskmanager/test_vertica_clusters.py | 123 +++++++ 16 files changed, 1328 insertions(+), 5 deletions(-) create mode 100644 trove/common/strategies/cluster/experimental/vertica/__init__.py create mode 100644 trove/common/strategies/cluster/experimental/vertica/api.py create mode 100644 trove/common/strategies/cluster/experimental/vertica/guestagent.py create mode 100644 trove/common/strategies/cluster/experimental/vertica/taskmanager.py create mode 100644 trove/tests/unittests/cluster/test_cluster_vertica_controller.py create mode 100644 trove/tests/unittests/cluster/test_vertica_cluster.py create mode 100644 trove/tests/unittests/taskmanager/test_vertica_clusters.py diff --git a/etc/trove/trove-guestagent.conf.sample b/etc/trove/trove-guestagent.conf.sample index 3948947d17..44c0576634 100644 --- a/etc/trove/trove-guestagent.conf.sample +++ b/etc/trove/trove-guestagent.conf.sample @@ -125,3 +125,9 @@ log_file = logfile.txt # replication_namespace = trove.guestagent.strategies.replication.mysql_binlog # replication_user = slave_user # replication_password = slave_password + +[vertica] +# For vertica, following are the defaults needed: +# mount_point = /var/lib/vertica +# readahead_size = 2048 +# guestagent_strategy = trove.common.strategies.cluster.experimental.vertica.guestagent.VerticaGuestAgentStrategy diff --git a/etc/trove/trove-taskmanager.conf.sample b/etc/trove/trove-taskmanager.conf.sample index a4c600f4f0..c0c6551cb8 100644 --- a/etc/trove/trove-taskmanager.conf.sample +++ b/etc/trove/trove-taskmanager.conf.sample @@ -243,3 +243,11 @@ device_path = /dev/vdb [mongodb] volume_support = True device_path = /dev/vdb + +[vertica] +tcp_ports = 5433, 5434, 22, 5444, 5450, 4803 +udp_ports = 5433, 4803, 4804, 6453 +volume_support = True +device_path = /dev/vdb +mount_point = /var/lib/vertica +taskmanager_strategy = trove.common.strategies.cluster.experimental.vertica.taskmanager.VerticaTaskManagerStrategy diff --git a/etc/trove/trove.conf.sample b/etc/trove/trove.conf.sample index 40b07369af..959e53bfbe 100644 --- a/etc/trove/trove.conf.sample +++ b/etc/trove/trove.conf.sample @@ -241,3 +241,12 @@ volume_support = True device_path = /dev/vdb num_config_servers_per_cluster = 1 num_query_routers_per_cluster = 1 + +[vertica] +tcp_ports = 5433, 5434, 22, 5444, 5450, 4803 +udp_ports = 5433, 4803, 4804, 6453 +volume_support = True +device_path = /dev/vdb +cluster_support = True +cluster_member_count = 3 +api_strategy = trove.common.strategies.cluster.experimental.vertica.api.VerticaAPIStrategy diff --git a/trove/common/cfg.py b/trove/common/cfg.py index c124da6459..cbe3616f51 100644 --- a/trove/common/cfg.py +++ b/trove/common/cfg.py @@ -822,11 +822,13 @@ vertica_group = cfg.OptGroup( 'vertica', title='Vertica options', help="Oslo option group designed for Vertica datastore") vertica_opts = [ - cfg.ListOpt('tcp_ports', default=["5433"], + cfg.ListOpt('tcp_ports', + default=["5433", "5434", "22", "5444", "5450", "4803"], help='List of TCP ports and/or port ranges to open ' 'in the security group (only applicable ' 'if trove_security_groups_support is True).'), - cfg.ListOpt('udp_ports', default=["5433"], + cfg.ListOpt('udp_ports', + default=["5433", "4803", "4804", "6453"], help='List of UDP ports and/or port ranges to open ' 'in the security group (only applicable ' 'if trove_security_groups_support is True).'), @@ -851,6 +853,24 @@ vertica_opts = [ help='Namespace to load restore strategies from.'), cfg.IntOpt('readahead_size', default=2048, help='Size(MB) to be set as readahead_size for data volume'), + cfg.BoolOpt('cluster_support', default=True, + help='Enable clusters to be created and managed.'), + cfg.IntOpt('cluster_member_count', default=3, + help='Number of members in Vertica cluster.'), + cfg.StrOpt('api_strategy', + default='trove.common.strategies.cluster.experimental.vertica.' + 'api.VerticaAPIStrategy', + help='Class that implements datastore-specific API logic.'), + cfg.StrOpt('taskmanager_strategy', + default='trove.common.strategies.cluster.experimental.vertica.' + 'taskmanager.VerticaTaskManagerStrategy', + help='Class that implements datastore-specific task manager ' + 'logic.'), + cfg.StrOpt('guestagent_strategy', + default='trove.common.strategies.cluster.experimental.vertica.' + 'guestagent.VerticaGuestAgentStrategy', + help='Class that implements datastore-specific Guest Agent API ' + 'logic.'), ] # RPC version groups diff --git a/trove/common/strategies/cluster/experimental/vertica/__init__.py b/trove/common/strategies/cluster/experimental/vertica/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/trove/common/strategies/cluster/experimental/vertica/api.py b/trove/common/strategies/cluster/experimental/vertica/api.py new file mode 100644 index 0000000000..4cbe471f44 --- /dev/null +++ b/trove/common/strategies/cluster/experimental/vertica/api.py @@ -0,0 +1,189 @@ +#Copyright [2015] Hewlett-Packard Development Company, L.P. +#Licensed under the Apache License, Version 2.0 (the "License"); +#you may not use this file except in compliance with the License. +#You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +#Unless required by applicable law or agreed to in writing, software +#distributed under the License is distributed on an "AS IS" BASIS, +#WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +#See the License for the specific language governing permissions and +#limitations under the License. + +from novaclient import exceptions as nova_exceptions + +from trove.cluster import models +from trove.cluster.tasks import ClusterTasks +from trove.cluster.views import ClusterView +from trove.common import cfg +from trove.common import exception +from trove.common import remote +from trove.common.strategies.cluster import base +from trove.common.views import create_links +from trove.extensions.mgmt.clusters.views import MgmtClusterView +from trove.instance import models as inst_models +from trove.openstack.common import log as logging +from trove.quota.quota import check_quotas +from trove.taskmanager import api as task_api + + +LOG = logging.getLogger(__name__) +CONF = cfg.CONF + + +class VerticaAPIStrategy(base.BaseAPIStrategy): + + @property + def cluster_class(self): + return VerticaCluster + + @property + def cluster_controller_actions(self): + return {} + + @property + def cluster_view_class(self): + return VerticaClusterView + + @property + def mgmt_cluster_view_class(self): + return VerticaMgmtClusterView + + +class VerticaCluster(models.Cluster): + + @classmethod + def create(cls, context, name, datastore, datastore_version, instances): + LOG.debug("Initiating cluster creation.") + vertica_conf = CONF.get(datastore_version.manager) + num_instances = len(instances) + + # Matching number of instances with configured cluster_member_count + if num_instances != vertica_conf.cluster_member_count: + raise exception.ClusterNumInstancesNotSupported( + num_instances=vertica_conf.cluster_member_count) + + # Checking flavors + flavor_ids = [instance['flavor_id'] for instance in instances] + if len(set(flavor_ids)) != 1: + raise exception.ClusterFlavorsNotEqual() + flavor_id = flavor_ids[0] + nova_client = remote.create_nova_client(context) + try: + flavor = nova_client.flavors.get(flavor_id) + except nova_exceptions.NotFound: + raise exception.FlavorNotFound(uuid=flavor_id) + deltas = {'instances': num_instances} + + # Checking volumes + volume_sizes = [instance['volume_size'] for instance in instances + if instance.get('volume_size', None)] + volume_size = None + if vertica_conf.volume_support: + if len(volume_sizes) != num_instances: + raise exception.ClusterVolumeSizeRequired() + if len(set(volume_sizes)) != 1: + raise exception.ClusterVolumeSizesNotEqual() + volume_size = volume_sizes[0] + models.validate_volume_size(volume_size) + deltas['volumes'] = volume_size * num_instances + else: + if len(volume_sizes) > 0: + raise exception.VolumeNotSupported() + ephemeral_support = vertica_conf.device_path + if ephemeral_support and flavor.ephemeral == 0: + raise exception.LocalStorageNotSpecified(flavor=flavor_id) + + check_quotas(context.tenant, deltas) + + # Updating Cluster Task + db_info = models.DBCluster.create( + name=name, tenant_id=context.tenant, + datastore_version_id=datastore_version.id, + task_status=ClusterTasks.BUILDING_INITIAL) + + member_config = {"id": db_info.id, + "instance_type": "member"} + + # Creating member instances + for i in range(1, num_instances + 1): + instance_name = "%s-member-%s" % (name, str(i)) + inst_models.Instance.create(context, instance_name, + flavor_id, + datastore_version.image_id, + [], [], datastore, + datastore_version, + volume_size, None, + availability_zone=None, + nics=None, + configuration_id=None, + cluster_config=member_config) + + # Calling taskmanager to further proceed for cluster-configuration + task_api.load(context, datastore_version.manager).create_cluster( + db_info.id) + + return VerticaCluster(context, db_info, datastore, datastore_version) + + +class VerticaClusterView(ClusterView): + + def build_instances(self): + instances = [] + ip_list = [] + if self.load_servers: + cluster_instances = self.cluster.instances + else: + cluster_instances = self.cluster.instances_without_server + for instance in cluster_instances: + if instance.type != 'member': + continue + instance_dict = { + "id": instance.id, + "name": instance.name, + "links": create_links("instances", self.req, instance.id) + } + if self.load_servers: + instance_dict["status"] = instance.status + if CONF.get(instance.datastore_version.manager).volume_support: + instance_dict["volume"] = {"size": instance.volume_size} + instance_dict["flavor"] = self._build_flavor_info( + instance.flavor_id) + instance_ips = instance.get_visible_ip_addresses() + if instance_ips: + instance_dict["ip"] = instance_ips + ip_list.append(instance_ips[0]) + instances.append(instance_dict) + ip_list.sort() + return instances, ip_list + + +class VerticaMgmtClusterView(MgmtClusterView): + + def build_instances(self): + instances = [] + ip_list = [] + if self.load_servers: + cluster_instances = self.cluster.instances + else: + cluster_instances = self.cluster.instances_without_server + for instance in cluster_instances: + instance_dict = { + "id": instance.id, + "name": instance.name, + "type": instance.type, + "links": create_links("instances", self.req, instance.id) + } + instance_ips = instance.get_visible_ip_addresses() + if self.load_servers and instance_ips: + instance_dict["ip"] = instance_ips + if self.load_servers: + instance_dict["status"] = instance.status + if CONF.get(instance.datastore_version.manager).volume_support: + instance_dict["volume"] = {"size": instance.volume_size} + instance_dict["flavor"] = self._build_flavor_info( + instance.flavor_id) + instances.append(instance_dict) + ip_list.sort() + return instances, ip_list diff --git a/trove/common/strategies/cluster/experimental/vertica/guestagent.py b/trove/common/strategies/cluster/experimental/vertica/guestagent.py new file mode 100644 index 0000000000..14b35df982 --- /dev/null +++ b/trove/common/strategies/cluster/experimental/vertica/guestagent.py @@ -0,0 +1,52 @@ +#Copyright [2015] Hewlett-Packard Development Company, L.P. +#Licensed under the Apache License, Version 2.0 (the "License"); +#you may not use this file except in compliance with the License. +#You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +#Unless required by applicable law or agreed to in writing, software +#distributed under the License is distributed on an "AS IS" BASIS, +#WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +#See the License for the specific language governing permissions and +#limitations under the License. + +from trove.common import cfg +from trove.common.strategies.cluster import base +from trove.guestagent import api as guest_api +from trove.openstack.common import log as logging + + +LOG = logging.getLogger(__name__) +CONF = cfg.CONF + + +class VerticaGuestAgentStrategy(base.BaseGuestAgentStrategy): + + @property + def guest_client_class(self): + return VerticaGuestAgentAPI + + +class VerticaGuestAgentAPI(guest_api.API): + + def get_public_keys(self, user): + LOG.debug("Getting public keys for user: %s." % user) + return self._call("get_public_keys", guest_api.AGENT_HIGH_TIMEOUT, + self.version_cap, user=user) + + def authorize_public_keys(self, user, public_keys): + LOG.debug("Authorizing public keys for user: %s." % user) + return self._call("authorize_public_keys", + guest_api.AGENT_HIGH_TIMEOUT, self.version_cap, + user=user, public_keys=public_keys) + + def install_cluster(self, members): + LOG.debug("Installing Vertica cluster on members: %s." % members) + return self._call("install_cluster", CONF.cluster_usage_timeout, + self.version_cap, members=members) + + def cluster_complete(self): + LOG.debug("Notifying cluster install completion.") + return self._call("cluster_complete", guest_api.AGENT_HIGH_TIMEOUT, + self.version_cap) diff --git a/trove/common/strategies/cluster/experimental/vertica/taskmanager.py b/trove/common/strategies/cluster/experimental/vertica/taskmanager.py new file mode 100644 index 0000000000..24ff00b840 --- /dev/null +++ b/trove/common/strategies/cluster/experimental/vertica/taskmanager.py @@ -0,0 +1,191 @@ +#Copyright [2015] Hewlett-Packard Development Company, L.P. +#Licensed under the Apache License, Version 2.0 (the "License"); +#you may not use this file except in compliance with the License. +#You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +#Unless required by applicable law or agreed to in writing, software +#distributed under the License is distributed on an "AS IS" BASIS, +#WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +#See the License for the specific language governing permissions and +#limitations under the License. + +from eventlet.timeout import Timeout + +from trove.common import cfg +from trove.common.exception import PollTimeOut +from trove.common.instance import ServiceStatuses +from trove.common.remote import create_guest_client +from trove.common.strategies.cluster import base +from trove.common import utils +from trove.instance.models import DBInstance +from trove.instance.models import Instance +from trove.instance.models import InstanceServiceStatus +from trove.instance.tasks import InstanceTasks +from trove.common.i18n import _ +from trove.openstack.common import log as logging +from trove.taskmanager import api as task_api +import trove.taskmanager.models as task_models + + +LOG = logging.getLogger(__name__) +CONF = cfg.CONF +USAGE_SLEEP_TIME = CONF.usage_sleep_time # seconds. + + +class VerticaTaskManagerStrategy(base.BaseTaskManagerStrategy): + + @property + def task_manager_api_class(self): + return VerticaTaskManagerAPI + + @property + def task_manager_cluster_tasks_class(self): + return VerticaClusterTasks + + +class VerticaClusterTasks(task_models.ClusterTasks): + + def update_statuses_on_failure(self, cluster_id): + + if CONF.update_status_on_fail: + db_instances = DBInstance.find_all( + cluster_id=cluster_id, deleted=False).all() + + for db_instance in db_instances: + db_instance.set_task_status( + InstanceTasks.BUILDING_ERROR_SERVER) + db_instance.save() + + @classmethod + def get_ip(cls, instance): + return instance.get_visible_ip_addresses()[0] + + @classmethod + def get_guest(cls, instance): + return create_guest_client(instance.context, instance.db_info.id, + instance.datastore_version.manager) + + def _all_instances_ready(self, instance_ids, cluster_id): + + def _all_status_ready(ids): + LOG.debug("Checking service status of instance ids: %s." % ids) + for instance_id in ids: + status = InstanceServiceStatus.find_by( + instance_id=instance_id).get_status() + if (status == ServiceStatuses.FAILED or + status == ServiceStatuses.FAILED_TIMEOUT_GUESTAGENT): + # if one has failed, no need to continue polling + LOG.debug("Instance %s in %s, exiting polling." % ( + instance_id, status)) + return True + if (status != ServiceStatuses.RUNNING and + status != ServiceStatuses.BUILD_PENDING): + # if one is not in a ready state, continue polling + LOG.debug("Instance %s in %s, continue polling." % ( + instance_id, status)) + return False + LOG.debug("Instances are ready, exiting polling for: %s." % ids) + return True + + def _instance_ids_with_failures(ids): + LOG.debug("Checking for service status failures for " + "instance ids: %s." % ids) + failed_instance_ids = [] + for instance_id in ids: + status = InstanceServiceStatus.find_by( + instance_id=instance_id).get_status() + if (status == ServiceStatuses.FAILED or + status == ServiceStatuses.FAILED_TIMEOUT_GUESTAGENT): + failed_instance_ids.append(instance_id) + return failed_instance_ids + + LOG.debug("Polling until service status is ready for " + "instance ids: %s." % instance_ids) + try: + utils.poll_until(lambda: instance_ids, + lambda ids: _all_status_ready(ids), + sleep_time=USAGE_SLEEP_TIME, + time_out=CONF.usage_timeout) + except PollTimeOut: + LOG.exception(_("Timeout for all instance service statuses " + "to become ready.")) + self.update_statuses_on_failure(cluster_id) + return False + + failed_ids = _instance_ids_with_failures(instance_ids) + if failed_ids: + LOG.error(_("Some instances failed to become ready: %s.") % + failed_ids) + self.update_statuses_on_failure(cluster_id) + return False + + return True + + def create_cluster(self, context, cluster_id): + LOG.debug("Begin create_cluster for id: %s." % cluster_id) + + def _create_cluster(): + + # Fetch instances by cluster_id against instances table. + db_instances = DBInstance.find_all(cluster_id=cluster_id).all() + instance_ids = [db_instance.id for db_instance in db_instances] + + # Wait for cluster members to get to cluster-ready status. + if not self._all_instances_ready(instance_ids, cluster_id): + return + + LOG.debug("All members ready, proceeding for cluster setup.") + instances = [Instance.load(context, instance_id) for instance_id + in instance_ids] + + member_ips = [self.get_ip(instance) for instance in instances] + guests = [self.get_guest(instance) for instance in instances] + + # Users to be configured for password-less SSH. + authorized_users_without_password = ['root', 'dbadmin'] + + # Configuring password-less SSH for cluster members. + # Strategy for setting up SSH: + # get public keys for user from member-instances in cluster, + # combine them, finally push it back to all instances, + # and member instances add them to authorized keys. + LOG.debug("Configuring password-less SSH on cluster members.") + try: + for user in authorized_users_without_password: + pub_key = [guest.get_public_keys(user) for guest in guests] + for guest in guests: + guest.authorize_public_keys(user, pub_key) + + LOG.debug("Installing cluster with members: %s." % member_ips) + guests[0].install_cluster(member_ips) + + LOG.debug("Finalizing cluster configuration.") + for guest in guests: + guest.cluster_complete() + except Exception: + LOG.exception(_("Error creating cluster.")) + self.update_statuses_on_failure(cluster_id) + + timeout = Timeout(CONF.cluster_usage_timeout) + try: + _create_cluster() + self.reset_task() + except Timeout as t: + if t is not timeout: + raise # not my timeout + LOG.exception(_("Timeout for building cluster.")) + self.update_statuses_on_failure(cluster_id) + finally: + timeout.cancel() + + LOG.debug("End create_cluster for id: %s." % cluster_id) + + +class VerticaTaskManagerAPI(task_api.API): + + def _cast(self, method_name, version, **kwargs): + LOG.debug("Casting %s" % method_name) + cctxt = self.client.prepare(version=version) + cctxt.cast(self.context, method_name, **kwargs) diff --git a/trove/guestagent/datastore/experimental/vertica/manager.py b/trove/guestagent/datastore/experimental/vertica/manager.py index 933acb45cf..6ca4aa51e2 100644 --- a/trove/guestagent/datastore/experimental/vertica/manager.py +++ b/trove/guestagent/datastore/experimental/vertica/manager.py @@ -62,9 +62,16 @@ class Manager(periodic_task.PeriodicTasks): LOG.debug("Mounted the volume.") self.app.install_if_needed(packages) self.app.prepare_for_install_vertica() - self.app.install_vertica() - self.app.create_db() - self.app.complete_install_or_restart() + if cluster_config is None: + self.app.install_vertica() + self.app.create_db() + self.app.complete_install_or_restart() + elif cluster_config['instance_type'] == "member": + self.appStatus.set_status(rd_ins.ServiceStatuses.BUILD_PENDING) + else: + LOG.error(_("Bad cluster configuration; instance type " + "given as %s.") % cluster_config['instance_type']) + raise LOG.info(_('Completed setup of Vertica database instance.')) except Exception: LOG.exception(_('Cannot prepare Vertica database instance.')) @@ -191,3 +198,26 @@ class Manager(periodic_task.PeriodicTasks): LOG.debug("Starting with configuration changes.") raise exception.DatastoreOperationNotSupported( operation='start_db_with_conf_changes', datastore=MANAGER) + + def get_public_keys(self, context, user): + LOG.debug("Retrieving public keys for %s." % user) + return self.app.get_public_keys(user) + + def authorize_public_keys(self, context, user, public_keys): + LOG.debug("Authorizing public keys for %s." % user) + return self.app.authorize_public_keys(user, public_keys) + + def install_cluster(self, context, members): + try: + LOG.debug("Installing cluster on members: %s." % members) + self.app.install_cluster(members) + LOG.debug("install_cluster call has finished.") + except Exception: + LOG.exception(_('Cluster installation failed.')) + self.appStatus.set_status(rd_ins.ServiceStatuses.FAILED) + raise + + def cluster_complete(self, context): + LOG.debug("Cluster creation complete, starting status checks.") + status = self.appStatus._get_actual_db_status() + self.appStatus.set_status(status) diff --git a/trove/guestagent/datastore/experimental/vertica/service.py b/trove/guestagent/datastore/experimental/vertica/service.py index 80f7712db5..dfeff05558 100644 --- a/trove/guestagent/datastore/experimental/vertica/service.py +++ b/trove/guestagent/datastore/experimental/vertica/service.py @@ -219,3 +219,72 @@ class VerticaApp(object): except exception.ProcessExecutionError: LOG.exception(_("Failed to prepare for install_vertica.")) raise + + def get_public_keys(self, user): + """Generates key (if not found), and sends public key for user.""" + LOG.debug("Public keys requested for user: %s." % user) + user_home_directory = os.path.expanduser('~' + user) + public_key_file_name = user_home_directory + '/.ssh/id_rsa.pub' + + try: + key_generate_command = (system.SSH_KEY_GEN % user_home_directory) + system.shell_execute(key_generate_command, user) + except exception.ProcessExecutionError: + LOG.debug("Cannot generate key.") + + try: + read_key_cmd = ("cat %(file)s" % {'file': public_key_file_name}) + out, err = system.shell_execute(read_key_cmd) + except exception.ProcessExecutionError: + LOG.exception(_("Cannot read public key.")) + raise + return out.strip() + + def authorize_public_keys(self, user, public_keys): + """Adds public key to authorized_keys for user.""" + LOG.debug("public keys to be added for user: %s." % (user)) + user_home_directory = os.path.expanduser('~' + user) + authorized_file_name = user_home_directory + '/.ssh/authorized_keys' + + try: + read_key_cmd = ("cat %(file)s" % {'file': authorized_file_name}) + out, err = system.shell_execute(read_key_cmd) + public_keys.append(out.strip()) + except exception.ProcessExecutionError: + LOG.debug("Cannot read authorized_keys.") + all_keys = '\n'.join(public_keys) + "\n" + + try: + with tempfile.NamedTemporaryFile(delete=False) as tempkeyfile: + tempkeyfile.write(all_keys) + copy_key_cmd = (("install -o %(user)s -m 600 %(source)s %(target)s" + ) % {'user': user, 'source': tempkeyfile.name, + 'target': authorized_file_name}) + system.shell_execute(copy_key_cmd) + os.remove(tempkeyfile.name) + except exception.ProcessExecutionError: + LOG.exception(_("Cannot install public keys.")) + os.remove(tempkeyfile.name) + raise + + def _export_conf_to_members(self, members): + """This method exports conf files to other members.""" + try: + for member in members: + COPY_CMD = (system.SEND_CONF_TO_SERVER % (system.VERTICA_CONF, + member, + system.VERTICA_CONF)) + system.shell_execute(COPY_CMD) + except exception.ProcessExecutionError: + LOG.exception(_("Cannot export configuration.")) + raise + + def install_cluster(self, members): + """Installs & configures cluster.""" + cluster_members = ','.join(members) + LOG.debug("Installing cluster with members: %s." % cluster_members) + self.install_vertica(cluster_members) + self._export_conf_to_members(members) + LOG.debug("Creating database with members: %s." % cluster_members) + self.create_db(cluster_members) + LOG.debug("Cluster configured on members: %s." % cluster_members) diff --git a/trove/guestagent/datastore/experimental/vertica/system.py b/trove/guestagent/datastore/experimental/vertica/system.py index 2b9e988edb..31e98850cd 100644 --- a/trove/guestagent/datastore/experimental/vertica/system.py +++ b/trove/guestagent/datastore/experimental/vertica/system.py @@ -24,6 +24,11 @@ STATUS_ACTIVE_DB = "/opt/vertica/bin/adminTools -t show_active_db" STATUS_DB_DOWN = "/opt/vertica/bin/adminTools -t db_status -s DOWN" SET_RESTART_POLICY = ("/opt/vertica/bin/adminTools -t set_restart_policy " "-d %s -p '%s'") +SEND_CONF_TO_SERVER = ("rsync -v -e 'ssh -o " + "UserKnownHostsFile=/dev/null -o " + "StrictHostKeyChecking=no' --perms --owner --group " + "%s %s:%s") +SSH_KEY_GEN = "ssh-keygen -f %s/.ssh/id_rsa -t rsa -N ''" VERTICA_CONF = "/etc/vertica.cnf" INSTALL_TIMEOUT = 1000 diff --git a/trove/tests/unittests/cluster/test_cluster_vertica_controller.py b/trove/tests/unittests/cluster/test_cluster_vertica_controller.py new file mode 100644 index 0000000000..6c8daa1a1d --- /dev/null +++ b/trove/tests/unittests/cluster/test_cluster_vertica_controller.py @@ -0,0 +1,345 @@ +#Copyright [2015] Hewlett-Packard Development Company, L.P. +#Licensed under the Apache License, Version 2.0 (the "License"); +#you may not use this file except in compliance with the License. +#You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +#Unless required by applicable law or agreed to in writing, software +#distributed under the License is distributed on an "AS IS" BASIS, +#WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +#See the License for the specific language governing permissions and +#limitations under the License. + +import jsonschema + +from mock import MagicMock +from mock import Mock +from mock import patch +from testtools import TestCase +from testtools.matchers import Is, Equals +from trove.cluster import models +from trove.cluster.models import Cluster +from trove.cluster.service import ClusterController +from trove.cluster import views +import trove.common.cfg as cfg +from trove.common import exception +from trove.common.strategies.cluster import strategy +from trove.common import utils +from trove.datastore import models as datastore_models + + +class TestClusterController(TestCase): + def setUp(self): + super(TestClusterController, self).setUp() + self.controller = ClusterController() + self.cluster = { + "cluster": { + "name": "products", + "datastore": { + "type": "vertica", + "version": "7.1" + }, + "instances": [ + { + "flavorRef": "7", + "volume": { + "size": 1 + }, + }, + { + "flavorRef": "7", + "volume": { + "size": 1 + }, + }, + { + "flavorRef": "7", + "volume": { + "size": 1 + }, + }, + ] + } + } + + def test_get_schema_create(self): + schema = self.controller.get_schema('create', self.cluster) + self.assertIsNotNone(schema) + self.assertTrue('cluster' in schema['properties']) + self.assertTrue('cluster') + + def test_validate_create(self): + body = self.cluster + schema = self.controller.get_schema('create', body) + validator = jsonschema.Draft4Validator(schema) + self.assertTrue(validator.is_valid(body)) + + def test_validate_create_blankname(self): + body = self.cluster + body['cluster']['name'] = " " + schema = self.controller.get_schema('create', body) + validator = jsonschema.Draft4Validator(schema) + self.assertFalse(validator.is_valid(body)) + errors = sorted(validator.iter_errors(body), key=lambda e: e.path) + self.assertThat(len(errors), Is(1)) + self.assertThat(errors[0].message, + Equals("' ' does not match '^.*[0-9a-zA-Z]+.*$'")) + + def test_validate_create_blank_datastore(self): + body = self.cluster + body['cluster']['datastore']['type'] = "" + schema = self.controller.get_schema('create', body) + validator = jsonschema.Draft4Validator(schema) + self.assertFalse(validator.is_valid(body)) + errors = sorted(validator.iter_errors(body), key=lambda e: e.path) + error_messages = [error.message for error in errors] + error_paths = [error.path.pop() for error in errors] + self.assertThat(len(errors), Is(2)) + self.assertIn("'' is too short", error_messages) + self.assertIn("'' does not match '^.*[0-9a-zA-Z]+.*$'", error_messages) + self.assertIn("type", error_paths) + + @patch.object(Cluster, 'create') + @patch.object(datastore_models, 'get_datastore_version') + def test_create_clusters_disabled(self, + mock_get_datastore_version, + mock_cluster_create): + body = self.cluster + tenant_id = Mock() + context = Mock() + + req = Mock() + req.environ = MagicMock() + req.environ.get = Mock(return_value=context) + + datastore_version = Mock() + datastore_version.manager = 'mysql' + mock_get_datastore_version.return_value = (Mock(), datastore_version) + + self.assertRaises(exception.ClusterDatastoreNotSupported, + self.controller.create, + req, + body, + tenant_id) + + @patch.object(Cluster, 'create') + @patch.object(utils, 'get_id_from_href') + @patch.object(datastore_models, 'get_datastore_version') + def test_create_clusters(self, + mock_get_datastore_version, + mock_id_from_href, + mock_cluster_create): + body = self.cluster + tenant_id = Mock() + context = Mock() + + req = Mock() + req.environ = Mock() + req.environ.__getitem__ = Mock(return_value=context) + datastore_version = Mock() + datastore_version.manager = 'vertica' + datastore = Mock() + mock_get_datastore_version.return_value = (datastore, + datastore_version) + instances = [{'volume_size': 1, 'flavor_id': '1234'}, + {'volume_size': 1, 'flavor_id': '1234'}, + {'volume_size': 1, 'flavor_id': '1234'}] + mock_id_from_href.return_value = '1234' + + mock_cluster = Mock() + mock_cluster.instances = [] + mock_cluster.instances_without_server = [] + mock_cluster.datastore_version.manager = 'vertica' + mock_cluster_create.return_value = mock_cluster + + self.controller.create(req, body, tenant_id) + mock_cluster_create.assert_called_with(context, 'products', + datastore, datastore_version, + instances) + + @patch.object(Cluster, 'load') + def test_show_cluster(self, + mock_cluster_load): + tenant_id = Mock() + id = Mock() + context = Mock() + req = Mock() + req.environ = Mock() + req.environ.__getitem__ = Mock(return_value=context) + + mock_cluster = Mock() + mock_cluster.instances = [] + mock_cluster.instances_without_server = [] + mock_cluster.datastore_version.manager = 'vertica' + mock_cluster_load.return_value = mock_cluster + + self.controller.show(req, tenant_id, id) + mock_cluster_load.assert_called_with(context, id) + + @patch.object(Cluster, 'load') + @patch.object(Cluster, 'load_instance') + def test_show_cluster_instance(self, + mock_cluster_load_instance, + mock_cluster_load): + tenant_id = Mock() + cluster_id = Mock() + instance_id = Mock() + context = Mock() + req = Mock() + req.environ = Mock() + req.environ.__getitem__ = Mock(return_value=context) + cluster = Mock() + mock_cluster_load.return_value = cluster + cluster.id = cluster_id + self.controller.show_instance(req, tenant_id, cluster_id, instance_id) + mock_cluster_load_instance.assert_called_with(context, cluster.id, + instance_id) + + @patch.object(Cluster, 'load') + def test_delete_cluster(self, mock_cluster_load): + tenant_id = Mock() + cluster_id = Mock() + req = MagicMock() + cluster = Mock() + mock_cluster_load.return_value = cluster + self.controller.delete(req, tenant_id, cluster_id) + cluster.delete.assert_called + + +class TestClusterControllerWithStrategy(TestCase): + def setUp(self): + super(TestClusterControllerWithStrategy, self).setUp() + self.controller = ClusterController() + self.cluster = { + "cluster": { + "name": "products", + "datastore": { + "type": "vertica", + "version": "7.1" + }, + "instances": [ + { + "flavorRef": "7", + "volume": { + "size": 1 + }, + }, + { + "flavorRef": "7", + "volume": { + "size": 1 + }, + }, + { + "flavorRef": "7", + "volume": { + "size": 1 + }, + }, + ] + } + } + + def tearDown(self): + super(TestClusterControllerWithStrategy, self).tearDown() + cfg.CONF.clear_override('cluster_support', group='vertica') + cfg.CONF.clear_override('api_strategy', group='vertica') + + @patch.object(datastore_models, 'get_datastore_version') + @patch.object(models.Cluster, 'create') + def test_create_clusters_disabled(self, + mock_cluster_create, + mock_get_datastore_version): + + cfg.CONF.set_override('cluster_support', False, group='vertica') + + body = self.cluster + tenant_id = Mock() + context = Mock() + + req = Mock() + req.environ = MagicMock() + req.environ.get = Mock(return_value=context) + + datastore_version = Mock() + datastore_version.manager = 'vertica' + mock_get_datastore_version.return_value = (Mock(), datastore_version) + + self.assertRaises(exception.TroveError, self.controller.create, req, + body, tenant_id) + + @patch.object(views.ClusterView, 'data', return_value={}) + @patch.object(datastore_models, 'get_datastore_version') + @patch.object(models.Cluster, 'create') + def test_create_clusters_enabled(self, + mock_cluster_create, + mock_get_datastore_version, + mock_cluster_view_data): + + cfg.CONF.set_override('cluster_support', True, group='vertica') + + body = self.cluster + tenant_id = Mock() + context = Mock() + + req = Mock() + req.environ = MagicMock() + req.environ.get = Mock(return_value=context) + + datastore_version = Mock() + datastore_version.manager = 'vertica' + mock_get_datastore_version.return_value = (Mock(), datastore_version) + + mock_cluster = Mock() + mock_cluster.datastore_version.manager = 'vertica' + mock_cluster_create.return_value = mock_cluster + self.controller.create(req, body, tenant_id) + + @patch.object(models.Cluster, 'load') + def test_controller_action_no_strategy(self, + mock_cluster_load): + + body = {'do_stuff2': {}} + tenant_id = Mock() + context = Mock() + id = Mock() + + req = Mock() + req.environ = MagicMock() + req.environ.get = Mock(return_value=context) + + cluster = Mock() + cluster.datastore_version.manager = 'vertica' + mock_cluster_load.return_value = cluster + + self.assertRaises(exception.TroveError, self.controller.action, req, + body, tenant_id, id) + + @patch.object(strategy, 'load_api_strategy') + @patch.object(models.Cluster, 'load') + def test_controller_action_found(self, + mock_cluster_load, + mock_cluster_api_strategy): + + body = {'do_stuff': {}} + tenant_id = Mock() + context = Mock() + id = Mock() + + req = Mock() + req.environ = MagicMock() + req.environ.get = Mock(return_value=context) + + cluster = Mock() + cluster.datastore_version.manager = 'vertica' + mock_cluster_load.return_value = cluster + + strat = Mock() + do_stuff_func = Mock() + strat.cluster_controller_actions = \ + {'do_stuff': do_stuff_func} + mock_cluster_api_strategy.return_value = strat + + self.controller.action(req, body, tenant_id, id) + self.assertEqual(1, do_stuff_func.call_count) diff --git a/trove/tests/unittests/cluster/test_vertica_cluster.py b/trove/tests/unittests/cluster/test_vertica_cluster.py new file mode 100644 index 0000000000..f3d54c1cff --- /dev/null +++ b/trove/tests/unittests/cluster/test_vertica_cluster.py @@ -0,0 +1,194 @@ +#Copyright [2015] Hewlett-Packard Development Company, L.P. +#Licensed under the Apache License, Version 2.0 (the "License"); +#you may not use this file except in compliance with the License. +#You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +#Unless required by applicable law or agreed to in writing, software +#distributed under the License is distributed on an "AS IS" BASIS, +#WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +#See the License for the specific language governing permissions and +#limitations under the License. + +import uuid + +from mock import Mock +from mock import patch +from testtools import TestCase +from trove.cluster.models import Cluster +from trove.cluster.models import ClusterTasks +from trove.cluster.models import DBCluster +from trove.common import cfg +from trove.common import exception +from trove.common import remote +from trove.common.strategies.cluster.experimental.vertica import ( + api as vertica_api) +from trove.instance import models as inst_models +from trove.quota.quota import QUOTAS +from trove.taskmanager import api as task_api + +CONF = cfg.CONF + + +class ClusterTest(TestCase): + def setUp(self): + super(ClusterTest, self).setUp() + task_api.API.get_client = Mock() + self.cluster_id = str(uuid.uuid4()) + self.cluster_name = "Cluster" + self.cluster_id + self.tenant_id = "23423432" + self.dv_id = "1" + self.db_info = DBCluster(ClusterTasks.NONE, + id=self.cluster_id, + name=self.cluster_name, + tenant_id=self.tenant_id, + datastore_version_id=self.dv_id, + task_id=ClusterTasks.NONE._code) + self.context = Mock() + self.datastore = Mock() + self.dv = Mock() + self.dv.manager = "vertica" + self.datastore_version = self.dv + self.cluster = vertica_api.VerticaCluster(self.context, self.db_info, + self.datastore, + self.datastore_version) + self.instances = [{'volume_size': 1, 'flavor_id': '1234'}, + {'volume_size': 1, 'flavor_id': '1234'}, + {'volume_size': 1, 'flavor_id': '1234'}] + self.volume_support = CONF.get(self.dv.manager).volume_support + self.remote_nova = remote.create_nova_client + + def tearDown(self): + super(ClusterTest, self).tearDown() + CONF.get(self.dv.manager).volume_support = self.volume_support + remote.create_nova_client = self.remote_nova + + def test_create_empty_instances(self): + self.assertRaises(exception.ClusterNumInstancesNotSupported, + Cluster.create, + Mock(), + self.cluster_name, + self.datastore, + self.datastore_version, + [] + ) + + def test_create_flavor_not_specified(self): + instances = self.instances + instances[0]['flavor_id'] = None + self.assertRaises(exception.ClusterFlavorsNotEqual, + Cluster.create, + Mock(), + self.cluster_name, + self.datastore, + self.datastore_version, + instances + ) + + @patch.object(remote, 'create_nova_client') + def test_create_volume_no_specified(self, + mock_client): + instances = self.instances + instances[0]['volume_size'] = None + flavors = Mock() + mock_client.return_value.flavors = flavors + self.assertRaises(exception.ClusterVolumeSizeRequired, + Cluster.create, + Mock(), + self.cluster_name, + self.datastore, + self.datastore_version, + instances + ) + + @patch.object(remote, 'create_nova_client') + def test_create_storage_specified_with_no_volume_support(self, + mock_client): + CONF.get(self.dv.manager).volume_support = False + instances = self.instances + instances[0]['volume_size'] = None + flavors = Mock() + mock_client.return_value.flavors = flavors + self.assertRaises(exception.VolumeNotSupported, + Cluster.create, + Mock(), + self.cluster_name, + self.datastore, + self.datastore_version, + instances + ) + + @patch.object(remote, 'create_nova_client') + def test_create_storage_not_specified_and_no_ephemeral_flavor(self, + mock_client): + class FakeFlavor: + def __init__(self, flavor_id): + self.flavor_id = flavor_id + + @property + def id(self): + return self.flavor.id + + @property + def ephemeral(self): + return 0 + instances = [{'flavor_id': '1234'}, + {'flavor_id': '1234'}, + {'flavor_id': '1234'}] + CONF.get(self.dv.manager).volume_support = False + (mock_client.return_value. + flavors.get.return_value) = FakeFlavor('1234') + self.assertRaises(exception.LocalStorageNotSpecified, + Cluster.create, + Mock(), + self.cluster_name, + self.datastore, + self.datastore_version, + instances + ) + + @patch.object(inst_models.Instance, 'create') + @patch.object(DBCluster, 'create') + @patch.object(task_api, 'load') + @patch.object(QUOTAS, 'check_quotas') + @patch.object(remote, 'create_nova_client') + def test_create(self, mock_client, mock_check_quotas, mock_task_api, + mock_db_create, mock_ins_create): + instances = self.instances + flavors = Mock() + mock_client.return_value.flavors = flavors + self.cluster.create(Mock(), + self.cluster_name, + self.datastore, + self.datastore_version, + instances) + mock_task_api.create_cluster.assert_called + self.assertEqual(3, mock_ins_create.call_count) + + def test_delete_bad_task_status(self): + self.cluster.db_info.task_status = ClusterTasks.BUILDING_INITIAL + self.assertRaises(exception.UnprocessableEntity, + self.cluster.delete) + + @patch.object(task_api.API, 'delete_cluster') + @patch.object(Cluster, 'update_db') + @patch.object(inst_models.DBInstance, 'find_all') + def test_delete_task_status_none(self, + mock_find_all, + mock_update_db, + mock_delete_cluster): + self.cluster.db_info.task_status = ClusterTasks.NONE + self.cluster.delete() + mock_update_db.assert_called_with(task_status=ClusterTasks.DELETING) + + @patch.object(task_api.API, 'delete_cluster') + @patch.object(Cluster, 'update_db') + @patch.object(inst_models.DBInstance, 'find_all') + def test_delete_task_status_deleting(self, + mock_find_all, + mock_update_db, + mock_delete_cluster): + self.cluster.db_info.task_status = ClusterTasks.DELETING + self.cluster.delete() + mock_update_db.assert_called_with(task_status=ClusterTasks.DELETING) diff --git a/trove/tests/unittests/guestagent/test_dbaas.py b/trove/tests/unittests/guestagent/test_dbaas.py index 2f65c9f291..ceba9d7f88 100644 --- a/trove/tests/unittests/guestagent/test_dbaas.py +++ b/trove/tests/unittests/guestagent/test_dbaas.py @@ -2277,3 +2277,36 @@ class VerticaAppTest(testtools.TestCase): mock_status.end_install_or_restart = MagicMock( return_value=None) self.assertRaises(RuntimeError, app.stop_db) + + def test_export_conf_to_members(self): + self.app._export_conf_to_members(members=['member1', 'member2']) + self.assertEqual(vertica_system.shell_execute.call_count, 2) + + def test_authorize_public_keys(self): + user = 'test_user' + keys = ['test_key@machine1', 'test_key@machine2'] + with patch.object(os.path, 'expanduser', + return_value=('/home/' + user)): + self.app.authorize_public_keys(user=user, public_keys=keys) + self.assertEqual(vertica_system.shell_execute.call_count, 2) + vertica_system.shell_execute.assert_any_call( + 'cat ' + '/home/' + user + '/.ssh/authorized_keys') + + def test_get_public_keys(self): + user = 'test_user' + with patch.object(os.path, 'expanduser', + return_value=('/home/' + user)): + self.app.get_public_keys(user=user) + self.assertEqual(vertica_system.shell_execute.call_count, 2) + vertica_system.shell_execute.assert_any_call( + (vertica_system.SSH_KEY_GEN % ('/home/' + user)), user) + vertica_system.shell_execute.assert_any_call( + 'cat ' + '/home/' + user + '/.ssh/id_rsa.pub') + + def test_install_cluster(self): + with patch.object(self.app, 'read_config', + return_value=self.test_config): + self.app.install_cluster(members=['member1', 'member2']) + # Verifying nu,ber of shell calls, + # as command has already been tested in preceeding tests + self.assertEqual(vertica_system.shell_execute.call_count, 5) diff --git a/trove/tests/unittests/guestagent/test_vertica_manager.py b/trove/tests/unittests/guestagent/test_vertica_manager.py index e1a4c68534..9fa0069e58 100644 --- a/trove/tests/unittests/guestagent/test_vertica_manager.py +++ b/trove/tests/unittests/guestagent/test_vertica_manager.py @@ -13,10 +13,14 @@ import testtools from mock import MagicMock +from mock import patch +from trove.common import instance as rd_instance from trove.common.context import TroveContext from trove.guestagent import volume from trove.guestagent.datastore.experimental.vertica.manager import Manager from trove.guestagent.datastore.experimental.vertica.service import VerticaApp +from trove.guestagent.datastore.experimental.vertica.service import ( + VerticaAppStatus) from trove.guestagent.volume import VolumeDevice @@ -144,3 +148,48 @@ class GuestAgentManagerTest(testtools.TestCase): self.manager.stop_db(self.context) #verification/assertion VerticaApp.stop_db.assert_any_call(do_not_start_on_reboot=False) + + @patch.object(VerticaApp, 'install_vertica') + @patch.object(VerticaApp, '_export_conf_to_members') + @patch.object(VerticaApp, 'create_db') + def test_install_cluster(self, mock_install, mock_export, mock_create_db): + members = ['test1', 'test2'] + self.manager.install_cluster(self.context, members) + mock_install.assert_called_with('test1,test2') + mock_export.assert_called_with(members) + mock_create_db.assert_called_with('test1,test2') + + @patch.object(VerticaAppStatus, 'set_status') + @patch.object(VerticaApp, 'install_cluster', + side_effect=RuntimeError("Boom!")) + def test_install_cluster_failure(self, mock_install, mock_set_status): + members = ["test1", "test2"] + self.assertRaises(RuntimeError, self.manager.install_cluster, + self.context, members) + mock_set_status.assert_called_with(rd_instance.ServiceStatuses.FAILED) + + @patch.object(volume.VolumeDevice, 'mount_points', return_value=[]) + @patch.object(volume.VolumeDevice, 'unmount_device', return_value=None) + @patch.object(volume.VolumeDevice, 'mount', return_value=None) + @patch.object(volume.VolumeDevice, 'migrate_data', return_value=None) + @patch.object(volume.VolumeDevice, 'format', return_value=None) + @patch.object(VerticaApp, 'prepare_for_install_vertica') + @patch.object(VerticaApp, 'install_if_needed') + @patch.object(VerticaAppStatus, 'begin_install') + def _prepare_method(self, instance_id, instance_type, *args): + cluster_config = {"id": instance_id, + "instance_type": instance_type} + + # invocation + self.manager.prepare(context=self.context, databases=None, + packages=['vertica'], + memory_mb='2048', users=None, + mount_point='/var/lib/vertica', + overrides=None, + cluster_config=cluster_config) + + @patch.object(VerticaAppStatus, 'set_status') + def test_prepare_member(self, mock_set_status): + self._prepare_method("test-instance-3", "member") + mock_set_status.assert_called_with( + rd_instance.ServiceStatuses.BUILD_PENDING) diff --git a/trove/tests/unittests/taskmanager/test_vertica_clusters.py b/trove/tests/unittests/taskmanager/test_vertica_clusters.py new file mode 100644 index 0000000000..87f67cd7bb --- /dev/null +++ b/trove/tests/unittests/taskmanager/test_vertica_clusters.py @@ -0,0 +1,123 @@ +#Copyright [2015] Hewlett-Packard Development Company, L.P. +#Licensed under the Apache License, Version 2.0 (the "License"); +#you may not use this file except in compliance with the License. +#You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +#Unless required by applicable law or agreed to in writing, software +#distributed under the License is distributed on an "AS IS" BASIS, +#WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +#See the License for the specific language governing permissions and +#limitations under the License. + +import datetime + +import testtools +from mock import Mock +from mock import patch + +from trove.cluster.models import ClusterTasks as ClusterTaskStatus +from trove.cluster.models import DBCluster +from trove.common.strategies.cluster.experimental.vertica.taskmanager import \ + VerticaClusterTasks as ClusterTasks +from trove.datastore import models as datastore_models +from trove.instance.models import BaseInstance +from trove.instance.models import DBInstance +from trove.instance.models import Instance +from trove.instance.models import InstanceServiceStatus +from trove.instance.models import InstanceTasks +from trove.taskmanager.models import ServiceStatuses + + +class VerticaClusterTasksTest(testtools.TestCase): + def setUp(self): + super(VerticaClusterTasksTest, self).setUp() + self.cluster_id = "1232" + self.cluster_name = "Cluster-1234" + self.tenant_id = "6789" + self.db_cluster = DBCluster(ClusterTaskStatus.NONE, + id=self.cluster_id, + created=str(datetime.date), + updated=str(datetime.date), + name=self.cluster_name, + task_id=ClusterTaskStatus.NONE._code, + tenant_id=self.tenant_id, + datastore_version_id="1", + deleted=False) + self.dbinst1 = DBInstance(InstanceTasks.NONE, id="1", name="member1", + compute_instance_id="compute-1", + task_id=InstanceTasks.NONE._code, + task_description= + InstanceTasks.NONE._db_text, + volume_id="volume-1", + datastore_version_id="1", + cluster_id=self.cluster_id, + type="member") + self.dbinst2 = DBInstance(InstanceTasks.NONE, id="2", name="member2", + compute_instance_id="compute-2", + task_id=InstanceTasks.NONE._code, + task_description= + InstanceTasks.NONE._db_text, + volume_id="volume-2", + datastore_version_id="1", + cluster_id=self.cluster_id, + type="member") + self.dbinst3 = DBInstance(InstanceTasks.NONE, id="3", name="member3", + compute_instance_id="compute-3", + task_id=InstanceTasks.NONE._code, + task_description= + InstanceTasks.NONE._db_text, + volume_id="volume-3", + datastore_version_id="1", + cluster_id=self.cluster_id, + type="member") + mock_ds1 = Mock() + mock_ds1.name = 'vertica' + mock_dv1 = Mock() + mock_dv1.name = '7.1' + self.clustertasks = ClusterTasks(Mock(), + self.db_cluster, + datastore=mock_ds1, + datastore_version=mock_dv1) + + @patch.object(ClusterTasks, 'update_statuses_on_failure') + @patch.object(InstanceServiceStatus, 'find_by') + def test_all_instances_ready_bad_status(self, + mock_find, mock_update): + (mock_find.return_value. + get_status.return_value) = ServiceStatuses.FAILED + ret_val = self.clustertasks._all_instances_ready(["1", "2", "3", "4"], + self.cluster_id) + mock_update.assert_called_with(self.cluster_id) + self.assertEqual(False, ret_val) + + @patch.object(InstanceServiceStatus, 'find_by') + def test_all_instances_ready(self, mock_find): + (mock_find.return_value. + get_status.return_value) = ServiceStatuses.RUNNING + ret_val = self.clustertasks._all_instances_ready(["1", "2", "3", "4"], + self.cluster_id) + self.assertEqual(True, ret_val) + + @patch.object(ClusterTasks, 'reset_task') + @patch.object(ClusterTasks, 'get_guest') + @patch.object(ClusterTasks, 'get_ip') + @patch.object(ClusterTasks, '_all_instances_ready') + @patch.object(Instance, 'load') + @patch.object(DBInstance, 'find_all') + @patch.object(datastore_models.Datastore, 'load') + @patch.object(datastore_models.DatastoreVersion, 'load_by_uuid') + def test_create_cluster(self, mock_dv, mock_ds, mock_find_all, mock_load, + mock_ready, mock_ip, mock_guest, mock_reset_task): + mock_find_all.return_value.all.return_value = [self.dbinst1] + mock_load.return_value = BaseInstance(Mock(), + self.dbinst1, Mock(), + InstanceServiceStatus( + ServiceStatuses.NEW)) + mock_ip.return_value = "10.0.0.2" + self.clustertasks.create_cluster(Mock(), self.cluster_id) + mock_guest.return_value.install_cluster.assert_called_with(['10.0.0.2'] + ) + mock_reset_task.assert_called() + mock_guest.return_value.cluster_complete.assert_called()