diff --git a/sahara/conductor/objects.py b/sahara/conductor/objects.py index 12bacf9a..0a59f412 100644 --- a/sahara/conductor/objects.py +++ b/sahara/conductor/objects.py @@ -61,6 +61,7 @@ class Cluster(object): node_groups - list of NodeGroup objects cluster_template_id cluster_template - ClusterTemplate object + use_autoconfig """ def has_proxy_gateway(self): diff --git a/sahara/db/migration/alembic_migrations/versions/023_add_use_autoconfig.py b/sahara/db/migration/alembic_migrations/versions/023_add_use_autoconfig.py new file mode 100644 index 00000000..b0d9775c --- /dev/null +++ b/sahara/db/migration/alembic_migrations/versions/023_add_use_autoconfig.py @@ -0,0 +1,42 @@ +# Copyright 2015 OpenStack Foundation. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""add_use_autoconfig + +Revision ID: 023 +Revises: 022 +Create Date: 2015-04-24 14:51:39.582085 + +""" + +# revision identifiers, used by Alembic. +revision = '023' +down_revision = '022' + +from alembic import op +import sqlalchemy as sa + + +def upgrade(): + op.add_column('clusters', + sa.Column('use_autoconfig', sa.Boolean())) + op.add_column('cluster_templates', + sa.Column('use_autoconfig', sa.Boolean())) + op.add_column('node_group_templates', + sa.Column('use_autoconfig', sa.Boolean())) + op.add_column('node_groups', + sa.Column('use_autoconfig', sa.Boolean())) + op.add_column('templates_relations', + sa.Column('use_autoconfig', sa.Boolean())) diff --git a/sahara/db/sqlalchemy/models.py b/sahara/db/sqlalchemy/models.py index 10aa8858..ed97dc3a 100644 --- a/sahara/db/sqlalchemy/models.py +++ b/sahara/db/sqlalchemy/models.py @@ -67,6 +67,7 @@ class Cluster(mb.SaharaBase): extra = sa.Column(st.JsonDictType()) rollback_info = sa.Column(st.JsonDictType()) sahara_info = sa.Column(st.JsonDictType()) + use_autoconfig = sa.Column(sa.Boolean(), default=True) provision_progress = relationship('ClusterProvisionStep', cascade="all,delete", backref='cluster', @@ -109,6 +110,8 @@ class NodeGroup(mb.SaharaBase): volume_mount_prefix = sa.Column(sa.String(80)) volume_type = sa.Column(sa.String(255)) count = sa.Column(sa.Integer, nullable=False) + use_autoconfig = sa.Column(sa.Boolean(), default=True) + instances = relationship('Instance', cascade="all,delete", backref='node_group', order_by="Instance.instance_name", lazy='joined') @@ -176,6 +179,7 @@ class ClusterTemplate(mb.SaharaBase): node_groups = relationship('TemplatesRelation', cascade="all,delete", backref='cluster_template', lazy='joined') is_default = sa.Column(sa.Boolean(), default=False) + use_autoconfig = sa.Column(sa.Boolean(), default=True) def to_dict(self): d = super(ClusterTemplate, self).to_dict() @@ -215,6 +219,7 @@ class NodeGroupTemplate(mb.SaharaBase): is_proxy_gateway = sa.Column(sa.Boolean()) volume_local_to_instance = sa.Column(sa.Boolean()) is_default = sa.Column(sa.Boolean(), default=False) + use_autoconfig = sa.Column(sa.Boolean(), default=True) class TemplatesRelation(mb.SaharaBase): @@ -238,6 +243,7 @@ class TemplatesRelation(mb.SaharaBase): volume_mount_prefix = sa.Column(sa.String(80)) volume_type = sa.Column(sa.String(255)) count = sa.Column(sa.Integer, nullable=False) + use_autoconfig = sa.Column(sa.Boolean(), default=True) cluster_template_id = sa.Column(sa.String(36), sa.ForeignKey('cluster_templates.id')) node_group_template_id = sa.Column(sa.String(36), diff --git a/sahara/plugins/provisioning.py b/sahara/plugins/provisioning.py index cb341322..cfffc1bc 100644 --- a/sahara/plugins/provisioning.py +++ b/sahara/plugins/provisioning.py @@ -90,6 +90,10 @@ class ProvisioningPluginBase(plugins_base.PluginInterface): def on_terminate_cluster(self, cluster): pass + @plugins_base.optional + def recommend_configs(self, cluster): + pass + def to_dict(self): res = super(ProvisioningPluginBase, self).to_dict() res['versions'] = self.get_versions() diff --git a/sahara/plugins/recommendations_utils.py b/sahara/plugins/recommendations_utils.py new file mode 100644 index 00000000..446ead12 --- /dev/null +++ b/sahara/plugins/recommendations_utils.py @@ -0,0 +1,352 @@ +# Copyright (c) 2015 Mirantis Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import abc + +from oslo_log import log as logging +import six + +from sahara import conductor as cond +from sahara import context +from sahara.utils.openstack import nova + +conductor = cond.API + +LOG = logging.getLogger(__name__) + + +@six.add_metaclass(abc.ABCMeta) +class AutoConfigsProvider(object): + def __init__(self, mapper, plugin_configs, cluster, extra_spec=None): + """This meta class provides general recommendation utils for cluster + + configuration. + :param mapper: dictionary, that describes which cluster configs and + node_configs to configure. It should maps to following dicts: + node_configs to configure and cluster_configs to configure. This + dicts should contains abstract names of configs as keys and + tuple (correct_applicable_target, correct_name) as values. Such + representation allows to use same AutoConfigsProvider for plugins + with almost same configs and configuring principles. + :param plugin_configs: all plugins_configs for specified plugin + :param cluster: cluster which is required to configure + :param extra_spec: extra helpful information about AutoConfigs + """ + self.plugin_configs = plugin_configs + self.cluster = cluster + self.node_configs_to_update = mapper.get('node_configs', {}) + self.cluster_configs_to_update = mapper.get('cluster_configs', {}) + self.extra_spec = {} if not extra_spec else extra_spec + + @abc.abstractmethod + def _get_recommended_node_configs(self, node_group): + """Method calculates and returns recommended configs for node_group. + + It's not required to update node_configs of node_group using the + conductor api in this method, because it will be done in the method + apply_node_configs. + + :param node_group: NodeGroup Sahara resource. + :return: dictionary with calculated recommended configs for + node_group. + """ + pass + + @abc.abstractmethod + def _get_recommended_cluster_configs(self): + """Method calculates and returns recommended configs for cluster. + + It's not required to update cluster_configs of cluster using the + conductor api in this method, because it will be done in the method + apply_cluster_configs. + + :return: dictionary with calculated recommended configs for + cluster. + """ + pass + + def _can_be_recommended(self, configs_list, node_group=None): + """Method calculates and returns True, when it's possible to + + automatically configure provided list of configs configs_list. + Otherwise, method should return False. + + :param configs_list: list of configs which we want to configure + :param node_group: optional argument, which should be provided if + some config can be used in node_configs of some node_group + :return: True if all configs can be configured and False otherwise + """ + # cluster configs is Frozen Dict, so let's call to_dict() + cl_configs = self.cluster.cluster_configs.to_dict() + for ncfg in configs_list: + section, name = self._get_correct_section_and_name(ncfg) + if section in cl_configs and name in cl_configs[section]: + return False + + if not node_group: + return True + + cl_configs = node_group.node_configs.to_dict() + for ncfg in configs_list: + section, name = self._get_correct_section_and_name(ncfg) + if section in cl_configs and name in cl_configs[section]: + return False + return True + + def _get_correct_section_and_name(self, config_name): + """Calculates and returns correct applicable target and name from + + abstract name of config. + :param config_name: abstract name of config. + :return: correct applicable target and name for config_name + """ + section, name = None, None + if config_name in self.cluster_configs_to_update: + section = self.cluster_configs_to_update[config_name][0] + name = self.cluster_configs_to_update[config_name][1] + elif config_name in self.node_configs_to_update: + section = self.node_configs_to_update[config_name][0] + name = self.node_configs_to_update[config_name][1] + return section, name + + def _get_default_config_value(self, config_name): + """Calculates and return default value of config from + + abstract name of config. + :param config_name: abstract name of config. + :return: default config value for config_name. + """ + section, name = self._get_correct_section_and_name(config_name) + for config in self.plugin_configs: + if config.applicable_target == section and config.name == name: + return config.default_value + + def _merge_configs(self, current_configs, proposed_configs): + """Correctly merges old configs and new extra configs""" + result = {} + for (section, configs) in six.iteritems(proposed_configs): + cfg_values = {} + if section in current_configs: + cfg_values = (current_configs[section] if + current_configs[section] else {}) + cfg_values.update(configs) + result.update({section: cfg_values}) + for (section, configs) in six.iteritems(current_configs): + if section not in result: + result.update({section: configs}) + return result + + def apply_node_configs(self, node_group): + """Method applies configs for node_group using conductor api, + + which were calculated with recommend_node_configs method. + :param node_group: NodeGroup Sahara resource. + :return: None. + """ + if not node_group.use_autoconfig or not self.cluster.use_autoconfig: + return + to_update = self.node_configs_to_update + recommended_node_configs = self._get_recommended_node_configs( + node_group) + if not recommended_node_configs: + # Nothing to configure + return + current_dict = node_group.node_configs.to_dict() + configuration = {} + for ncfg in six.iterkeys(to_update): + if ncfg not in recommended_node_configs: + continue + n_section = to_update[ncfg][0] + n_name = to_update[ncfg][1] + proposed_config_value = recommended_node_configs[ncfg] + if n_section not in configuration: + configuration.update({n_section: {}}) + configuration[n_section].update({n_name: proposed_config_value}) + current_dict = self._merge_configs(current_dict, configuration) + conductor.node_group_update(context.ctx(), node_group, + {'node_configs': current_dict}) + + def apply_cluster_configs(self): + """Method applies configs for cluster using conductor api, which were + + calculated with recommend_cluster_configs method. + :return: None. + """ + cluster = self.cluster + if not cluster.use_autoconfig: + return + to_update = self.cluster_configs_to_update + recommended_cluster_configs = self._get_recommended_cluster_configs() + if not recommended_cluster_configs: + # Nothing to configure + return + current_dict = cluster.cluster_configs.to_dict() + configuration = {} + for ncfg in six.iterkeys(to_update): + if ncfg not in recommended_cluster_configs: + continue + n_section = to_update[ncfg][0] + n_name = to_update[ncfg][1] + proposed_config_value = recommended_cluster_configs[ncfg] + if n_section not in configuration: + configuration.update({n_section: {}}) + configuration[n_section].update({n_name: proposed_config_value}) + current_dict = self._merge_configs(current_dict, configuration) + conductor.cluster_update(context.ctx(), cluster, + {'cluster_configs': current_dict}) + + def apply_recommended_configs(self): + """Method applies recommended configs for cluster and for all + + node_groups using conductor api. + :return: None. + """ + for ng in self.cluster.node_groups: + self.apply_node_configs(ng) + self.apply_cluster_configs() + configs = list(self.cluster_configs_to_update.keys()) + configs.extend(list(self.node_configs_to_update.keys())) + LOG.debug("Following configs were auto-configured: {configs}".format( + configs=configs)) + + +class HadoopAutoConfigsProvider(AutoConfigsProvider): + def __init__(self, mapper, plugin_configs, cluster, extra_spec=None, + hbase=False): + super(HadoopAutoConfigsProvider, self).__init__( + mapper, plugin_configs, cluster, extra_spec) + self.requested_flavors = {} + self.is_hbase_enabled = hbase + + def _get_java_opts(self, value): + return "-Xmx%dm" % int(value) + + def _transform_mb_to_gb(self, mb): + return mb / 1024. + + def _transform_gb_to_mb(self, gb): + return gb * 1024. + + def _get_min_size_of_container(self, ram): + if ram <= 4: + return 256 + if ram <= 8: + return 512 + if ram <= 24: + return 1024 + return 2048 + + def _get_os_ram_recommendation(self, ram): + upper_bounds = [4, 8, 16, 24, 48, 64, 72, 96, 128, 256] + reserve_for_os = [1, 2, 2, 4, 6, 8, 8, 12, 24, 32] + for (upper, reserve) in zip(upper_bounds, reserve_for_os): + if ram <= upper: + return reserve + return 64 + + def _get_hbase_ram_recommendations(self, ram): + if not self.is_hbase_enabled: + return 0 + upper_bounds = [4, 8, 16, 24, 48, 64, 72, 96, 128, 256] + reserve_for_hbase = [1, 1, 2, 4, 8, 8, 8, 16, 24, 32] + for (upper, reserve) in zip(upper_bounds, reserve_for_hbase): + if ram <= upper: + return reserve + return 64 + + def _get_node_group_data(self, node_group): + if node_group.flavor_id not in self.requested_flavors: + flavor = nova.get_flavor(id=node_group.flavor_id) + self.requested_flavors[node_group.flavor_id] = flavor + else: + flavor = self.requested_flavors[node_group.flavor_id] + cpu = flavor.vcpus + ram = flavor.ram + data = {} + # config recommendations was taken from Ambari code + os = self._get_os_ram_recommendation(self._transform_mb_to_gb(ram)) + hbase = self._get_hbase_ram_recommendations( + self._transform_mb_to_gb(ram)) + reserved_ram = self._transform_gb_to_mb(os + hbase) + min_container_size = self._get_min_size_of_container( + self._transform_mb_to_gb(ram)) + + # we use large amount of containers to allow users to run + # at least 4 jobs at same time on clusters based on small flavors + data["containers"] = int(max( + 8, min(2 * cpu, ram / min_container_size))) + data["ramPerContainer"] = (ram - reserved_ram) / data["containers"] + data["ramPerContainer"] = max(data["ramPerContainer"], + min_container_size) + data["ramPerContainer"] = min(2048, int(data["ramPerContainer"])) + + data["ramPerContainer"] = int(data["ramPerContainer"]) + data["mapMemory"] = int(data["ramPerContainer"]) + data["reduceMemory"] = int(2 * data["ramPerContainer"]) + data["amMemory"] = int(min(data["mapMemory"], data["reduceMemory"])) + + return data + + def _get_recommended_node_configs(self, node_group): + """Calculates recommended MapReduce and YARN configs for specified + + node_group. + :param node_group: NodeGroup Sahara resource + :return: dictionary with recommended MapReduce and YARN configs + """ + configs_to_update = list(self.node_configs_to_update.keys()) + if not self._can_be_recommended(configs_to_update, node_group): + return {} + data = self._get_node_group_data(node_group) + r = {} + r['yarn.nodemanager.resource.memory-mb'] = (data['containers'] * + data['ramPerContainer']) + r['yarn.scheduler.minimum-allocation-mb'] = data['ramPerContainer'] + r['yarn.scheduler.maximum-allocation-mb'] = (data['containers'] * + data['ramPerContainer']) + r['yarn.nodemanager.vmem-check-enabled'] = "false" + r['yarn.app.mapreduce.am.resource.mb'] = data['amMemory'] + r['yarn.app.mapreduce.am.command-opts'] = self._get_java_opts( + 0.8 * data['amMemory']) + r['mapreduce.map.memory.mb'] = data['mapMemory'] + r['mapreduce.reduce.memory.mb'] = data['reduceMemory'] + r['mapreduce.map.java.opts'] = self._get_java_opts( + 0.8 * data['mapMemory']) + r['mapreduce.reduce.java.opts'] = self._get_java_opts( + 0.8 * data['reduceMemory']) + r['mapreduce.task.io.sort.mb'] = int(min( + 0.4 * data['mapMemory'], 1024)) + return r + + def _get_recommended_cluster_configs(self): + """Method recommends dfs_replication for cluster. + + :return: recommended value of dfs_replication. + """ + if not self._can_be_recommended(['dfs.replication']): + return {} + datanode_count = 0 + datanode_proc_name = "datanode" + if 'datanode_process_name' in self.extra_spec: + datanode_proc_name = self.extra_spec['datanode_process_name'] + for ng in self.cluster.node_groups: + if datanode_proc_name in ng.node_processes: + datanode_count += ng.count + replica = 'dfs.replication' + recommended_value = self._get_default_config_value(replica) + if recommended_value: + return {replica: min(recommended_value, datanode_count)} + else: + return {} diff --git a/sahara/plugins/vanilla/plugin.py b/sahara/plugins/vanilla/plugin.py index 431605fb..34f7cc24 100644 --- a/sahara/plugins/vanilla/plugin.py +++ b/sahara/plugins/vanilla/plugin.py @@ -92,3 +92,7 @@ class VanillaProvider(p.ProvisioningPluginBase): def on_terminate_cluster(self, cluster): return self._get_version_handler( cluster.hadoop_version).on_terminate_cluster(cluster) + + def recommend_configs(self, cluster): + return self._get_version_handler( + cluster.hadoop_version).recommend_configs(cluster) diff --git a/sahara/plugins/vanilla/v1_2_1/versionhandler.py b/sahara/plugins/vanilla/v1_2_1/versionhandler.py index 49ed7bfb..a91a2d0d 100644 --- a/sahara/plugins/vanilla/v1_2_1/versionhandler.py +++ b/sahara/plugins/vanilla/v1_2_1/versionhandler.py @@ -594,3 +594,7 @@ class VersionHandler(avm.AbstractVersionHandler): def on_terminate_cluster(self, cluster): proxy.delete_proxy_user_for_cluster(cluster) + + def recommend_configs(self, cluster): + # We don't support any recommendations in Vanilla 1 plugin + pass diff --git a/sahara/plugins/vanilla/v2_6_0/versionhandler.py b/sahara/plugins/vanilla/v2_6_0/versionhandler.py index ea321968..ca96820d 100644 --- a/sahara/plugins/vanilla/v2_6_0/versionhandler.py +++ b/sahara/plugins/vanilla/v2_6_0/versionhandler.py @@ -18,6 +18,7 @@ from oslo_log import log as logging from sahara import conductor from sahara import context +from sahara.plugins import recommendations_utils as ru from sahara.plugins import utils from sahara.plugins.vanilla import abstractversionhandler as avm from sahara.plugins.vanilla.hadoop2 import config as c @@ -189,3 +190,34 @@ class VersionHandler(avm.AbstractVersionHandler): def get_open_ports(self, node_group): return c.get_open_ports(node_group) + + def recommend_configs(self, cluster): + yarn_configs = [ + 'yarn.nodemanager.resource.memory-mb', + 'yarn.scheduler.minimum-allocation-mb', + 'yarn.scheduler.maximum-allocation-mb', + 'yarn.nodemanager.vmem-check-enabled', + ] + mapred_configs = [ + 'yarn.app.mapreduce.am.resource.mb', + 'yarn.app.mapreduce.am.command-opts', + 'mapreduce.map.memory.mb', + 'mapreduce.reduce.memory.mb', + 'mapreduce.map.java.opts', + 'mapreduce.reduce.java.opts', + 'mapreduce.task.io.sort.mb', + ] + configs_to_configure = { + 'cluster_configs': { + 'dfs.replication': ('HDFS', 'dfs.replication') + }, + 'node_configs': { + } + } + for mapr in mapred_configs: + configs_to_configure['node_configs'][mapr] = ("MapReduce", mapr) + for yarn in yarn_configs: + configs_to_configure['node_configs'][yarn] = ('YARN', yarn) + provider = ru.HadoopAutoConfigsProvider( + configs_to_configure, self.get_plugin_configs(), cluster) + provider.apply_recommended_configs() diff --git a/sahara/service/api.py b/sahara/service/api.py index 57f59314..5b261663 100644 --- a/sahara/service/api.py +++ b/sahara/service/api.py @@ -76,6 +76,7 @@ def scale_cluster(id, data): try: cluster = g.change_cluster_status(cluster, "Validating") quotas.check_scaling(cluster, to_be_enlarged, additional) + plugin.recommend_configs(cluster) plugin.validate_scaling(cluster, to_be_enlarged, additional) except Exception as e: with excutils.save_and_reraise_exception(): @@ -127,6 +128,7 @@ def _cluster_create(values, plugin): # validating cluster try: + plugin.recommend_configs(cluster) cluster = g.change_cluster_status(cluster, "Validating") quotas.check_cluster(cluster) plugin.validate(cluster) diff --git a/sahara/tests/unit/conductor/manager/test_clusters.py b/sahara/tests/unit/conductor/manager/test_clusters.py index 714ec39a..19a9c78e 100644 --- a/sahara/tests/unit/conductor/manager/test_clusters.py +++ b/sahara/tests/unit/conductor/manager/test_clusters.py @@ -36,14 +36,16 @@ SAMPLE_CLUSTER = { "flavor_id": "42", "node_processes": ["p1", "p2"], "count": 1, - "security_groups": None + "security_groups": None, + 'use_autoconfig': True, }, { "name": "ng_2", "flavor_id": "42", "node_processes": ["p3", "p4"], "count": 3, - "security_groups": ["group1", "group2"] + "security_groups": ["group1", "group2"], + 'use_autoconfig': True, } ], "cluster_configs": { diff --git a/sahara/tests/unit/conductor/manager/test_templates.py b/sahara/tests/unit/conductor/manager/test_templates.py index 27915f35..d32c9d9b 100644 --- a/sahara/tests/unit/conductor/manager/test_templates.py +++ b/sahara/tests/unit/conductor/manager/test_templates.py @@ -55,7 +55,8 @@ SAMPLE_NGT = { "auto_security_group": False, "availability_zone": "here", "is_proxy_gateway": False, - "volume_local_to_instance": False + "volume_local_to_instance": False, + 'use_autoconfig': True, } SAMPLE_CLT = { @@ -80,6 +81,7 @@ SAMPLE_CLT = { "floating_ip_pool": None, "security_groups": None, "availability_zone": None, + 'use_autoconfig': True, }, { "name": "ng_2", @@ -89,6 +91,7 @@ SAMPLE_CLT = { "floating_ip_pool": None, "security_groups": ["group1", "group2"], "availability_zone": None, + 'use_autoconfig': True, } ], diff --git a/sahara/tests/unit/db/migration/test_migrations.py b/sahara/tests/unit/db/migration/test_migrations.py index 4e1eb0b1..c803324a 100644 --- a/sahara/tests/unit/db/migration/test_migrations.py +++ b/sahara/tests/unit/db/migration/test_migrations.py @@ -484,6 +484,18 @@ class SaharaMigrationsCheckers(object): self.assertColumnCount(engine, 'job_interface_arguments', columns) self.assertColumnsExist(engine, 'job_interface_arguments', columns) + def _check_023(self, engine, data): + self.assertColumnExists(engine, 'clusters', + 'use_autoconfig') + self.assertColumnExists(engine, 'cluster_templates', + 'use_autoconfig') + self.assertColumnExists(engine, 'node_group_templates', + 'use_autoconfig') + self.assertColumnExists(engine, 'node_groups', + 'use_autoconfig') + self.assertColumnExists(engine, 'templates_relations', + 'use_autoconfig') + class TestMigrationsMySQL(SaharaMigrationsCheckers, base.BaseWalkMigrationTestCase, diff --git a/sahara/tests/unit/plugins/test_provide_recommendations.py b/sahara/tests/unit/plugins/test_provide_recommendations.py new file mode 100644 index 00000000..6e0d912f --- /dev/null +++ b/sahara/tests/unit/plugins/test_provide_recommendations.py @@ -0,0 +1,274 @@ +# Copyright (c) 2015 Mirantis Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import mock +import six + +from sahara import conductor as cond +from sahara import context +from sahara.plugins import recommendations_utils as ru +from sahara.tests.unit import base as b + +conductor = cond.API + + +class Configs(object): + def __init__(self, configs): + self.configs = configs + + def to_dict(self): + return self.configs + + +class FakeObject(object): + def __init__(self, **kwargs): + for attr in six.iterkeys(kwargs): + setattr(self, attr, kwargs.get(attr)) + + +class TestProvidingRecommendations(b.SaharaWithDbTestCase): + @mock.patch('sahara.utils.openstack.nova.get_flavor') + def test_get_recommended_node_configs_medium_flavor( + self, fake_flavor): + ng = FakeObject(flavor_id="fake_flavor", node_configs=Configs({})) + cl = FakeObject(cluster_configs=Configs({})) + fake_flavor.return_value = FakeObject(ram=4096, vcpus=2) + observed = ru.HadoopAutoConfigsProvider( + {}, [], cl)._get_recommended_node_configs(ng) + self.assertEqual({ + 'mapreduce.reduce.memory.mb': 768, + 'mapreduce.map.java.opts': '-Xmx307m', + 'mapreduce.map.memory.mb': 384, + 'mapreduce.reduce.java.opts': '-Xmx614m', + 'yarn.app.mapreduce.am.resource.mb': 384, + 'yarn.app.mapreduce.am.command-opts': '-Xmx307m', + 'mapreduce.task.io.sort.mb': 153, + 'yarn.nodemanager.resource.memory-mb': 3072, + 'yarn.scheduler.minimum-allocation-mb': 384, + 'yarn.scheduler.maximum-allocation-mb': 3072, + 'yarn.nodemanager.vmem-check-enabled': 'false' + }, observed) + + @mock.patch('sahara.utils.openstack.nova.get_flavor') + def test_get_recommended_node_configs_small_flavor( + self, fake_flavor): + ng = FakeObject(flavor_id="fake_flavor", node_configs=Configs({})) + cl = FakeObject(cluster_configs=Configs({})) + fake_flavor.return_value = FakeObject(ram=2048, vcpus=1) + observed = ru.HadoopAutoConfigsProvider( + {'node_configs': {}, 'cluster_configs': {}}, [], cl + )._get_recommended_node_configs(ng) + self.assertEqual({ + 'mapreduce.reduce.java.opts': '-Xmx409m', + 'yarn.app.mapreduce.am.resource.mb': 256, + 'mapreduce.reduce.memory.mb': 512, + 'mapreduce.map.java.opts': '-Xmx204m', + 'yarn.app.mapreduce.am.command-opts': '-Xmx204m', + 'mapreduce.task.io.sort.mb': 102, + 'mapreduce.map.memory.mb': 256, + 'yarn.nodemanager.resource.memory-mb': 2048, + 'yarn.scheduler.minimum-allocation-mb': 256, + 'yarn.nodemanager.vmem-check-enabled': 'false', + 'yarn.scheduler.maximum-allocation-mb': 2048, + }, observed) + + def test_merge_configs(self): + provider = ru.HadoopAutoConfigsProvider({}, None, None) + initial_configs = { + 'cat': { + 'talk': 'meow', + }, + 'bond': { + 'name': 'james' + } + } + + extra_configs = { + 'dog': { + 'talk': 'woof' + }, + 'bond': { + 'extra_name': 'james bond' + } + } + + expected = { + 'cat': { + 'talk': 'meow', + }, + 'dog': { + 'talk': 'woof' + }, + 'bond': { + 'name': 'james', + 'extra_name': 'james bond' + } + } + self.assertEqual( + expected, provider._merge_configs(initial_configs, extra_configs)) + + @mock.patch('sahara.utils.openstack.nova.get_flavor') + @mock.patch('sahara.plugins.recommendations_utils.conductor.' + 'node_group_update') + @mock.patch('sahara.plugins.recommendations_utils.conductor.' + 'cluster_update') + def test_apply_recommended_configs(self, cond_cluster, cond_node_group, + fake_flavor): + fake_flavor.return_value = FakeObject(ram=2048, vcpus=1) + to_tune = { + 'cluster_configs': { + 'dfs.replication': ('dfs', 'replica') + }, + 'node_configs': { + 'mapreduce.task.io.sort.mb': ('bond', 'extra_name') + } + } + + fake_plugin_configs = [ + FakeObject(applicable_target='dfs', name='replica', + default_value=3)] + fake_ng = FakeObject( + use_autoconfig=True, + count=2, + node_processes=['dog_datanode'], + flavor_id='fake_id', + node_configs=Configs({ + 'bond': { + 'name': 'james' + } + }) + ) + fake_cluster = FakeObject( + cluster_configs=Configs({ + 'cat': { + 'talk': 'meow', + } + }), + node_groups=[fake_ng], + use_autoconfig=True, + ) + v = ru.HadoopAutoConfigsProvider( + to_tune, fake_plugin_configs, fake_cluster, + {'datanode_process_name': "dog_datanode"}) + + v.apply_recommended_configs() + self.assertEqual([mock.call(context.ctx(), fake_cluster, { + 'cluster_configs': { + 'cat': { + 'talk': 'meow' + }, + 'dfs': { + 'replica': 2 + } + } + })], cond_cluster.call_args_list) + self.assertEqual([mock.call(context.ctx(), fake_ng, { + 'node_configs': { + 'bond': { + 'name': 'james', + 'extra_name': 102 + } + } + })], cond_node_group.call_args_list) + + @mock.patch('sahara.utils.openstack.nova.get_flavor') + @mock.patch('sahara.plugins.recommendations_utils.conductor.' + 'node_group_update') + @mock.patch('sahara.plugins.recommendations_utils.conductor.' + 'cluster_update') + def test_apply_recommended_configs_no_updates( + self, cond_cluster, cond_node_group, fake_flavor): + fake_flavor.return_value = FakeObject(ram=2048, vcpus=1) + to_tune = { + 'cluster_configs': { + 'dfs.replication': ('dfs', 'replica') + }, + 'node_configs': { + 'mapreduce.task.io.sort.mb': ('bond', 'extra_name') + } + } + + fake_plugin_configs = [ + FakeObject(applicable_target='dfs', name='replica', + default_value=3)] + fake_ng = FakeObject( + use_autoconfig=True, + count=2, + node_processes=['dog_datanode'], + flavor_id='fake_id', + node_configs=Configs({ + 'bond': { + 'extra_name': 'james bond' + } + }) + ) + fake_cluster = FakeObject( + cluster_configs=Configs({ + 'dfs': { + 'replica': 1 + } + }), + node_groups=[fake_ng], + use_autoconfig=True, + ) + v = ru.HadoopAutoConfigsProvider( + to_tune, fake_plugin_configs, fake_cluster) + v.apply_recommended_configs() + self.assertEqual(0, cond_cluster.call_count) + self.assertEqual(0, cond_node_group.call_count) + + def test_correct_use_autoconfig_value(self): + ctx = context.ctx() + ngt1 = conductor.node_group_template_create(ctx, { + 'name': 'ngt1', + 'flavor_id': '1', + 'plugin_name': 'vanilla', + 'hadoop_version': '1' + }) + ngt2 = conductor.node_group_template_create(ctx, { + 'name': 'ngt2', + 'flavor_id': '2', + 'plugin_name': 'vanilla', + 'hadoop_version': '1', + 'use_autoconfig': False + }) + self.assertTrue(ngt1.use_autoconfig) + self.assertFalse(ngt2.use_autoconfig) + clt = conductor.cluster_template_create(ctx, { + 'name': "clt1", + 'plugin_name': 'vanilla', + 'hadoop_version': '1', + 'node_groups': [ + { + 'count': 3, + "node_group_template_id": ngt1.id + }, + { + 'count': 1, + 'node_group_template_id': ngt2.id + } + ], + 'use_autoconfig': False + }) + cluster = conductor.cluster_create(ctx, { + 'name': 'stupid', + 'cluster_template_id': clt.id + }) + self.assertFalse(cluster.use_autoconfig) + for ng in cluster.node_groups: + if ng.name == 'ngt1': + self.assertTrue(ng.use_autoconfig) + else: + self.assertFalse(ng.use_autoconfig) diff --git a/sahara/tests/unit/plugins/vanilla/v2_6_0/test_versionhandler.py b/sahara/tests/unit/plugins/vanilla/v2_6_0/test_versionhandler.py new file mode 100644 index 00000000..673f4ad1 --- /dev/null +++ b/sahara/tests/unit/plugins/vanilla/v2_6_0/test_versionhandler.py @@ -0,0 +1,63 @@ +# Copyright (c) 2015 Mirantis Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import mock +import testtools + +from sahara.plugins.vanilla.v2_6_0 import versionhandler + +CONFIGURATION_SCHEMA = { + 'cluster_configs': { + 'dfs.replication': ('HDFS', 'dfs.replication') + }, + 'node_configs': { + 'yarn.app.mapreduce.am.command-opts': ( + 'MapReduce', 'yarn.app.mapreduce.am.command-opts'), + 'yarn.scheduler.maximum-allocation-mb': ( + 'YARN', 'yarn.scheduler.maximum-allocation-mb'), + 'yarn.app.mapreduce.am.resource.mb': ( + 'MapReduce', 'yarn.app.mapreduce.am.resource.mb'), + 'yarn.scheduler.minimum-allocation-mb': ( + 'YARN', 'yarn.scheduler.minimum-allocation-mb'), + 'yarn.nodemanager.vmem-check-enabled': ( + 'YARN', 'yarn.nodemanager.vmem-check-enabled'), + 'mapreduce.map.java.opts': ( + 'MapReduce', 'mapreduce.map.java.opts'), + 'mapreduce.reduce.memory.mb': ( + 'MapReduce', 'mapreduce.reduce.memory.mb'), + 'yarn.nodemanager.resource.memory-mb': ( + 'YARN', 'yarn.nodemanager.resource.memory-mb'), + 'mapreduce.reduce.java.opts': ( + 'MapReduce', 'mapreduce.reduce.java.opts'), + 'mapreduce.map.memory.mb': ( + 'MapReduce', 'mapreduce.map.memory.mb'), + 'mapreduce.task.io.sort.mb': ( + 'MapReduce', 'mapreduce.task.io.sort.mb') + } +} + + +class TestVersionHandler(testtools.TestCase): + @mock.patch('sahara.plugins.recommendations_utils.' + 'HadoopAutoConfigsProvider') + @mock.patch('sahara.plugins.vanilla.v2_6_0.versionhandler.VersionHandler.' + 'get_plugin_configs') + def test_recommend_configs(self, fake_plugin_configs, provider): + f_cluster, f_configs = mock.Mock(), mock.Mock() + fake_plugin_configs.return_value = f_configs + versionhandler.VersionHandler().recommend_configs(f_cluster) + self.assertEqual([ + mock.call(CONFIGURATION_SCHEMA, f_configs, f_cluster) + ], provider.call_args_list) diff --git a/sahara/tests/unit/service/test_api.py b/sahara/tests/unit/service/test_api.py index 6af0d0df..a8fe2379 100644 --- a/sahara/tests/unit/service/test_api.py +++ b/sahara/tests/unit/service/test_api.py @@ -120,6 +120,9 @@ class FakePlugin(object): def get_configs(self, version): return {} + def recommend_configs(self, cluster): + self.calls_order.append('recommend_configs') + class FakePluginManager(object): def __init__(self, calls_order): @@ -177,7 +180,7 @@ class TestApi(base.SaharaWithDbTestCase): self.assertEqual(3, ng_count) api.terminate_cluster(result_cluster.id) self.assertEqual( - ['get_open_ports', 'validate', + ['get_open_ports', 'recommend_configs', 'validate', 'ops.provision_cluster', 'ops.terminate_cluster'], self.calls_order) @@ -204,9 +207,9 @@ class TestApi(base.SaharaWithDbTestCase): api.terminate_cluster(result_cluster1.id) api.terminate_cluster(result_cluster2.id) self.assertEqual( - ['get_open_ports', 'validate', + ['get_open_ports', 'recommend_configs', 'validate', 'ops.provision_cluster', - 'get_open_ports', 'validate', + 'get_open_ports', 'recommend_configs', 'validate', 'ops.provision_cluster', 'ops.terminate_cluster', 'ops.terminate_cluster'], self.calls_order) @@ -249,8 +252,9 @@ class TestApi(base.SaharaWithDbTestCase): self.assertEqual(4, ng_count) api.terminate_cluster(result_cluster.id) self.assertEqual( - ['get_open_ports', 'validate', 'ops.provision_cluster', - 'get_open_ports', 'get_open_ports', 'validate_scaling', + ['get_open_ports', 'recommend_configs', 'validate', + 'ops.provision_cluster', 'get_open_ports', 'get_open_ports', + 'recommend_configs', 'validate_scaling', 'ops.provision_scaled_cluster', 'ops.terminate_cluster'], self.calls_order)