Add recommendation support to Cloudera plugin
Added support of recommendations to both versions of Cloudera plugin. Also added small refactoring in AutoConfigsProvider. Partial-Implements blueprint: recommend-configuration Change-Id: I25f906d830b803b65ffcc7443978ea3cfef126da
This commit is contained in:
parent
427555e397
commit
aceb9c4a28
@ -41,6 +41,7 @@ clusters:
|
|||||||
- HDFS_NAMENODE
|
- HDFS_NAMENODE
|
||||||
- YARN_RESOURCEMANAGER
|
- YARN_RESOURCEMANAGER
|
||||||
- SENTRY_SERVER
|
- SENTRY_SERVER
|
||||||
|
- YARN_NODEMANAGER
|
||||||
- ZOOKEEPER_SERVER
|
- ZOOKEEPER_SERVER
|
||||||
auto_security_group: true
|
auto_security_group: true
|
||||||
- name: master-additional
|
- name: master-additional
|
||||||
@ -48,6 +49,7 @@ clusters:
|
|||||||
node_processes:
|
node_processes:
|
||||||
- OOZIE_SERVER
|
- OOZIE_SERVER
|
||||||
- YARN_JOBHISTORY
|
- YARN_JOBHISTORY
|
||||||
|
- YARN_NODEMANAGER
|
||||||
- HDFS_SECONDARYNAMENODE
|
- HDFS_SECONDARYNAMENODE
|
||||||
- HIVE_METASTORE
|
- HIVE_METASTORE
|
||||||
- HIVE_SERVER2
|
- HIVE_SERVER2
|
||||||
|
@ -87,3 +87,7 @@ class CDHPluginProvider(p.ProvisioningPluginBase):
|
|||||||
def get_open_ports(self, node_group):
|
def get_open_ports(self, node_group):
|
||||||
return self._get_version_handler(
|
return self._get_version_handler(
|
||||||
node_group.cluster.hadoop_version).get_open_ports(node_group)
|
node_group.cluster.hadoop_version).get_open_ports(node_group)
|
||||||
|
|
||||||
|
def recommend_configs(self, cluster):
|
||||||
|
return self._get_version_handler(
|
||||||
|
cluster.hadoop_version).recommend_configs(cluster)
|
||||||
|
@ -25,6 +25,7 @@ from sahara import context
|
|||||||
from sahara import exceptions as exc
|
from sahara import exceptions as exc
|
||||||
from sahara.i18n import _
|
from sahara.i18n import _
|
||||||
from sahara.plugins.cdh import commands as cmd
|
from sahara.plugins.cdh import commands as cmd
|
||||||
|
from sahara.plugins import recommendations_utils as ru
|
||||||
from sahara.plugins import utils as u
|
from sahara.plugins import utils as u
|
||||||
from sahara.utils import cluster_progress_ops as cpo
|
from sahara.utils import cluster_progress_ops as cpo
|
||||||
from sahara.utils import edp as edp_u
|
from sahara.utils import edp as edp_u
|
||||||
@ -39,6 +40,39 @@ CM_API_PORT = 7180
|
|||||||
|
|
||||||
LOG = logging.getLogger(__name__)
|
LOG = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
AUTO_CONFIGURATION_SCHEMA = {
|
||||||
|
'node_configs': {
|
||||||
|
'yarn.scheduler.minimum-allocation-mb': (
|
||||||
|
'RESOURCEMANAGER', 'yarn_scheduler_minimum_allocation_mb'),
|
||||||
|
'mapreduce.reduce.memory.mb': (
|
||||||
|
'YARN_GATEWAY', 'mapreduce_reduce_memory_mb'),
|
||||||
|
'mapreduce.map.memory.mb': (
|
||||||
|
'YARN_GATEWAY', 'mapreduce_map_memory_mb',),
|
||||||
|
'yarn.scheduler.maximum-allocation-mb': (
|
||||||
|
'RESOURCEMANAGER', 'yarn_scheduler_maximum_allocation_mb'),
|
||||||
|
'yarn.app.mapreduce.am.command-opts': (
|
||||||
|
'YARN_GATEWAY', 'yarn_app_mapreduce_am_command_opts'),
|
||||||
|
'yarn.nodemanager.resource.memory-mb': (
|
||||||
|
'NODEMANAGER', 'yarn_nodemanager_resource_memory_mb'),
|
||||||
|
'mapreduce.task.io.sort.mb': (
|
||||||
|
'YARN_GATEWAY', 'io_sort_mb'),
|
||||||
|
'mapreduce.map.java.opts': (
|
||||||
|
'YARN_GATEWAY', 'mapreduce_map_java_opts'),
|
||||||
|
'mapreduce.reduce.java.opts': (
|
||||||
|
'YARN_GATEWAY', 'mapreduce_reduce_java_opts'),
|
||||||
|
'yarn.app.mapreduce.am.resource.mb': (
|
||||||
|
'YARN_GATEWAY', 'yarn_app_mapreduce_am_resource_mb')
|
||||||
|
},
|
||||||
|
'cluster_configs': {
|
||||||
|
'dfs.replication': ('HDFS', 'dfs_replication')
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
class CDHPluginAutoConfigsProvider(ru.HadoopAutoConfigsProvider):
|
||||||
|
def get_datanode_name(self):
|
||||||
|
return 'HDFS_DATANODE'
|
||||||
|
|
||||||
|
|
||||||
class AbstractPluginUtils(object):
|
class AbstractPluginUtils(object):
|
||||||
# c_helper and db_helper will be defined in derived classes.
|
# c_helper and db_helper will be defined in derived classes.
|
||||||
@ -339,3 +373,8 @@ class AbstractPluginUtils(object):
|
|||||||
raise exc.InvalidDataException(
|
raise exc.InvalidDataException(
|
||||||
_("Unable to find config: {applicable_target: %(target)s, name: "
|
_("Unable to find config: {applicable_target: %(target)s, name: "
|
||||||
"%(name)s").format(target=service, name=name))
|
"%(name)s").format(target=service, name=name))
|
||||||
|
|
||||||
|
def recommend_configs(self, cluster, plugin_configs):
|
||||||
|
provider = CDHPluginAutoConfigsProvider(
|
||||||
|
AUTO_CONFIGURATION_SCHEMA, plugin_configs, cluster)
|
||||||
|
provider.apply_recommended_configs()
|
||||||
|
@ -21,11 +21,13 @@ from sahara.plugins.cdh.v5 import cloudera_utils as cu
|
|||||||
from sahara.plugins.cdh.v5 import config_helper as c_helper
|
from sahara.plugins.cdh.v5 import config_helper as c_helper
|
||||||
from sahara.plugins.cdh.v5 import deploy as dp
|
from sahara.plugins.cdh.v5 import deploy as dp
|
||||||
from sahara.plugins.cdh.v5 import edp_engine
|
from sahara.plugins.cdh.v5 import edp_engine
|
||||||
|
from sahara.plugins.cdh.v5 import plugin_utils as pu
|
||||||
from sahara.plugins.cdh.v5 import validation as vl
|
from sahara.plugins.cdh.v5 import validation as vl
|
||||||
|
|
||||||
|
|
||||||
conductor = conductor.API
|
conductor = conductor.API
|
||||||
CU = cu.ClouderaUtilsV5()
|
CU = cu.ClouderaUtilsV5()
|
||||||
|
PU = pu.PluginUtilsV5()
|
||||||
|
|
||||||
|
|
||||||
class VersionHandler(avm.AbstractVersionHandler):
|
class VersionHandler(avm.AbstractVersionHandler):
|
||||||
@ -111,3 +113,6 @@ class VersionHandler(avm.AbstractVersionHandler):
|
|||||||
|
|
||||||
def get_open_ports(self, node_group):
|
def get_open_ports(self, node_group):
|
||||||
return dp.get_open_ports(node_group)
|
return dp.get_open_ports(node_group)
|
||||||
|
|
||||||
|
def recommend_configs(self, cluster):
|
||||||
|
PU.recommend_configs(cluster, self.get_plugin_configs())
|
||||||
|
@ -22,11 +22,12 @@ from sahara.plugins.cdh.v5_3_0 import cloudera_utils as cu
|
|||||||
from sahara.plugins.cdh.v5_3_0 import config_helper as c_helper
|
from sahara.plugins.cdh.v5_3_0 import config_helper as c_helper
|
||||||
from sahara.plugins.cdh.v5_3_0 import deploy as dp
|
from sahara.plugins.cdh.v5_3_0 import deploy as dp
|
||||||
from sahara.plugins.cdh.v5_3_0 import edp_engine
|
from sahara.plugins.cdh.v5_3_0 import edp_engine
|
||||||
|
from sahara.plugins.cdh.v5_3_0 import plugin_utils as pu
|
||||||
from sahara.plugins.cdh.v5_3_0 import validation as vl
|
from sahara.plugins.cdh.v5_3_0 import validation as vl
|
||||||
|
|
||||||
|
|
||||||
conductor = conductor.API
|
conductor = conductor.API
|
||||||
CU = cu.ClouderaUtilsV530()
|
CU = cu.ClouderaUtilsV530()
|
||||||
|
PU = pu.PluginUtilsV530()
|
||||||
|
|
||||||
|
|
||||||
class VersionHandler(avm.AbstractVersionHandler):
|
class VersionHandler(avm.AbstractVersionHandler):
|
||||||
@ -124,3 +125,6 @@ class VersionHandler(avm.AbstractVersionHandler):
|
|||||||
|
|
||||||
def get_open_ports(self, node_group):
|
def get_open_ports(self, node_group):
|
||||||
return dp.get_open_ports(node_group)
|
return dp.get_open_ports(node_group)
|
||||||
|
|
||||||
|
def recommend_configs(self, cluster):
|
||||||
|
PU.recommend_configs(cluster, self.get_plugin_configs())
|
||||||
|
@ -22,11 +22,13 @@ from sahara.plugins.cdh.v5_4_0 import cloudera_utils as cu
|
|||||||
from sahara.plugins.cdh.v5_4_0 import config_helper as c_helper
|
from sahara.plugins.cdh.v5_4_0 import config_helper as c_helper
|
||||||
from sahara.plugins.cdh.v5_4_0 import deploy as dp
|
from sahara.plugins.cdh.v5_4_0 import deploy as dp
|
||||||
from sahara.plugins.cdh.v5_4_0 import edp_engine
|
from sahara.plugins.cdh.v5_4_0 import edp_engine
|
||||||
|
from sahara.plugins.cdh.v5_4_0 import plugin_utils as pu
|
||||||
from sahara.plugins.cdh.v5_4_0 import validation as vl
|
from sahara.plugins.cdh.v5_4_0 import validation as vl
|
||||||
|
|
||||||
|
|
||||||
conductor = conductor.API
|
conductor = conductor.API
|
||||||
CU = cu.ClouderaUtilsV540()
|
CU = cu.ClouderaUtilsV540()
|
||||||
|
PU = pu.PluginUtilsV540()
|
||||||
|
|
||||||
|
|
||||||
class VersionHandler(avm.AbstractVersionHandler):
|
class VersionHandler(avm.AbstractVersionHandler):
|
||||||
@ -125,3 +127,6 @@ class VersionHandler(avm.AbstractVersionHandler):
|
|||||||
|
|
||||||
def get_open_ports(self, node_group):
|
def get_open_ports(self, node_group):
|
||||||
return dp.get_open_ports(node_group)
|
return dp.get_open_ports(node_group)
|
||||||
|
|
||||||
|
def recommend_configs(self, cluster):
|
||||||
|
PU.recommend_configs(cluster, self.get_plugin_configs())
|
||||||
|
@ -29,7 +29,7 @@ LOG = logging.getLogger(__name__)
|
|||||||
|
|
||||||
@six.add_metaclass(abc.ABCMeta)
|
@six.add_metaclass(abc.ABCMeta)
|
||||||
class AutoConfigsProvider(object):
|
class AutoConfigsProvider(object):
|
||||||
def __init__(self, mapper, plugin_configs, cluster, extra_spec=None):
|
def __init__(self, mapper, plugin_configs, cluster):
|
||||||
"""This meta class provides general recommendation utils for cluster
|
"""This meta class provides general recommendation utils for cluster
|
||||||
|
|
||||||
configuration.
|
configuration.
|
||||||
@ -42,13 +42,11 @@ class AutoConfigsProvider(object):
|
|||||||
with almost same configs and configuring principles.
|
with almost same configs and configuring principles.
|
||||||
:param plugin_configs: all plugins_configs for specified plugin
|
:param plugin_configs: all plugins_configs for specified plugin
|
||||||
:param cluster: cluster which is required to configure
|
:param cluster: cluster which is required to configure
|
||||||
:param extra_spec: extra helpful information about AutoConfigs
|
|
||||||
"""
|
"""
|
||||||
self.plugin_configs = plugin_configs
|
self.plugin_configs = plugin_configs
|
||||||
self.cluster = cluster
|
self.cluster = cluster
|
||||||
self.node_configs_to_update = mapper.get('node_configs', {})
|
self.node_configs_to_update = mapper.get('node_configs', {})
|
||||||
self.cluster_configs_to_update = mapper.get('cluster_configs', {})
|
self.cluster_configs_to_update = mapper.get('cluster_configs', {})
|
||||||
self.extra_spec = {} if not extra_spec else extra_spec
|
|
||||||
|
|
||||||
@abc.abstractmethod
|
@abc.abstractmethod
|
||||||
def _get_recommended_node_configs(self, node_group):
|
def _get_recommended_node_configs(self, node_group):
|
||||||
@ -223,10 +221,9 @@ class AutoConfigsProvider(object):
|
|||||||
|
|
||||||
|
|
||||||
class HadoopAutoConfigsProvider(AutoConfigsProvider):
|
class HadoopAutoConfigsProvider(AutoConfigsProvider):
|
||||||
def __init__(self, mapper, plugin_configs, cluster, extra_spec=None,
|
def __init__(self, mapper, plugin_configs, cluster, hbase=False):
|
||||||
hbase=False):
|
|
||||||
super(HadoopAutoConfigsProvider, self).__init__(
|
super(HadoopAutoConfigsProvider, self).__init__(
|
||||||
mapper, plugin_configs, cluster, extra_spec)
|
mapper, plugin_configs, cluster)
|
||||||
self.requested_flavors = {}
|
self.requested_flavors = {}
|
||||||
self.is_hbase_enabled = hbase
|
self.is_hbase_enabled = hbase
|
||||||
|
|
||||||
@ -330,6 +327,9 @@ class HadoopAutoConfigsProvider(AutoConfigsProvider):
|
|||||||
0.4 * data['mapMemory'], 1024))
|
0.4 * data['mapMemory'], 1024))
|
||||||
return r
|
return r
|
||||||
|
|
||||||
|
def get_datanode_name(self):
|
||||||
|
return "datanode"
|
||||||
|
|
||||||
def _get_recommended_cluster_configs(self):
|
def _get_recommended_cluster_configs(self):
|
||||||
"""Method recommends dfs_replication for cluster.
|
"""Method recommends dfs_replication for cluster.
|
||||||
|
|
||||||
@ -338,9 +338,7 @@ class HadoopAutoConfigsProvider(AutoConfigsProvider):
|
|||||||
if not self._can_be_recommended(['dfs.replication']):
|
if not self._can_be_recommended(['dfs.replication']):
|
||||||
return {}
|
return {}
|
||||||
datanode_count = 0
|
datanode_count = 0
|
||||||
datanode_proc_name = "datanode"
|
datanode_proc_name = self.get_datanode_name()
|
||||||
if 'datanode_process_name' in self.extra_spec:
|
|
||||||
datanode_proc_name = self.extra_spec['datanode_process_name']
|
|
||||||
for ng in self.cluster.node_groups:
|
for ng in self.cluster.node_groups:
|
||||||
if datanode_proc_name in ng.node_processes:
|
if datanode_proc_name in ng.node_processes:
|
||||||
datanode_count += ng.count
|
datanode_count += ng.count
|
||||||
|
62
sahara/tests/unit/plugins/cdh/test_plugin_utils.py
Normal file
62
sahara/tests/unit/plugins/cdh/test_plugin_utils.py
Normal file
@ -0,0 +1,62 @@
|
|||||||
|
# Copyright (c) 2015 Mirantis Inc.
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
# you may not use this file except in compliance with the License.
|
||||||
|
# You may obtain a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||||
|
# implied.
|
||||||
|
# See the License for the specific language governing permissions and
|
||||||
|
# limitations under the License.
|
||||||
|
|
||||||
|
import mock
|
||||||
|
|
||||||
|
from sahara.plugins.cdh import plugin_utils as pu
|
||||||
|
from sahara.tests.unit import base as b
|
||||||
|
|
||||||
|
|
||||||
|
CONFIGURATION_SCHEMA = {
|
||||||
|
'node_configs': {
|
||||||
|
'yarn.scheduler.minimum-allocation-mb': (
|
||||||
|
'RESOURCEMANAGER', 'yarn_scheduler_minimum_allocation_mb'),
|
||||||
|
'mapreduce.reduce.memory.mb': (
|
||||||
|
'YARN_GATEWAY', 'mapreduce_reduce_memory_mb'),
|
||||||
|
'mapreduce.map.memory.mb': (
|
||||||
|
'YARN_GATEWAY', 'mapreduce_map_memory_mb',),
|
||||||
|
'yarn.scheduler.maximum-allocation-mb': (
|
||||||
|
'RESOURCEMANAGER', 'yarn_scheduler_maximum_allocation_mb'),
|
||||||
|
'yarn.app.mapreduce.am.command-opts': (
|
||||||
|
'YARN_GATEWAY', 'yarn_app_mapreduce_am_command_opts'),
|
||||||
|
'yarn.nodemanager.resource.memory-mb': (
|
||||||
|
'NODEMANAGER', 'yarn_nodemanager_resource_memory_mb'),
|
||||||
|
'mapreduce.task.io.sort.mb': (
|
||||||
|
'YARN_GATEWAY', 'io_sort_mb'),
|
||||||
|
'mapreduce.map.java.opts': (
|
||||||
|
'YARN_GATEWAY', 'mapreduce_map_java_opts'),
|
||||||
|
'mapreduce.reduce.java.opts': (
|
||||||
|
'YARN_GATEWAY', 'mapreduce_reduce_java_opts'),
|
||||||
|
'yarn.app.mapreduce.am.resource.mb': (
|
||||||
|
'YARN_GATEWAY', 'yarn_app_mapreduce_am_resource_mb')
|
||||||
|
},
|
||||||
|
'cluster_configs': {
|
||||||
|
'dfs.replication': ('HDFS', 'dfs_replication')
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
class TestPluginUtils(b.SaharaTestCase):
|
||||||
|
@mock.patch('sahara.plugins.cdh.plugin_utils.'
|
||||||
|
'CDHPluginAutoConfigsProvider')
|
||||||
|
def test_recommend_configs(self, provider):
|
||||||
|
plug_utils = pu.AbstractPluginUtils()
|
||||||
|
fake_plugin_utils = mock.Mock()
|
||||||
|
fake_cluster = mock.Mock()
|
||||||
|
plug_utils.recommend_configs(fake_cluster, fake_plugin_utils)
|
||||||
|
self.assertEqual([
|
||||||
|
mock.call(CONFIGURATION_SCHEMA,
|
||||||
|
fake_plugin_utils, fake_cluster)
|
||||||
|
], provider.call_args_list)
|
@ -126,6 +126,9 @@ class TestProvidingRecommendations(b.SaharaWithDbTestCase):
|
|||||||
'cluster_update')
|
'cluster_update')
|
||||||
def test_apply_recommended_configs(self, cond_cluster, cond_node_group,
|
def test_apply_recommended_configs(self, cond_cluster, cond_node_group,
|
||||||
fake_flavor):
|
fake_flavor):
|
||||||
|
class TestProvider(ru.HadoopAutoConfigsProvider):
|
||||||
|
def get_datanode_name(self):
|
||||||
|
return "dog_datanode"
|
||||||
fake_flavor.return_value = FakeObject(ram=2048, vcpus=1)
|
fake_flavor.return_value = FakeObject(ram=2048, vcpus=1)
|
||||||
to_tune = {
|
to_tune = {
|
||||||
'cluster_configs': {
|
'cluster_configs': {
|
||||||
@ -159,9 +162,8 @@ class TestProvidingRecommendations(b.SaharaWithDbTestCase):
|
|||||||
node_groups=[fake_ng],
|
node_groups=[fake_ng],
|
||||||
use_autoconfig=True,
|
use_autoconfig=True,
|
||||||
)
|
)
|
||||||
v = ru.HadoopAutoConfigsProvider(
|
v = TestProvider(
|
||||||
to_tune, fake_plugin_configs, fake_cluster,
|
to_tune, fake_plugin_configs, fake_cluster)
|
||||||
{'datanode_process_name': "dog_datanode"})
|
|
||||||
|
|
||||||
v.apply_recommended_configs()
|
v.apply_recommended_configs()
|
||||||
self.assertEqual([mock.call(context.ctx(), fake_cluster, {
|
self.assertEqual([mock.call(context.ctx(), fake_cluster, {
|
||||||
|
Loading…
Reference in New Issue
Block a user