Merge "Add recommendation support to Cloudera plugin"

This commit is contained in:
Jenkins 2015-08-17 13:54:08 +00:00 committed by Gerrit Code Review
commit dc9c9e0752
9 changed files with 134 additions and 13 deletions

View File

@ -41,6 +41,7 @@ clusters:
- HDFS_NAMENODE - HDFS_NAMENODE
- YARN_RESOURCEMANAGER - YARN_RESOURCEMANAGER
- SENTRY_SERVER - SENTRY_SERVER
- YARN_NODEMANAGER
- ZOOKEEPER_SERVER - ZOOKEEPER_SERVER
auto_security_group: true auto_security_group: true
- name: master-additional - name: master-additional
@ -48,6 +49,7 @@ clusters:
node_processes: node_processes:
- OOZIE_SERVER - OOZIE_SERVER
- YARN_JOBHISTORY - YARN_JOBHISTORY
- YARN_NODEMANAGER
- HDFS_SECONDARYNAMENODE - HDFS_SECONDARYNAMENODE
- HIVE_METASTORE - HIVE_METASTORE
- HIVE_SERVER2 - HIVE_SERVER2

View File

@ -87,3 +87,7 @@ class CDHPluginProvider(p.ProvisioningPluginBase):
def get_open_ports(self, node_group): def get_open_ports(self, node_group):
return self._get_version_handler( return self._get_version_handler(
node_group.cluster.hadoop_version).get_open_ports(node_group) node_group.cluster.hadoop_version).get_open_ports(node_group)
def recommend_configs(self, cluster):
return self._get_version_handler(
cluster.hadoop_version).recommend_configs(cluster)

View File

@ -25,6 +25,7 @@ from sahara import context
from sahara import exceptions as exc from sahara import exceptions as exc
from sahara.i18n import _ from sahara.i18n import _
from sahara.plugins.cdh import commands as cmd from sahara.plugins.cdh import commands as cmd
from sahara.plugins import recommendations_utils as ru
from sahara.plugins import utils as u from sahara.plugins import utils as u
from sahara.utils import cluster_progress_ops as cpo from sahara.utils import cluster_progress_ops as cpo
from sahara.utils import edp as edp_u from sahara.utils import edp as edp_u
@ -39,6 +40,39 @@ CM_API_PORT = 7180
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
AUTO_CONFIGURATION_SCHEMA = {
'node_configs': {
'yarn.scheduler.minimum-allocation-mb': (
'RESOURCEMANAGER', 'yarn_scheduler_minimum_allocation_mb'),
'mapreduce.reduce.memory.mb': (
'YARN_GATEWAY', 'mapreduce_reduce_memory_mb'),
'mapreduce.map.memory.mb': (
'YARN_GATEWAY', 'mapreduce_map_memory_mb',),
'yarn.scheduler.maximum-allocation-mb': (
'RESOURCEMANAGER', 'yarn_scheduler_maximum_allocation_mb'),
'yarn.app.mapreduce.am.command-opts': (
'YARN_GATEWAY', 'yarn_app_mapreduce_am_command_opts'),
'yarn.nodemanager.resource.memory-mb': (
'NODEMANAGER', 'yarn_nodemanager_resource_memory_mb'),
'mapreduce.task.io.sort.mb': (
'YARN_GATEWAY', 'io_sort_mb'),
'mapreduce.map.java.opts': (
'YARN_GATEWAY', 'mapreduce_map_java_opts'),
'mapreduce.reduce.java.opts': (
'YARN_GATEWAY', 'mapreduce_reduce_java_opts'),
'yarn.app.mapreduce.am.resource.mb': (
'YARN_GATEWAY', 'yarn_app_mapreduce_am_resource_mb')
},
'cluster_configs': {
'dfs.replication': ('HDFS', 'dfs_replication')
}
}
class CDHPluginAutoConfigsProvider(ru.HadoopAutoConfigsProvider):
def get_datanode_name(self):
return 'HDFS_DATANODE'
class AbstractPluginUtils(object): class AbstractPluginUtils(object):
# c_helper and db_helper will be defined in derived classes. # c_helper and db_helper will be defined in derived classes.
@ -340,3 +374,8 @@ class AbstractPluginUtils(object):
raise exc.InvalidDataException( raise exc.InvalidDataException(
_("Unable to find config: {applicable_target: %(target)s, name: " _("Unable to find config: {applicable_target: %(target)s, name: "
"%(name)s").format(target=service, name=name)) "%(name)s").format(target=service, name=name))
def recommend_configs(self, cluster, plugin_configs):
provider = CDHPluginAutoConfigsProvider(
AUTO_CONFIGURATION_SCHEMA, plugin_configs, cluster)
provider.apply_recommended_configs()

