[CDH] Add validation check about dfs_replication

This patch add validation rule about dfs_replication
to both plugin versions.

Change-Id: Iac26782f22c35bbc3cd1a78ec241d7e995c76021
Closes-bug: 1458862
This commit is contained in:
Vitaly Gridnev 2015-05-26 16:00:53 +03:00
parent 2c2864a348
commit 362ab7c285
6 changed files with 53 additions and 4 deletions

View File

@ -55,6 +55,7 @@ cloudera plugin versions:
+ Cluster must contain exactly one manager. + Cluster must contain exactly one manager.
+ Cluster must contain exactly one namenode. + Cluster must contain exactly one namenode.
+ Cluster must contain exactly one secondarynamenode. + Cluster must contain exactly one secondarynamenode.
+ Cluster must contain at least ``dfs_replication`` datanodes.
+ Cluster can contain at most one resourcemanager and this process is also + Cluster can contain at most one resourcemanager and this process is also
required by nodemanager. required by nodemanager.
+ Cluster can contain at most one jobhistory and this process is also + Cluster can contain at most one jobhistory and this process is also

View File

@ -22,6 +22,7 @@ from oslo_log import log as logging
from sahara.conductor import resource as res from sahara.conductor import resource as res
from sahara import context from sahara import context
from sahara import exceptions as exc
from sahara.i18n import _ from sahara.i18n import _
from sahara.plugins.cdh import commands as cmd from sahara.plugins.cdh import commands as cmd
from sahara.plugins import utils as u from sahara.plugins import utils as u
@ -323,3 +324,15 @@ class AbstractPluginUtils(object):
cmd.write_centos_repository(r, cdh5_repo_content, 'cdh') cmd.write_centos_repository(r, cdh5_repo_content, 'cdh')
cmd.write_centos_repository(r, cm5_repo_content, 'cm') cmd.write_centos_repository(r, cm5_repo_content, 'cm')
cmd.update_repository(r) cmd.update_repository(r)
def _get_config_value(self, service, name, configs, cluster=None):
conf = cluster.cluster_configs
if cluster:
if service in conf and name in conf[service]:
return conf[service][name]
for config in configs:
if config.applicable_target == service and config.name == name:
return config.default_value
raise exc.InvalidDataException(
_("Unable to find config: {applicable_target: %(target)s, name: "
"%(name)s").format(target=service, name=name))

View File

@ -57,3 +57,7 @@ class PluginUtilsV5(pu.AbstractPluginUtils):
def start_cloudera_manager(self, cluster): def start_cloudera_manager(self, cluster):
self._start_cloudera_manager( self._start_cloudera_manager(
cluster, c_helper.AWAIT_MANAGER_STARTING_TIMEOUT) cluster, c_helper.AWAIT_MANAGER_STARTING_TIMEOUT)
def get_config_value(self, service, name, cluster=None):
configs = c_helper.get_plugin_configs()
return self._get_config_value(service, name, configs, cluster)

View File

@ -36,6 +36,12 @@ def validate_cluster_creating(cluster):
if snn_count != 1: if snn_count != 1:
raise ex.InvalidComponentCountException('HDFS_SECONDARYNAMENODE', 1, raise ex.InvalidComponentCountException('HDFS_SECONDARYNAMENODE', 1,
snn_count) snn_count)
dn_count = _get_inst_count(cluster, 'HDFS_DATANODE')
replicas = PU.get_config_value('HDFS', 'dfs_replication', cluster)
if dn_count < replicas:
raise ex.InvalidComponentCountException(
'HDFS_DATANODE', replicas, dn_count,
_('Number of datanodes must be not less than dfs_replication.'))
rm_count = _get_inst_count(cluster, 'YARN_RESOURCEMANAGER') rm_count = _get_inst_count(cluster, 'YARN_RESOURCEMANAGER')
if rm_count > 1: if rm_count > 1:
@ -58,7 +64,6 @@ def validate_cluster_creating(cluster):
'YARN_RESOURCEMANAGER', required_by='YARN_NODEMANAGER') 'YARN_RESOURCEMANAGER', required_by='YARN_NODEMANAGER')
oo_count = _get_inst_count(cluster, 'OOZIE_SERVER') oo_count = _get_inst_count(cluster, 'OOZIE_SERVER')
dn_count = _get_inst_count(cluster, 'HDFS_DATANODE')
if oo_count > 1: if oo_count > 1:
raise ex.InvalidComponentCountException('OOZIE_SERVER', _('0 or 1'), raise ex.InvalidComponentCountException('OOZIE_SERVER', _('0 or 1'),
oo_count) oo_count)
@ -156,7 +161,8 @@ def validate_existing_ng_scaling(cluster, existing):
dn_to_delete = 0 dn_to_delete = 0
for ng in cluster.node_groups: for ng in cluster.node_groups:
if ng.id in existing: if ng.id in existing:
if ng.count > existing[ng.id] and "datanode" in ng.node_processes: if (ng.count > existing[ng.id] and
"HDFS_DATANODE" in ng.node_processes):
dn_to_delete += ng.count - existing[ng.id] dn_to_delete += ng.count - existing[ng.id]
if not set(ng.node_processes).issubset(scalable_processes): if not set(ng.node_processes).issubset(scalable_processes):
@ -165,6 +171,13 @@ def validate_existing_ng_scaling(cluster, existing):
raise ex.NodeGroupCannotBeScaled( raise ex.NodeGroupCannotBeScaled(
ng.name, msg % {'processes': ' '.join(ng.node_processes)}) ng.name, msg % {'processes': ' '.join(ng.node_processes)})
dn_count = _get_inst_count(cluster, 'HDFS_DATANODE') - dn_to_delete
replicas = PU.get_config_value('HDFS', 'dfs_replication', cluster)
if dn_count < replicas:
raise ex.ClusterCannotBeScaled(
cluster,
_('Number of datanodes must be not less than dfs_replication.'))
def _get_scalable_processes(): def _get_scalable_processes():
return ['HDFS_DATANODE', 'YARN_NODEMANAGER'] return ['HDFS_DATANODE', 'YARN_NODEMANAGER']

