[CDH] Provide ability to configure gateway configs

Current implementation of CDH plugin doesn't allow to
configure configs that doesn't correlate with name of
some process. This patch introduce that ability and
implements support of configuring gateway configs for
hdfs and yarn services.

Change-Id: I0d9b16cbb46ea4d5ebea8e0cc05cca621ca9c588
Closes-bug: 1460645
This commit is contained in:
Vitaly Gridnev 2015-06-15 17:28:11 +03:00
parent 00c4e3eda3
commit aab0cce7f5
13 changed files with 119 additions and 43 deletions

View File

@ -16,6 +16,7 @@
import functools
from oslo_log import log as logging
import six
from sahara import context
from sahara.i18n import _
@ -188,7 +189,7 @@ class ClouderaUtils(object):
cm.create_mgmt_service(setup_info)
cm.hosts_start_roles([hostname])
def get_service_by_role(self, process, cluster=None, instance=None):
def get_service_by_role(self, role, cluster=None, instance=None):
cm_cluster = None
if cluster:
cm_cluster = self.get_cloudera_cluster(cluster)
@ -197,26 +198,28 @@ class ClouderaUtils(object):
else:
raise ValueError(_("'cluster' or 'instance' argument missed"))
if process in ['NAMENODE', 'DATANODE', 'SECONDARYNAMENODE']:
if role in ['NAMENODE', 'DATANODE', 'SECONDARYNAMENODE',
'HDFS_GATEWAY']:
return cm_cluster.get_service(self.HDFS_SERVICE_NAME)
elif process in ['RESOURCEMANAGER', 'NODEMANAGER', 'JOBHISTORY']:
elif role in ['RESOURCEMANAGER', 'NODEMANAGER', 'JOBHISTORY',
'YARN_GATEWAY']:
return cm_cluster.get_service(self.YARN_SERVICE_NAME)
elif process in ['OOZIE_SERVER']:
elif role in ['OOZIE_SERVER']:
return cm_cluster.get_service(self.OOZIE_SERVICE_NAME)
elif process in ['HIVESERVER2', 'HIVEMETASTORE', 'WEBHCAT']:
elif role in ['HIVESERVER2', 'HIVEMETASTORE', 'WEBHCAT']:
return cm_cluster.get_service(self.HIVE_SERVICE_NAME)
elif process in ['HUE_SERVER']:
elif role in ['HUE_SERVER']:
return cm_cluster.get_service(self.HUE_SERVICE_NAME)
elif process in ['SPARK_YARN_HISTORY_SERVER']:
elif role in ['SPARK_YARN_HISTORY_SERVER']:
return cm_cluster.get_service(self.SPARK_SERVICE_NAME)
elif process in ['SERVER']:
elif role in ['SERVER']:
return cm_cluster.get_service(self.ZOOKEEPER_SERVICE_NAME)
elif process in ['MASTER', 'REGIONSERVER']:
elif role in ['MASTER', 'REGIONSERVER']:
return cm_cluster.get_service(self.HBASE_SERVICE_NAME)
else:
raise ValueError(
_("Process %(process)s is not supported by CDH plugin") %
{'process': process})
{'process': role})
def _agents_connected(self, instances, api):
hostnames = [i.fqdn() for i in instances]
@ -242,10 +245,33 @@ class ClouderaUtils(object):
for inst in instances:
self.configure_instance(inst, cluster)
def get_roles_list(self, node_processes):
current = set(node_processes)
extra_roles = {
'YARN_GATEWAY': ["YARN_NODEMANAGER"],
'HDFS_GATEWAY': ['HDFS_NAMENODE', 'HDFS_DATANODE',
"HDFS_SECONDARYNAMENODE"]
}
for extra_role in six.iterkeys(extra_roles):
valid_processes = extra_roles[extra_role]
for valid in valid_processes:
if valid in current:
current.add(extra_role)
break
return list(current)
def get_role_type(self, process):
mapper = {
'YARN_GATEWAY': 'GATEWAY',
'HDFS_GATEWAY': 'GATEWAY',
}
return mapper.get(process, process)
@cpo.event_wrapper(True)
def configure_instance(self, instance, cluster=None):
for process in instance.node_group.node_processes:
self._add_role(instance, process, cluster)
roles_list = self.get_roles_list(instance.node_group.node_processes)
for role in roles_list:
self._add_role(instance, role, cluster)
def _add_role(self, instance, process, cluster):
if process in ['CLOUDERA_MANAGER']:
@ -253,8 +279,9 @@ class ClouderaUtils(object):
process = self.pu.convert_role_showname(process)
service = self.get_service_by_role(process, instance=instance)
role_type = self.get_role_type(process)
role = service.create_role(self.pu.get_role_name(instance, process),
process, instance.fqdn())
role_type, instance.fqdn())
role.update_config(self._get_configs(process, cluster,
node_group=instance.node_group))

