Make starting scripts module for vanilla 2 plugin
To avoid code duplication in new vanilla 2 plugin, we can make one common module with starting sripts in hadoop2 directory. Also in Sahara it's restricted to use at most one secondarynamenode, so remove methods that can start several secondarynamenodes. Related blueprint: support-vanilla-2-7-1 Change-Id: I0a9194d5e06d222d6df094bb0107c02bf01cd359
This commit is contained in:
parent
f342382ab5
commit
c1348a40ae
73
sahara/plugins/vanilla/hadoop2/starting_scripts.py
Normal file
73
sahara/plugins/vanilla/hadoop2/starting_scripts.py
Normal file
@ -0,0 +1,73 @@
|
|||||||
|
# Copyright (c) 2015 Mirantis Inc.
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
# you may not use this file except in compliance with the License.
|
||||||
|
# You may obtain a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||||
|
# implied.
|
||||||
|
# See the License for the specific language governing permissions and
|
||||||
|
# limitations under the License.
|
||||||
|
|
||||||
|
from sahara.plugins import utils
|
||||||
|
from sahara.plugins.vanilla.hadoop2 import run_scripts as run
|
||||||
|
from sahara.plugins.vanilla import utils as vu
|
||||||
|
from sahara.utils import cluster_progress_ops as cpo
|
||||||
|
|
||||||
|
|
||||||
|
def start_namenode(cluster):
|
||||||
|
nn = vu.get_namenode(cluster)
|
||||||
|
_start_namenode(nn)
|
||||||
|
|
||||||
|
|
||||||
|
@cpo.event_wrapper(
|
||||||
|
True, step=utils.start_process_event_message('NameNode'))
|
||||||
|
def _start_namenode(nn):
|
||||||
|
run.format_namenode(nn)
|
||||||
|
run.start_hadoop_process(nn, 'namenode')
|
||||||
|
|
||||||
|
|
||||||
|
def start_secondarynamenode(cluster):
|
||||||
|
snn = vu.get_secondarynamenode(cluster)
|
||||||
|
if snn:
|
||||||
|
_start_secondarynamenode(snn)
|
||||||
|
|
||||||
|
|
||||||
|
@cpo.event_wrapper(
|
||||||
|
True, step=utils.start_process_event_message("SecondaryNameNodes"))
|
||||||
|
def _start_secondarynamenode(snn):
|
||||||
|
run.start_hadoop_process(snn, 'secondarynamenode')
|
||||||
|
|
||||||
|
|
||||||
|
def start_resourcemanager(cluster):
|
||||||
|
rm = vu.get_resourcemanager(cluster)
|
||||||
|
if rm:
|
||||||
|
_start_resourcemanager(rm)
|
||||||
|
|
||||||
|
|
||||||
|
@cpo.event_wrapper(
|
||||||
|
True, step=utils.start_process_event_message('ResourceManager'))
|
||||||
|
def _start_resourcemanager(snn):
|
||||||
|
run.start_yarn_process(snn, 'resourcemanager')
|
||||||
|
|
||||||
|
|
||||||
|
def start_historyserver(cluster):
|
||||||
|
hs = vu.get_historyserver(cluster)
|
||||||
|
if hs:
|
||||||
|
run.start_historyserver(hs)
|
||||||
|
|
||||||
|
|
||||||
|
def start_oozie(pctx, cluster):
|
||||||
|
oo = vu.get_oozie(cluster)
|
||||||
|
if oo:
|
||||||
|
run.start_oozie_process(pctx, oo)
|
||||||
|
|
||||||
|
|
||||||
|
def start_hiveserver(pctx, cluster):
|
||||||
|
hiveserver = vu.get_hiveserver(cluster)
|
||||||
|
if hiveserver:
|
||||||
|
run.start_hiveserver_process(pctx, hiveserver)
|
@ -50,8 +50,8 @@ def get_tasktrackers(cluster):
|
|||||||
return u.get_instances(cluster, 'tasktracker')
|
return u.get_instances(cluster, 'tasktracker')
|
||||||
|
|
||||||
|
|
||||||
def get_secondarynamenodes(cluster):
|
def get_secondarynamenode(cluster):
|
||||||
return u.get_instances(cluster, 'secondarynamenode')
|
return u.get_instance(cluster, 'secondarynamenode')
|
||||||
|
|
||||||
|
|
||||||
def get_historyserver(cluster):
|
def get_historyserver(cluster):
|
||||||
|
@ -122,19 +122,15 @@ class VersionHandler(avm.AbstractVersionHandler):
|
|||||||
run.format_namenode(r)
|
run.format_namenode(r)
|
||||||
run.start_processes(r, "namenode")
|
run.start_processes(r, "namenode")
|
||||||
|
|
||||||
def start_secondarynamenodes(self, cluster):
|
def start_secondarynamenode(self, cluster):
|
||||||
snns = vu.get_secondarynamenodes(cluster)
|
snn = vu.get_secondarynamenode(cluster)
|
||||||
if len(snns) == 0:
|
if snn is None:
|
||||||
return
|
return
|
||||||
cpo.add_provisioning_step(
|
|
||||||
cluster.id,
|
|
||||||
utils.start_process_event_message("SecondaryNameNodes"),
|
|
||||||
len(snns))
|
|
||||||
|
|
||||||
for snn in snns:
|
|
||||||
self._start_secondarynamenode(snn)
|
self._start_secondarynamenode(snn)
|
||||||
|
|
||||||
@cpo.event_wrapper(True)
|
@cpo.event_wrapper(
|
||||||
|
True, step=utils.start_process_event_message("SecondaryNameNode"))
|
||||||
def _start_secondarynamenode(self, snn):
|
def _start_secondarynamenode(self, snn):
|
||||||
run.start_processes(remote.get_remote(snn), "secondarynamenode")
|
run.start_processes(remote.get_remote(snn), "secondarynamenode")
|
||||||
|
|
||||||
@ -194,7 +190,7 @@ class VersionHandler(avm.AbstractVersionHandler):
|
|||||||
def start_cluster(self, cluster):
|
def start_cluster(self, cluster):
|
||||||
self.start_namenode(cluster)
|
self.start_namenode(cluster)
|
||||||
|
|
||||||
self.start_secondarynamenodes(cluster)
|
self.start_secondarynamenode(cluster)
|
||||||
|
|
||||||
self.start_jobtracker(cluster)
|
self.start_jobtracker(cluster)
|
||||||
|
|
||||||
|
@ -24,11 +24,11 @@ from sahara.plugins.vanilla.hadoop2 import config as c
|
|||||||
from sahara.plugins.vanilla.hadoop2 import recommendations_utils as ru
|
from sahara.plugins.vanilla.hadoop2 import recommendations_utils as ru
|
||||||
from sahara.plugins.vanilla.hadoop2 import run_scripts as run
|
from sahara.plugins.vanilla.hadoop2 import run_scripts as run
|
||||||
from sahara.plugins.vanilla.hadoop2 import scaling as sc
|
from sahara.plugins.vanilla.hadoop2 import scaling as sc
|
||||||
|
from sahara.plugins.vanilla.hadoop2 import starting_scripts as s_scripts
|
||||||
from sahara.plugins.vanilla.hadoop2 import validation as vl
|
from sahara.plugins.vanilla.hadoop2 import validation as vl
|
||||||
from sahara.plugins.vanilla import utils as vu
|
from sahara.plugins.vanilla import utils as vu
|
||||||
from sahara.plugins.vanilla.v2_6_0 import config_helper as c_helper
|
from sahara.plugins.vanilla.v2_6_0 import config_helper as c_helper
|
||||||
from sahara.plugins.vanilla.v2_6_0 import edp_engine
|
from sahara.plugins.vanilla.v2_6_0 import edp_engine
|
||||||
from sahara.utils import cluster_progress_ops as cpo
|
|
||||||
|
|
||||||
|
|
||||||
conductor = conductor.API
|
conductor = conductor.API
|
||||||
@ -65,72 +65,17 @@ class VersionHandler(avm.AbstractVersionHandler):
|
|||||||
def configure_cluster(self, cluster):
|
def configure_cluster(self, cluster):
|
||||||
c.configure_cluster(self.pctx, cluster)
|
c.configure_cluster(self.pctx, cluster)
|
||||||
|
|
||||||
def start_namenode(self, cluster):
|
|
||||||
nn = vu.get_namenode(cluster)
|
|
||||||
self._start_namenode(nn)
|
|
||||||
|
|
||||||
@cpo.event_wrapper(
|
|
||||||
True, step=utils.start_process_event_message('NameNode'))
|
|
||||||
def _start_namenode(self, nn):
|
|
||||||
run.format_namenode(nn)
|
|
||||||
run.start_hadoop_process(nn, 'namenode')
|
|
||||||
|
|
||||||
def start_secondarynamenodes(self, cluster):
|
|
||||||
snns = vu.get_secondarynamenodes(cluster)
|
|
||||||
if len(snns) == 0:
|
|
||||||
return
|
|
||||||
cpo.add_provisioning_step(
|
|
||||||
snns[0].cluster_id, utils.start_process_event_message(
|
|
||||||
"SecondaryNameNodes"), len(snns))
|
|
||||||
|
|
||||||
for snn in vu.get_secondarynamenodes(cluster):
|
|
||||||
self._start_secondarynamenode(snn)
|
|
||||||
|
|
||||||
@cpo.event_wrapper(True)
|
|
||||||
def _start_secondarynamenode(self, snn):
|
|
||||||
run.start_hadoop_process(snn, 'secondarynamenode')
|
|
||||||
|
|
||||||
def start_resourcemanager(self, cluster):
|
|
||||||
rm = vu.get_resourcemanager(cluster)
|
|
||||||
if rm:
|
|
||||||
self._start_resourcemanager(rm)
|
|
||||||
|
|
||||||
@cpo.event_wrapper(
|
|
||||||
True, step=utils.start_process_event_message('ResourceManager'))
|
|
||||||
def _start_resourcemanager(self, snn):
|
|
||||||
run.start_yarn_process(snn, 'resourcemanager')
|
|
||||||
|
|
||||||
def start_historyserver(self, cluster):
|
|
||||||
hs = vu.get_historyserver(cluster)
|
|
||||||
if hs:
|
|
||||||
run.start_historyserver(hs)
|
|
||||||
|
|
||||||
def start_oozie(self, cluster):
|
|
||||||
oo = vu.get_oozie(cluster)
|
|
||||||
if oo:
|
|
||||||
run.start_oozie_process(self.pctx, oo)
|
|
||||||
|
|
||||||
def start_hiveserver(self, cluster):
|
|
||||||
hiveserver = vu.get_hiveserver(cluster)
|
|
||||||
if hiveserver:
|
|
||||||
run.start_hiveserver_process(self.pctx, hiveserver)
|
|
||||||
|
|
||||||
def start_cluster(self, cluster):
|
def start_cluster(self, cluster):
|
||||||
self.start_namenode(cluster)
|
s_scripts.start_namenode(cluster)
|
||||||
|
s_scripts.start_secondarynamenode(cluster)
|
||||||
self.start_secondarynamenodes(cluster)
|
s_scripts.start_resourcemanager(cluster)
|
||||||
|
|
||||||
self.start_resourcemanager(cluster)
|
|
||||||
|
|
||||||
run.start_dn_nm_processes(utils.get_instances(cluster))
|
run.start_dn_nm_processes(utils.get_instances(cluster))
|
||||||
|
|
||||||
run.await_datanodes(cluster)
|
run.await_datanodes(cluster)
|
||||||
|
|
||||||
self.start_historyserver(cluster)
|
s_scripts.start_historyserver(cluster)
|
||||||
|
s_scripts.start_oozie(self.pctx, cluster)
|
||||||
self.start_oozie(cluster)
|
s_scripts.start_hiveserver(self.pctx, cluster)
|
||||||
|
|
||||||
self.start_hiveserver(cluster)
|
|
||||||
|
|
||||||
self._set_cluster_info(cluster)
|
self._set_cluster_info(cluster)
|
||||||
|
|
||||||
|
@ -120,8 +120,8 @@ class TestUtils(base.SaharaWithDbTestCase):
|
|||||||
cl = tu.create_cluster('cl1', 't1', 'vanilla', '1.2.1',
|
cl = tu.create_cluster('cl1', 't1', 'vanilla', '1.2.1',
|
||||||
[self.ng_manager, self.ng_namenode,
|
[self.ng_manager, self.ng_namenode,
|
||||||
self.ng_secondarynamenode])
|
self.ng_secondarynamenode])
|
||||||
self.assertEqual('snn1', u.get_secondarynamenodes(cl)[0].instance_id)
|
self.assertEqual('snn1', u.get_secondarynamenode(cl).instance_id)
|
||||||
|
|
||||||
cl = tu.create_cluster('cl1', 't1', 'vanilla', '1.2.1',
|
cl = tu.create_cluster('cl1', 't1', 'vanilla', '1.2.1',
|
||||||
[self.ng_manager])
|
[self.ng_manager])
|
||||||
self.assertEqual([], u.get_secondarynamenodes(cl))
|
self.assertEqual(None, u.get_secondarynamenode(cl))
|
||||||
|
Loading…
Reference in New Issue
Block a user