Implement recommendations for vanilla 2.6.0

This patch implements base recommendation provider for vanilla 2.6.0. Also
added new fields in database for cluster and node groups which will allow
to switch off autoconfiguration.

Partial-implements blueprint: recommend-configuration
Change-Id: I9abb6b9c04494f4aed9b72479c06e45fe289c1ff
This commit is contained in:
Vitaly Gridnev 2015-04-24 15:39:25 +03:00
parent a13f60d38a
commit 22dbaa7248
15 changed files with 813 additions and 8 deletions

View File

@ -61,6 +61,7 @@ class Cluster(object):
node_groups - list of NodeGroup objects node_groups - list of NodeGroup objects
cluster_template_id cluster_template_id
cluster_template - ClusterTemplate object cluster_template - ClusterTemplate object
use_autoconfig
""" """
def has_proxy_gateway(self): def has_proxy_gateway(self):

View File

@ -0,0 +1,42 @@
# Copyright 2015 OpenStack Foundation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""add_use_autoconfig
Revision ID: 023
Revises: 022
Create Date: 2015-04-24 14:51:39.582085
"""
# revision identifiers, used by Alembic.
revision = '023'
down_revision = '022'
from alembic import op
import sqlalchemy as sa
def upgrade():
op.add_column('clusters',
sa.Column('use_autoconfig', sa.Boolean()))
op.add_column('cluster_templates',
sa.Column('use_autoconfig', sa.Boolean()))
op.add_column('node_group_templates',
sa.Column('use_autoconfig', sa.Boolean()))
op.add_column('node_groups',
sa.Column('use_autoconfig', sa.Boolean()))
op.add_column('templates_relations',
sa.Column('use_autoconfig', sa.Boolean()))

View File

@ -67,6 +67,7 @@ class Cluster(mb.SaharaBase):
extra = sa.Column(st.JsonDictType()) extra = sa.Column(st.JsonDictType())
rollback_info = sa.Column(st.JsonDictType()) rollback_info = sa.Column(st.JsonDictType())
sahara_info = sa.Column(st.JsonDictType()) sahara_info = sa.Column(st.JsonDictType())
use_autoconfig = sa.Column(sa.Boolean(), default=True)
provision_progress = relationship('ClusterProvisionStep', provision_progress = relationship('ClusterProvisionStep',
cascade="all,delete", cascade="all,delete",
backref='cluster', backref='cluster',
@ -109,6 +110,8 @@ class NodeGroup(mb.SaharaBase):
volume_mount_prefix = sa.Column(sa.String(80)) volume_mount_prefix = sa.Column(sa.String(80))
volume_type = sa.Column(sa.String(255)) volume_type = sa.Column(sa.String(255))
count = sa.Column(sa.Integer, nullable=False) count = sa.Column(sa.Integer, nullable=False)
use_autoconfig = sa.Column(sa.Boolean(), default=True)
instances = relationship('Instance', cascade="all,delete", instances = relationship('Instance', cascade="all,delete",
backref='node_group', backref='node_group',
order_by="Instance.instance_name", lazy='joined') order_by="Instance.instance_name", lazy='joined')
@ -176,6 +179,7 @@ class ClusterTemplate(mb.SaharaBase):
node_groups = relationship('TemplatesRelation', cascade="all,delete", node_groups = relationship('TemplatesRelation', cascade="all,delete",
backref='cluster_template', lazy='joined') backref='cluster_template', lazy='joined')
is_default = sa.Column(sa.Boolean(), default=False) is_default = sa.Column(sa.Boolean(), default=False)
use_autoconfig = sa.Column(sa.Boolean(), default=True)
def to_dict(self): def to_dict(self):
d = super(ClusterTemplate, self).to_dict() d = super(ClusterTemplate, self).to_dict()
@ -215,6 +219,7 @@ class NodeGroupTemplate(mb.SaharaBase):
is_proxy_gateway = sa.Column(sa.Boolean()) is_proxy_gateway = sa.Column(sa.Boolean())
volume_local_to_instance = sa.Column(sa.Boolean()) volume_local_to_instance = sa.Column(sa.Boolean())
is_default = sa.Column(sa.Boolean(), default=False) is_default = sa.Column(sa.Boolean(), default=False)
use_autoconfig = sa.Column(sa.Boolean(), default=True)
class TemplatesRelation(mb.SaharaBase): class TemplatesRelation(mb.SaharaBase):
@ -238,6 +243,7 @@ class TemplatesRelation(mb.SaharaBase):
volume_mount_prefix = sa.Column(sa.String(80)) volume_mount_prefix = sa.Column(sa.String(80))
volume_type = sa.Column(sa.String(255)) volume_type = sa.Column(sa.String(255))
count = sa.Column(sa.Integer, nullable=False) count = sa.Column(sa.Integer, nullable=False)
use_autoconfig = sa.Column(sa.Boolean(), default=True)
cluster_template_id = sa.Column(sa.String(36), cluster_template_id = sa.Column(sa.String(36),
sa.ForeignKey('cluster_templates.id')) sa.ForeignKey('cluster_templates.id'))
node_group_template_id = sa.Column(sa.String(36), node_group_template_id = sa.Column(sa.String(36),

View File

@ -90,6 +90,10 @@ class ProvisioningPluginBase(plugins_base.PluginInterface):
def on_terminate_cluster(self, cluster): def on_terminate_cluster(self, cluster):
pass pass
@plugins_base.optional
def recommend_configs(self, cluster):
pass
def to_dict(self): def to_dict(self):
res = super(ProvisioningPluginBase, self).to_dict() res = super(ProvisioningPluginBase, self).to_dict()
res['versions'] = self.get_versions() res['versions'] = self.get_versions()

View File

@ -0,0 +1,352 @@
# Copyright (c) 2015 Mirantis Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import abc
from oslo_log import log as logging
import six
from sahara import conductor as cond
from sahara import context
from sahara.utils.openstack import nova
conductor = cond.API
LOG = logging.getLogger(__name__)
@six.add_metaclass(abc.ABCMeta)
class AutoConfigsProvider(object):
def __init__(self, mapper, plugin_configs, cluster, extra_spec=None):
"""This meta class provides general recommendation utils for cluster
configuration.
:param mapper: dictionary, that describes which cluster configs and
node_configs to configure. It should maps to following dicts:
node_configs to configure and cluster_configs to configure. This
dicts should contains abstract names of configs as keys and
tuple (correct_applicable_target, correct_name) as values. Such
representation allows to use same AutoConfigsProvider for plugins
with almost same configs and configuring principles.
:param plugin_configs: all plugins_configs for specified plugin
:param cluster: cluster which is required to configure
:param extra_spec: extra helpful information about AutoConfigs
"""
self.plugin_configs = plugin_configs
self.cluster = cluster
self.node_configs_to_update = mapper.get('node_configs', {})
self.cluster_configs_to_update = mapper.get('cluster_configs', {})
self.extra_spec = {} if not extra_spec else extra_spec
@abc.abstractmethod
def _get_recommended_node_configs(self, node_group):
"""Method calculates and returns recommended configs for node_group.
It's not required to update node_configs of node_group using the
conductor api in this method, because it will be done in the method
apply_node_configs.
:param node_group: NodeGroup Sahara resource.
:return: dictionary with calculated recommended configs for
node_group.
"""
pass
@abc.abstractmethod
def _get_recommended_cluster_configs(self):
"""Method calculates and returns recommended configs for cluster.
It's not required to update cluster_configs of cluster using the
conductor api in this method, because it will be done in the method
apply_cluster_configs.
:return: dictionary with calculated recommended configs for
cluster.
"""
pass
def _can_be_recommended(self, configs_list, node_group=None):
"""Method calculates and returns True, when it's possible to
automatically configure provided list of configs configs_list.
Otherwise, method should return False.
:param configs_list: list of configs which we want to configure
:param node_group: optional argument, which should be provided if
some config can be used in node_configs of some node_group
:return: True if all configs can be configured and False otherwise
"""
# cluster configs is Frozen Dict, so let's call to_dict()
cl_configs = self.cluster.cluster_configs.to_dict()
for ncfg in configs_list:
section, name = self._get_correct_section_and_name(ncfg)
if section in cl_configs and name in cl_configs[section]:
return False
if not node_group:
return True
cl_configs = node_group.node_configs.to_dict()
for ncfg in configs_list:
section, name = self._get_correct_section_and_name(ncfg)
if section in cl_configs and name in cl_configs[section]:
return False
return True
def _get_correct_section_and_name(self, config_name):
"""Calculates and returns correct applicable target and name from
abstract name of config.
:param config_name: abstract name of config.
:return: correct applicable target and name for config_name
"""
section, name = None, None
if config_name in self.cluster_configs_to_update:
section = self.cluster_configs_to_update[config_name][0]
name = self.cluster_configs_to_update[config_name][1]
elif config_name in self.node_configs_to_update:
section = self.node_configs_to_update[config_name][0]
name = self.node_configs_to_update[config_name][1]
return section, name
def _get_default_config_value(self, config_name):
"""Calculates and return default value of config from
abstract name of config.
:param config_name: abstract name of config.
:return: default config value for config_name.
"""
section, name = self._get_correct_section_and_name(config_name)
for config in self.plugin_configs:
if config.applicable_target == section and config.name == name:
return config.default_value
def _merge_configs(self, current_configs, proposed_configs):
"""Correctly merges old configs and new extra configs"""
result = {}
for (section, configs) in six.iteritems(proposed_configs):
cfg_values = {}
if section in current_configs:
cfg_values = (current_configs[section] if
current_configs[section] else {})
cfg_values.update(configs)
result.update({section: cfg_values})
for (section, configs) in six.iteritems(current_configs):
if section not in result:
result.update({section: configs})
return result
def apply_node_configs(self, node_group):
"""Method applies configs for node_group using conductor api,
which were calculated with recommend_node_configs method.
:param node_group: NodeGroup Sahara resource.
:return: None.
"""
if not node_group.use_autoconfig or not self.cluster.use_autoconfig:
return
to_update = self.node_configs_to_update
recommended_node_configs = self._get_recommended_node_configs(
node_group)
if not recommended_node_configs:
# Nothing to configure
return
current_dict = node_group.node_configs.to_dict()
configuration = {}
for ncfg in six.iterkeys(to_update):
if ncfg not in recommended_node_configs:
continue
n_section = to_update[ncfg][0]
n_name = to_update[ncfg][1]
proposed_config_value = recommended_node_configs[ncfg]
if n_section not in configuration:
configuration.update({n_section: {}})
configuration[n_section].update({n_name: proposed_config_value})
current_dict = self._merge_configs(current_dict, configuration)
conductor.node_group_update(context.ctx(), node_group,
{'node_configs': current_dict})
def apply_cluster_configs(self):
"""Method applies configs for cluster using conductor api, which were
calculated with recommend_cluster_configs method.
:return: None.
"""
cluster = self.cluster
if not cluster.use_autoconfig:
return
to_update = self.cluster_configs_to_update
recommended_cluster_configs = self._get_recommended_cluster_configs()
if not recommended_cluster_configs:
# Nothing to configure
return
current_dict = cluster.cluster_configs.to_dict()
configuration = {}
for ncfg in six.iterkeys(to_update):
if ncfg not in recommended_cluster_configs:
continue
n_section = to_update[ncfg][0]
n_name = to_update[ncfg][1]
proposed_config_value = recommended_cluster_configs[ncfg]
if n_section not in configuration:
configuration.update({n_section: {}})
configuration[n_section].update({n_name: proposed_config_value})
current_dict = self._merge_configs(current_dict, configuration)
conductor.cluster_update(context.ctx(), cluster,
{'cluster_configs': current_dict})
def apply_recommended_configs(self):
"""Method applies recommended configs for cluster and for all
node_groups using conductor api.
:return: None.
"""
for ng in self.cluster.node_groups:
self.apply_node_configs(ng)
self.apply_cluster_configs()
configs = list(self.cluster_configs_to_update.keys())
configs.extend(list(self.node_configs_to_update.keys()))
LOG.debug("Following configs were auto-configured: {configs}".format(
configs=configs))
class HadoopAutoConfigsProvider(AutoConfigsProvider):
def __init__(self, mapper, plugin_configs, cluster, extra_spec=None,
hbase=False):
super(HadoopAutoConfigsProvider, self).__init__(
mapper, plugin_configs, cluster, extra_spec)
self.requested_flavors = {}
self.is_hbase_enabled = hbase
def _get_java_opts(self, value):
return "-Xmx%dm" % int(value)
def _transform_mb_to_gb(self, mb):
return mb / 1024.
def _transform_gb_to_mb(self, gb):
return gb * 1024.
def _get_min_size_of_container(self, ram):
if ram <= 4:
return 256
if ram <= 8:
return 512
if ram <= 24:
return 1024
return 2048
def _get_os_ram_recommendation(self, ram):
upper_bounds = [4, 8, 16, 24, 48, 64, 72, 96, 128, 256]
reserve_for_os = [1, 2, 2, 4, 6, 8, 8, 12, 24, 32]
for (upper, reserve) in zip(upper_bounds, reserve_for_os):
if ram <= upper:
return reserve
return 64
def _get_hbase_ram_recommendations(self, ram):
if not self.is_hbase_enabled:
return 0
upper_bounds = [4, 8, 16, 24, 48, 64, 72, 96, 128, 256]
reserve_for_hbase = [1, 1, 2, 4, 8, 8, 8, 16, 24, 32]
for (upper, reserve) in zip(upper_bounds, reserve_for_hbase):
if ram <= upper:
return reserve
return 64
def _get_node_group_data(self, node_group):
if node_group.flavor_id not in self.requested_flavors:
flavor = nova.get_flavor(id=node_group.flavor_id)
self.requested_flavors[node_group.flavor_id] = flavor
else:
flavor = self.requested_flavors[node_group.flavor_id]
cpu = flavor.vcpus
ram = flavor.ram
data = {}
# config recommendations was taken from Ambari code
os = self._get_os_ram_recommendation(self._transform_mb_to_gb(ram))
hbase = self._get_hbase_ram_recommendations(
self._transform_mb_to_gb(ram))
reserved_ram = self._transform_gb_to_mb(os + hbase)
min_container_size = self._get_min_size_of_container(
self._transform_mb_to_gb(ram))
# we use large amount of containers to allow users to run
# at least 4 jobs at same time on clusters based on small flavors
data["containers"] = int(max(
8, min(2 * cpu, ram / min_container_size)))
data["ramPerContainer"] = (ram - reserved_ram) / data["containers"]
data["ramPerContainer"] = max(data["ramPerContainer"],
min_container_size)
data["ramPerContainer"] = min(2048, int(data["ramPerContainer"]))
data["ramPerContainer"] = int(data["ramPerContainer"])
data["mapMemory"] = int(data["ramPerContainer"])
data["reduceMemory"] = int(2 * data["ramPerContainer"])
data["amMemory"] = int(min(data["mapMemory"], data["reduceMemory"]))
return data
def _get_recommended_node_configs(self, node_group):
"""Calculates recommended MapReduce and YARN configs for specified
node_group.
:param node_group: NodeGroup Sahara resource
:return: dictionary with recommended MapReduce and YARN configs
"""
configs_to_update = list(self.node_configs_to_update.keys())
if not self._can_be_recommended(configs_to_update, node_group):
return {}
data = self._get_node_group_data(node_group)
r = {}
r['yarn.nodemanager.resource.memory-mb'] = (data['containers'] *
data['ramPerContainer'])
r['yarn.scheduler.minimum-allocation-mb'] = data['ramPerContainer']
r['yarn.scheduler.maximum-allocation-mb'] = (data['containers'] *
data['ramPerContainer'])
r['yarn.nodemanager.vmem-check-enabled'] = "false"
r['yarn.app.mapreduce.am.resource.mb'] = data['amMemory']
r['yarn.app.mapreduce.am.command-opts'] = self._get_java_opts(
0.8 * data['amMemory'])
r['mapreduce.map.memory.mb'] = data['mapMemory']
r['mapreduce.reduce.memory.mb'] = data['reduceMemory']
r['mapreduce.map.java.opts'] = self._get_java_opts(
0.8 * data['mapMemory'])
r['mapreduce.reduce.java.opts'] = self._get_java_opts(
0.8 * data['reduceMemory'])
r['mapreduce.task.io.sort.mb'] = int(min(
0.4 * data['mapMemory'], 1024))
return r
def _get_recommended_cluster_configs(self):
"""Method recommends dfs_replication for cluster.
:return: recommended value of dfs_replication.
"""
if not self._can_be_recommended(['dfs.replication']):
return {}
datanode_count = 0
datanode_proc_name = "datanode"
if 'datanode_process_name' in self.extra_spec:
datanode_proc_name = self.extra_spec['datanode_process_name']
for ng in self.cluster.node_groups:
if datanode_proc_name in ng.node_processes:
datanode_count += ng.count
replica = 'dfs.replication'
recommended_value = self._get_default_config_value(replica)
if recommended_value:
return {replica: min(recommended_value, datanode_count)}
else:
return {}

View File

@ -92,3 +92,7 @@ class VanillaProvider(p.ProvisioningPluginBase):
def on_terminate_cluster(self, cluster): def on_terminate_cluster(self, cluster):
return self._get_version_handler( return self._get_version_handler(
cluster.hadoop_version).on_terminate_cluster(cluster) cluster.hadoop_version).on_terminate_cluster(cluster)
def recommend_configs(self, cluster):
return self._get_version_handler(
cluster.hadoop_version).recommend_configs(cluster)

View File

@ -594,3 +594,7 @@ class VersionHandler(avm.AbstractVersionHandler):
def on_terminate_cluster(self, cluster): def on_terminate_cluster(self, cluster):
proxy.delete_proxy_user_for_cluster(cluster) proxy.delete_proxy_user_for_cluster(cluster)
def recommend_configs(self, cluster):
# We don't support any recommendations in Vanilla 1 plugin
pass

View File

@ -18,6 +18,7 @@ from oslo_log import log as logging
from sahara import conductor from sahara import conductor
from sahara import context from sahara import context
from sahara.plugins import recommendations_utils as ru
from sahara.plugins import utils from sahara.plugins import utils
from sahara.plugins.vanilla import abstractversionhandler as avm from sahara.plugins.vanilla import abstractversionhandler as avm
from sahara.plugins.vanilla.hadoop2 import config as c from sahara.plugins.vanilla.hadoop2 import config as c
@ -189,3 +190,34 @@ class VersionHandler(avm.AbstractVersionHandler):
def get_open_ports(self, node_group): def get_open_ports(self, node_group):
return c.get_open_ports(node_group) return c.get_open_ports(node_group)
def recommend_configs(self, cluster):
yarn_configs = [
'yarn.nodemanager.resource.memory-mb',
'yarn.scheduler.minimum-allocation-mb',
'yarn.scheduler.maximum-allocation-mb',
'yarn.nodemanager.vmem-check-enabled',
]
mapred_configs = [
'yarn.app.mapreduce.am.resource.mb',
'yarn.app.mapreduce.am.command-opts',
'mapreduce.map.memory.mb',
'mapreduce.reduce.memory.mb',
'mapreduce.map.java.opts',
'mapreduce.reduce.java.opts',
'mapreduce.task.io.sort.mb',
]
configs_to_configure = {
'cluster_configs': {
'dfs.replication': ('HDFS', 'dfs.replication')
},
'node_configs': {
}
}
for mapr in mapred_configs:
configs_to_configure['node_configs'][mapr] = ("MapReduce", mapr)
for yarn in yarn_configs:
configs_to_configure['node_configs'][yarn] = ('YARN', yarn)
provider = ru.HadoopAutoConfigsProvider(
configs_to_configure, self.get_plugin_configs(), cluster)
provider.apply_recommended_configs()

View File

@ -76,6 +76,7 @@ def scale_cluster(id, data):
try: try:
cluster = g.change_cluster_status(cluster, "Validating") cluster = g.change_cluster_status(cluster, "Validating")
quotas.check_scaling(cluster, to_be_enlarged, additional) quotas.check_scaling(cluster, to_be_enlarged, additional)
plugin.recommend_configs(cluster)
plugin.validate_scaling(cluster, to_be_enlarged, additional) plugin.validate_scaling(cluster, to_be_enlarged, additional)
except Exception as e: except Exception as e:
with excutils.save_and_reraise_exception(): with excutils.save_and_reraise_exception():
@ -127,6 +128,7 @@ def _cluster_create(values, plugin):
# validating cluster # validating cluster
try: try:
plugin.recommend_configs(cluster)
cluster = g.change_cluster_status(cluster, "Validating") cluster = g.change_cluster_status(cluster, "Validating")
quotas.check_cluster(cluster) quotas.check_cluster(cluster)
plugin.validate(cluster) plugin.validate(cluster)

View File

@ -36,14 +36,16 @@ SAMPLE_CLUSTER = {
"flavor_id": "42", "flavor_id": "42",
"node_processes": ["p1", "p2"], "node_processes": ["p1", "p2"],
"count": 1, "count": 1,
"security_groups": None "security_groups": None,
'use_autoconfig': True,
}, },
{ {
"name": "ng_2", "name": "ng_2",
"flavor_id": "42", "flavor_id": "42",
"node_processes": ["p3", "p4"], "node_processes": ["p3", "p4"],
"count": 3, "count": 3,
"security_groups": ["group1", "group2"] "security_groups": ["group1", "group2"],
'use_autoconfig': True,
} }
], ],
"cluster_configs": { "cluster_configs": {

View File

@ -55,7 +55,8 @@ SAMPLE_NGT = {
"auto_security_group": False, "auto_security_group": False,
"availability_zone": "here", "availability_zone": "here",
"is_proxy_gateway": False, "is_proxy_gateway": False,
"volume_local_to_instance": False "volume_local_to_instance": False,
'use_autoconfig': True,
} }
SAMPLE_CLT = { SAMPLE_CLT = {
@ -80,6 +81,7 @@ SAMPLE_CLT = {
"floating_ip_pool": None, "floating_ip_pool": None,
"security_groups": None, "security_groups": None,
"availability_zone": None, "availability_zone": None,
'use_autoconfig': True,
}, },
{ {
"name": "ng_2", "name": "ng_2",
@ -89,6 +91,7 @@ SAMPLE_CLT = {
"floating_ip_pool": None, "floating_ip_pool": None,
"security_groups": ["group1", "group2"], "security_groups": ["group1", "group2"],
"availability_zone": None, "availability_zone": None,
'use_autoconfig': True,
} }
], ],

View File

@ -484,6 +484,18 @@ class SaharaMigrationsCheckers(object):
self.assertColumnCount(engine, 'job_interface_arguments', columns) self.assertColumnCount(engine, 'job_interface_arguments', columns)
self.assertColumnsExist(engine, 'job_interface_arguments', columns) self.assertColumnsExist(engine, 'job_interface_arguments', columns)
def _check_023(self, engine, data):
self.assertColumnExists(engine, 'clusters',
'use_autoconfig')
self.assertColumnExists(engine, 'cluster_templates',
'use_autoconfig')
self.assertColumnExists(engine, 'node_group_templates',
'use_autoconfig')
self.assertColumnExists(engine, 'node_groups',
'use_autoconfig')
self.assertColumnExists(engine, 'templates_relations',
'use_autoconfig')
class TestMigrationsMySQL(SaharaMigrationsCheckers, class TestMigrationsMySQL(SaharaMigrationsCheckers,
base.BaseWalkMigrationTestCase, base.BaseWalkMigrationTestCase,

View File

@ -0,0 +1,274 @@
# Copyright (c) 2015 Mirantis Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import mock
import six
from sahara import conductor as cond
from sahara import context
from sahara.plugins import recommendations_utils as ru
from sahara.tests.unit import base as b
conductor = cond.API
class Configs(object):
def __init__(self, configs):
self.configs = configs
def to_dict(self):
return self.configs
class FakeObject(object):
def __init__(self, **kwargs):
for attr in six.iterkeys(kwargs):
setattr(self, attr, kwargs.get(attr))
class TestProvidingRecommendations(b.SaharaWithDbTestCase):
@mock.patch('sahara.utils.openstack.nova.get_flavor')
def test_get_recommended_node_configs_medium_flavor(
self, fake_flavor):
ng = FakeObject(flavor_id="fake_flavor", node_configs=Configs({}))
cl = FakeObject(cluster_configs=Configs({}))
fake_flavor.return_value = FakeObject(ram=4096, vcpus=2)
observed = ru.HadoopAutoConfigsProvider(
{}, [], cl)._get_recommended_node_configs(ng)
self.assertEqual({
'mapreduce.reduce.memory.mb': 768,
'mapreduce.map.java.opts': '-Xmx307m',
'mapreduce.map.memory.mb': 384,
'mapreduce.reduce.java.opts': '-Xmx614m',
'yarn.app.mapreduce.am.resource.mb': 384,
'yarn.app.mapreduce.am.command-opts': '-Xmx307m',
'mapreduce.task.io.sort.mb': 153,
'yarn.nodemanager.resource.memory-mb': 3072,
'yarn.scheduler.minimum-allocation-mb': 384,
'yarn.scheduler.maximum-allocation-mb': 3072,
'yarn.nodemanager.vmem-check-enabled': 'false'
}, observed)
@mock.patch('sahara.utils.openstack.nova.get_flavor')
def test_get_recommended_node_configs_small_flavor(
self, fake_flavor):
ng = FakeObject(flavor_id="fake_flavor", node_configs=Configs({}))
cl = FakeObject(cluster_configs=Configs({}))
fake_flavor.return_value = FakeObject(ram=2048, vcpus=1)
observed = ru.HadoopAutoConfigsProvider(
{'node_configs': {}, 'cluster_configs': {}}, [], cl
)._get_recommended_node_configs(ng)
self.assertEqual({
'mapreduce.reduce.java.opts': '-Xmx409m',
'yarn.app.mapreduce.am.resource.mb': 256,
'mapreduce.reduce.memory.mb': 512,
'mapreduce.map.java.opts': '-Xmx204m',
'yarn.app.mapreduce.am.command-opts': '-Xmx204m',
'mapreduce.task.io.sort.mb': 102,
'mapreduce.map.memory.mb': 256,
'yarn.nodemanager.resource.memory-mb': 2048,
'yarn.scheduler.minimum-allocation-mb': 256,
'yarn.nodemanager.vmem-check-enabled': 'false',
'yarn.scheduler.maximum-allocation-mb': 2048,
}, observed)
def test_merge_configs(self):
provider = ru.HadoopAutoConfigsProvider({}, None, None)
initial_configs = {
'cat': {
'talk': 'meow',
},
'bond': {
'name': 'james'
}
}
extra_configs = {
'dog': {
'talk': 'woof'
},
'bond': {
'extra_name': 'james bond'
}
}
expected = {
'cat': {
'talk': 'meow',
},
'dog': {
'talk': 'woof'
},
'bond': {
'name': 'james',
'extra_name': 'james bond'
}
}
self.assertEqual(
expected, provider._merge_configs(initial_configs, extra_configs))
@mock.patch('sahara.utils.openstack.nova.get_flavor')
@mock.patch('sahara.plugins.recommendations_utils.conductor.'
'node_group_update')
@mock.patch('sahara.plugins.recommendations_utils.conductor.'
'cluster_update')
def test_apply_recommended_configs(self, cond_cluster, cond_node_group,
fake_flavor):
fake_flavor.return_value = FakeObject(ram=2048, vcpus=1)
to_tune = {
'cluster_configs': {
'dfs.replication': ('dfs', 'replica')
},
'node_configs': {
'mapreduce.task.io.sort.mb': ('bond', 'extra_name')
}
}
fake_plugin_configs = [
FakeObject(applicable_target='dfs', name='replica',
default_value=3)]
fake_ng = FakeObject(
use_autoconfig=True,
count=2,
node_processes=['dog_datanode'],
flavor_id='fake_id',
node_configs=Configs({
'bond': {
'name': 'james'
}
})
)
fake_cluster = FakeObject(
cluster_configs=Configs({
'cat': {
'talk': 'meow',
}
}),
node_groups=[fake_ng],
use_autoconfig=True,
)
v = ru.HadoopAutoConfigsProvider(
to_tune, fake_plugin_configs, fake_cluster,
{'datanode_process_name': "dog_datanode"})
v.apply_recommended_configs()
self.assertEqual([mock.call(context.ctx(), fake_cluster, {
'cluster_configs': {
'cat': {
'talk': 'meow'
},
'dfs': {
'replica': 2
}
}
})], cond_cluster.call_args_list)
self.assertEqual([mock.call(context.ctx(), fake_ng, {
'node_configs': {
'bond': {
'name': 'james',
'extra_name': 102
}
}
})], cond_node_group.call_args_list)
@mock.patch('sahara.utils.openstack.nova.get_flavor')
@mock.patch('sahara.plugins.recommendations_utils.conductor.'
'node_group_update')
@mock.patch('sahara.plugins.recommendations_utils.conductor.'
'cluster_update')
def test_apply_recommended_configs_no_updates(
self, cond_cluster, cond_node_group, fake_flavor):
fake_flavor.return_value = FakeObject(ram=2048, vcpus=1)
to_tune = {
'cluster_configs': {
'dfs.replication': ('dfs', 'replica')
},
'node_configs': {
'mapreduce.task.io.sort.mb': ('bond', 'extra_name')
}
}
fake_plugin_configs = [
FakeObject(applicable_target='dfs', name='replica',
default_value=3)]
fake_ng = FakeObject(
use_autoconfig=True,
count=2,
node_processes=['dog_datanode'],
flavor_id='fake_id',
node_configs=Configs({
'bond': {
'extra_name': 'james bond'
}
})
)
fake_cluster = FakeObject(
cluster_configs=Configs({
'dfs': {
'replica': 1
}
}),
node_groups=[fake_ng],
use_autoconfig=True,
)
v = ru.HadoopAutoConfigsProvider(
to_tune, fake_plugin_configs, fake_cluster)
v.apply_recommended_configs()
self.assertEqual(0, cond_cluster.call_count)
self.assertEqual(0, cond_node_group.call_count)
def test_correct_use_autoconfig_value(self):
ctx = context.ctx()
ngt1 = conductor.node_group_template_create(ctx, {
'name': 'ngt1',
'flavor_id': '1',
'plugin_name': 'vanilla',
'hadoop_version': '1'
})
ngt2 = conductor.node_group_template_create(ctx, {
'name': 'ngt2',
'flavor_id': '2',
'plugin_name': 'vanilla',
'hadoop_version': '1',
'use_autoconfig': False
})
self.assertTrue(ngt1.use_autoconfig)
self.assertFalse(ngt2.use_autoconfig)
clt = conductor.cluster_template_create(ctx, {
'name': "clt1",
'plugin_name': 'vanilla',
'hadoop_version': '1',
'node_groups': [
{
'count': 3,
"node_group_template_id": ngt1.id
},
{
'count': 1,
'node_group_template_id': ngt2.id
}
],
'use_autoconfig': False
})
cluster = conductor.cluster_create(ctx, {
'name': 'stupid',
'cluster_template_id': clt.id
})
self.assertFalse(cluster.use_autoconfig)
for ng in cluster.node_groups:
if ng.name == 'ngt1':
self.assertTrue(ng.use_autoconfig)
else:
self.assertFalse(ng.use_autoconfig)

View File

@ -0,0 +1,63 @@
# Copyright (c) 2015 Mirantis Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import mock
import testtools
from sahara.plugins.vanilla.v2_6_0 import versionhandler
CONFIGURATION_SCHEMA = {
'cluster_configs': {
'dfs.replication': ('HDFS', 'dfs.replication')
},
'node_configs': {
'yarn.app.mapreduce.am.command-opts': (
'MapReduce', 'yarn.app.mapreduce.am.command-opts'),
'yarn.scheduler.maximum-allocation-mb': (
'YARN', 'yarn.scheduler.maximum-allocation-mb'),
'yarn.app.mapreduce.am.resource.mb': (
'MapReduce', 'yarn.app.mapreduce.am.resource.mb'),
'yarn.scheduler.minimum-allocation-mb': (
'YARN', 'yarn.scheduler.minimum-allocation-mb'),
'yarn.nodemanager.vmem-check-enabled': (
'YARN', 'yarn.nodemanager.vmem-check-enabled'),
'mapreduce.map.java.opts': (
'MapReduce', 'mapreduce.map.java.opts'),
'mapreduce.reduce.memory.mb': (
'MapReduce', 'mapreduce.reduce.memory.mb'),
'yarn.nodemanager.resource.memory-mb': (
'YARN', 'yarn.nodemanager.resource.memory-mb'),
'mapreduce.reduce.java.opts': (
'MapReduce', 'mapreduce.reduce.java.opts'),
'mapreduce.map.memory.mb': (
'MapReduce', 'mapreduce.map.memory.mb'),
'mapreduce.task.io.sort.mb': (
'MapReduce', 'mapreduce.task.io.sort.mb')
}
}
class TestVersionHandler(testtools.TestCase):
@mock.patch('sahara.plugins.recommendations_utils.'
'HadoopAutoConfigsProvider')
@mock.patch('sahara.plugins.vanilla.v2_6_0.versionhandler.VersionHandler.'
'get_plugin_configs')
def test_recommend_configs(self, fake_plugin_configs, provider):
f_cluster, f_configs = mock.Mock(), mock.Mock()
fake_plugin_configs.return_value = f_configs
versionhandler.VersionHandler().recommend_configs(f_cluster)
self.assertEqual([
mock.call(CONFIGURATION_SCHEMA, f_configs, f_cluster)
], provider.call_args_list)

View File

@ -120,6 +120,9 @@ class FakePlugin(object):
def get_configs(self, version): def get_configs(self, version):
return {} return {}
def recommend_configs(self, cluster):
self.calls_order.append('recommend_configs')
class FakePluginManager(object): class FakePluginManager(object):
def __init__(self, calls_order): def __init__(self, calls_order):
@ -177,7 +180,7 @@ class TestApi(base.SaharaWithDbTestCase):
self.assertEqual(3, ng_count) self.assertEqual(3, ng_count)
api.terminate_cluster(result_cluster.id) api.terminate_cluster(result_cluster.id)
self.assertEqual( self.assertEqual(
['get_open_ports', 'validate', ['get_open_ports', 'recommend_configs', 'validate',
'ops.provision_cluster', 'ops.provision_cluster',
'ops.terminate_cluster'], self.calls_order) 'ops.terminate_cluster'], self.calls_order)
@ -204,9 +207,9 @@ class TestApi(base.SaharaWithDbTestCase):
api.terminate_cluster(result_cluster1.id) api.terminate_cluster(result_cluster1.id)
api.terminate_cluster(result_cluster2.id) api.terminate_cluster(result_cluster2.id)
self.assertEqual( self.assertEqual(
['get_open_ports', 'validate', ['get_open_ports', 'recommend_configs', 'validate',
'ops.provision_cluster', 'ops.provision_cluster',
'get_open_ports', 'validate', 'get_open_ports', 'recommend_configs', 'validate',
'ops.provision_cluster', 'ops.provision_cluster',
'ops.terminate_cluster', 'ops.terminate_cluster',
'ops.terminate_cluster'], self.calls_order) 'ops.terminate_cluster'], self.calls_order)
@ -249,8 +252,9 @@ class TestApi(base.SaharaWithDbTestCase):
self.assertEqual(4, ng_count) self.assertEqual(4, ng_count)
api.terminate_cluster(result_cluster.id) api.terminate_cluster(result_cluster.id)
self.assertEqual( self.assertEqual(
['get_open_ports', 'validate', 'ops.provision_cluster', ['get_open_ports', 'recommend_configs', 'validate',
'get_open_ports', 'get_open_ports', 'validate_scaling', 'ops.provision_cluster', 'get_open_ports', 'get_open_ports',
'recommend_configs', 'validate_scaling',
'ops.provision_scaled_cluster', 'ops.provision_scaled_cluster',
'ops.terminate_cluster'], self.calls_order) 'ops.terminate_cluster'], self.calls_order)