Merge "Disable autotune configs for scaling old clusters"
This commit is contained in:
commit
e22142d94a
|
@ -88,6 +88,6 @@ class CDHPluginProvider(p.ProvisioningPluginBase):
|
||||||
return self._get_version_handler(
|
return self._get_version_handler(
|
||||||
node_group.cluster.hadoop_version).get_open_ports(node_group)
|
node_group.cluster.hadoop_version).get_open_ports(node_group)
|
||||||
|
|
||||||
def recommend_configs(self, cluster):
|
def recommend_configs(self, cluster, scaling=False):
|
||||||
return self._get_version_handler(
|
return self._get_version_handler(
|
||||||
cluster.hadoop_version).recommend_configs(cluster)
|
cluster.hadoop_version).recommend_configs(cluster, scaling)
|
||||||
|
|
|
@ -377,7 +377,7 @@ class AbstractPluginUtils(object):
|
||||||
_("Unable to find config: {applicable_target: %(target)s, name: "
|
_("Unable to find config: {applicable_target: %(target)s, name: "
|
||||||
"%(name)s").format(target=service, name=name))
|
"%(name)s").format(target=service, name=name))
|
||||||
|
|
||||||
def recommend_configs(self, cluster, plugin_configs):
|
def recommend_configs(self, cluster, plugin_configs, scaling):
|
||||||
provider = CDHPluginAutoConfigsProvider(
|
provider = CDHPluginAutoConfigsProvider(
|
||||||
AUTO_CONFIGURATION_SCHEMA, plugin_configs, cluster)
|
AUTO_CONFIGURATION_SCHEMA, plugin_configs, cluster, scaling)
|
||||||
provider.apply_recommended_configs()
|
provider.apply_recommended_configs()
|
||||||
|
|
|
@ -109,5 +109,5 @@ class VersionHandler(avm.AbstractVersionHandler):
|
||||||
def get_open_ports(self, node_group):
|
def get_open_ports(self, node_group):
|
||||||
return dp.get_open_ports(node_group)
|
return dp.get_open_ports(node_group)
|
||||||
|
|
||||||
def recommend_configs(self, cluster):
|
def recommend_configs(self, cluster, scaling):
|
||||||
PU.recommend_configs(cluster, self.get_plugin_configs())
|
PU.recommend_configs(cluster, self.get_plugin_configs(), scaling)
|
||||||
|
|
|
@ -122,5 +122,5 @@ class VersionHandler(avm.AbstractVersionHandler):
|
||||||
def get_open_ports(self, node_group):
|
def get_open_ports(self, node_group):
|
||||||
return dp.get_open_ports(node_group)
|
return dp.get_open_ports(node_group)
|
||||||
|
|
||||||
def recommend_configs(self, cluster):
|
def recommend_configs(self, cluster, scaling):
|
||||||
PU.recommend_configs(cluster, self.get_plugin_configs())
|
PU.recommend_configs(cluster, self.get_plugin_configs(), scaling)
|
||||||
|
|
|
@ -126,5 +126,5 @@ class VersionHandler(avm.AbstractVersionHandler):
|
||||||
def get_open_ports(self, node_group):
|
def get_open_ports(self, node_group):
|
||||||
return dp.get_open_ports(node_group)
|
return dp.get_open_ports(node_group)
|
||||||
|
|
||||||
def recommend_configs(self, cluster):
|
def recommend_configs(self, cluster, scaling):
|
||||||
PU.recommend_configs(cluster, self.get_plugin_configs())
|
PU.recommend_configs(cluster, self.get_plugin_configs(), scaling)
|
||||||
|
|
|
@ -91,7 +91,7 @@ class ProvisioningPluginBase(plugins_base.PluginInterface):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
@plugins_base.optional
|
@plugins_base.optional
|
||||||
def recommend_configs(self, cluster):
|
def recommend_configs(self, cluster, scaling=False):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
def get_all_configs(self, hadoop_version):
|
def get_all_configs(self, hadoop_version):
|
||||||
|
|
|
@ -29,7 +29,7 @@ LOG = logging.getLogger(__name__)
|
||||||
|
|
||||||
@six.add_metaclass(abc.ABCMeta)
|
@six.add_metaclass(abc.ABCMeta)
|
||||||
class AutoConfigsProvider(object):
|
class AutoConfigsProvider(object):
|
||||||
def __init__(self, mapper, plugin_configs, cluster):
|
def __init__(self, mapper, plugin_configs, cluster, scaling):
|
||||||
"""This meta class provides general recommendation utils for cluster
|
"""This meta class provides general recommendation utils for cluster
|
||||||
|
|
||||||
configuration.
|
configuration.
|
||||||
|
@ -42,11 +42,13 @@ class AutoConfigsProvider(object):
|
||||||
with almost same configs and configuring principles.
|
with almost same configs and configuring principles.
|
||||||
:param plugin_configs: all plugins_configs for specified plugin
|
:param plugin_configs: all plugins_configs for specified plugin
|
||||||
:param cluster: cluster which is required to configure
|
:param cluster: cluster which is required to configure
|
||||||
|
:param scaling: indicates that current cluster operation is scaling
|
||||||
"""
|
"""
|
||||||
self.plugin_configs = plugin_configs
|
self.plugin_configs = plugin_configs
|
||||||
self.cluster = cluster
|
self.cluster = cluster
|
||||||
self.node_configs_to_update = mapper.get('node_configs', {})
|
self.node_configs_to_update = mapper.get('node_configs', {})
|
||||||
self.cluster_configs_to_update = mapper.get('cluster_configs', {})
|
self.cluster_configs_to_update = mapper.get('cluster_configs', {})
|
||||||
|
self.scaling = scaling
|
||||||
|
|
||||||
@abc.abstractmethod
|
@abc.abstractmethod
|
||||||
def _get_recommended_node_configs(self, node_group):
|
def _get_recommended_node_configs(self, node_group):
|
||||||
|
@ -146,6 +148,18 @@ class AutoConfigsProvider(object):
|
||||||
result.update({section: configs})
|
result.update({section: configs})
|
||||||
return result
|
return result
|
||||||
|
|
||||||
|
def _get_cluster_extra(self):
|
||||||
|
cluster = self.cluster
|
||||||
|
return cluster.extra.to_dict() if cluster.extra else {}
|
||||||
|
|
||||||
|
def finalize_autoconfiguration(self):
|
||||||
|
if not self.cluster.use_autoconfig:
|
||||||
|
return
|
||||||
|
cluster_extra = self._get_cluster_extra()
|
||||||
|
cluster_extra['auto-configured'] = True
|
||||||
|
conductor.cluster_update(
|
||||||
|
context.ctx(), self.cluster, {'extra': cluster_extra})
|
||||||
|
|
||||||
def apply_node_configs(self, node_group):
|
def apply_node_configs(self, node_group):
|
||||||
"""Method applies configs for node_group using conductor api,
|
"""Method applies configs for node_group using conductor api,
|
||||||
|
|
||||||
|
@ -211,6 +225,13 @@ class AutoConfigsProvider(object):
|
||||||
node_groups using conductor api.
|
node_groups using conductor api.
|
||||||
:return: None.
|
:return: None.
|
||||||
"""
|
"""
|
||||||
|
if self.scaling:
|
||||||
|
# Validate cluster is not an old created cluster
|
||||||
|
cluster_extra = self._get_cluster_extra()
|
||||||
|
if 'auto-configured' not in cluster_extra:
|
||||||
|
# Don't configure
|
||||||
|
return
|
||||||
|
|
||||||
for ng in self.cluster.node_groups:
|
for ng in self.cluster.node_groups:
|
||||||
self.apply_node_configs(ng)
|
self.apply_node_configs(ng)
|
||||||
self.apply_cluster_configs()
|
self.apply_cluster_configs()
|
||||||
|
@ -218,12 +239,13 @@ class AutoConfigsProvider(object):
|
||||||
configs.extend(list(self.node_configs_to_update.keys()))
|
configs.extend(list(self.node_configs_to_update.keys()))
|
||||||
LOG.debug("Following configs were auto-configured: {configs}".format(
|
LOG.debug("Following configs were auto-configured: {configs}".format(
|
||||||
configs=configs))
|
configs=configs))
|
||||||
|
self.finalize_autoconfiguration()
|
||||||
|
|
||||||
|
|
||||||
class HadoopAutoConfigsProvider(AutoConfigsProvider):
|
class HadoopAutoConfigsProvider(AutoConfigsProvider):
|
||||||
def __init__(self, mapper, plugin_configs, cluster, hbase=False):
|
def __init__(self, mapper, plugin_configs, cluster, scaling, hbase=False):
|
||||||
super(HadoopAutoConfigsProvider, self).__init__(
|
super(HadoopAutoConfigsProvider, self).__init__(
|
||||||
mapper, plugin_configs, cluster)
|
mapper, plugin_configs, cluster, scaling)
|
||||||
self.requested_flavors = {}
|
self.requested_flavors = {}
|
||||||
self.is_hbase_enabled = hbase
|
self.is_hbase_enabled = hbase
|
||||||
|
|
||||||
|
|
|
@ -548,7 +548,7 @@ class SparkProvider(p.ProvisioningPluginBase):
|
||||||
|
|
||||||
return ports
|
return ports
|
||||||
|
|
||||||
def recommend_configs(self, cluster):
|
def recommend_configs(self, cluster, scaling=False):
|
||||||
want_to_configure = {
|
want_to_configure = {
|
||||||
'cluster_configs': {
|
'cluster_configs': {
|
||||||
'dfs.replication': ('HDFS', 'dfs.replication')
|
'dfs.replication': ('HDFS', 'dfs.replication')
|
||||||
|
@ -556,5 +556,5 @@ class SparkProvider(p.ProvisioningPluginBase):
|
||||||
}
|
}
|
||||||
provider = ru.HadoopAutoConfigsProvider(
|
provider = ru.HadoopAutoConfigsProvider(
|
||||||
want_to_configure, self.get_configs(
|
want_to_configure, self.get_configs(
|
||||||
cluster.hadoop_version), cluster)
|
cluster.hadoop_version), cluster, scaling)
|
||||||
provider.apply_recommended_configs()
|
provider.apply_recommended_configs()
|
||||||
|
|
|
@ -16,7 +16,7 @@
|
||||||
from sahara.plugins import recommendations_utils as ru
|
from sahara.plugins import recommendations_utils as ru
|
||||||
|
|
||||||
|
|
||||||
def recommend_configs(cluster, plugin_configs):
|
def recommend_configs(cluster, plugin_configs, scaling):
|
||||||
yarn_configs = [
|
yarn_configs = [
|
||||||
'yarn.nodemanager.resource.memory-mb',
|
'yarn.nodemanager.resource.memory-mb',
|
||||||
'yarn.scheduler.minimum-allocation-mb',
|
'yarn.scheduler.minimum-allocation-mb',
|
||||||
|
@ -44,5 +44,5 @@ def recommend_configs(cluster, plugin_configs):
|
||||||
for yarn in yarn_configs:
|
for yarn in yarn_configs:
|
||||||
configs_to_configure['node_configs'][yarn] = ('YARN', yarn)
|
configs_to_configure['node_configs'][yarn] = ('YARN', yarn)
|
||||||
provider = ru.HadoopAutoConfigsProvider(
|
provider = ru.HadoopAutoConfigsProvider(
|
||||||
configs_to_configure, plugin_configs, cluster)
|
configs_to_configure, plugin_configs, cluster, scaling)
|
||||||
provider.apply_recommended_configs()
|
provider.apply_recommended_configs()
|
||||||
|
|
|
@ -93,6 +93,6 @@ class VanillaProvider(p.ProvisioningPluginBase):
|
||||||
return self._get_version_handler(
|
return self._get_version_handler(
|
||||||
cluster.hadoop_version).on_terminate_cluster(cluster)
|
cluster.hadoop_version).on_terminate_cluster(cluster)
|
||||||
|
|
||||||
def recommend_configs(self, cluster):
|
def recommend_configs(self, cluster, scaling=False):
|
||||||
return self._get_version_handler(
|
return self._get_version_handler(
|
||||||
cluster.hadoop_version).recommend_configs(cluster)
|
cluster.hadoop_version).recommend_configs(cluster, scaling)
|
||||||
|
|
|
@ -141,5 +141,5 @@ class VersionHandler(avm.AbstractVersionHandler):
|
||||||
def get_open_ports(self, node_group):
|
def get_open_ports(self, node_group):
|
||||||
return c.get_open_ports(node_group)
|
return c.get_open_ports(node_group)
|
||||||
|
|
||||||
def recommend_configs(self, cluster):
|
def recommend_configs(self, cluster, scaling):
|
||||||
ru.recommend_configs(cluster, self.get_plugin_configs())
|
ru.recommend_configs(cluster, self.get_plugin_configs(), scaling)
|
||||||
|
|
|
@ -136,5 +136,5 @@ class VersionHandler(avm.AbstractVersionHandler):
|
||||||
def get_open_ports(self, node_group):
|
def get_open_ports(self, node_group):
|
||||||
return c.get_open_ports(node_group)
|
return c.get_open_ports(node_group)
|
||||||
|
|
||||||
def recommend_configs(self, cluster):
|
def recommend_configs(self, cluster, scaling):
|
||||||
ru.recommend_configs(cluster, self.get_plugin_configs())
|
ru.recommend_configs(cluster, self.get_plugin_configs(), scaling)
|
||||||
|
|
|
@ -78,7 +78,7 @@ def scale_cluster(id, data):
|
||||||
cluster = c_u.change_cluster_status(
|
cluster = c_u.change_cluster_status(
|
||||||
cluster, c_u.CLUSTER_STATUS_VALIDATING)
|
cluster, c_u.CLUSTER_STATUS_VALIDATING)
|
||||||
quotas.check_scaling(cluster, to_be_enlarged, additional)
|
quotas.check_scaling(cluster, to_be_enlarged, additional)
|
||||||
plugin.recommend_configs(cluster)
|
plugin.recommend_configs(cluster, scaling=True)
|
||||||
plugin.validate_scaling(cluster, to_be_enlarged, additional)
|
plugin.validate_scaling(cluster, to_be_enlarged, additional)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
with excutils.save_and_reraise_exception():
|
with excutils.save_and_reraise_exception():
|
||||||
|
|
|
@ -55,8 +55,8 @@ class TestPluginUtils(b.SaharaTestCase):
|
||||||
plug_utils = pu.AbstractPluginUtils()
|
plug_utils = pu.AbstractPluginUtils()
|
||||||
fake_plugin_utils = mock.Mock()
|
fake_plugin_utils = mock.Mock()
|
||||||
fake_cluster = mock.Mock()
|
fake_cluster = mock.Mock()
|
||||||
plug_utils.recommend_configs(fake_cluster, fake_plugin_utils)
|
plug_utils.recommend_configs(fake_cluster, fake_plugin_utils, False)
|
||||||
self.assertEqual([
|
self.assertEqual([
|
||||||
mock.call(CONFIGURATION_SCHEMA,
|
mock.call(CONFIGURATION_SCHEMA,
|
||||||
fake_plugin_utils, fake_cluster)
|
fake_plugin_utils, fake_cluster, False)
|
||||||
], provider.call_args_list)
|
], provider.call_args_list)
|
||||||
|
|
|
@ -46,7 +46,7 @@ class TestProvidingRecommendations(b.SaharaWithDbTestCase):
|
||||||
cl = FakeObject(cluster_configs=Configs({}))
|
cl = FakeObject(cluster_configs=Configs({}))
|
||||||
fake_flavor.return_value = FakeObject(ram=4096, vcpus=2)
|
fake_flavor.return_value = FakeObject(ram=4096, vcpus=2)
|
||||||
observed = ru.HadoopAutoConfigsProvider(
|
observed = ru.HadoopAutoConfigsProvider(
|
||||||
{}, [], cl)._get_recommended_node_configs(ng)
|
{}, [], cl, False)._get_recommended_node_configs(ng)
|
||||||
self.assertEqual({
|
self.assertEqual({
|
||||||
'mapreduce.reduce.memory.mb': 768,
|
'mapreduce.reduce.memory.mb': 768,
|
||||||
'mapreduce.map.java.opts': '-Xmx307m',
|
'mapreduce.map.java.opts': '-Xmx307m',
|
||||||
|
@ -68,7 +68,7 @@ class TestProvidingRecommendations(b.SaharaWithDbTestCase):
|
||||||
cl = FakeObject(cluster_configs=Configs({}))
|
cl = FakeObject(cluster_configs=Configs({}))
|
||||||
fake_flavor.return_value = FakeObject(ram=2048, vcpus=1)
|
fake_flavor.return_value = FakeObject(ram=2048, vcpus=1)
|
||||||
observed = ru.HadoopAutoConfigsProvider(
|
observed = ru.HadoopAutoConfigsProvider(
|
||||||
{'node_configs': {}, 'cluster_configs': {}}, [], cl
|
{'node_configs': {}, 'cluster_configs': {}}, [], cl, False,
|
||||||
)._get_recommended_node_configs(ng)
|
)._get_recommended_node_configs(ng)
|
||||||
self.assertEqual({
|
self.assertEqual({
|
||||||
'mapreduce.reduce.java.opts': '-Xmx409m',
|
'mapreduce.reduce.java.opts': '-Xmx409m',
|
||||||
|
@ -85,7 +85,7 @@ class TestProvidingRecommendations(b.SaharaWithDbTestCase):
|
||||||
}, observed)
|
}, observed)
|
||||||
|
|
||||||
def test_merge_configs(self):
|
def test_merge_configs(self):
|
||||||
provider = ru.HadoopAutoConfigsProvider({}, None, None)
|
provider = ru.HadoopAutoConfigsProvider({}, None, None, False)
|
||||||
initial_configs = {
|
initial_configs = {
|
||||||
'cat': {
|
'cat': {
|
||||||
'talk': 'meow',
|
'talk': 'meow',
|
||||||
|
@ -161,9 +161,10 @@ class TestProvidingRecommendations(b.SaharaWithDbTestCase):
|
||||||
}),
|
}),
|
||||||
node_groups=[fake_ng],
|
node_groups=[fake_ng],
|
||||||
use_autoconfig=True,
|
use_autoconfig=True,
|
||||||
|
extra=Configs({})
|
||||||
)
|
)
|
||||||
v = TestProvider(
|
v = TestProvider(
|
||||||
to_tune, fake_plugin_configs, fake_cluster)
|
to_tune, fake_plugin_configs, fake_cluster, False)
|
||||||
|
|
||||||
v.apply_recommended_configs()
|
v.apply_recommended_configs()
|
||||||
self.assertEqual([mock.call(context.ctx(), fake_cluster, {
|
self.assertEqual([mock.call(context.ctx(), fake_cluster, {
|
||||||
|
@ -175,7 +176,10 @@ class TestProvidingRecommendations(b.SaharaWithDbTestCase):
|
||||||
'replica': 2
|
'replica': 2
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
})], cond_cluster.call_args_list)
|
}), mock.call(
|
||||||
|
context.ctx(), fake_cluster,
|
||||||
|
{'extra': {'auto-configured': True}})],
|
||||||
|
cond_cluster.call_args_list)
|
||||||
self.assertEqual([mock.call(context.ctx(), fake_ng, {
|
self.assertEqual([mock.call(context.ctx(), fake_ng, {
|
||||||
'node_configs': {
|
'node_configs': {
|
||||||
'bond': {
|
'bond': {
|
||||||
|
@ -224,12 +228,16 @@ class TestProvidingRecommendations(b.SaharaWithDbTestCase):
|
||||||
}),
|
}),
|
||||||
node_groups=[fake_ng],
|
node_groups=[fake_ng],
|
||||||
use_autoconfig=True,
|
use_autoconfig=True,
|
||||||
|
extra=Configs({})
|
||||||
)
|
)
|
||||||
v = ru.HadoopAutoConfigsProvider(
|
v = ru.HadoopAutoConfigsProvider(
|
||||||
to_tune, fake_plugin_configs, fake_cluster)
|
to_tune, fake_plugin_configs, fake_cluster, False)
|
||||||
v.apply_recommended_configs()
|
v.apply_recommended_configs()
|
||||||
self.assertEqual(0, cond_cluster.call_count)
|
|
||||||
self.assertEqual(0, cond_node_group.call_count)
|
self.assertEqual(0, cond_node_group.call_count)
|
||||||
|
self.assertEqual(
|
||||||
|
[mock.call(context.ctx(), fake_cluster,
|
||||||
|
{'extra': {'auto-configured': True}})],
|
||||||
|
cond_cluster.call_args_list)
|
||||||
|
|
||||||
def test_correct_use_autoconfig_value(self):
|
def test_correct_use_autoconfig_value(self):
|
||||||
ctx = context.ctx()
|
ctx = context.ctx()
|
||||||
|
@ -274,3 +282,11 @@ class TestProvidingRecommendations(b.SaharaWithDbTestCase):
|
||||||
self.assertTrue(ng.use_autoconfig)
|
self.assertTrue(ng.use_autoconfig)
|
||||||
else:
|
else:
|
||||||
self.assertFalse(ng.use_autoconfig)
|
self.assertFalse(ng.use_autoconfig)
|
||||||
|
|
||||||
|
@mock.patch('sahara.plugins.recommendations_utils.conductor.'
|
||||||
|
'cluster_update')
|
||||||
|
def test_not_autonconfigured(self, cluster_update):
|
||||||
|
fake_cluster = FakeObject(extra=Configs({}))
|
||||||
|
v = ru.HadoopAutoConfigsProvider({}, [], fake_cluster, True)
|
||||||
|
v.apply_recommended_configs()
|
||||||
|
self.assertEqual(0, cluster_update.call_count)
|
||||||
|
|
|
@ -54,7 +54,7 @@ class TestVersionHandler(testtools.TestCase):
|
||||||
'HadoopAutoConfigsProvider')
|
'HadoopAutoConfigsProvider')
|
||||||
def test_recommend_configs(self, provider):
|
def test_recommend_configs(self, provider):
|
||||||
f_cluster, f_configs = mock.Mock(), mock.Mock()
|
f_cluster, f_configs = mock.Mock(), mock.Mock()
|
||||||
ru.recommend_configs(f_cluster, f_configs)
|
ru.recommend_configs(f_cluster, f_configs, False)
|
||||||
self.assertEqual([
|
self.assertEqual([
|
||||||
mock.call(CONFIGURATION_SCHEMA, f_configs, f_cluster)
|
mock.call(CONFIGURATION_SCHEMA, f_configs, f_cluster, False)
|
||||||
], provider.call_args_list)
|
], provider.call_args_list)
|
||||||
|
|
|
@ -123,7 +123,7 @@ class FakePlugin(object):
|
||||||
def get_configs(self, version):
|
def get_configs(self, version):
|
||||||
return {}
|
return {}
|
||||||
|
|
||||||
def recommend_configs(self, cluster):
|
def recommend_configs(self, cluster, scaling=False):
|
||||||
self.calls_order.append('recommend_configs')
|
self.calls_order.append('recommend_configs')
|
||||||
|
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue