Moved information about processes names to plugins

Names of the processes are highly plugin specific. So, plugin
should handle processes names.

Partially implements: blueprint edp-plugin-communication
Closes-Bug: #1251750

Change-Id: If1f84ce7672763034aadc28871b77fa0ba419adc
This commit is contained in:
Andrew Lazarev
2014-04-04 13:29:27 -07:00
parent e565c40ea4
commit 7362acea7c
12 changed files with 247 additions and 103 deletions

View File

@@ -38,51 +38,6 @@ def get_instance(cluster, node_process):
return instances[0] if instances else None
def get_namenode(cluster):
return get_instance(cluster, "namenode")
#TODO(jmaron): name change?
def get_jobtracker(cluster):
instance = get_instance(cluster, "jobtracker")
if not instance:
instance = get_resourcemanager(cluster)
return instance
def get_resourcemanager(cluster):
return get_instance(cluster, 'resourcemanager')
def get_nodemanagers(cluster):
return get_instances(cluster, 'nodemanager')
def get_oozie(cluster):
return get_instance(cluster, "oozie")
def get_hiveserver(cluster):
return get_instance(cluster, "hiveserver")
def get_datanodes(cluster):
return get_instances(cluster, 'datanode')
def get_tasktrackers(cluster):
return get_instances(cluster, 'tasktracker')
def get_secondarynamenodes(cluster):
return get_instances(cluster, 'secondarynamenode')
def get_historyserver(cluster):
return get_instance(cluster, 'historyserver')
def generate_host_names(nodes):
return "\n".join([n.hostname() for n in nodes])

View File

@@ -0,0 +1,58 @@
# Copyright (c) 2014 Mirantis Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from sahara.plugins.general import utils as u
def get_namenode(cluster):
return u.get_instance(cluster, "namenode")
def get_jobtracker(cluster):
instance = u.get_instance(cluster, "jobtracker")
return instance
def get_resourcemanager(cluster):
return u.get_instance(cluster, 'resourcemanager')
def get_nodemanagers(cluster):
return u.get_instances(cluster, 'nodemanager')
def get_oozie(cluster):
return u.get_instance(cluster, "oozie")
def get_hiveserver(cluster):
return u.get_instance(cluster, "hiveserver")
def get_datanodes(cluster):
return u.get_instances(cluster, 'datanode')
def get_tasktrackers(cluster):
return u.get_instances(cluster, 'tasktracker')
def get_secondarynamenodes(cluster):
return u.get_instances(cluster, 'secondarynamenode')
def get_historyserver(cluster):
return u.get_instance(cluster, 'historyserver')

View File

@@ -20,6 +20,7 @@ from sahara import context
from sahara.openstack.common import log as logging
from sahara.plugins.general import utils
from sahara.plugins import provisioning as p
from sahara.plugins.vanilla import utils as vu
from sahara.plugins.vanilla.v1_2_1 import mysql_helper as m_h
from sahara.plugins.vanilla.v1_2_1 import oozie_helper as o_h
from sahara.swift import swift_helper as swift
@@ -241,10 +242,10 @@ def get_hadoop_ssh_keys(cluster):
def generate_sahara_configs(cluster, node_group=None):
nn_hostname = _get_hostname(utils.get_namenode(cluster))
jt_hostname = _get_hostname(utils.get_jobtracker(cluster))
oozie_hostname = _get_hostname(utils.get_oozie(cluster))
hive_hostname = _get_hostname(utils.get_hiveserver(cluster))
nn_hostname = _get_hostname(vu.get_namenode(cluster))
jt_hostname = _get_hostname(vu.get_jobtracker(cluster))
oozie_hostname = _get_hostname(vu.get_oozie(cluster))
hive_hostname = _get_hostname(vu.get_hiveserver(cluster))
storage_path = node_group.storage_paths() if node_group else None
@@ -296,8 +297,8 @@ def generate_sahara_configs(cluster, node_group=None):
def generate_xml_configs(cluster, node_group, hive_mysql_passwd):
oozie_hostname = _get_hostname(utils.get_oozie(cluster))
hive_hostname = _get_hostname(utils.get_hiveserver(cluster))
oozie_hostname = _get_hostname(vu.get_oozie(cluster))
hive_hostname = _get_hostname(vu.get_hiveserver(cluster))
ng_configs = node_group.configuration()

