Apply event-log feature for Vanilla plugins

Applying event log feature for Vanilla 1.2.1 and Vanilla 2.6.0
plugins. We don't apply this feature for Vanilla 2.4.1, because
it's deprecated.

Change-Id: I0e9a097f389803a899fe238ccfe4889d32b8d9fe
Partially implements: blueprint event-log
This commit is contained in:
Vitaly Gridnev 2015-01-26 13:29:34 +03:00
parent efe15324d9
commit 2afb33d394
8 changed files with 181 additions and 26 deletions

View File

@ -62,6 +62,13 @@ def get_port_from_address(address):
return netutils.parse_host_port(address)[1]
def instances_with_services(instances, node_processes):
node_processes = set(node_processes)
return filter(
lambda x: node_processes.intersection(
x.node_group.node_processes), instances)
def start_process_event_message(process):
return _("Start the following process(es): {process}").format(
process=process)

View File

@ -17,12 +17,14 @@ from oslo_config import cfg
from oslo_log import log as logging
import six
from sahara.i18n import _
from sahara.i18n import _LI
from sahara.plugins.vanilla.hadoop2 import config_helper as c_helper
from sahara.plugins.vanilla.hadoop2 import oozie_helper as o_helper
from sahara.plugins.vanilla import utils as vu
from sahara.swift import swift_helper as swift
from sahara.topology import topology_helper as th
from sahara.utils import cluster_progress_ops as cpo
from sahara.utils import files as f
from sahara.utils import proxy
from sahara.utils import xmlutils as x
@ -54,9 +56,20 @@ def configure_cluster(pctx, cluster):
def configure_instances(pctx, instances):
if len(instances) == 0:
return
cpo.add_provisioning_step(
instances[0].cluster_id, _("Configure instances"), len(instances))
for instance in instances:
_provisioning_configs(pctx, instance)
_post_configuration(pctx, instance)
_configure_instance(pctx, instance)
@cpo.event_wrapper(True)
def _configure_instance(pctx, instance):
_provisioning_configs(pctx, instance)
_post_configuration(pctx, instance)
def _provisioning_configs(pctx, instance):
@ -335,6 +348,8 @@ def _merge_configs(a, b):
return res
@cpo.event_wrapper(
True, step=_("Configure topology data"), param=('cluster', 1))
def configure_topology_data(pctx, cluster):
if c_helper.is_data_locality_enabled(pctx, cluster):
LOG.info(_LI("Node group awareness is not implemented in YARN yet "

View File

@ -21,8 +21,10 @@ from sahara import context
from sahara.i18n import _
from sahara.i18n import _LI
from sahara.plugins import exceptions as ex
from sahara.plugins import utils as pu
from sahara.plugins.vanilla.hadoop2 import config_helper as c_helper
from sahara.plugins.vanilla import utils as vu
from sahara.utils import cluster_progress_ops as cpo
from sahara.utils import edp
from sahara.utils import files
from sahara.utils import general as g
@ -31,19 +33,28 @@ LOG = logging.getLogger(__name__)
def start_all_processes(instances, filternames):
if filternames:
instances = pu.instances_with_services(instances, filternames)
if len(instances) == 0:
return
name = pu.start_process_event_message(", ".join(filternames))
cpo.add_provisioning_step(instances[0].cluster_id, name, len(instances))
with context.ThreadGroup() as tg:
for instance in instances:
processes = set(instance.node_group.node_processes)
procs = processes
if filternames:
procs = processes.intersection(filternames)
if procs:
processes = processes.intersection(filternames)
if processes:
tg.spawn('vanilla-start-processes-%s' %
instance.instance_name,
_start_processes,
instance, list(procs))
instance, list(processes))
@cpo.event_wrapper(True)
def _start_processes(instance, processes):
with instance.remote() as r:
for process in processes:
@ -69,11 +80,13 @@ def start_yarn_process(instance, process):
'sudo su - -c "yarn-daemon.sh start %s" hadoop' % process)
@cpo.event_wrapper(True, step=pu.start_process_event_message("HistoryServer"))
def start_historyserver(instance):
instance.remote().execute_command(
'sudo su - -c "mr-jobhistory-daemon.sh start historyserver" hadoop')
@cpo.event_wrapper(True, step=pu.start_process_event_message("Oozie"))
def start_oozie_process(pctx, instance):
with instance.remote() as r:
if c_helper.is_mysql_enabled(pctx, instance.cluster):
@ -96,12 +109,16 @@ def format_namenode(instance):
'sudo su - -c "hdfs namenode -format" hadoop')
@cpo.event_wrapper(
True, step=pu.start_process_event_message("Oozie"), param=('cluster', 0))
def refresh_hadoop_nodes(cluster):
nn = vu.get_namenode(cluster)
nn.remote().execute_command(
'sudo su - -c "hdfs dfsadmin -refreshNodes" hadoop')
@cpo.event_wrapper(
True, step=_("Refresh %s nodes") % "YARN", param=('cluster', 0))
def refresh_yarn_nodes(cluster):
rm = vu.get_resourcemanager(cluster)
rm.remote().execute_command(
@ -142,6 +159,8 @@ def _start_oozie(remote):
'sudo su - -c "/opt/oozie/bin/oozied.sh start" hadoop')
@cpo.event_wrapper(
True, step=_("Await %s start up") % "DataNodes", param=('cluster', 0))
def await_datanodes(cluster):
datanodes_count = len(vu.get_datanodes(cluster))
if datanodes_count < 1:
@ -205,6 +224,7 @@ def _hive_metastore_start(remote):
" --service metastore > /dev/null &' hadoop")
@cpo.event_wrapper(True, step=pu.start_process_event_message("HiveServer"))
def start_hiveserver_process(pctx, instance):
with instance.remote() as r:
_hive_create_warehouse_dir(r)