View File

@ -129,7 +129,9 @@ class AbstractPluginUtils(object):
"SPARK_ON_YARN": ['SPARK_YARN_HISTORY_SERVER'],
"ZOOKEEPER": ['SERVER'],
"MASTER": ['MASTER'],
"REGIONSERVER": ['REGIONSERVER']
"REGIONSERVER": ['REGIONSERVER'],
'YARN_GATEWAY': ['YARN_GATEWAY'],
'HDFS_GATEWAY': ['HDFS_GATEWAY']
}
if isinstance(configs, res.Resource):
configs = configs.to_dict()

View File

@ -123,12 +123,13 @@ hdfs_confs = _load_json(path_to_config + 'hdfs-service.json')
namenode_confs = _load_json(path_to_config + 'hdfs-namenode.json')
datanode_confs = _load_json(path_to_config + 'hdfs-datanode.json')
secnamenode_confs = _load_json(path_to_config + 'hdfs-secondarynamenode.json')
hdfs_gateway_confs = _load_json(path_to_config + 'hdfs-gateway.json')
yarn_confs = _load_json(path_to_config + 'yarn-service.json')
resourcemanager_confs = _load_json(
path_to_config + 'yarn-resourcemanager.json')
mapred_confs = _load_json(path_to_config + 'yarn-gateway.json')
nodemanager_confs = _load_json(path_to_config + 'yarn-nodemanager.json')
jobhistory_confs = _load_json(path_to_config + 'yarn-jobhistory.json')
yarn_gateway = _load_json(path_to_config + 'yarn-gateway.json')
oozie_service_confs = _load_json(path_to_config + 'oozie-service.json')
oozie_role_confs = _load_json(path_to_config + 'oozie-oozie.json')
hive_service_confs = _load_json(path_to_config + 'hive-service.json')
@ -173,8 +174,9 @@ def _get_ng_plugin_configs():
cfg += _init_configs(namenode_confs, 'NAMENODE', 'node')
cfg += _init_configs(datanode_confs, 'DATANODE', 'node')
cfg += _init_configs(secnamenode_confs, 'SECONDARYNAMENODE', 'node')
cfg += _init_configs(hdfs_gateway_confs, 'HDFS_GATEWAY', 'node')
cfg += _init_configs(yarn_confs, 'YARN', 'cluster')
cfg += _init_configs(mapred_confs, 'MAPREDUCE', 'node')
cfg += _init_configs(yarn_gateway, 'YARN_GATEWAY', 'node')
cfg += _init_configs(resourcemanager_confs, 'RESOURCEMANAGER', 'node')
cfg += _init_configs(nodemanager_confs, 'NODEMANAGER', 'node')
cfg += _init_configs(jobhistory_confs, 'JOBHISTORY', 'node')

View File

@ -55,6 +55,8 @@ class VersionHandler(avm.AbstractVersionHandler):
"HBASE": [],
"MASTER": ['HBASE_MASTER'],
"REGIONSERVER": ['HBASE_REGIONSERVER'],
"YARN_GATEWAY": [],
"HDFS_GATEWAY": []
}
def validate(self, cluster):

View File