View File

@@ -24,6 +24,7 @@ from sahara.openstack.common import log as logging
from sahara.plugins.general import exceptions as ex
from sahara.plugins.general import utils
from sahara.plugins.vanilla import abstractversionhandler as avm
from sahara.plugins.vanilla import utils as vu
from sahara.plugins.vanilla.v1_2_1 import config_helper as c_helper
from sahara.plugins.vanilla.v1_2_1 import run_scripts as run
from sahara.plugins.vanilla.v1_2_1 import scaling as sc
@@ -55,7 +56,7 @@ class VersionHandler(avm.AbstractVersionHandler):
return cluster['info']['MapReduce']['JobTracker']
def get_oozie_server(self, cluster):
return utils.get_oozie(cluster)
return vu.get_oozie(cluster)
def validate(self, cluster):
nn_count = sum([ng.count for ng
@@ -105,15 +106,15 @@ class VersionHandler(avm.AbstractVersionHandler):
self._setup_instances(cluster, instances)
def start_cluster(self, cluster):
nn_instance = utils.get_namenode(cluster)
nn_instance = vu.get_namenode(cluster)
with remote.get_remote(nn_instance) as r:
run.format_namenode(r)
run.start_processes(r, "namenode")
for snn in utils.get_secondarynamenodes(cluster):
for snn in vu.get_secondarynamenodes(cluster):
run.start_processes(remote.get_remote(snn), "secondarynamenode")
jt_instance = utils.get_jobtracker(cluster)
jt_instance = vu.get_jobtracker(cluster)
if jt_instance:
run.start_processes(remote.get_remote(jt_instance), "jobtracker")
@@ -124,7 +125,7 @@ class VersionHandler(avm.AbstractVersionHandler):
LOG.info("Hadoop services in cluster %s have been started" %
cluster.name)
oozie = utils.get_oozie(cluster)
oozie = vu.get_oozie(cluster)
if oozie:
with remote.get_remote(oozie) as r:
if c_helper.is_mysql_enable(cluster):
@@ -135,7 +136,7 @@ class VersionHandler(avm.AbstractVersionHandler):
LOG.info("Oozie service at '%s' has been started",
nn_instance.hostname())
hive_server = utils.get_hiveserver(cluster)
hive_server = vu.get_hiveserver(cluster)
if hive_server:
with remote.get_remote(hive_server) as r:
run.hive_create_warehouse_dir(r)
@@ -154,12 +155,12 @@ class VersionHandler(avm.AbstractVersionHandler):
self._set_cluster_info(cluster)
def _await_datanodes(self, cluster):
datanodes_count = len(utils.get_datanodes(cluster))
datanodes_count = len(vu.get_datanodes(cluster))
if datanodes_count < 1:
return
LOG.info("Waiting %s datanodes to start up" % datanodes_count)
with remote.get_remote(utils.get_namenode(cluster)) as r:
with remote.get_remote(vu.get_namenode(cluster)) as r:
while True:
if run.check_datanodes_count(r, datanodes_count):
LOG.info(
@@ -176,8 +177,8 @@ class VersionHandler(avm.AbstractVersionHandler):
return
def _extract_configs_to_extra(self, cluster):
oozie = utils.get_oozie(cluster)
hive = utils.get_hiveserver(cluster)
oozie = vu.get_oozie(cluster)
hive = vu.get_hiveserver(cluster)
extra = dict()
@@ -205,8 +206,8 @@ class VersionHandler(avm.AbstractVersionHandler):
return extra
def decommission_nodes(self, cluster, instances):
tts = utils.get_tasktrackers(cluster)
dns = utils.get_datanodes(cluster)
tts = vu.get_tasktrackers(cluster)
dns = vu.get_datanodes(cluster)
decommission_dns = False
decommission_tts = False
@@ -218,8 +219,8 @@ class VersionHandler(avm.AbstractVersionHandler):
tts.remove(i)
decommission_tts = True
nn = utils.get_namenode(cluster)
jt = utils.get_jobtracker(cluster)
nn = vu.get_namenode(cluster)
jt = vu.get_jobtracker(cluster)
if decommission_tts:
sc.decommission_tt(jt, instances, tts)
@@ -234,8 +235,8 @@ class VersionHandler(avm.AbstractVersionHandler):
self._setup_instances(cluster, instances)
run.refresh_nodes(remote.get_remote(
utils.get_namenode(cluster)), "dfsadmin")
jt = utils.get_jobtracker(cluster)
vu.get_namenode(cluster)), "dfsadmin")
jt = vu.get_jobtracker(cluster)
if jt:
run.refresh_nodes(remote.get_remote(jt), "mradmin")
@@ -362,12 +363,12 @@ class VersionHandler(avm.AbstractVersionHandler):
def _push_namenode_configs(self, cluster, r):
r.write_file_to('/etc/hadoop/dn.incl',
utils.generate_fqdn_host_names(
utils.get_datanodes(cluster)))
vu.get_datanodes(cluster)))
def _push_jobtracker_configs(self, cluster, r):
r.write_file_to('/etc/hadoop/tt.incl',
utils.generate_fqdn_host_names(
utils.get_tasktrackers(cluster)))
vu.get_tasktrackers(cluster)))
def _push_oozie_configs(self, cluster, ng_extra, r):
r.write_file_to('/opt/oozie/conf/oozie-site.xml',
@@ -396,9 +397,9 @@ class VersionHandler(avm.AbstractVersionHandler):
r.write_files_to(files)
def _set_cluster_info(self, cluster):
nn = utils.get_namenode(cluster)
jt = utils.get_jobtracker(cluster)
oozie = utils.get_oozie(cluster)
nn = vu.get_namenode(cluster)
jt = vu.get_jobtracker(cluster)
oozie = vu.get_oozie(cluster)
info = {}
if jt:
@@ -443,7 +444,7 @@ class VersionHandler(avm.AbstractVersionHandler):
return None
def _validate_additional_ng_scaling(self, cluster, additional):
jt = utils.get_jobtracker(cluster)
jt = vu.get_jobtracker(cluster)
scalable_processes = self._get_scalable_processes()
for ng_id in additional:
@@ -473,7 +474,7 @@ class VersionHandler(avm.AbstractVersionHandler):
" with processes: " +
' '.join(ng.node_processes))
dn_amount = len(utils.get_datanodes(cluster))
dn_amount = len(vu.get_datanodes(cluster))
rep_factor = c_helper.get_config_value('HDFS', 'dfs.replication',
cluster)