View File

@ -127,3 +127,7 @@ class PluginUtilsV530(pu.AbstractPluginUtils):
def start_cloudera_manager(self, cluster): def start_cloudera_manager(self, cluster):
self._start_cloudera_manager( self._start_cloudera_manager(
cluster, c_helper.AWAIT_MANAGER_STARTING_TIMEOUT) cluster, c_helper.AWAIT_MANAGER_STARTING_TIMEOUT)
def get_config_value(self, service, name, cluster=None):
configs = c_helper.get_plugin_configs()
return self._get_config_value(service, name, configs, cluster)

View File

@ -37,6 +37,13 @@ def validate_cluster_creating(cluster):
raise ex.InvalidComponentCountException('HDFS_SECONDARYNAMENODE', 1, raise ex.InvalidComponentCountException('HDFS_SECONDARYNAMENODE', 1,
snn_count) snn_count)
dn_count = _get_inst_count(cluster, 'HDFS_DATANODE')
replicas = PU.get_config_value('HDFS', 'dfs_replication', cluster)
if dn_count < replicas:
raise ex.InvalidComponentCountException(
'HDFS_DATANODE', replicas, dn_count,
_('Number of datanodes must be not less than dfs_replication.'))
rm_count = _get_inst_count(cluster, 'YARN_RESOURCEMANAGER') rm_count = _get_inst_count(cluster, 'YARN_RESOURCEMANAGER')
if rm_count > 1: if rm_count > 1:
raise ex.InvalidComponentCountException('YARN_RESOURCEMANAGER', raise ex.InvalidComponentCountException('YARN_RESOURCEMANAGER',
@ -58,7 +65,6 @@ def validate_cluster_creating(cluster):
'YARN_RESOURCEMANAGER', required_by='YARN_NODEMANAGER') 'YARN_RESOURCEMANAGER', required_by='YARN_NODEMANAGER')
oo_count = _get_inst_count(cluster, 'OOZIE_SERVER') oo_count = _get_inst_count(cluster, 'OOZIE_SERVER')
dn_count = _get_inst_count(cluster, 'HDFS_DATANODE')
if oo_count > 1: if oo_count > 1:
raise ex.InvalidComponentCountException('OOZIE_SERVER', _('0 or 1'), raise ex.InvalidComponentCountException('OOZIE_SERVER', _('0 or 1'),
oo_count) oo_count)
@ -236,7 +242,8 @@ def validate_existing_ng_scaling(cluster, existing):
dn_to_delete = 0 dn_to_delete = 0
for ng in cluster.node_groups: for ng in cluster.node_groups:
if ng.id in existing: if ng.id in existing:
if ng.count > existing[ng.id] and "datanode" in ng.node_processes: if (ng.count > existing[ng.id] and
'HDFS_DATANODE' in ng.node_processes):
dn_to_delete += ng.count - existing[ng.id] dn_to_delete += ng.count - existing[ng.id]
if not set(ng.node_processes).issubset(scalable_processes): if not set(ng.node_processes).issubset(scalable_processes):
@ -245,6 +252,13 @@ def validate_existing_ng_scaling(cluster, existing):
raise ex.NodeGroupCannotBeScaled( raise ex.NodeGroupCannotBeScaled(
ng.name, msg % {'processes': ' '.join(ng.node_processes)}) ng.name, msg % {'processes': ' '.join(ng.node_processes)})
dn_count = _get_inst_count(cluster, 'HDFS_DATANODE') - dn_to_delete
replicas = PU.get_config_value('HDFS', 'dfs_replication', cluster)
if dn_count < replicas:
raise ex.ClusterCannotBeScaled(
cluster,
_('Number of datanodes must be not less than dfs_replication.'))
def _get_scalable_processes(): def _get_scalable_processes():
return ['HDFS_DATANODE', 'YARN_NODEMANAGER'] return ['HDFS_DATANODE', 'YARN_NODEMANAGER']