Merge "CDH plugin validation mudule refactoring"

This commit is contained in:
Jenkins 2016-02-02 19:53:18 +00:00 committed by Gerrit Code Review
commit af26773b50
13 changed files with 575 additions and 714 deletions

View File

@ -17,7 +17,7 @@ from sahara.i18n import _
from sahara.plugins.cdh import cloudera_utils as cu
from sahara.plugins.cdh.v5 import config_helper as c_helper
from sahara.plugins.cdh.v5 import plugin_utils as pu
from sahara.plugins.cdh.v5 import validation as v
from sahara.plugins.cdh.v5 import validation
from sahara.swift import swift_helper
from sahara.utils import cluster_progress_ops as cpo
from sahara.utils import configs as s_cfg
@ -35,9 +35,11 @@ HBASE_SERVICE_TYPE = 'HBASE'
class ClouderaUtilsV5(cu.ClouderaUtils):
def __init__(self):
cu.ClouderaUtils.__init__(self)
self.pu = pu.PluginUtilsV5()
self.validator = validation.ValidatorV5
@cu.cloudera_cmd
def format_namenode(self, hdfs_service):
@ -151,7 +153,8 @@ class ClouderaUtilsV5(cu.ClouderaUtils):
all_confs = {}
if cluster:
zk_count = v._get_inst_count(cluster, 'ZOOKEEPER_SERVER')
zk_count = self.validator._get_inst_count(cluster,
'ZOOKEEPER_SERVER')
core_site_safety_valve = ''
if self.pu.c_helper.is_swift_enabled(cluster):
configs = swift_helper.get_swift_configs()

View File

@ -13,175 +13,9 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from sahara.i18n import _
from sahara.plugins.cdh.v5 import plugin_utils as pu
from sahara.plugins import exceptions as ex
from sahara.plugins import utils as u
from sahara.utils import general as gu
PU = pu.PluginUtilsV5()
from sahara.plugins.cdh import validation
def validate_cluster_creating(cluster):
mng_count = _get_inst_count(cluster, 'CLOUDERA_MANAGER')
if mng_count != 1:
raise ex.InvalidComponentCountException('CLOUDERA_MANAGER',
1, mng_count)
nn_count = _get_inst_count(cluster, 'HDFS_NAMENODE')
if nn_count != 1:
raise ex.InvalidComponentCountException('HDFS_NAMENODE', 1, nn_count)
snn_count = _get_inst_count(cluster, 'HDFS_SECONDARYNAMENODE')
if snn_count != 1:
raise ex.InvalidComponentCountException('HDFS_SECONDARYNAMENODE', 1,
snn_count)
dn_count = _get_inst_count(cluster, 'HDFS_DATANODE')
replicas = PU.get_config_value('HDFS', 'dfs_replication', cluster)
if dn_count < replicas:
raise ex.InvalidComponentCountException(
'HDFS_DATANODE', replicas, dn_count,
_('Number of datanodes must be not less than dfs_replication.'))
rm_count = _get_inst_count(cluster, 'YARN_RESOURCEMANAGER')
if rm_count > 1:
raise ex.InvalidComponentCountException('YARN_RESOURCEMANAGER',
_('0 or 1'), rm_count)
hs_count = _get_inst_count(cluster, 'YARN_JOBHISTORY')
if hs_count > 1:
raise ex.InvalidComponentCountException('YARN_JOBHISTORY', _('0 or 1'),
hs_count)
if rm_count > 0 and hs_count < 1:
raise ex.RequiredServiceMissingException(
'YARN_JOBHISTORY', required_by='YARN_RESOURCEMANAGER')
nm_count = _get_inst_count(cluster, 'YARN_NODEMANAGER')
if rm_count == 0:
if nm_count > 0:
raise ex.RequiredServiceMissingException(
'YARN_RESOURCEMANAGER', required_by='YARN_NODEMANAGER')
oo_count = _get_inst_count(cluster, 'OOZIE_SERVER')
if oo_count > 1:
raise ex.InvalidComponentCountException('OOZIE_SERVER', _('0 or 1'),
oo_count)
if oo_count == 1:
if dn_count < 1:
raise ex.RequiredServiceMissingException(
'HDFS_DATANODE', required_by='OOZIE_SERVER')
if nm_count < 1:
raise ex.RequiredServiceMissingException(
'YARN_NODEMANAGER', required_by='OOZIE_SERVER')
if hs_count != 1:
raise ex.RequiredServiceMissingException(
'YARN_JOBHISTORY', required_by='OOZIE_SERVER')
hms_count = _get_inst_count(cluster, 'HIVE_METASTORE')
hvs_count = _get_inst_count(cluster, 'HIVE_SERVER2')
whc_count = _get_inst_count(cluster, 'HIVE_WEBHCAT')
if hms_count and rm_count < 1:
raise ex.RequiredServiceMissingException(
'YARN_RESOURCEMANAGER', required_by='HIVE_METASTORE')
if hms_count and not hvs_count:
raise ex.RequiredServiceMissingException(
'HIVE_SERVER2', required_by='HIVE_METASTORE')
if hvs_count and not hms_count:
raise ex.RequiredServiceMissingException(
'HIVE_METASTORE', required_by='HIVE_SERVER2')
if whc_count and not hms_count:
raise ex.RequiredServiceMissingException(
'HIVE_METASTORE', required_by='WEBHCAT')
hue_count = _get_inst_count(cluster, 'HUE_SERVER')
if hue_count > 1:
raise ex.InvalidComponentCountException('HUE_SERVER', _('0 or 1'),
hue_count)
shs_count = _get_inst_count(cluster, 'SPARK_YARN_HISTORY_SERVER')
if shs_count > 1:
raise ex.InvalidComponentCountException('SPARK_YARN_HISTORY_SERVER',
_('0 or 1'), shs_count)
if shs_count and not rm_count:
raise ex.RequiredServiceMissingException(
'YARN_RESOURCEMANAGER', required_by='SPARK_YARN_HISTORY_SERVER')
if oo_count < 1 and hue_count:
raise ex.RequiredServiceMissingException(
'OOZIE_SERVER', required_by='HUE_SERVER')
if hms_count < 1 and hue_count:
raise ex.RequiredServiceMissingException(
'HIVE_METASTORE', required_by='HUE_SERVER')
hbm_count = _get_inst_count(cluster, 'HBASE_MASTER')
hbr_count = _get_inst_count(cluster, 'HBASE_REGIONSERVER')
zk_count = _get_inst_count(cluster, 'ZOOKEEPER_SERVER')
if hbm_count >= 1:
if zk_count < 1:
raise ex.RequiredServiceMissingException('ZOOKEEPER',
required_by='HBASE')
if hbr_count < 1:
raise ex.InvalidComponentCountException('HBASE_REGIONSERVER',
_('at least 1'), hbr_count)
elif hbr_count >= 1:
raise ex.InvalidComponentCountException('HBASE_MASTER',
_('at least 1'), hbm_count)
def validate_additional_ng_scaling(cluster, additional):
rm = PU.get_resourcemanager(cluster)
scalable_processes = _get_scalable_processes()
for ng_id in additional:
ng = gu.get_by_id(cluster.node_groups, ng_id)
if not set(ng.node_processes).issubset(scalable_processes):
msg = _("CDH plugin cannot scale nodegroup with processes: "
"%(processes)s")
raise ex.NodeGroupCannotBeScaled(
ng.name, msg % {'processes': ' '.join(ng.node_processes)})
if not rm and 'YARN_NODEMANAGER' in ng.node_processes:
msg = _("CDH plugin cannot scale node group with processes "
"which have no master-processes run in cluster")
raise ex.NodeGroupCannotBeScaled(ng.name, msg)
def validate_existing_ng_scaling(cluster, existing):
scalable_processes = _get_scalable_processes()
dn_to_delete = 0
for ng in cluster.node_groups:
if ng.id in existing:
if (ng.count > existing[ng.id] and
"HDFS_DATANODE" in ng.node_processes):
dn_to_delete += ng.count - existing[ng.id]
if not set(ng.node_processes).issubset(scalable_processes):
msg = _("CDH plugin cannot scale nodegroup with processes: "
"%(processes)s")
raise ex.NodeGroupCannotBeScaled(
ng.name, msg % {'processes': ' '.join(ng.node_processes)})
dn_count = _get_inst_count(cluster, 'HDFS_DATANODE') - dn_to_delete
replicas = PU.get_config_value('HDFS', 'dfs_replication', cluster)
if dn_count < replicas:
raise ex.ClusterCannotBeScaled(
cluster,
_('Number of datanodes must be not less than dfs_replication.'))
def _get_scalable_processes():
return ['HDFS_DATANODE', 'YARN_NODEMANAGER']
def _get_inst_count(cluster, process):
return sum([ng.count for ng in u.get_node_groups(cluster, process)])
class ValidatorV5(validation.Validator):
PU = pu.PluginUtilsV5()