@ -69,7 +69,7 @@ class ClouderaUtilsV530(cu.ClouderaUtils):
cu.ClouderaUtils.__init__(self)
self.pu = pu.PluginUtilsV530()
def get_service_by_role(self, process, cluster=None, instance=None):
def get_service_by_role(self, role, cluster=None, instance=None):
cm_cluster = None
if cluster:
cm_cluster = self.get_cloudera_cluster(cluster)
@ -78,21 +78,21 @@ class ClouderaUtilsV530(cu.ClouderaUtils):
else:
raise ValueError(_("'cluster' or 'instance' argument missed"))
if process in ['AGENT']:
if role in ['AGENT']:
return cm_cluster.get_service(self.FLUME_SERVICE_NAME)
elif process in ['SENTRY_SERVER']:
elif role in ['SENTRY_SERVER']:
return cm_cluster.get_service(self.SENTRY_SERVICE_NAME)
elif process in ['SQOOP_SERVER']:
elif role in ['SQOOP_SERVER']:
return cm_cluster.get_service(self.SQOOP_SERVICE_NAME)
elif process in ['SOLR_SERVER']:
elif role in ['SOLR_SERVER']:
return cm_cluster.get_service(self.SOLR_SERVICE_NAME)
elif process in ['HBASE_INDEXER']:
elif role in ['HBASE_INDEXER']:
return cm_cluster.get_service(self.KS_INDEXER_SERVICE_NAME)
elif process in ['CATALOGSERVER', 'STATESTORE', 'IMPALAD', 'LLAMA']:
elif role in ['CATALOGSERVER', 'STATESTORE', 'IMPALAD', 'LLAMA']:
return cm_cluster.get_service(self.IMPALA_SERVICE_NAME)
else:
return super(ClouderaUtilsV530, self).get_service_by_role(
process, cluster, instance)
role, cluster, instance)
@cpo.event_wrapper(
True, step=_("First run cluster"), param=('cluster', 1))

View File

@ -190,12 +190,13 @@ hdfs_confs = _load_json(path_to_config + 'hdfs-service.json')
namenode_confs = _load_json(path_to_config + 'hdfs-namenode.json')
datanode_confs = _load_json(path_to_config + 'hdfs-datanode.json')
secnamenode_confs = _load_json(path_to_config + 'hdfs-secondarynamenode.json')
hdfs_gateway_confs = _load_json(path_to_config + "hdfs-gateway.json")
yarn_confs = _load_json(path_to_config + 'yarn-service.json')
resourcemanager_confs = _load_json(
path_to_config + 'yarn-resourcemanager.json')
mapred_confs = _load_json(path_to_config + 'yarn-gateway.json')
nodemanager_confs = _load_json(path_to_config + 'yarn-nodemanager.json')
jobhistory_confs = _load_json(path_to_config + 'yarn-jobhistory.json')
yarn_gateway = _load_json(path_to_config + "yarn-gateway.json")
oozie_service_confs = _load_json(path_to_config + 'oozie-service.json')
oozie_role_confs = _load_json(path_to_config + 'oozie-oozie_server.json')
hive_service_confs = _load_json(path_to_config + 'hive-service.json')
@ -263,10 +264,11 @@ def _get_ng_plugin_configs():
cfg += _init_configs(hdfs_confs, 'HDFS', 'cluster')
cfg += _init_configs(namenode_confs, 'NAMENODE', 'node')
cfg += _init_configs(datanode_confs, 'DATANODE', 'node')
cfg += _init_configs(hdfs_gateway_confs, 'HDFS_GATEWAY', 'node')
cfg += _init_configs(secnamenode_confs, 'SECONDARYNAMENODE', 'node')
cfg += _init_configs(yarn_confs, 'YARN', 'cluster')
cfg += _init_configs(resourcemanager_confs, 'RESOURCEMANAGER', 'node')
cfg += _init_configs(mapred_confs, 'MAPREDUCE', 'node')
cfg += _init_configs(yarn_gateway, 'YARN_GATEWAY', 'node')
cfg += _init_configs(nodemanager_confs, 'NODEMANAGER', 'node')
cfg += _init_configs(jobhistory_confs, 'JOBHISTORY', 'node')
cfg += _init_configs(oozie_service_confs, 'OOZIE', 'cluster')

View File

@ -53,7 +53,9 @@ class PluginUtilsV530(pu.AbstractPluginUtils):
'SPARK_YARN_HISTORY_SERVER': 'SHS',
'SQOOP_SERVER': 'S2S',
'STATESTORE': 'ISS',
'WEBHCAT': 'WHC'
'WEBHCAT': 'WHC',
'HDFS_GATEWAY': 'HG',
'YARN_GATEWAY': 'YG'
}
return '%s_%s' % (shortcuts.get(service, service),
instance.hostname().replace('-', '_'))
@ -107,7 +109,9 @@ class PluginUtilsV530(pu.AbstractPluginUtils):
"KS_INDEXER": ['HBASE_INDEXER'],
"SENTRY": ['SENTRY_SERVER'],
"SOLR": ['SOLR_SERVER'],
"SQOOP": ['SQOOP_SERVER']
"SQOOP": ['SQOOP_SERVER'],
'YARN_GATEWAY': ['YARN_GATEWAY'],
'HDFS_GATEWAY': ['HDFS_GATEWAY']
}
if isinstance(configs, res.Resource):
configs = configs.to_dict()