View File

@ -23,6 +23,7 @@ from sahara.plugins.vanilla.hadoop2 import config
from sahara.plugins.vanilla.hadoop2 import run_scripts as run
from sahara.plugins.vanilla.hadoop2 import utils as pu
from sahara.plugins.vanilla import utils as vu
from sahara.utils import cluster_progress_ops as cpo
HADOOP_CONF_DIR = config.HADOOP_CONF_DIR
@ -48,6 +49,7 @@ def _get_instances_with_service(instances, service):
return ret
@cpo.event_wrapper(True, step=_("Update include files"), param=('cluster', 0))
def _update_include_files(cluster, dec_instances=None):
dec_instances = dec_instances or []
dec_instances_ids = [instance.id for instance in dec_instances]
@ -134,9 +136,13 @@ def _check_decommission(cluster, instances, check_func, timeout):
{"cluster": cluster, "seconds": timeout})
@cpo.event_wrapper(
True, step=_("Decommission %s") % "NodeManagers", param=('cluster', 0))
def _check_nodemanagers_decommission(cluster, instances):
_check_decommission(cluster, instances, pu.get_nodemanagers_status, 300)
@cpo.event_wrapper(
True, step=_("Decommission %s") % "DataNodes", param=('cluster', 0))
def _check_datanodes_decommission(cluster, instances):
_check_decommission(cluster, instances, pu.get_datanodes_status, 3600 * 4)

View File