View File

@ -21,12 +21,13 @@ from sahara.plugins.cdh.v5 import config_helper as c_helper
from sahara.plugins.cdh.v5 import deploy as dp
from sahara.plugins.cdh.v5 import edp_engine
from sahara.plugins.cdh.v5 import plugin_utils as pu
from sahara.plugins.cdh.v5 import validation as vl
from sahara.plugins.cdh.v5 import validation
conductor = conductor.API
CU = cu.ClouderaUtilsV5()
PU = pu.PluginUtilsV5()
vl = validation.ValidatorV5
class VersionHandler(avm.AbstractVersionHandler):

View File

@ -17,7 +17,7 @@ from sahara.i18n import _
from sahara.plugins.cdh import cloudera_utils as cu
from sahara.plugins.cdh.v5_3_0 import config_helper as c_helper
from sahara.plugins.cdh.v5_3_0 import plugin_utils as pu
from sahara.plugins.cdh.v5_3_0 import validation as v
from sahara.plugins.cdh.v5_3_0 import validation
from sahara.swift import swift_helper
from sahara.utils import cluster_progress_ops as cpo
from sahara.utils import configs as s_cfg
@ -52,6 +52,7 @@ class ClouderaUtilsV530(cu.ClouderaUtils):
def __init__(self):
cu.ClouderaUtils.__init__(self)
self.pu = pu.PluginUtilsV530()
self.validator = validation.ValidatorV530
def get_service_by_role(self, role, cluster=None, instance=None):
cm_cluster = None
@ -141,7 +142,7 @@ class ClouderaUtilsV530(cu.ClouderaUtils):
if len(self.pu.get_zookeepers(cluster)) > 0:
zookeeper = cm_cluster.get_service(self.ZOOKEEPER_SERVICE_NAME)
zookeeper.update_config(self._get_configs(ZOOKEEPER_SERVICE_TYPE,
cluster=cluster))
cluster=cluster))
hdfs = cm_cluster.get_service(self.HDFS_SERVICE_NAME)
hdfs.update_config(self._get_configs(HDFS_SERVICE_TYPE,
@ -211,14 +212,20 @@ class ClouderaUtilsV530(cu.ClouderaUtils):
all_confs = {}
if cluster:
zk_count = v._get_inst_count(cluster, 'ZOOKEEPER_SERVER')
hbm_count = v._get_inst_count(cluster, 'HBASE_MASTER')
snt_count = v._get_inst_count(cluster, 'SENTRY_SERVER')
ks_count = v._get_inst_count(cluster, 'KEY_VALUE_STORE_INDEXER')
imp_count = v._get_inst_count(cluster, 'IMPALA_CATALOGSERVER')
hive_count = v._get_inst_count(cluster, 'HIVE_METASTORE')
slr_count = v._get_inst_count(cluster, 'SOLR_SERVER')
sqp_count = v._get_inst_count(cluster, 'SQOOP_SERVER')
zk_count = self.validator._get_inst_count(cluster,
'ZOOKEEPER_SERVER')
hbm_count = self.validator._get_inst_count(cluster, 'HBASE_MASTER')
snt_count = self.validator._get_inst_count(cluster,
'SENTRY_SERVER')
ks_count =\
self.validator._get_inst_count(cluster,
'KEY_VALUE_STORE_INDEXER')
imp_count = self.validator._get_inst_count(cluster,
'IMPALA_CATALOGSERVER')
hive_count = self.validator._get_inst_count(cluster,
'HIVE_METASTORE')
slr_count = self.validator._get_inst_count(cluster, 'SOLR_SERVER')
sqp_count = self.validator._get_inst_count(cluster, 'SQOOP_SERVER')
core_site_safety_valve = ''
if self.pu.c_helper.is_swift_enabled(cluster):
configs = swift_helper.get_swift_configs()

View File

@ -15,263 +15,144 @@
from sahara.i18n import _
from sahara.plugins.cdh.v5_3_0 import plugin_utils as pu
from sahara.plugins.cdh import validation
from sahara.plugins import exceptions as ex
from sahara.plugins import utils as u
from sahara.utils import general as gu
PU = pu.PluginUtilsV530()
def validate_cluster_creating(cluster):
mng_count = _get_inst_count(cluster, 'CLOUDERA_MANAGER')
if mng_count != 1:
raise ex.InvalidComponentCountException('CLOUDERA_MANAGER',
1, mng_count)
class ValidatorV530(validation.Validator):
PU = pu.PluginUtilsV530()
nn_count = _get_inst_count(cluster, 'HDFS_NAMENODE')
if nn_count != 1:
raise ex.InvalidComponentCountException('HDFS_NAMENODE', 1, nn_count)
@classmethod
def validate_cluster_creating(cls, cluster):
super(ValidatorV530, cls).validate_cluster_creating(cluster)
cls._flume_validation(cluster)
cls._sentry_validation(cluster)
cls._solr_validation(cluster)
cls._sqoop_validation(cluster)
cls._hbase_indexer_validation(cluster)
cls._impala_validation(cluster)
snn_count = _get_inst_count(cluster, 'HDFS_SECONDARYNAMENODE')
if snn_count != 1:
raise ex.InvalidComponentCountException('HDFS_SECONDARYNAMENODE', 1,
snn_count)
@classmethod
def _flume_validation(cls, cluster):
a_count = cls._get_inst_count(cluster, 'FLUME_AGENT')
dn_count = cls._get_inst_count(cluster, 'HDFS_DATANODE')
dn_count = _get_inst_count(cluster, 'HDFS_DATANODE')
replicas = PU.get_config_value('HDFS', 'dfs_replication', cluster)
if dn_count < replicas:
raise ex.InvalidComponentCountException(
'HDFS_DATANODE', replicas, dn_count,
_('Number of datanodes must be not less than dfs_replication.'))
if a_count >= 1:
if dn_count < 1:
raise ex.RequiredServiceMissingException(
'HDFS_DATANODE', required_by='FLUME_AGENT')
rm_count = _get_inst_count(cluster, 'YARN_RESOURCEMANAGER')
if rm_count > 1:
raise ex.InvalidComponentCountException('YARN_RESOURCEMANAGER',
_('0 or 1'), rm_count)
@classmethod
def _sentry_validation(cls, cluster):
hs_count = _get_inst_count(cluster, 'YARN_JOBHISTORY')
if hs_count > 1:
raise ex.InvalidComponentCountException('YARN_JOBHISTORY',
_('0 or 1'), hs_count)
snt_count = cls._get_inst_count(cluster, 'SENTRY_SERVER')
dn_count = cls._get_inst_count(cluster, 'HDFS_DATANODE')
zk_count = cls._get_inst_count(cluster, 'ZOOKEEPER_SERVER')
if rm_count > 0 and hs_count < 1:
raise ex.RequiredServiceMissingException(
'YARN_JOBHISTORY', required_by='YARN_RESOURCEMANAGER')
nm_count = _get_inst_count(cluster, 'YARN_NODEMANAGER')
if rm_count == 0:
if nm_count > 0:
raise ex.RequiredServiceMissingException(
'YARN_RESOURCEMANAGER', required_by='YARN_NODEMANAGER')
oo_count = _get_inst_count(cluster, 'OOZIE_SERVER')
if oo_count > 1:
raise ex.InvalidComponentCountException('OOZIE_SERVER', _('0 or 1'),
oo_count)
if oo_count == 1:
if dn_count < 1:
raise ex.RequiredServiceMissingException(
'HDFS_DATANODE', required_by='OOZIE_SERVER')
if nm_count < 1:
raise ex.RequiredServiceMissingException(
'YARN_NODEMANAGER', required_by='OOZIE_SERVER')
if hs_count != 1:
raise ex.RequiredServiceMissingException(
'YARN_JOBHISTORY', required_by='OOZIE_SERVER')
hms_count = _get_inst_count(cluster, 'HIVE_METASTORE')
hvs_count = _get_inst_count(cluster, 'HIVE_SERVER2')
whc_count = _get_inst_count(cluster, 'HIVE_WEBHCAT')
if hms_count and rm_count < 1:
raise ex.RequiredServiceMissingException(
'YARN_RESOURCEMANAGER', required_by='HIVE_METASTORE')
if hms_count and not hvs_count:
raise ex.RequiredServiceMissingException(
'HIVE_SERVER2', required_by='HIVE_METASTORE')
if hvs_count and not hms_count:
raise ex.RequiredServiceMissingException(
'HIVE_METASTORE', required_by='HIVE_SERVER2')
if whc_count and not hms_count:
raise ex.RequiredServiceMissingException(
'HIVE_METASTORE', required_by='HIVE_WEBHCAT')
hue_count = _get_inst_count(cluster, 'HUE_SERVER')
if hue_count > 1:
raise ex.InvalidComponentCountException('HUE_SERVER', _('0 or 1'),
hue_count)
shs_count = _get_inst_count(cluster, 'SPARK_YARN_HISTORY_SERVER')
if shs_count > 1:
raise ex.InvalidComponentCountException('SPARK_YARN_HISTORY_SERVER',
_('0 or 1'), shs_count)
if shs_count and not rm_count:
raise ex.RequiredServiceMissingException(
'YARN_RESOURCEMANAGER', required_by='SPARK_YARN_HISTORY_SERVER')
if oo_count < 1 and hue_count:
raise ex.RequiredServiceMissingException(
'OOZIE_SERVER', required_by='HUE_SERVER')
if hms_count < 1 and hue_count:
raise ex.RequiredServiceMissingException(
'HIVE_METASTORE', required_by='HUE_SERVER')
hbm_count = _get_inst_count(cluster, 'HBASE_MASTER')
hbr_count = _get_inst_count(cluster, 'HBASE_REGIONSERVER')
zk_count = _get_inst_count(cluster, 'ZOOKEEPER_SERVER')
if hbm_count >= 1:
if zk_count < 1:
raise ex.RequiredServiceMissingException('ZOOKEEPER',
required_by='HBASE')
if hbr_count < 1:
if snt_count > 1:
raise ex.InvalidComponentCountException(
'HBASE_REGIONSERVER', _('at least 1'), hbr_count)
elif hbr_count >= 1:
raise ex.InvalidComponentCountException('HBASE_MASTER',
_('at least 1'), hbm_count)
'SENTRY_SERVER', _('0 or 1'), snt_count)
if snt_count == 1:
if dn_count < 1:
raise ex.RequiredServiceMissingException(
'HDFS_DATANODE', required_by='SENTRY_SERVER')
if zk_count < 1:
raise ex.RequiredServiceMissingException(
'ZOOKEEPER', required_by='SENTRY_SERVER')
a_count = _get_inst_count(cluster, 'FLUME_AGENT')
if a_count >= 1:
if dn_count < 1:
raise ex.RequiredServiceMissingException(
'HDFS_DATANODE', required_by='FLUME_AGENT')
@classmethod
def _solr_validation(cls, cluster):
slr_count = cls._get_inst_count(cluster, 'SOLR_SERVER')
zk_count = cls._get_inst_count(cluster, 'ZOOKEEPER_SERVER')
dn_count = cls._get_inst_count(cluster, 'HDFS_DATANODE')
snt_count = _get_inst_count(cluster, 'SENTRY_SERVER')
if snt_count > 1:
raise ex.InvalidComponentCountException('SENTRY_SERVER', _('0 or 1'),
snt_count)
if snt_count == 1:
if dn_count < 1:
raise ex.RequiredServiceMissingException(
'HDFS_DATANODE', required_by='SENTRY_SERVER')
if zk_count < 1:
raise ex.RequiredServiceMissingException(
'ZOOKEEPER', required_by='SENTRY_SERVER')
if slr_count >= 1:
if dn_count < 1:
raise ex.RequiredServiceMissingException(
'HDFS_DATANODE', required_by='SOLR_SERVER')
if zk_count < 1:
raise ex.RequiredServiceMissingException(
'ZOOKEEPER', required_by='SOLR_SERVER')
slr_count = _get_inst_count(cluster, 'SOLR_SERVER')
if slr_count >= 1:
if dn_count < 1:
raise ex.RequiredServiceMissingException(
'HDFS_DATANODE', required_by='SOLR_SERVER')
if zk_count < 1:
raise ex.RequiredServiceMissingException(
'ZOOKEEPER', required_by='SOLR_SERVER')
@classmethod
def _sqoop_validation(cls, cluster):
s2s_count = _get_inst_count(cluster, 'SQOOP_SERVER')
if s2s_count > 1:
raise ex.InvalidComponentCountException('SQOOP_SERVER', _('0 or 1'),
s2s_count)
if s2s_count == 1:
if dn_count < 1:
raise ex.RequiredServiceMissingException(
'HDFS_DATANODE', required_by='SQOOP_SERVER')
if nm_count < 1:
raise ex.RequiredServiceMissingException(
'YARN_NODEMANAGER', required_by='SQOOP_SERVER')
if hs_count != 1:
raise ex.RequiredServiceMissingException(
'YARN_JOBHISTORY', required_by='SQOOP_SERVER')
s2s_count = cls._get_inst_count(cluster, 'SQOOP_SERVER')
dn_count = cls._get_inst_count(cluster, 'HDFS_DATANODE')
hs_count = cls._get_inst_count(cluster, 'YARN_JOBHISTORY')
nm_count = cls._get_inst_count(cluster, 'YARN_NODEMANAGER')
lhbi_count = _get_inst_count(cluster, 'HBASE_INDEXER')
if lhbi_count >= 1:
if dn_count < 1:
raise ex.RequiredServiceMissingException(
'HDFS_DATANODE', required_by='HBASE_INDEXER')
if zk_count < 1:
raise ex.RequiredServiceMissingException(
'ZOOKEEPER', required_by='HBASE_INDEXER')
if slr_count < 1:
raise ex.RequiredServiceMissingException(
'SOLR_SERVER', required_by='HBASE_INDEXER')
if hbm_count < 1:
raise ex.RequiredServiceMissingException(
'HBASE_MASTER', required_by='HBASE_INDEXER')
if s2s_count > 1:
raise ex.InvalidComponentCountException(
'SQOOP_SERVER', _('0 or 1'), s2s_count)
if s2s_count == 1:
if dn_count < 1:
raise ex.RequiredServiceMissingException(
'HDFS_DATANODE', required_by='SQOOP_SERVER')
if nm_count < 1:
raise ex.RequiredServiceMissingException(
'YARN_NODEMANAGER', required_by='SQOOP_SERVER')
if hs_count != 1:
raise ex.RequiredServiceMissingException(
'YARN_JOBHISTORY', required_by='SQOOP_SERVER')
ics_count = _get_inst_count(cluster, 'IMPALA_CATALOGSERVER')
iss_count = _get_inst_count(cluster, 'IMPALA_STATESTORE')
id_count = _get_inst_count(cluster, 'IMPALAD')
if ics_count > 1:
raise ex.InvalidComponentCountException('IMPALA_CATALOGSERVER',
_('0 or 1'), ics_count)
if iss_count > 1:
raise ex.InvalidComponentCountException('IMPALA_STATESTORE',
_('0 or 1'), iss_count)
if ics_count == 1:
datanode_ng = u.get_node_groups(cluster, "HDFS_DATANODE")
impalad_ng = u.get_node_groups(cluster, "IMPALAD")
datanodes = set(ng.id for ng in datanode_ng)
impalads = set(ng.id for ng in impalad_ng)
@classmethod
def _hbase_indexer_validation(cls, cluster):
if datanodes != impalads:
raise ex.InvalidClusterTopology(
_("IMPALAD must be installed on every HDFS_DATANODE"))
lhbi_count = cls._get_inst_count(cluster, 'HBASE_INDEXER')
zk_count = cls._get_inst_count(cluster, 'ZOOKEEPER_SERVER')
dn_count = cls._get_inst_count(cluster, 'HDFS_DATANODE')
slr_count = cls._get_inst_count(cluster, 'SOLR_SERVER')
hbm_count = cls._get_inst_count(cluster, 'HBASE_MASTER')
if iss_count != 1:
raise ex.RequiredServiceMissingException(
'IMPALA_STATESTORE', required_by='IMPALA')
if id_count < 1:
raise ex.RequiredServiceMissingException(
'IMPALAD', required_by='IMPALA')
if dn_count < 1:
raise ex.RequiredServiceMissingException(
'HDFS_DATANODE', required_by='IMPALA')
if hms_count < 1:
raise ex.RequiredServiceMissingException(
'HIVE_METASTORE', required_by='IMPALA')
if lhbi_count >= 1:
if dn_count < 1:
raise ex.RequiredServiceMissingException(
'HDFS_DATANODE', required_by='HBASE_INDEXER')
if zk_count < 1:
raise ex.RequiredServiceMissingException(
'ZOOKEEPER', required_by='HBASE_INDEXER')
if slr_count < 1:
raise ex.RequiredServiceMissingException(
'SOLR_SERVER', required_by='HBASE_INDEXER')
if hbm_count < 1:
raise ex.RequiredServiceMissingException(
'HBASE_MASTER', required_by='HBASE_INDEXER')
@classmethod
def _impala_validation(cls, cluster):
ics_count = cls._get_inst_count(cluster, 'IMPALA_CATALOGSERVER')
iss_count = cls._get_inst_count(cluster, 'IMPALA_STATESTORE')
id_count = cls._get_inst_count(cluster, 'IMPALAD')
dn_count = cls._get_inst_count(cluster, 'HDFS_DATANODE')
hms_count = cls._get_inst_count(cluster, 'HIVE_METASTORE')
def validate_additional_ng_scaling(cluster, additional):
rm = PU.get_resourcemanager(cluster)
scalable_processes = _get_scalable_processes()
if ics_count > 1:
raise ex.InvalidComponentCountException('IMPALA_CATALOGSERVER',
_('0 or 1'), ics_count)
if iss_count > 1:
raise ex.InvalidComponentCountException('IMPALA_STATESTORE',
_('0 or 1'), iss_count)
if ics_count == 1:
datanode_ng = u.get_node_groups(cluster, "HDFS_DATANODE")
impalad_ng = u.get_node_groups(cluster, "IMPALAD")
datanodes = set(ng.id for ng in datanode_ng)
impalads = set(ng.id for ng in impalad_ng)
for ng_id in additional:
ng = gu.get_by_id(cluster.node_groups, ng_id)
if not set(ng.node_processes).issubset(scalable_processes):
msg = _("CDH plugin cannot scale nodegroup with processes: "
"%(processes)s")
raise ex.NodeGroupCannotBeScaled(
ng.name, msg % {'processes': ' '.join(ng.node_processes)})
if datanodes != impalads:
raise ex.InvalidClusterTopology(
_("IMPALAD must be installed on every HDFS_DATANODE"))
if not rm and 'YARN_NODEMANAGER' in ng.node_processes:
msg = _("CDH plugin cannot scale node group with processes "
"which have no master-processes run in cluster")
raise ex.NodeGroupCannotBeScaled(ng.name, msg)
def validate_existing_ng_scaling(cluster, existing):
scalable_processes = _get_scalable_processes()
dn_to_delete = 0
for ng in cluster.node_groups:
if ng.id in existing:
if (ng.count > existing[ng.id] and
'HDFS_DATANODE' in ng.node_processes):
dn_to_delete += ng.count - existing[ng.id]
if not set(ng.node_processes).issubset(scalable_processes):
msg = _("CDH plugin cannot scale nodegroup with processes: "
"%(processes)s")
raise ex.NodeGroupCannotBeScaled(
ng.name, msg % {'processes': ' '.join(ng.node_processes)})
dn_count = _get_inst_count(cluster, 'HDFS_DATANODE') - dn_to_delete
replicas = PU.get_config_value('HDFS', 'dfs_replication', cluster)
if dn_count < replicas:
raise ex.ClusterCannotBeScaled(
cluster,
_('Number of datanodes must be not less than dfs_replication.'))
def _get_scalable_processes():
return ['HDFS_DATANODE', 'YARN_NODEMANAGER']
def _get_inst_count(cluster, process):
return sum([ng.count for ng in u.get_node_groups(cluster, process)])
if iss_count != 1:
raise ex.RequiredServiceMissingException(
'IMPALA_STATESTORE', required_by='IMPALA')
if id_count < 1:
raise ex.RequiredServiceMissingException(
'IMPALAD', required_by='IMPALA')
if dn_count < 1:
raise ex.RequiredServiceMissingException(
'HDFS_DATANODE', required_by='IMPALA')
if hms_count < 1:
raise ex.RequiredServiceMissingException(
'HIVE_METASTORE', required_by='IMPALA')

View File

@ -22,11 +22,12 @@ from sahara.plugins.cdh.v5_3_0 import config_helper as c_helper
from sahara.plugins.cdh.v5_3_0 import deploy as dp
from sahara.plugins.cdh.v5_3_0 import edp_engine
from sahara.plugins.cdh.v5_3_0 import plugin_utils as pu
from sahara.plugins.cdh.v5_3_0 import validation as vl
from sahara.plugins.cdh.v5_3_0 import validation
conductor = conductor.API
CU = cu.ClouderaUtilsV530()
PU = pu.PluginUtilsV530()
vl = validation.ValidatorV530
class VersionHandler(avm.AbstractVersionHandler):

View File

@ -17,7 +17,7 @@ from sahara.i18n import _
from sahara.plugins.cdh import cloudera_utils as cu
from sahara.plugins.cdh.v5_4_0 import config_helper as c_helper
from sahara.plugins.cdh.v5_4_0 import plugin_utils as pu
from sahara.plugins.cdh.v5_4_0 import validation as v
from sahara.plugins.cdh.v5_4_0 import validation
from sahara.swift import swift_helper
from sahara.utils import cluster_progress_ops as cpo
from sahara.utils import configs as s_cfg
@ -55,6 +55,7 @@ class ClouderaUtilsV540(cu.ClouderaUtils):
def __init__(self):
cu.ClouderaUtils.__init__(self)
self.pu = pu.PluginUtilsV540()
self.validator = validation.ValidatorV540
def get_service_by_role(self, role, cluster=None, instance=None):
cm_cluster = None
@ -153,7 +154,7 @@ class ClouderaUtilsV540(cu.ClouderaUtils):
if len(self.pu.get_zookeepers(cluster)) > 0:
zookeeper = cm_cluster.get_service(self.ZOOKEEPER_SERVICE_NAME)
zookeeper.update_config(self._get_configs(ZOOKEEPER_SERVICE_TYPE,
cluster=cluster))
cluster=cluster))
hdfs = cm_cluster.get_service(self.HDFS_SERVICE_NAME)
hdfs.update_config(self._get_configs(HDFS_SERVICE_TYPE,
@ -228,15 +229,22 @@ class ClouderaUtilsV540(cu.ClouderaUtils):
all_confs = {}
if cluster:
zk_count = v._get_inst_count(cluster, 'ZOOKEEPER_SERVER')
hbm_count = v._get_inst_count(cluster, 'HBASE_MASTER')
snt_count = v._get_inst_count(cluster, 'SENTRY_SERVER')
ks_count = v._get_inst_count(cluster, 'KEY_VALUE_STORE_INDEXER')
kms_count = v._get_inst_count(cluster, 'KMS')
imp_count = v._get_inst_count(cluster, 'IMPALA_CATALOGSERVER')
hive_count = v._get_inst_count(cluster, 'HIVE_METASTORE')
slr_count = v._get_inst_count(cluster, 'SOLR_SERVER')
sqp_count = v._get_inst_count(cluster, 'SQOOP_SERVER')
zk_count = self.validator._get_inst_count(cluster,
'ZOOKEEPER_SERVER')
hbm_count = self.validator._get_inst_count(cluster, 'HBASE_MASTER')
snt_count = self.validator._get_inst_count(cluster,
'SENTRY_SERVER')
ks_count =\
self.validator._get_inst_count(cluster,
'KEY_VALUE_STORE_INDEXER')
kms_count = self.validator._get_inst_count(cluster, 'KMS')
imp_count =\
self.validator._get_inst_count(cluster,
'IMPALA_CATALOGSERVER')
hive_count = self.validator._get_inst_count(cluster,
'HIVE_METASTORE')
slr_count = self.validator._get_inst_count(cluster, 'SOLR_SERVER')
sqp_count = self.validator._get_inst_count(cluster, 'SQOOP_SERVER')
core_site_safety_valve = ''
if self.pu.c_helper.is_swift_enabled(cluster):
configs = swift_helper.get_swift_configs()

View File

@ -15,314 +15,218 @@
from sahara.i18n import _
from sahara.plugins.cdh.v5_4_0 import plugin_utils as pu
from sahara.plugins.cdh import validation
from sahara.plugins import exceptions as ex
from sahara.plugins import utils as u
from sahara.utils import general as gu
PU = pu.PluginUtilsV540()
def validate_cluster_creating(cluster):
mng_count = _get_inst_count(cluster, 'CLOUDERA_MANAGER')
if mng_count != 1:
raise ex.InvalidComponentCountException('CLOUDERA_MANAGER',
1, mng_count)
class ValidatorV540(validation.Validator):
PU = pu.PluginUtilsV540()
zk_count = _get_inst_count(cluster, 'ZOOKEEPER_SERVER')
nn_count = _get_inst_count(cluster, 'HDFS_NAMENODE')
if nn_count != 1:
raise ex.InvalidComponentCountException('HDFS_NAMENODE', 1, nn_count)
@classmethod
def validate_cluster_creating(cls, cluster):
super(ValidatorV540, cls).validate_cluster_creating(cluster)
cls._hdfs_ha_validation(cluster)
cls._yarn_ha_validation(cluster)
cls._flume_validation(cluster)
cls._sentry_validation(cluster)
cls._solr_validation(cluster)
cls._sqoop_validation(cluster)
cls._hbase_indexer_validation(cluster)
cls._impala_validation(cluster)
cls._kms_validation(cluster)
snn_count = _get_inst_count(cluster, 'HDFS_SECONDARYNAMENODE')
if snn_count != 1:
raise ex.InvalidComponentCountException('HDFS_SECONDARYNAMENODE', 1,
snn_count)
@classmethod
def _hdfs_ha_validation(cls, cluster):
jn_count = cls._get_inst_count(cluster, 'HDFS_JOURNALNODE')
zk_count = cls._get_inst_count(cluster, 'ZOOKEEPER_SERVER')
dn_count = _get_inst_count(cluster, 'HDFS_DATANODE')
replicas = PU.get_config_value('HDFS', 'dfs_replication', cluster)
if dn_count < replicas:
raise ex.InvalidComponentCountException(
'HDFS_DATANODE', replicas, dn_count,
_('Number of datanodes must be not less than dfs_replication.'))
require_anti_affinity = cls.PU.c_helper.get_required_anti_affinity(
cluster)
jn_count = _get_inst_count(cluster, 'HDFS_JOURNALNODE')
require_anti_affinity = PU.c_helper.get_required_anti_affinity(cluster)
if jn_count > 0:
if jn_count < 3:
raise ex.InvalidComponentCountException('HDFS_JOURNALNODE',
_('not less than 3'),
jn_count)
if not jn_count % 2:
raise ex.InvalidComponentCountException('HDFS_JOURNALNODE',
_('be odd'), jn_count)
if zk_count < 1:
raise ex.RequiredServiceMissingException('ZOOKEEPER',
required_by='HDFS HA')
if require_anti_affinity:
if 'HDFS_SECONDARYNAMENODE' not in _get_anti_affinity(cluster):
raise ex.NameNodeHAConfigurationError(
_('HDFS_SECONDARYNAMENODE should be enabled '
'in anti_affinity.'))
if 'HDFS_NAMENODE' not in _get_anti_affinity(cluster):
raise ex.NameNodeHAConfigurationError(
_('HDFS_NAMENODE should be enabled in anti_affinity.'))
if jn_count > 0:
if jn_count < 3:
raise ex.InvalidComponentCountException('HDFS_JOURNALNODE',
_('not less than 3'),
jn_count)
if not jn_count % 2:
raise ex.InvalidComponentCountException('HDFS_JOURNALNODE',
_('be odd'), jn_count)
if zk_count < 1:
raise ex.RequiredServiceMissingException('ZOOKEEPER',
required_by='HDFS HA')
if require_anti_affinity:
if 'HDFS_SECONDARYNAMENODE' not in\
cls._get_anti_affinity(cluster):
raise ex.NameNodeHAConfigurationError(
_('HDFS_SECONDARYNAMENODE should be enabled '
'in anti_affinity.'))
if 'HDFS_NAMENODE' not in cls._get_anti_affinity(cluster):
raise ex.NameNodeHAConfigurationError(
_('HDFS_NAMENODE should be enabled in anti_affinity.'))
rm_count = _get_inst_count(cluster, 'YARN_RESOURCEMANAGER')
if rm_count > 1:
raise ex.InvalidComponentCountException('YARN_RESOURCEMANAGER',
_('0 or 1'), rm_count)
@classmethod
def _yarn_ha_validation(cls, cluster):
rm_count = cls._get_inst_count(cluster, 'YARN_RESOURCEMANAGER')
zk_count = cls._get_inst_count(cluster, 'ZOOKEEPER_SERVER')
stdb_rm_count = cls._get_inst_count(cluster, 'YARN_STANDBYRM')
stdb_rm_count = _get_inst_count(cluster, 'YARN_STANDBYRM')
if stdb_rm_count > 1:
raise ex.InvalidComponentCountException('YARN_STANDBYRM',
_('0 or 1'), stdb_rm_count)
if stdb_rm_count > 0:
if rm_count < 1:
raise ex.RequiredServiceMissingException('YARN_RESOURCEMANAGER',
required_by='RM HA')
if zk_count < 1:
raise ex.RequiredServiceMissingException('ZOOKEEPER',
required_by='RM HA')
if require_anti_affinity:
if 'YARN_RESOURCEMANAGER' not in _get_anti_affinity(cluster):
raise ex.ResourceManagerHAConfigurationError(
_('YARN_RESOURCEMANAGER should be enabled in '
'anti_affinity.'))
if 'YARN_STANDBYRM' not in _get_anti_affinity(cluster):
raise ex.ResourceManagerHAConfigurationError(
_('YARN_STANDBYRM should be enabled in anti_affinity.'))
require_anti_affinity = cls.PU.c_helper.get_required_anti_affinity(
cluster)
hs_count = _get_inst_count(cluster, 'YARN_JOBHISTORY')
if hs_count > 1:
raise ex.InvalidComponentCountException('YARN_JOBHISTORY',
_('0 or 1'), hs_count)
if rm_count > 0 and hs_count < 1:
raise ex.RequiredServiceMissingException(
'YARN_JOBHISTORY', required_by='YARN_RESOURCEMANAGER')
nm_count = _get_inst_count(cluster, 'YARN_NODEMANAGER')
if rm_count == 0:
if nm_count > 0:
raise ex.RequiredServiceMissingException(
'YARN_RESOURCEMANAGER', required_by='YARN_NODEMANAGER')
oo_count = _get_inst_count(cluster, 'OOZIE_SERVER')
if oo_count > 1:
raise ex.InvalidComponentCountException('OOZIE_SERVER', _('0 or 1'),
oo_count)
if oo_count == 1:
if dn_count < 1:
raise ex.RequiredServiceMissingException(
'HDFS_DATANODE', required_by='OOZIE_SERVER')
if nm_count < 1:
raise ex.RequiredServiceMissingException(
'YARN_NODEMANAGER', required_by='OOZIE_SERVER')
if hs_count != 1:
raise ex.RequiredServiceMissingException(
'YARN_JOBHISTORY', required_by='OOZIE_SERVER')
hms_count = _get_inst_count(cluster, 'HIVE_METASTORE')
hvs_count = _get_inst_count(cluster, 'HIVE_SERVER2')
whc_count = _get_inst_count(cluster, 'HIVE_WEBHCAT')
if hms_count and rm_count < 1:
raise ex.RequiredServiceMissingException(
'YARN_RESOURCEMANAGER', required_by='HIVE_METASTORE')
if hms_count and not hvs_count:
raise ex.RequiredServiceMissingException(
'HIVE_SERVER2', required_by='HIVE_METASTORE')
if hvs_count and not hms_count:
raise ex.RequiredServiceMissingException(
'HIVE_METASTORE', required_by='HIVE_SERVER2')
if whc_count and not hms_count:
raise ex.RequiredServiceMissingException(
'HIVE_METASTORE', required_by='HIVE_WEBHCAT')
hue_count = _get_inst_count(cluster, 'HUE_SERVER')
if hue_count > 1:
raise ex.InvalidComponentCountException('HUE_SERVER', _('0 or 1'),
hue_count)
shs_count = _get_inst_count(cluster, 'SPARK_YARN_HISTORY_SERVER')
if shs_count > 1:
raise ex.InvalidComponentCountException('SPARK_YARN_HISTORY_SERVER',
_('0 or 1'), shs_count)
if shs_count and not rm_count:
raise ex.RequiredServiceMissingException(
'YARN_RESOURCEMANAGER', required_by='SPARK_YARN_HISTORY_SERVER')
if oo_count < 1 and hue_count:
raise ex.RequiredServiceMissingException(
'OOZIE_SERVER', required_by='HUE_SERVER')
if hms_count < 1 and hue_count:
raise ex.RequiredServiceMissingException(
'HIVE_METASTORE', required_by='HUE_SERVER')
hbm_count = _get_inst_count(cluster, 'HBASE_MASTER')
hbr_count = _get_inst_count(cluster, 'HBASE_REGIONSERVER')
if hbm_count >= 1:
if zk_count < 1:
raise ex.RequiredServiceMissingException('ZOOKEEPER',
required_by='HBASE')
if hbr_count < 1:
if stdb_rm_count > 1:
raise ex.InvalidComponentCountException(
'HBASE_REGIONSERVER', _('at least 1'), hbr_count)
elif hbr_count >= 1:
raise ex.InvalidComponentCountException('HBASE_MASTER',
_('at least 1'), hbm_count)
'YARN_STANDBYRM', _('0 or 1'), stdb_rm_count)
if stdb_rm_count > 0:
if rm_count < 1:
raise ex.RequiredServiceMissingException(
'YARN_RESOURCEMANAGER', required_by='RM HA')
if zk_count < 1:
raise ex.RequiredServiceMissingException(
'ZOOKEEPER', required_by='RM HA')
if require_anti_affinity:
if 'YARN_RESOURCEMANAGER' not in\
cls._get_anti_affinity(cluster):
raise ex.ResourceManagerHAConfigurationError(
_('YARN_RESOURCEMANAGER should be enabled in '
'anti_affinity.'))
if 'YARN_STANDBYRM' not in cls._get_anti_affinity(cluster):
raise ex.ResourceManagerHAConfigurationError(
_('YARN_STANDBYRM should be'
' enabled in anti_affinity.'))
a_count = _get_inst_count(cluster, 'FLUME_AGENT')
if a_count >= 1:
if dn_count < 1:
raise ex.RequiredServiceMissingException(
'HDFS_DATANODE', required_by='FLUME_AGENT')
@classmethod
def _flume_validation(cls, cluster):
a_count = cls._get_inst_count(cluster, 'FLUME_AGENT')
dn_count = cls._get_inst_count(cluster, 'HDFS_DATANODE')
snt_count = _get_inst_count(cluster, 'SENTRY_SERVER')
if snt_count > 1:
raise ex.InvalidComponentCountException('SENTRY_SERVER', _('0 or 1'),
snt_count)
if snt_count == 1:
if dn_count < 1:
raise ex.RequiredServiceMissingException(
'HDFS_DATANODE', required_by='SENTRY_SERVER')
if zk_count < 1:
raise ex.RequiredServiceMissingException(
'ZOOKEEPER', required_by='SENTRY_SERVER')
if a_count >= 1:
if dn_count < 1:
raise ex.RequiredServiceMissingException(
'HDFS_DATANODE', required_by='FLUME_AGENT')
slr_count = _get_inst_count(cluster, 'SOLR_SERVER')
if slr_count >= 1:
if dn_count < 1:
raise ex.RequiredServiceMissingException(
'HDFS_DATANODE', required_by='SOLR_SERVER')
if zk_count < 1:
raise ex.RequiredServiceMissingException(
'ZOOKEEPER', required_by='SOLR_SERVER')
@classmethod
def _sentry_validation(cls, cluster):
s2s_count = _get_inst_count(cluster, 'SQOOP_SERVER')
if s2s_count > 1:
raise ex.InvalidComponentCountException('SQOOP_SERVER', _('0 or 1'),
s2s_count)
if s2s_count == 1:
if dn_count < 1:
raise ex.RequiredServiceMissingException(
'HDFS_DATANODE', required_by='SQOOP_SERVER')
if nm_count < 1:
raise ex.RequiredServiceMissingException(
'YARN_NODEMANAGER', required_by='SQOOP_SERVER')
if hs_count != 1:
raise ex.RequiredServiceMissingException(
'YARN_JOBHISTORY', required_by='SQOOP_SERVER')
snt_count = cls._get_inst_count(cluster, 'SENTRY_SERVER')
dn_count = cls._get_inst_count(cluster, 'HDFS_DATANODE')
zk_count = cls._get_inst_count(cluster, 'ZOOKEEPER_SERVER')
lhbi_count = _get_inst_count(cluster, 'HBASE_INDEXER')
if lhbi_count >= 1:
if dn_count < 1:
raise ex.RequiredServiceMissingException(
'HDFS_DATANODE', required_by='HBASE_INDEXER')
if zk_count < 1:
raise ex.RequiredServiceMissingException(
'ZOOKEEPER', required_by='HBASE_INDEXER')
if slr_count < 1:
raise ex.RequiredServiceMissingException(
'SOLR_SERVER', required_by='HBASE_INDEXER')
if hbm_count < 1:
raise ex.RequiredServiceMissingException(
'HBASE_MASTER', required_by='HBASE_INDEXER')
if snt_count > 1:
raise ex.InvalidComponentCountException(
'SENTRY_SERVER', _('0 or 1'), snt_count)
if snt_count == 1:
if dn_count < 1:
raise ex.RequiredServiceMissingException(
'HDFS_DATANODE', required_by='SENTRY_SERVER')
if zk_count < 1:
raise ex.RequiredServiceMissingException(
'ZOOKEEPER', required_by='SENTRY_SERVER')
ics_count = _get_inst_count(cluster, 'IMPALA_CATALOGSERVER')
iss_count = _get_inst_count(cluster, 'IMPALA_STATESTORE')
id_count = _get_inst_count(cluster, 'IMPALAD')
if ics_count > 1:
raise ex.InvalidComponentCountException('IMPALA_CATALOGSERVER',
_('0 or 1'), ics_count)
if iss_count > 1:
raise ex.InvalidComponentCountException('IMPALA_STATESTORE',
_('0 or 1'), iss_count)
if ics_count == 1:
datanode_ng = u.get_node_groups(cluster, "HDFS_DATANODE")
impalad_ng = u.get_node_groups(cluster, "IMPALAD")
datanodes = set(ng.id for ng in datanode_ng)
impalads = set(ng.id for ng in impalad_ng)
@classmethod
def _solr_validation(cls, cluster):
slr_count = cls._get_inst_count(cluster, 'SOLR_SERVER')
zk_count = cls._get_inst_count(cluster, 'ZOOKEEPER_SERVER')
dn_count = cls._get_inst_count(cluster, 'HDFS_DATANODE')
if datanodes != impalads:
raise ex.InvalidClusterTopology(
_("IMPALAD must be installed on every HDFS_DATANODE"))
if slr_count >= 1:
if dn_count < 1:
raise ex.RequiredServiceMissingException(
'HDFS_DATANODE', required_by='SOLR_SERVER')
if zk_count < 1:
raise ex.RequiredServiceMissingException(
'ZOOKEEPER', required_by='SOLR_SERVER')
if iss_count != 1:
raise ex.RequiredServiceMissingException(
'IMPALA_STATESTORE', required_by='IMPALA')
if id_count < 1:
raise ex.RequiredServiceMissingException(
'IMPALAD', required_by='IMPALA')
if dn_count < 1:
raise ex.RequiredServiceMissingException(
'HDFS_DATANODE', required_by='IMPALA')
if hms_count < 1:
raise ex.RequiredServiceMissingException(
'HIVE_METASTORE', required_by='IMPALA')
@classmethod
def _sqoop_validation(cls, cluster):
kms_count = _get_inst_count(cluster, 'KMS')
if kms_count > 1:
raise ex.InvalidComponentCountException('KMS',
_('0 or 1'), kms_count)
s2s_count = cls._get_inst_count(cluster, 'SQOOP_SERVER')
dn_count = cls._get_inst_count(cluster, 'HDFS_DATANODE')
hs_count = cls._get_inst_count(cluster, 'YARN_JOBHISTORY')
nm_count = cls._get_inst_count(cluster, 'YARN_NODEMANAGER')
if s2s_count > 1:
raise ex.InvalidComponentCountException(
'SQOOP_SERVER', _('0 or 1'), s2s_count)
if s2s_count == 1:
if dn_count < 1:
raise ex.RequiredServiceMissingException(
'HDFS_DATANODE', required_by='SQOOP_SERVER')
if nm_count < 1:
raise ex.RequiredServiceMissingException(
'YARN_NODEMANAGER', required_by='SQOOP_SERVER')
if hs_count != 1:
raise ex.RequiredServiceMissingException(
'YARN_JOBHISTORY', required_by='SQOOP_SERVER')
def validate_additional_ng_scaling(cluster, additional):
rm = PU.get_resourcemanager(cluster)
scalable_processes = _get_scalable_processes()
@classmethod
def _hbase_indexer_validation(cls, cluster):
for ng_id in additional:
ng = gu.get_by_id(cluster.node_groups, ng_id)
if not set(ng.node_processes).issubset(scalable_processes):
msg = _("CDH plugin cannot scale nodegroup with processes: "
"%(processes)s")
raise ex.NodeGroupCannotBeScaled(
ng.name, msg % {'processes': ' '.join(ng.node_processes)})
lhbi_count = cls._get_inst_count(cluster, 'HBASE_INDEXER')
zk_count = cls._get_inst_count(cluster, 'ZOOKEEPER_SERVER')
dn_count = cls._get_inst_count(cluster, 'HDFS_DATANODE')
slr_count = cls._get_inst_count(cluster, 'SOLR_SERVER')
hbm_count = cls._get_inst_count(cluster, 'HBASE_MASTER')
if not rm and 'YARN_NODEMANAGER' in ng.node_processes:
msg = _("CDH plugin cannot scale node group with processes "
"which have no master-processes run in cluster")
raise ex.NodeGroupCannotBeScaled(ng.name, msg)
if lhbi_count >= 1:
if dn_count < 1:
raise ex.RequiredServiceMissingException(
'HDFS_DATANODE', required_by='HBASE_INDEXER')
if zk_count < 1:
raise ex.RequiredServiceMissingException(
'ZOOKEEPER', required_by='HBASE_INDEXER')
if slr_count < 1:
raise ex.RequiredServiceMissingException(
'SOLR_SERVER', required_by='HBASE_INDEXER')
if hbm_count < 1:
raise ex.RequiredServiceMissingException(
'HBASE_MASTER', required_by='HBASE_INDEXER')
@classmethod
def _impala_validation(cls, cluster):
ics_count = cls._get_inst_count(cluster, 'IMPALA_CATALOGSERVER')
iss_count = cls._get_inst_count(cluster, 'IMPALA_STATESTORE')
id_count = cls._get_inst_count(cluster, 'IMPALAD')
dn_count = cls._get_inst_count(cluster, 'HDFS_DATANODE')
hms_count = cls._get_inst_count(cluster, 'HIVE_METASTORE')
def validate_existing_ng_scaling(cluster, existing):
scalable_processes = _get_scalable_processes()
dn_to_delete = 0
for ng in cluster.node_groups:
if ng.id in existing:
if (ng.count > existing[ng.id] and
'HDFS_DATANODE' in ng.node_processes):
dn_to_delete += ng.count - existing[ng.id]
if ics_count > 1:
raise ex.InvalidComponentCountException('IMPALA_CATALOGSERVER',
_('0 or 1'), ics_count)
if iss_count > 1:
raise ex.InvalidComponentCountException('IMPALA_STATESTORE',
_('0 or 1'), iss_count)
if ics_count == 1:
datanode_ng = u.get_node_groups(cluster, "HDFS_DATANODE")
impalad_ng = u.get_node_groups(cluster, "IMPALAD")
datanodes = set(ng.id for ng in datanode_ng)
impalads = set(ng.id for ng in impalad_ng)
if not set(ng.node_processes).issubset(scalable_processes):
msg = _("CDH plugin cannot scale nodegroup with processes: "
"%(processes)s")
raise ex.NodeGroupCannotBeScaled(
ng.name, msg % {'processes': ' '.join(ng.node_processes)})
if datanodes != impalads:
raise ex.InvalidClusterTopology(
_("IMPALAD must be installed on every HDFS_DATANODE"))
dn_count = _get_inst_count(cluster, 'HDFS_DATANODE') - dn_to_delete
replicas = PU.get_config_value('HDFS', 'dfs_replication', cluster)
if dn_count < replicas:
raise ex.ClusterCannotBeScaled(
cluster,
_('Number of datanodes must be not less than dfs_replication.'))
if iss_count != 1:
raise ex.RequiredServiceMissingException(
'IMPALA_STATESTORE', required_by='IMPALA')
if id_count < 1:
raise ex.RequiredServiceMissingException(
'IMPALAD', required_by='IMPALA')
if dn_count < 1:
raise ex.RequiredServiceMissingException(
'HDFS_DATANODE', required_by='IMPALA')
if hms_count < 1:
raise ex.RequiredServiceMissingException(
'HIVE_METASTORE', required_by='IMPALA')
@classmethod
def _kms_validation(cls, cluster):
def _get_scalable_processes():
return ['HDFS_DATANODE', 'YARN_NODEMANAGER']
kms_count = cls._get_inst_count(cluster, 'KMS')
if kms_count > 1:
raise ex.InvalidComponentCountException('KMS',
_('0 or 1'), kms_count)
def _get_inst_count(cluster, process):
return sum([ng.count for ng in u.get_node_groups(cluster, process)])
def _get_anti_affinity(cluster):
return cluster.anti_affinity
@classmethod
def _get_anti_affinity(cls, cluster):
return cluster.anti_affinity

View File

@ -22,12 +22,13 @@ from sahara.plugins.cdh.v5_4_0 import config_helper as c_helper
from sahara.plugins.cdh.v5_4_0 import deploy as dp
from sahara.plugins.cdh.v5_4_0 import edp_engine
from sahara.plugins.cdh.v5_4_0 import plugin_utils as pu
from sahara.plugins.cdh.v5_4_0 import validation as vl
from sahara.plugins.cdh.v5_4_0 import validation
conductor = conductor.API
CU = cu.ClouderaUtilsV540()
PU = pu.PluginUtilsV540()
vl = validation.ValidatorV540
class VersionHandler(avm.AbstractVersionHandler):

View File

@ -0,0 +1,221 @@
# Copyright (c) 2014 Mirantis Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from sahara.i18n import _
from sahara.plugins import exceptions as ex
from sahara.plugins import utils as u
from sahara.utils import general as gu
class Validator(object):
PU = None
@classmethod
def validate_cluster_creating(cls, cluster):
cls._basic_validation(cluster)
cls._oozie_validation(cluster)
cls._hive_validation(cluster)
cls._hue_validation(cluster)
cls._hbase_validation(cluster)
@classmethod
def _basic_validation(cls, cluster):
mng_count = cls._get_inst_count(cluster, 'CLOUDERA_MANAGER')
if mng_count != 1:
raise ex.InvalidComponentCountException('CLOUDERA_MANAGER',
1, mng_count)
nn_count = cls._get_inst_count(cluster, 'HDFS_NAMENODE')
if nn_count != 1:
raise ex.InvalidComponentCountException(
'HDFS_NAMENODE', 1, nn_count)
snn_count = cls._get_inst_count(cluster, 'HDFS_SECONDARYNAMENODE')
if snn_count != 1:
raise ex.InvalidComponentCountException('HDFS_SECONDARYNAMENODE',
1, snn_count)
dn_count = cls._get_inst_count(cluster, 'HDFS_DATANODE')
replicas = cls.PU.get_config_value('HDFS', 'dfs_replication', cluster)
if dn_count < replicas:
raise ex.InvalidComponentCountException(
'HDFS_DATANODE', replicas, dn_count,
_('Number of datanodes must be not'
' less than dfs_replication.'))
rm_count = cls._get_inst_count(cluster, 'YARN_RESOURCEMANAGER')
if rm_count > 1:
raise ex.InvalidComponentCountException('YARN_RESOURCEMANAGER',
_('0 or 1'), rm_count)
hs_count = cls._get_inst_count(cluster, 'YARN_JOBHISTORY')
if hs_count > 1:
raise ex.InvalidComponentCountException('YARN_JOBHISTORY',
_('0 or 1'),
hs_count)
if rm_count > 0 and hs_count < 1:
raise ex.RequiredServiceMissingException(
'YARN_JOBHISTORY', required_by='YARN_RESOURCEMANAGER')
nm_count = cls._get_inst_count(cluster, 'YARN_NODEMANAGER')
if rm_count == 0:
if nm_count > 0:
raise ex.RequiredServiceMissingException(
'YARN_RESOURCEMANAGER', required_by='YARN_NODEMANAGER')
@classmethod
def _oozie_validation(cls, cluster):
oo_count = cls._get_inst_count(cluster, 'OOZIE_SERVER')
dn_count = cls._get_inst_count(cluster, 'HDFS_DATANODE')
nm_count = cls._get_inst_count(cluster, 'YARN_NODEMANAGER')
hs_count = cls._get_inst_count(cluster, 'YARN_JOBHISTORY')
if oo_count > 1:
raise ex.InvalidComponentCountException(
'OOZIE_SERVER', _('0 or 1'), oo_count)
if oo_count == 1:
if dn_count < 1:
raise ex.RequiredServiceMissingException(
'HDFS_DATANODE', required_by='OOZIE_SERVER')
if nm_count < 1:
raise ex.RequiredServiceMissingException(
'YARN_NODEMANAGER', required_by='OOZIE_SERVER')
if hs_count != 1:
raise ex.RequiredServiceMissingException(
'YARN_JOBHISTORY', required_by='OOZIE_SERVER')
@classmethod
def _hive_validation(cls, cluster):
hms_count = cls._get_inst_count(cluster, 'HIVE_METASTORE')
hvs_count = cls._get_inst_count(cluster, 'HIVE_SERVER2')
whc_count = cls._get_inst_count(cluster, 'HIVE_WEBHCAT')
rm_count = cls._get_inst_count(cluster, 'YARN_RESOURCEMANAGER')
if hms_count and rm_count < 1:
raise ex.RequiredServiceMissingException(
'YARN_RESOURCEMANAGER', required_by='HIVE_METASTORE')
if hms_count and not hvs_count:
raise ex.RequiredServiceMissingException(
'HIVE_SERVER2', required_by='HIVE_METASTORE')
if hvs_count and not hms_count:
raise ex.RequiredServiceMissingException(
'HIVE_METASTORE', required_by='HIVE_SERVER2')
if whc_count and not hms_count:
raise ex.RequiredServiceMissingException(
'HIVE_METASTORE', required_by='WEBHCAT')
@classmethod
def _hue_validation(cls, cluster):
hue_count = cls._get_inst_count(cluster, 'HUE_SERVER')
if hue_count > 1:
raise ex.InvalidComponentCountException(
'HUE_SERVER', _('0 or 1'), hue_count)
shs_count = cls._get_inst_count(cluster, 'SPARK_YARN_HISTORY_SERVER')
hms_count = cls._get_inst_count(cluster, 'HIVE_METASTORE')
oo_count = cls._get_inst_count(cluster, 'OOZIE_SERVER')
rm_count = cls._get_inst_count(cluster, 'YARN_RESOURCEMANAGER')
if shs_count > 1:
raise ex.InvalidComponentCountException(
'SPARK_YARN_HISTORY_SERVER',
_('0 or 1'), shs_count)
if shs_count and not rm_count:
raise ex.RequiredServiceMissingException(
'YARN_RESOURCEMANAGER',
required_by='SPARK_YARN_HISTORY_SERVER')
if oo_count < 1 and hue_count:
raise ex.RequiredServiceMissingException(
'OOZIE_SERVER', required_by='HUE_SERVER')
if hms_count < 1 and hue_count:
raise ex.RequiredServiceMissingException(
'HIVE_METASTORE', required_by='HUE_SERVER')
@classmethod
def _hbase_validation(cls, cluster):
hbm_count = cls._get_inst_count(cluster, 'HBASE_MASTER')
hbr_count = cls._get_inst_count(cluster, 'HBASE_REGIONSERVER')
zk_count = cls._get_inst_count(cluster, 'ZOOKEEPER_SERVER')
if hbm_count >= 1:
if zk_count < 1:
raise ex.RequiredServiceMissingException(
'ZOOKEEPER', required_by='HBASE')
if hbr_count < 1:
raise ex.InvalidComponentCountException(
'HBASE_REGIONSERVER', _('at least 1'), hbr_count)
elif hbr_count >= 1:
raise ex.InvalidComponentCountException('HBASE_MASTER',
_('at least 1'), hbm_count)
@classmethod
def validate_additional_ng_scaling(cls, cluster, additional):
rm = cls.PU.get_resourcemanager(cluster)
scalable_processes = cls._get_scalable_processes()
for ng_id in additional:
ng = gu.get_by_id(cluster.node_groups, ng_id)
if not set(ng.node_processes).issubset(scalable_processes):
msg = _("CDH plugin cannot scale nodegroup with processes: "
"%(processes)s")
raise ex.NodeGroupCannotBeScaled(
ng.name, msg % {'processes': ' '.join(ng.node_processes)})
if not rm and 'YARN_NODEMANAGER' in ng.node_processes:
msg = _("CDH plugin cannot scale node group with processes "
"which have no master-processes run in cluster")
raise ex.NodeGroupCannotBeScaled(ng.name, msg)
@classmethod
def validate_existing_ng_scaling(cls, cluster, existing):
scalable_processes = cls._get_scalable_processes()
dn_to_delete = 0
for ng in cluster.node_groups:
if ng.id in existing:
if (ng.count > existing[ng.id] and
"HDFS_DATANODE" in ng.node_processes):
dn_to_delete += ng.count - existing[ng.id]
if not set(ng.node_processes).issubset(scalable_processes):
msg = _("CDH plugin cannot scale nodegroup"
" with processes: %(processes)s")
raise ex.NodeGroupCannotBeScaled(
ng.name,
msg % {'processes': ' '.join(ng.node_processes)})
dn_count = cls._get_inst_count(cluster, 'HDFS_DATANODE') - dn_to_delete
replicas = cls.PU.get_config_value('HDFS', 'dfs_replication', cluster)
if dn_count < replicas:
raise ex.ClusterCannotBeScaled(
cluster, _('Number of datanodes must be not'
' less than dfs_replication.'))
@classmethod
def _get_scalable_processes(cls):
return ['HDFS_DATANODE', 'YARN_NODEMANAGER']
@classmethod
def _get_inst_count(cls, cluster, process):
return sum([ng.count for ng in u.get_node_groups(cluster, process)])

View File

@ -20,4 +20,4 @@ from sahara.tests.unit.plugins.cdh import base_validation_tests as bvt
class ValidationTestCase(bvt.BaseValidationTestCase):
def setUp(self):
super(ValidationTestCase, self).setUp()
self.module = validation
self.module = validation.ValidatorV5

View File

@ -26,7 +26,7 @@ class ValidationTestCase(bvt.BaseValidationTestCase):
def setUp(self):
super(ValidationTestCase, self).setUp()
self.module = validation
self.module = validation.ValidatorV530
def _get_test_cases(self):
cases = super(ValidationTestCase, self)._get_test_cases()

View File

@ -28,7 +28,7 @@ class ValidationTestCase(bvt.BaseValidationTestCase):
def setUp(self):
super(ValidationTestCase, self).setUp()
self.module = validation
self.module = validation.ValidatorV540
def _get_test_cases(self):
cases = super(ValidationTestCase, self)._get_test_cases()