View File

@ -21,11 +21,13 @@ from sahara.plugins.cdh.v5 import cloudera_utils as cu
from sahara.plugins.cdh.v5 import config_helper as c_helper from sahara.plugins.cdh.v5 import config_helper as c_helper
from sahara.plugins.cdh.v5 import deploy as dp from sahara.plugins.cdh.v5 import deploy as dp
from sahara.plugins.cdh.v5 import edp_engine from sahara.plugins.cdh.v5 import edp_engine
from sahara.plugins.cdh.v5 import plugin_utils as pu
from sahara.plugins.cdh.v5 import validation as vl from sahara.plugins.cdh.v5 import validation as vl
conductor = conductor.API conductor = conductor.API
CU = cu.ClouderaUtilsV5() CU = cu.ClouderaUtilsV5()
PU = pu.PluginUtilsV5()
class VersionHandler(avm.AbstractVersionHandler): class VersionHandler(avm.AbstractVersionHandler):
@ -111,3 +113,6 @@ class VersionHandler(avm.AbstractVersionHandler):
def get_open_ports(self, node_group): def get_open_ports(self, node_group):
return dp.get_open_ports(node_group) return dp.get_open_ports(node_group)
def recommend_configs(self, cluster):
PU.recommend_configs(cluster, self.get_plugin_configs())

View File

@ -22,11 +22,12 @@ from sahara.plugins.cdh.v5_3_0 import cloudera_utils as cu
from sahara.plugins.cdh.v5_3_0 import config_helper as c_helper from sahara.plugins.cdh.v5_3_0 import config_helper as c_helper
from sahara.plugins.cdh.v5_3_0 import deploy as dp from sahara.plugins.cdh.v5_3_0 import deploy as dp
from sahara.plugins.cdh.v5_3_0 import edp_engine from sahara.plugins.cdh.v5_3_0 import edp_engine
from sahara.plugins.cdh.v5_3_0 import plugin_utils as pu
from sahara.plugins.cdh.v5_3_0 import validation as vl from sahara.plugins.cdh.v5_3_0 import validation as vl
conductor = conductor.API conductor = conductor.API
CU = cu.ClouderaUtilsV530() CU = cu.ClouderaUtilsV530()
PU = pu.PluginUtilsV530()
class VersionHandler(avm.AbstractVersionHandler): class VersionHandler(avm.AbstractVersionHandler):
@ -124,3 +125,6 @@ class VersionHandler(avm.AbstractVersionHandler):
def get_open_ports(self, node_group): def get_open_ports(self, node_group):
return dp.get_open_ports(node_group) return dp.get_open_ports(node_group)
def recommend_configs(self, cluster):
PU.recommend_configs(cluster, self.get_plugin_configs())

View File