@ -24,9 +24,11 @@ from sahara.plugins import exceptions as ex
from sahara.plugins import utils
from sahara.plugins.vanilla.v1_2_1 import config_helper
from sahara.plugins.vanilla.v1_2_1 import run_scripts as run
from sahara.utils import cluster_progress_ops as cpo
from sahara.utils import remote
@cpo.event_wrapper(True, step=_("Decommission %s") % "TaskTrackers")
def decommission_tt(jt, inst_to_be_deleted, survived_inst):
with remote.get_remote(jt) as r:
r.write_file_to('/etc/hadoop/tt.excl',
@ -40,6 +42,7 @@ def decommission_tt(jt, inst_to_be_deleted, survived_inst):
})
@cpo.event_wrapper(True, step=_("Decommission %s") % "DataNodes")
def decommission_dn(nn, inst_to_be_deleted, survived_inst):
with remote.get_remote(nn) as r:
r.write_file_to('/etc/hadoop/dn.excl',

View File

@ -32,6 +32,7 @@ from sahara.plugins.vanilla.v1_2_1 import edp_engine
from sahara.plugins.vanilla.v1_2_1 import run_scripts as run
from sahara.plugins.vanilla.v1_2_1 import scaling as sc
from sahara.topology import topology_helper as th
from sahara.utils import cluster_progress_ops as cpo
from sahara.utils import edp
from sahara.utils import files as f
from sahara.utils import general as g
@ -100,25 +101,32 @@ class VersionHandler(avm.AbstractVersionHandler):
def configure_cluster(self, cluster):
instances = utils.get_instances(cluster)
self._setup_instances(cluster, instances)
def start_namenode(self, cluster):
nn = vu.get_namenode(cluster)
self._start_namenode(nn)
@cpo.event_wrapper(
True, step=utils.start_process_event_message("NameNode"))
def _start_namenode(self, nn_instance):
with remote.get_remote(nn_instance) as r:
run.format_namenode(r)
run.start_processes(r, "namenode")
def start_secondarynamenodes(self, cluster):
if vu.get_secondarynamenodes(cluster) == 0:
snns = vu.get_secondarynamenodes(cluster)
if len(snns) == 0:
return
cpo.add_provisioning_step(
cluster.id,
utils.start_process_event_message("SecondaryNameNodes"),
len(snns))
for snn in vu.get_secondarynamenodes(cluster):
for snn in snns:
self._start_secondarynamenode(snn)
@cpo.event_wrapper(True)
def _start_secondarynamenode(self, snn):
run.start_processes(remote.get_remote(snn), "secondarynamenode")
@ -127,6 +135,8 @@ class VersionHandler(avm.AbstractVersionHandler):
if jt:
self._start_jobtracker(jt)
@cpo.event_wrapper(
True, step=utils.start_process_event_message("JobTracker"))
def _start_jobtracker(self, jt_instance):
run.start_processes(remote.get_remote(jt_instance), "jobtracker")
@ -135,6 +145,8 @@ class VersionHandler(avm.AbstractVersionHandler):
if oozie:
self._start_oozie(cluster, oozie)
@cpo.event_wrapper(
True, step=utils.start_process_event_message("Oozie"))
def _start_oozie(self, cluster, oozie):
nn_instance = vu.get_namenode(cluster)
@ -152,6 +164,8 @@ class VersionHandler(avm.AbstractVersionHandler):
if hs:
self._start_hiveserver(cluster, hs)
@cpo.event_wrapper(
True, step=utils.start_process_event_message("HiveServer"))
def _start_hiveserver(self, cluster, hive_server):
oozie = vu.get_oozie(cluster)
@ -190,6 +204,8 @@ class VersionHandler(avm.AbstractVersionHandler):
LOG.info(_LI('Cluster %s has been started successfully'), cluster.name)
self._set_cluster_info(cluster)
@cpo.event_wrapper(
True, step=_("Await %s start up") % "DataNodes", param=('cluster', 1))
def _await_datanodes(self, cluster):
datanodes_count = len(vu.get_datanodes(cluster))
if datanodes_count < 1:
@ -291,19 +307,30 @@ class VersionHandler(avm.AbstractVersionHandler):
def _start_tt_dn_processes(self, instances):
tt_dn_names = ["datanode", "tasktracker"]
instances = utils.instances_with_services(instances, tt_dn_names)
if not instances:
return
cpo.add_provisioning_step(
instances[0].cluster_id,
utils.start_process_event_message("DataNodes, TaskTrackers"),
len(instances))
with context.ThreadGroup() as tg:
for i in instances:
processes = set(i.node_group.node_processes)
tt_dn_procs = processes.intersection(tt_dn_names)
tg.spawn('vanilla-start-tt-dn-%s' % i.instance_name,
self._start_tt_dn, i, list(tt_dn_procs))
if tt_dn_procs:
tg.spawn('vanilla-start-tt-dn-%s' % i.instance_name,
self._start_tt_dn, i, list(tt_dn_procs))
@cpo.event_wrapper(True)
def _start_tt_dn(self, instance, tt_dn_procs):
with instance.remote() as r:
run.start_processes(r, *tt_dn_procs)
@cpo.event_wrapper(True, step=_("Setup instances and push configs"),
param=('cluster', 1))
def _setup_instances(self, cluster, instances):
if (CONF.use_identity_api_v3 and CONF.use_domain_for_proxy_users and
vu.get_hiveserver(cluster) and

View File

@ -27,6 +27,7 @@ from sahara.plugins.vanilla.hadoop2 import scaling as sc
from sahara.plugins.vanilla.hadoop2 import validation as vl
from sahara.plugins.vanilla import utils as vu
from sahara.plugins.vanilla.v2_4_1 import config_helper as c_helper
from sahara.utils import cluster_progress_ops as cpo
conductor = conductor.API
@ -63,34 +64,73 @@ class VersionHandler(avm.AbstractVersionHandler):
def configure_cluster(self, cluster):
c.configure_cluster(self.pctx, cluster)
def start_cluster(self, cluster):
def start_namenode(self, cluster):
nn = vu.get_namenode(cluster)
self._start_namenode(nn)
@cpo.event_wrapper(
True, step=utils.start_process_event_message('NameNode'))
def _start_namenode(self, nn):
run.format_namenode(nn)
run.start_hadoop_process(nn, 'namenode')
for snn in vu.get_secondarynamenodes(cluster):
run.start_hadoop_process(snn, 'secondarynamenode')
def start_secondarynamenodes(self, cluster):
snns = vu.get_secondarynamenodes(cluster)
if len(snns) == 0:
return
cpo.add_provisioning_step(
snns[0].cluster_id, utils.start_process_event_message(
"SecondaryNameNodes"), len(snns))
for snn in vu.get_secondarynamenodes(cluster):
self._start_secondarynamenode(snn)
@cpo.event_wrapper(True)
def _start_secondarynamenode(self, snn):
run.start_hadoop_process(snn, 'secondarynamenode')
def start_resourcemanager(self, cluster):
rm = vu.get_resourcemanager(cluster)
if rm:
run.start_yarn_process(rm, 'resourcemanager')
self._start_resourcemanager(rm)
@cpo.event_wrapper(
True, step=utils.start_process_event_message('ResourceManager'))
def _start_resourcemanager(self, snn):
run.start_yarn_process(snn, 'resourcemanager')
def start_historyserver(self, cluster):
hs = vu.get_historyserver(cluster)
if hs:
run.start_historyserver(hs)
def start_oozie(self, cluster):
oo = vu.get_oozie(cluster)
if oo:
run.start_oozie_process(self.pctx, oo)
def start_hiveserver(self, cluster):
hiveserver = vu.get_hiveserver(cluster)
if hiveserver:
run.start_hiveserver_process(self.pctx, hiveserver)
def start_cluster(self, cluster):
self.start_namenode(cluster)
self.start_secondarynamenodes(cluster)
self.start_resourcemanager(cluster)
run.start_all_processes(utils.get_instances(cluster),
['datanode', 'nodemanager'])
run.await_datanodes(cluster)
hs = vu.get_historyserver(cluster)
if hs:
run.start_historyserver(hs)
self.start_historyserver(cluster)
oo = vu.get_oozie(cluster)
if oo:
run.start_oozie_process(self.pctx, oo)
self.start_oozie(cluster)
hiveserver = vu.get_hiveserver(cluster)
if hiveserver:
run.start_hiveserver_process(self.pctx, hiveserver)
self.start_hiveserver(cluster)
self._set_cluster_info(cluster)

View File

@ -0,0 +1,37 @@
# Copyright (c) 2015 Mirantis Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from sahara.plugins import utils as pu
from sahara.tests.unit import base as b
class FakeInstace(object):
def __init__(self, node_processes):
self.node_processes = node_processes
@property
def node_group(self):
return self
class TestPluginUtils(b.SaharaTestCase):
def test_instances_with_services(self):
inst = [FakeInstace(["1", "2", "3"]), FakeInstace(["1", "3"]),
FakeInstace(["1"]), FakeInstace(["3"])]
self.assertEqual(4, len(pu.instances_with_services(inst, ["1", "3"])))
self.assertEqual(1, len(pu.instances_with_services(inst, ["2"])))
self.assertEqual(3, len(pu.instances_with_services(inst, ["3"])))
self.assertEqual(0, len(pu.instances_with_services(inst, ["5"])))