View File

@ -64,7 +64,9 @@ class VersionHandler(avm.AbstractVersionHandler):
"KS_INDEXER": ['KEY_VALUE_STORE_INDEXER'],
"SOLR": ['SOLR_SERVER'],
"SQOOP": ['SQOOP_SERVER'],
"SENTRY": ['SENTRY_SERVER']
"SENTRY": ['SENTRY_SERVER'],
"YARN_GATEWAY": [],
"HDFS_GATEWAY": []
}
def validate(self, cluster):

View File

@ -71,7 +71,7 @@ class ClouderaUtilsV540(cu.ClouderaUtils):
cu.ClouderaUtils.__init__(self)
self.pu = pu.PluginUtilsV540()
def get_service_by_role(self, process, cluster=None, instance=None):
def get_service_by_role(self, role, cluster=None, instance=None):
cm_cluster = None
if cluster:
cm_cluster = self.get_cloudera_cluster(cluster)
@ -80,23 +80,23 @@ class ClouderaUtilsV540(cu.ClouderaUtils):
else:
raise ValueError(_("'cluster' or 'instance' argument missed"))
if process in ['AGENT']:
if role in ['AGENT']:
return cm_cluster.get_service(self.FLUME_SERVICE_NAME)
elif process in ['SENTRY_SERVER']:
elif role in ['SENTRY_SERVER']:
return cm_cluster.get_service(self.SENTRY_SERVICE_NAME)
elif process in ['SQOOP_SERVER']:
elif role in ['SQOOP_SERVER']:
return cm_cluster.get_service(self.SQOOP_SERVICE_NAME)
elif process in ['SOLR_SERVER']:
elif role in ['SOLR_SERVER']:
return cm_cluster.get_service(self.SOLR_SERVICE_NAME)
elif process in ['HBASE_INDEXER']:
elif role in ['HBASE_INDEXER']:
return cm_cluster.get_service(self.KS_INDEXER_SERVICE_NAME)
elif process in ['CATALOGSERVER', 'STATESTORE', 'IMPALAD', 'LLAMA']:
elif role in ['CATALOGSERVER', 'STATESTORE', 'IMPALAD', 'LLAMA']:
return cm_cluster.get_service(self.IMPALA_SERVICE_NAME)
elif process in ['KMS']:
elif role in ['KMS']:
return cm_cluster.get_service(self.KMS_SERVICE_NAME)
else:
return super(ClouderaUtilsV540, self).get_service_by_role(
process, cluster, instance)
role, cluster, instance)
@cpo.event_wrapper(
True, step=_("First run cluster"), param=('cluster', 1))

View File

@ -213,10 +213,11 @@ hdfs_confs = _load_json(path_to_config + 'hdfs-service.json')
namenode_confs = _load_json(path_to_config + 'hdfs-namenode.json')
datanode_confs = _load_json(path_to_config + 'hdfs-datanode.json')
secnamenode_confs = _load_json(path_to_config + 'hdfs-secondarynamenode.json')
hdfs_gateway_confs = _load_json(path_to_config + "hdfs-gateway.json")
yarn_confs = _load_json(path_to_config + 'yarn-service.json')
resourcemanager_confs = _load_json(
path_to_config + 'yarn-resourcemanager.json')
mapred_confs = _load_json(path_to_config + 'yarn-gateway.json')
yarn_gateway_confs = _load_json(path_to_config + "yarn-gateway.json")
nodemanager_confs = _load_json(path_to_config + 'yarn-nodemanager.json')
jobhistory_confs = _load_json(path_to_config + 'yarn-jobhistory.json')
oozie_service_confs = _load_json(path_to_config + 'oozie-service.json')
@ -288,10 +289,11 @@ def _get_ng_plugin_configs():
cfg += _init_configs(hdfs_confs, 'HDFS', 'cluster')
cfg += _init_configs(namenode_confs, 'NAMENODE', 'node')
cfg += _init_configs(datanode_confs, 'DATANODE', 'node')
cfg += _init_configs(hdfs_gateway_confs, 'HDFS_GATEWAY', 'node')
cfg += _init_configs(secnamenode_confs, 'SECONDARYNAMENODE', 'node')
cfg += _init_configs(yarn_confs, 'YARN', 'cluster')
cfg += _init_configs(resourcemanager_confs, 'RESOURCEMANAGER', 'node')
cfg += _init_configs(mapred_confs, 'MAPREDUCE', 'node')
cfg += _init_configs(yarn_gateway_confs, 'YARN_GATEWAY', 'node')
cfg += _init_configs(nodemanager_confs, 'NODEMANAGER', 'node')
cfg += _init_configs(jobhistory_confs, 'JOBHISTORY', 'node')
cfg += _init_configs(oozie_service_confs, 'OOZIE', 'cluster')