View File

@@ -16,7 +16,7 @@
import six
from sahara.openstack.common import log as logging
from sahara.plugins.general import utils
from sahara.plugins.vanilla import utils as vu
from sahara.plugins.vanilla.v2_3_0 import config_helper as c_helper
from sahara.plugins.vanilla.v2_3_0 import oozie_helper as o_helper
from sahara.swift import swift_helper as swift
@@ -67,8 +67,8 @@ def _generate_configs(node_group):
def _get_hadoop_configs(node_group):
cluster = node_group.cluster
nn_hostname = utils.get_namenode(cluster).hostname()
res_hostname = utils.get_resourcemanager(cluster).hostname()
nn_hostname = vu.get_namenode(cluster).hostname()
res_hostname = vu.get_resourcemanager(cluster).hostname()
dirs = _get_hadoop_dirs(node_group)
confs = {
'Hadoop': {
@@ -93,7 +93,7 @@ def _get_hadoop_configs(node_group):
},
}
oozie = utils.get_oozie(cluster)
oozie = vu.get_oozie(cluster)
if oozie:
hadoop_cfg = {
'hadoop.proxyuser.hadoop.hosts': '*',

View File

@@ -15,7 +15,7 @@
from sahara import context
from sahara.openstack.common import log as logging
from sahara.plugins.general import utils as u
from sahara.plugins.vanilla import utils as vu
from sahara.plugins.vanilla.v2_3_0 import config_helper as c_helper
from sahara.utils import files
from sahara.utils import general as g
@@ -68,13 +68,13 @@ def format_namenode(instance):
def refresh_hadoop_nodes(cluster):
nn = u.get_namenode(cluster)
nn = vu.get_namenode(cluster)
nn.remote().execute_command(
'sudo su - -c "hdfs dfsadmin -refreshNodes" hadoop')
def refresh_yarn_nodes(cluster):
rm = u.get_resourcemanager(cluster)
rm = vu.get_resourcemanager(cluster)
rm.remote().execute_command(
'sudo su - -c "yarn rmadmin -refreshNodes" hadoop')
@@ -119,12 +119,12 @@ def _start_oozie(remote):
def await_datanodes(cluster):
datanodes_count = len(u.get_datanodes(cluster))
datanodes_count = len(vu.get_datanodes(cluster))
if datanodes_count < 1:
return
LOG.info("Waiting %s datanodes to start up" % datanodes_count)
with u.get_namenode(cluster).remote() as r:
with vu.get_namenode(cluster).remote() as r:
while True:
if _check_datanodes_count(r, datanodes_count):
LOG.info(

View File

@@ -17,6 +17,7 @@ from sahara import context
from sahara.openstack.common import timeutils
from sahara.plugins.general import exceptions as ex
from sahara.plugins.general import utils as u
from sahara.plugins.vanilla import utils as vu
from sahara.plugins.vanilla.v2_3_0 import config
from sahara.plugins.vanilla.v2_3_0 import run_scripts as run
from sahara.plugins.vanilla.v2_3_0 import utils as pu
@@ -46,8 +47,8 @@ def _get_instances_with_service(instances, service):
def _update_include_files(cluster):
instances = u.get_instances(cluster)
datanodes = u.get_datanodes(cluster)
nodemanagers = u.get_nodemanagers(cluster)
datanodes = vu.get_datanodes(cluster)
nodemanagers = vu.get_nodemanagers(cluster)
dn_hosts = u.generate_fqdn_host_names(datanodes)
nm_hosts = u.generate_fqdn_host_names(nodemanagers)
for instance in instances:

View File

@@ -15,7 +15,7 @@
import re
from sahara.plugins.general import utils as u
from sahara.plugins.vanilla import utils as u
def get_datanodes_status(cluster):

View File

@@ -15,6 +15,7 @@
from sahara.plugins.general import exceptions as ex
from sahara.plugins.general import utils as u
from sahara.plugins.vanilla import utils as vu
from sahara.plugins.vanilla.v2_3_0 import config_helper as c_helper
from sahara.utils import general as gu
@@ -65,7 +66,7 @@ def validate_cluster_creating(cluster):
def validate_additional_ng_scaling(cluster, additional):
rm = u.get_resourcemanager(cluster)
rm = vu.get_resourcemanager(cluster)
scalable_processes = _get_scalable_processes()
for ng_id in additional:
@@ -95,7 +96,7 @@ def validate_existing_ng_scaling(cluster, existing):
raise ex.NodeGroupCannotBeScaled(
ng.name, msg % ' '.join(ng.node_processes))
dn_amount = len(u.get_datanodes(cluster))
dn_amount = len(vu.get_datanodes(cluster))
rep_factor = c_helper.get_config_value('HDFS', 'dfs.replication', cluster)
if dn_to_delete > 0 and dn_amount - dn_to_delete < rep_factor:

View File

@@ -18,8 +18,8 @@ from oslo.config import cfg
from sahara import conductor
from sahara import context
from sahara.openstack.common import log as logging
from sahara.plugins.general import utils
from sahara.plugins.vanilla import abstractversionhandler as avm
from sahara.plugins.vanilla import utils as vu
from sahara.plugins.vanilla.v2_3_0 import config as c
from sahara.plugins.vanilla.v2_3_0 import config_helper as c_helper
from sahara.plugins.vanilla.v2_3_0 import run_scripts as run
@@ -54,29 +54,29 @@ class VersionHandler(avm.AbstractVersionHandler):
c.configure_cluster(cluster)
def start_cluster(self, cluster):
nn = utils.get_namenode(cluster)
nn = vu.get_namenode(cluster)
run.format_namenode(nn)
run.start_hadoop_process(nn, 'namenode')
for snn in utils.get_secondarynamenodes(cluster):
for snn in vu.get_secondarynamenodes(cluster):
run.start_hadoop_process(snn, 'secondarynamenode')
rm = utils.get_resourcemanager(cluster)
rm = vu.get_resourcemanager(cluster)
run.start_yarn_process(rm, 'resourcemanager')
for dn in utils.get_datanodes(cluster):
for dn in vu.get_datanodes(cluster):
run.start_hadoop_process(dn, 'datanode')
run.await_datanodes(cluster)
for nm in utils.get_nodemanagers(cluster):
for nm in vu.get_nodemanagers(cluster):
run.start_yarn_process(nm, 'nodemanager')
hs = utils.get_historyserver(cluster)
hs = vu.get_historyserver(cluster)
if hs:
run.start_historyserver(hs)
oo = utils.get_oozie(cluster)
oo = vu.get_oozie(cluster)
if oo:
run.start_oozie_process(oo)
@@ -93,10 +93,10 @@ class VersionHandler(avm.AbstractVersionHandler):
sc.scale_cluster(cluster, instances)
def _set_cluster_info(self, cluster):
nn = utils.get_namenode(cluster)
rm = utils.get_resourcemanager(cluster)
hs = utils.get_historyserver(cluster)
oo = utils.get_oozie(cluster)
nn = vu.get_namenode(cluster)
rm = vu.get_resourcemanager(cluster)
hs = vu.get_historyserver(cluster)
oo = vu.get_oozie(cluster)
info = {}
@@ -126,7 +126,7 @@ class VersionHandler(avm.AbstractVersionHandler):
conductor.cluster_update(ctx, cluster, {'info': info})
def get_oozie_server(self, cluster):
return utils.get_oozie(cluster)
return vu.get_oozie(cluster)
def get_resource_manager_uri(self, cluster):
return cluster['info']['YARN']['ResourceManager']

View File

@@ -0,0 +1,127 @@
# Copyright (c) 2014 Mirantis Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from sahara.plugins.vanilla import plugin as p
from sahara.plugins.vanilla import utils as u
from sahara.tests.unit import base
from sahara.tests.unit import testutils as tu
class TestUtils(base.SaharaWithDbTestCase):
def setUp(self):
super(TestUtils, self).setUp()
self.plugin = p.VanillaProvider()
self.ng_manager = tu.make_ng_dict(
'mng', 'f1', ['manager'], 1,
[tu.make_inst_dict('mng1', 'manager')])
self.ng_namenode = tu.make_ng_dict(
'nn', 'f1', ['namenode'], 1,
[tu.make_inst_dict('nn1', 'namenode')])
self.ng_jobtracker = tu.make_ng_dict(
'jt', 'f1', ['jobtracker'], 1,
[tu.make_inst_dict('jt1', 'jobtracker')])
self.ng_datanode = tu.make_ng_dict(
'dn', 'f1', ['datanode'], 2,
[tu.make_inst_dict('dn1', 'datanode-1'),
tu.make_inst_dict('dn2', 'datanode-2')])
self.ng_tasktracker = tu.make_ng_dict(
'tt', 'f1', ['tasktracker'], 2,
[tu.make_inst_dict('tt1', 'tasktracker-1'),
tu.make_inst_dict('tt2', 'tasktracker-2')])
self.ng_oozie = tu.make_ng_dict(
'ooz1', 'f1', ['oozie'], 1,
[tu.make_inst_dict('ooz1', 'oozie')])
self.ng_hiveserver = tu.make_ng_dict(
'hs', 'f1', ['hiveserver'], 1,
[tu.make_inst_dict('hs1', 'hiveserver')])
self.ng_secondarynamenode = tu.make_ng_dict(
'snn', 'f1', ['secondarynamenode'], 1,
[tu.make_inst_dict('snn1', 'secondarynamenode')])
def test_get_namenode(self):
cl = tu.create_cluster('cl1', 't1', 'vanilla', '1.2.1',
[self.ng_manager, self.ng_namenode])
self.assertEqual('nn1', u.get_namenode(cl).instance_id)
cl = tu.create_cluster('cl1', 't1', 'vanilla', '1.2.1',
[self.ng_manager])
self.assertIsNone(u.get_namenode(cl))
def test_get_jobtracker(self):
cl = tu.create_cluster('cl1', 't1', 'vanilla', '1.2.1',
[self.ng_manager, self.ng_jobtracker])
self.assertEqual('jt1', u.get_jobtracker(cl).instance_id)
cl = tu.create_cluster('cl1', 't1', 'vanilla', '1.2.1',
[self.ng_manager])
self.assertIsNone(u.get_jobtracker(cl))
def test_get_oozie(self):
cl = tu.create_cluster('cl1', 't1', 'vanilla', '1.2.1',
[self.ng_manager, self.ng_oozie])
self.assertEqual('ooz1', u.get_oozie(cl).instance_id)
cl = tu.create_cluster('cl1', 't1', 'vanilla', '1.2.1',
[self.ng_manager])
self.assertIsNone(u.get_oozie(cl))
def test_get_hiveserver(self):
cl = tu.create_cluster('cl1', 't1', 'vanilla', '1.2.1',
[self.ng_manager, self.ng_hiveserver])
self.assertEqual('hs1', u.get_hiveserver(cl).instance_id)
cl = tu.create_cluster('cl1', 't1', 'vanilla', '1.2.1',
[self.ng_manager])
self.assertIsNone(u.get_hiveserver(cl))
def test_get_datanodes(self):
cl = tu.create_cluster('cl1', 't1', 'vanilla', '1.2.1',
[self.ng_manager, self.ng_namenode,
self.ng_datanode])
datanodes = u.get_datanodes(cl)
self.assertEqual(2, len(datanodes))
self.assertEqual(set(['dn1', 'dn2']),
set([datanodes[0].instance_id,
datanodes[1].instance_id]))
cl = tu.create_cluster('cl1', 't1', 'vanilla', '1.2.1',
[self.ng_manager])
self.assertEqual([], u.get_datanodes(cl))
def test_get_tasktrackers(self):
cl = tu.create_cluster('cl1', 't1', 'vanilla', '1.2.1',
[self.ng_manager, self.ng_jobtracker,
self.ng_tasktracker])
tasktrackers = u.get_tasktrackers(cl)
self.assertEqual(2, len(tasktrackers))
self.assertEqual(set(['tt1', 'tt2']),
set([tasktrackers[0].instance_id,
tasktrackers[1].instance_id]))
cl = tu.create_cluster('cl1', 't1', 'vanilla', '1.2.1',
[self.ng_manager])
self.assertEqual([], u.get_tasktrackers(cl))
def test_get_secondarynamenodes(self):
cl = tu.create_cluster('cl1', 't1', 'vanilla', '1.2.1',
[self.ng_manager, self.ng_namenode,
self.ng_secondarynamenode])
self.assertEqual('snn1', u.get_secondarynamenodes(cl)[0].instance_id)
cl = tu.create_cluster('cl1', 't1', 'vanilla', '1.2.1',
[self.ng_manager])
self.assertEqual([], u.get_secondarynamenodes(cl))

View File

@@ -21,7 +21,7 @@ from sahara.utils import files
class UtilsTestCase(base.SaharaTestCase):
@mock.patch('sahara.plugins.general.utils.get_namenode')
@mock.patch('sahara.plugins.vanilla.utils.get_namenode')
def test_datanodes_status(self, nn):
report = files.get_file_text(
'tests/unit/plugins/vanilla/v2_3_0/resources/dfs-report.txt')
@@ -38,7 +38,7 @@ class UtilsTestCase(base.SaharaTestCase):
self.assertDictEqual(statuses, expected)
@mock.patch('sahara.plugins.general.utils.get_resourcemanager')
@mock.patch('sahara.plugins.vanilla.utils.get_resourcemanager')
def test_nodemanagers_status(self, rm):
report = files.get_file_text(
'tests/unit/plugins/vanilla/v2_3_0/resources/yarn-report.txt')