@ -22,11 +22,13 @@ from sahara.plugins.cdh.v5_4_0 import cloudera_utils as cu
from sahara.plugins.cdh.v5_4_0 import config_helper as c_helper from sahara.plugins.cdh.v5_4_0 import config_helper as c_helper
from sahara.plugins.cdh.v5_4_0 import deploy as dp from sahara.plugins.cdh.v5_4_0 import deploy as dp
from sahara.plugins.cdh.v5_4_0 import edp_engine from sahara.plugins.cdh.v5_4_0 import edp_engine
from sahara.plugins.cdh.v5_4_0 import plugin_utils as pu
from sahara.plugins.cdh.v5_4_0 import validation as vl from sahara.plugins.cdh.v5_4_0 import validation as vl
conductor = conductor.API conductor = conductor.API
CU = cu.ClouderaUtilsV540() CU = cu.ClouderaUtilsV540()
PU = pu.PluginUtilsV540()
class VersionHandler(avm.AbstractVersionHandler): class VersionHandler(avm.AbstractVersionHandler):
@ -126,3 +128,6 @@ class VersionHandler(avm.AbstractVersionHandler):
def get_open_ports(self, node_group): def get_open_ports(self, node_group):
return dp.get_open_ports(node_group) return dp.get_open_ports(node_group)
def recommend_configs(self, cluster):
PU.recommend_configs(cluster, self.get_plugin_configs())

View File

@ -29,7 +29,7 @@ LOG = logging.getLogger(__name__)
@six.add_metaclass(abc.ABCMeta) @six.add_metaclass(abc.ABCMeta)
class AutoConfigsProvider(object): class AutoConfigsProvider(object):
def __init__(self, mapper, plugin_configs, cluster, extra_spec=None): def __init__(self, mapper, plugin_configs, cluster):
"""This meta class provides general recommendation utils for cluster """This meta class provides general recommendation utils for cluster
configuration. configuration.
@ -42,13 +42,11 @@ class AutoConfigsProvider(object):
with almost same configs and configuring principles. with almost same configs and configuring principles.
:param plugin_configs: all plugins_configs for specified plugin :param plugin_configs: all plugins_configs for specified plugin
:param cluster: cluster which is required to configure :param cluster: cluster which is required to configure
:param extra_spec: extra helpful information about AutoConfigs
""" """
self.plugin_configs = plugin_configs self.plugin_configs = plugin_configs
self.cluster = cluster self.cluster = cluster
self.node_configs_to_update = mapper.get('node_configs', {}) self.node_configs_to_update = mapper.get('node_configs', {})
self.cluster_configs_to_update = mapper.get('cluster_configs', {}) self.cluster_configs_to_update = mapper.get('cluster_configs', {})
self.extra_spec = {} if not extra_spec else extra_spec
@abc.abstractmethod @abc.abstractmethod
def _get_recommended_node_configs(self, node_group): def _get_recommended_node_configs(self, node_group):
@ -223,10 +221,9 @@ class AutoConfigsProvider(object):
class HadoopAutoConfigsProvider(AutoConfigsProvider): class HadoopAutoConfigsProvider(AutoConfigsProvider):
def __init__(self, mapper, plugin_configs, cluster, extra_spec=None, def __init__(self, mapper, plugin_configs, cluster, hbase=False):
hbase=False):
super(HadoopAutoConfigsProvider, self).__init__( super(HadoopAutoConfigsProvider, self).__init__(
mapper, plugin_configs, cluster, extra_spec) mapper, plugin_configs, cluster)
self.requested_flavors = {} self.requested_flavors = {}
self.is_hbase_enabled = hbase self.is_hbase_enabled = hbase
@ -330,6 +327,9 @@ class HadoopAutoConfigsProvider(AutoConfigsProvider):
0.4 * data['mapMemory'], 1024)) 0.4 * data['mapMemory'], 1024))
return r return r
def get_datanode_name(self):
return "datanode"
def _get_recommended_cluster_configs(self): def _get_recommended_cluster_configs(self):
"""Method recommends dfs_replication for cluster. """Method recommends dfs_replication for cluster.
@ -338,9 +338,7 @@ class HadoopAutoConfigsProvider(AutoConfigsProvider):
if not self._can_be_recommended(['dfs.replication']): if not self._can_be_recommended(['dfs.replication']):
return {} return {}
datanode_count = 0 datanode_count = 0
datanode_proc_name = "datanode" datanode_proc_name = self.get_datanode_name()
if 'datanode_process_name' in self.extra_spec:
datanode_proc_name = self.extra_spec['datanode_process_name']
for ng in self.cluster.node_groups: for ng in self.cluster.node_groups:
if datanode_proc_name in ng.node_processes: if datanode_proc_name in ng.node_processes:
datanode_count += ng.count datanode_count += ng.count

View File

@ -0,0 +1,62 @@
# Copyright (c) 2015 Mirantis Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import mock
from sahara.plugins.cdh import plugin_utils as pu
from sahara.tests.unit import base as b
CONFIGURATION_SCHEMA = {
'node_configs': {
'yarn.scheduler.minimum-allocation-mb': (
'RESOURCEMANAGER', 'yarn_scheduler_minimum_allocation_mb'),
'mapreduce.reduce.memory.mb': (
'YARN_GATEWAY', 'mapreduce_reduce_memory_mb'),
'mapreduce.map.memory.mb': (
'YARN_GATEWAY', 'mapreduce_map_memory_mb',),
'yarn.scheduler.maximum-allocation-mb': (
'RESOURCEMANAGER', 'yarn_scheduler_maximum_allocation_mb'),
'yarn.app.mapreduce.am.command-opts': (
'YARN_GATEWAY', 'yarn_app_mapreduce_am_command_opts'),
'yarn.nodemanager.resource.memory-mb': (
'NODEMANAGER', 'yarn_nodemanager_resource_memory_mb'),
'mapreduce.task.io.sort.mb': (
'YARN_GATEWAY', 'io_sort_mb'),
'mapreduce.map.java.opts': (
'YARN_GATEWAY', 'mapreduce_map_java_opts'),
'mapreduce.reduce.java.opts': (
'YARN_GATEWAY', 'mapreduce_reduce_java_opts'),
'yarn.app.mapreduce.am.resource.mb': (
'YARN_GATEWAY', 'yarn_app_mapreduce_am_resource_mb')
},
'cluster_configs': {
'dfs.replication': ('HDFS', 'dfs_replication')
}
}
class TestPluginUtils(b.SaharaTestCase):
@mock.patch('sahara.plugins.cdh.plugin_utils.'
'CDHPluginAutoConfigsProvider')
def test_recommend_configs(self, provider):
plug_utils = pu.AbstractPluginUtils()
fake_plugin_utils = mock.Mock()
fake_cluster = mock.Mock()
plug_utils.recommend_configs(fake_cluster, fake_plugin_utils)
self.assertEqual([
mock.call(CONFIGURATION_SCHEMA,
fake_plugin_utils, fake_cluster)
], provider.call_args_list)

View File

@ -126,6 +126,9 @@ class TestProvidingRecommendations(b.SaharaWithDbTestCase):
'cluster_update') 'cluster_update')
def test_apply_recommended_configs(self, cond_cluster, cond_node_group, def test_apply_recommended_configs(self, cond_cluster, cond_node_group,
fake_flavor): fake_flavor):
class TestProvider(ru.HadoopAutoConfigsProvider):
def get_datanode_name(self):
return "dog_datanode"
fake_flavor.return_value = FakeObject(ram=2048, vcpus=1) fake_flavor.return_value = FakeObject(ram=2048, vcpus=1)
to_tune = { to_tune = {
'cluster_configs': { 'cluster_configs': {
@ -159,9 +162,8 @@ class TestProvidingRecommendations(b.SaharaWithDbTestCase):
node_groups=[fake_ng], node_groups=[fake_ng],
use_autoconfig=True, use_autoconfig=True,
) )
v = ru.HadoopAutoConfigsProvider( v = TestProvider(
to_tune, fake_plugin_configs, fake_cluster, to_tune, fake_plugin_configs, fake_cluster)
{'datanode_process_name': "dog_datanode"})
v.apply_recommended_configs() v.apply_recommended_configs()
self.assertEqual([mock.call(context.ctx(), fake_cluster, { self.assertEqual([mock.call(context.ctx(), fake_cluster, {