Merge "[CDH] Add validation check about dfs_replication"
This commit is contained in:
commit
4c048161e0
@ -55,6 +55,7 @@ cloudera plugin versions:
|
||||
+ Cluster must contain exactly one manager.
|
||||
+ Cluster must contain exactly one namenode.
|
||||
+ Cluster must contain exactly one secondarynamenode.
|
||||
+ Cluster must contain at least ``dfs_replication`` datanodes.
|
||||
+ Cluster can contain at most one resourcemanager and this process is also
|
||||
required by nodemanager.
|
||||
+ Cluster can contain at most one jobhistory and this process is also
|
||||
|
@ -22,6 +22,7 @@ from oslo_log import log as logging
|
||||
|
||||
from sahara.conductor import resource as res
|
||||
from sahara import context
|
||||
from sahara import exceptions as exc
|
||||
from sahara.i18n import _
|
||||
from sahara.plugins.cdh import commands as cmd
|
||||
from sahara.plugins import utils as u
|
||||
@ -323,3 +324,15 @@ class AbstractPluginUtils(object):
|
||||
cmd.write_centos_repository(r, cdh5_repo_content, 'cdh')
|
||||
cmd.write_centos_repository(r, cm5_repo_content, 'cm')
|
||||
cmd.update_repository(r)
|
||||
|
||||
def _get_config_value(self, service, name, configs, cluster=None):
|
||||
conf = cluster.cluster_configs
|
||||
if cluster:
|
||||
if service in conf and name in conf[service]:
|
||||
return conf[service][name]
|
||||
for config in configs:
|
||||
if config.applicable_target == service and config.name == name:
|
||||
return config.default_value
|
||||
raise exc.InvalidDataException(
|
||||
_("Unable to find config: {applicable_target: %(target)s, name: "
|
||||
"%(name)s").format(target=service, name=name))
|
||||
|
@ -57,3 +57,7 @@ class PluginUtilsV5(pu.AbstractPluginUtils):
|
||||
def start_cloudera_manager(self, cluster):
|
||||
self._start_cloudera_manager(
|
||||
cluster, c_helper.AWAIT_MANAGER_STARTING_TIMEOUT)
|
||||
|
||||
def get_config_value(self, service, name, cluster=None):
|
||||
configs = c_helper.get_plugin_configs()
|
||||
return self._get_config_value(service, name, configs, cluster)
|
||||
|
@ -36,6 +36,12 @@ def validate_cluster_creating(cluster):
|
||||
if snn_count != 1:
|
||||
raise ex.InvalidComponentCountException('HDFS_SECONDARYNAMENODE', 1,
|
||||
snn_count)
|
||||
dn_count = _get_inst_count(cluster, 'HDFS_DATANODE')
|
||||
replicas = PU.get_config_value('HDFS', 'dfs_replication', cluster)
|
||||
if dn_count < replicas:
|
||||
raise ex.InvalidComponentCountException(
|
||||
'HDFS_DATANODE', replicas, dn_count,
|
||||
_('Number of datanodes must be not less than dfs_replication.'))
|
||||
|
||||
rm_count = _get_inst_count(cluster, 'YARN_RESOURCEMANAGER')
|
||||
if rm_count > 1:
|
||||
@ -58,7 +64,6 @@ def validate_cluster_creating(cluster):
|
||||
'YARN_RESOURCEMANAGER', required_by='YARN_NODEMANAGER')
|
||||
|
||||
oo_count = _get_inst_count(cluster, 'OOZIE_SERVER')
|
||||
dn_count = _get_inst_count(cluster, 'HDFS_DATANODE')
|
||||
if oo_count > 1:
|
||||
raise ex.InvalidComponentCountException('OOZIE_SERVER', _('0 or 1'),
|
||||
oo_count)
|
||||
@ -156,7 +161,8 @@ def validate_existing_ng_scaling(cluster, existing):
|
||||
dn_to_delete = 0
|
||||
for ng in cluster.node_groups:
|
||||
if ng.id in existing:
|
||||
if ng.count > existing[ng.id] and "datanode" in ng.node_processes:
|
||||
if (ng.count > existing[ng.id] and
|
||||
"HDFS_DATANODE" in ng.node_processes):
|
||||
dn_to_delete += ng.count - existing[ng.id]
|
||||
|
||||
if not set(ng.node_processes).issubset(scalable_processes):
|
||||
@ -165,6 +171,13 @@ def validate_existing_ng_scaling(cluster, existing):
|
||||
raise ex.NodeGroupCannotBeScaled(
|
||||
ng.name, msg % {'processes': ' '.join(ng.node_processes)})
|
||||
|
||||
dn_count = _get_inst_count(cluster, 'HDFS_DATANODE') - dn_to_delete
|
||||
replicas = PU.get_config_value('HDFS', 'dfs_replication', cluster)
|
||||
if dn_count < replicas:
|
||||
raise ex.ClusterCannotBeScaled(
|
||||
cluster,
|
||||
_('Number of datanodes must be not less than dfs_replication.'))
|
||||
|
||||
|
||||
def _get_scalable_processes():
|
||||
return ['HDFS_DATANODE', 'YARN_NODEMANAGER']
|
||||
|
@ -127,3 +127,7 @@ class PluginUtilsV530(pu.AbstractPluginUtils):
|
||||
def start_cloudera_manager(self, cluster):
|
||||
self._start_cloudera_manager(
|
||||
cluster, c_helper.AWAIT_MANAGER_STARTING_TIMEOUT)
|
||||
|
||||
def get_config_value(self, service, name, cluster=None):
|
||||
configs = c_helper.get_plugin_configs()
|
||||
return self._get_config_value(service, name, configs, cluster)
|
||||
|
@ -37,6 +37,13 @@ def validate_cluster_creating(cluster):
|
||||
raise ex.InvalidComponentCountException('HDFS_SECONDARYNAMENODE', 1,
|
||||
snn_count)
|
||||
|
||||
dn_count = _get_inst_count(cluster, 'HDFS_DATANODE')
|
||||
replicas = PU.get_config_value('HDFS', 'dfs_replication', cluster)
|
||||
if dn_count < replicas:
|
||||
raise ex.InvalidComponentCountException(
|
||||
'HDFS_DATANODE', replicas, dn_count,
|
||||
_('Number of datanodes must be not less than dfs_replication.'))
|
||||
|
||||
rm_count = _get_inst_count(cluster, 'YARN_RESOURCEMANAGER')
|
||||
if rm_count > 1:
|
||||
raise ex.InvalidComponentCountException('YARN_RESOURCEMANAGER',
|
||||
@ -58,7 +65,6 @@ def validate_cluster_creating(cluster):
|
||||
'YARN_RESOURCEMANAGER', required_by='YARN_NODEMANAGER')
|
||||
|
||||
oo_count = _get_inst_count(cluster, 'OOZIE_SERVER')
|
||||
dn_count = _get_inst_count(cluster, 'HDFS_DATANODE')
|
||||
if oo_count > 1:
|
||||
raise ex.InvalidComponentCountException('OOZIE_SERVER', _('0 or 1'),
|
||||
oo_count)
|
||||
@ -236,7 +242,8 @@ def validate_existing_ng_scaling(cluster, existing):
|
||||
dn_to_delete = 0
|
||||
for ng in cluster.node_groups:
|
||||
if ng.id in existing:
|
||||
if ng.count > existing[ng.id] and "datanode" in ng.node_processes:
|
||||
if (ng.count > existing[ng.id] and
|
||||
'HDFS_DATANODE' in ng.node_processes):
|
||||
dn_to_delete += ng.count - existing[ng.id]
|
||||
|
||||
if not set(ng.node_processes).issubset(scalable_processes):
|
||||
@ -245,6 +252,13 @@ def validate_existing_ng_scaling(cluster, existing):
|
||||
raise ex.NodeGroupCannotBeScaled(
|
||||
ng.name, msg % {'processes': ' '.join(ng.node_processes)})
|
||||
|
||||
dn_count = _get_inst_count(cluster, 'HDFS_DATANODE') - dn_to_delete
|
||||
replicas = PU.get_config_value('HDFS', 'dfs_replication', cluster)
|
||||
if dn_count < replicas:
|
||||
raise ex.ClusterCannotBeScaled(
|
||||
cluster,
|
||||
_('Number of datanodes must be not less than dfs_replication.'))
|
||||
|
||||
|
||||
def _get_scalable_processes():
|
||||
return ['HDFS_DATANODE', 'YARN_NODEMANAGER']
|
||||
|
Loading…
Reference in New Issue
Block a user