diff --git a/sahara/plugins/vanilla/hadoop2/starting_scripts.py b/sahara/plugins/vanilla/hadoop2/starting_scripts.py new file mode 100644 index 00000000..c2922382 --- /dev/null +++ b/sahara/plugins/vanilla/hadoop2/starting_scripts.py @@ -0,0 +1,73 @@ +# Copyright (c) 2015 Mirantis Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from sahara.plugins import utils +from sahara.plugins.vanilla.hadoop2 import run_scripts as run +from sahara.plugins.vanilla import utils as vu +from sahara.utils import cluster_progress_ops as cpo + + +def start_namenode(cluster): + nn = vu.get_namenode(cluster) + _start_namenode(nn) + + +@cpo.event_wrapper( + True, step=utils.start_process_event_message('NameNode')) +def _start_namenode(nn): + run.format_namenode(nn) + run.start_hadoop_process(nn, 'namenode') + + +def start_secondarynamenode(cluster): + snn = vu.get_secondarynamenode(cluster) + if snn: + _start_secondarynamenode(snn) + + +@cpo.event_wrapper( + True, step=utils.start_process_event_message("SecondaryNameNodes")) +def _start_secondarynamenode(snn): + run.start_hadoop_process(snn, 'secondarynamenode') + + +def start_resourcemanager(cluster): + rm = vu.get_resourcemanager(cluster) + if rm: + _start_resourcemanager(rm) + + +@cpo.event_wrapper( + True, step=utils.start_process_event_message('ResourceManager')) +def _start_resourcemanager(snn): + run.start_yarn_process(snn, 'resourcemanager') + + +def start_historyserver(cluster): + hs = vu.get_historyserver(cluster) + if hs: + run.start_historyserver(hs) + + +def start_oozie(pctx, cluster): + oo = vu.get_oozie(cluster) + if oo: + run.start_oozie_process(pctx, oo) + + +def start_hiveserver(pctx, cluster): + hiveserver = vu.get_hiveserver(cluster) + if hiveserver: + run.start_hiveserver_process(pctx, hiveserver) diff --git a/sahara/plugins/vanilla/utils.py b/sahara/plugins/vanilla/utils.py index e52d0549..bd1ed367 100644 --- a/sahara/plugins/vanilla/utils.py +++ b/sahara/plugins/vanilla/utils.py @@ -50,8 +50,8 @@ def get_tasktrackers(cluster): return u.get_instances(cluster, 'tasktracker') -def get_secondarynamenodes(cluster): - return u.get_instances(cluster, 'secondarynamenode') +def get_secondarynamenode(cluster): + return u.get_instance(cluster, 'secondarynamenode') def get_historyserver(cluster): diff --git a/sahara/plugins/vanilla/v1_2_1/versionhandler.py b/sahara/plugins/vanilla/v1_2_1/versionhandler.py index a91a2d0d..8cf5805f 100644 --- a/sahara/plugins/vanilla/v1_2_1/versionhandler.py +++ b/sahara/plugins/vanilla/v1_2_1/versionhandler.py @@ -122,19 +122,15 @@ class VersionHandler(avm.AbstractVersionHandler): run.format_namenode(r) run.start_processes(r, "namenode") - def start_secondarynamenodes(self, cluster): - snns = vu.get_secondarynamenodes(cluster) - if len(snns) == 0: + def start_secondarynamenode(self, cluster): + snn = vu.get_secondarynamenode(cluster) + if snn is None: return - cpo.add_provisioning_step( - cluster.id, - utils.start_process_event_message("SecondaryNameNodes"), - len(snns)) - for snn in snns: - self._start_secondarynamenode(snn) + self._start_secondarynamenode(snn) - @cpo.event_wrapper(True) + @cpo.event_wrapper( + True, step=utils.start_process_event_message("SecondaryNameNode")) def _start_secondarynamenode(self, snn): run.start_processes(remote.get_remote(snn), "secondarynamenode") @@ -194,7 +190,7 @@ class VersionHandler(avm.AbstractVersionHandler): def start_cluster(self, cluster): self.start_namenode(cluster) - self.start_secondarynamenodes(cluster) + self.start_secondarynamenode(cluster) self.start_jobtracker(cluster) diff --git a/sahara/plugins/vanilla/v2_6_0/versionhandler.py b/sahara/plugins/vanilla/v2_6_0/versionhandler.py index dc6b8c60..b81356c7 100644 --- a/sahara/plugins/vanilla/v2_6_0/versionhandler.py +++ b/sahara/plugins/vanilla/v2_6_0/versionhandler.py @@ -24,11 +24,11 @@ from sahara.plugins.vanilla.hadoop2 import config as c from sahara.plugins.vanilla.hadoop2 import recommendations_utils as ru from sahara.plugins.vanilla.hadoop2 import run_scripts as run from sahara.plugins.vanilla.hadoop2 import scaling as sc +from sahara.plugins.vanilla.hadoop2 import starting_scripts as s_scripts from sahara.plugins.vanilla.hadoop2 import validation as vl from sahara.plugins.vanilla import utils as vu from sahara.plugins.vanilla.v2_6_0 import config_helper as c_helper from sahara.plugins.vanilla.v2_6_0 import edp_engine -from sahara.utils import cluster_progress_ops as cpo conductor = conductor.API @@ -65,72 +65,17 @@ class VersionHandler(avm.AbstractVersionHandler): def configure_cluster(self, cluster): c.configure_cluster(self.pctx, cluster) - def start_namenode(self, cluster): - nn = vu.get_namenode(cluster) - self._start_namenode(nn) - - @cpo.event_wrapper( - True, step=utils.start_process_event_message('NameNode')) - def _start_namenode(self, nn): - run.format_namenode(nn) - run.start_hadoop_process(nn, 'namenode') - - def start_secondarynamenodes(self, cluster): - snns = vu.get_secondarynamenodes(cluster) - if len(snns) == 0: - return - cpo.add_provisioning_step( - snns[0].cluster_id, utils.start_process_event_message( - "SecondaryNameNodes"), len(snns)) - - for snn in vu.get_secondarynamenodes(cluster): - self._start_secondarynamenode(snn) - - @cpo.event_wrapper(True) - def _start_secondarynamenode(self, snn): - run.start_hadoop_process(snn, 'secondarynamenode') - - def start_resourcemanager(self, cluster): - rm = vu.get_resourcemanager(cluster) - if rm: - self._start_resourcemanager(rm) - - @cpo.event_wrapper( - True, step=utils.start_process_event_message('ResourceManager')) - def _start_resourcemanager(self, snn): - run.start_yarn_process(snn, 'resourcemanager') - - def start_historyserver(self, cluster): - hs = vu.get_historyserver(cluster) - if hs: - run.start_historyserver(hs) - - def start_oozie(self, cluster): - oo = vu.get_oozie(cluster) - if oo: - run.start_oozie_process(self.pctx, oo) - - def start_hiveserver(self, cluster): - hiveserver = vu.get_hiveserver(cluster) - if hiveserver: - run.start_hiveserver_process(self.pctx, hiveserver) - def start_cluster(self, cluster): - self.start_namenode(cluster) - - self.start_secondarynamenodes(cluster) - - self.start_resourcemanager(cluster) + s_scripts.start_namenode(cluster) + s_scripts.start_secondarynamenode(cluster) + s_scripts.start_resourcemanager(cluster) run.start_dn_nm_processes(utils.get_instances(cluster)) - run.await_datanodes(cluster) - self.start_historyserver(cluster) - - self.start_oozie(cluster) - - self.start_hiveserver(cluster) + s_scripts.start_historyserver(cluster) + s_scripts.start_oozie(self.pctx, cluster) + s_scripts.start_hiveserver(self.pctx, cluster) self._set_cluster_info(cluster) diff --git a/sahara/tests/unit/plugins/vanilla/test_utils.py b/sahara/tests/unit/plugins/vanilla/test_utils.py index 60d6743c..7453f997 100644 --- a/sahara/tests/unit/plugins/vanilla/test_utils.py +++ b/sahara/tests/unit/plugins/vanilla/test_utils.py @@ -120,8 +120,8 @@ class TestUtils(base.SaharaWithDbTestCase): cl = tu.create_cluster('cl1', 't1', 'vanilla', '1.2.1', [self.ng_manager, self.ng_namenode, self.ng_secondarynamenode]) - self.assertEqual('snn1', u.get_secondarynamenodes(cl)[0].instance_id) + self.assertEqual('snn1', u.get_secondarynamenode(cl).instance_id) cl = tu.create_cluster('cl1', 't1', 'vanilla', '1.2.1', [self.ng_manager]) - self.assertEqual([], u.get_secondarynamenodes(cl)) + self.assertEqual(None, u.get_secondarynamenode(cl))