View File

@ -60,7 +60,9 @@ class PluginUtilsV540(pu.AbstractPluginUtils):
'SPARK_YARN_HISTORY_SERVER': 'SHS',
'SQOOP_SERVER': 'S2S',
'STATESTORE': 'ISS',
'WEBHCAT': 'WHC'
'WEBHCAT': 'WHC',
'HDFS_GATEWAY': 'HG',
'YARN_GATEWAY': 'YG'
}
return '%s_%s' % (shortcuts.get(service, service),
instance.hostname().replace('-', '_'))
@ -118,7 +120,9 @@ class PluginUtilsV540(pu.AbstractPluginUtils):
"SENTRY": ['SENTRY_SERVER'],
"SOLR": ['SOLR_SERVER'],
"SQOOP": ['SQOOP_SERVER'],
"KMS": ['KMS']
"KMS": ['KMS'],
'YARN_GATEWAY': ['YARN_GATEWAY'],
'HDFS_GATEWAY': ['HDFS_GATEWAY']
}
if isinstance(configs, res.Resource):
configs = configs.to_dict()

View File

@ -65,7 +65,9 @@ class VersionHandler(avm.AbstractVersionHandler):
"SOLR": ['SOLR_SERVER'],
"SQOOP": ['SQOOP_SERVER'],
"SENTRY": ['SENTRY_SERVER'],
"KMS": ['KMS']
"KMS": ['KMS'],
"YARN_GATEWAY": [],
"HDFS_GATEWAY": []
}
def validate(self, cluster):

View File

@ -14,6 +14,7 @@
# limitations under the License.
import mock
import testtools
from sahara.plugins.cdh.v5_3_0 import cloudera_utils as cu
from sahara.tests.unit import base
@ -64,3 +65,29 @@ class ClouderaUtilsTestCase(base.SaharaTestCase):
self.assertEqual('eggs_spam_host',
CU.pu.get_role_name(inst_mock, 'eggs'))
@mock.patch('sahara.plugins.cdh.cloudera_utils.ClouderaUtils.'
'get_cloudera_cluster')
def test_get_service_by_role(self, get_cloudera_cluster):
class Ob(object):
def get_service(self, x):
return x
get_cloudera_cluster.return_value = Ob()
roles = ['NAMENODE', 'DATANODE', 'SECONDARYNAMENODE', 'HDFS_GATEWAY',
'RESOURCEMANAGER', 'NODEMANAGER', 'JOBHISTORY',
'YARN_GATEWAY', 'OOZIE_SERVER', 'HIVESERVER2',
'HIVEMETASTORE', 'WEBHCAT', 'HUE_SERVER',
'SPARK_YARN_HISTORY_SERVER', 'SERVER', 'MASTER',
'REGIONSERVER']
resps = ['hdfs01', 'hdfs01', 'hdfs01', 'hdfs01', 'yarn01', 'yarn01',
'yarn01', 'yarn01', 'oozie01', 'hive01', 'hive01', 'hive01',
'hue01', 'spark_on_yarn01', 'zookeeper01', 'hbase01',
'hbase01']
provider = cu.ClouderaUtilsV530()
cluster = mock.Mock()
for (role, resp) in zip(roles, resps):
self.assertEqual(
resp, provider.get_service_by_role(role, cluster=cluster))
with testtools.ExpectedException(ValueError):
provider.get_service_by_role('cat', cluster=cluster)