From 094084b42a90f0f9d53943654feead9ae078f5ed Mon Sep 17 00:00:00 2001 From: Vitaly Gridnev Date: Tue, 24 Feb 2015 11:19:04 +0400 Subject: [PATCH] Applying event log feature for CDH - part 3 It would be nice to apply event log feature for CDH. Because it's require big changes it would done in series of small CRs. This change finish applying event-log feature for CDH. partially implements bp event-log Change-Id: I793305d516a78920eaef97476111b71374b0fed2 --- sahara/plugins/cdh/cloudera_utils.py | 15 +++++- sahara/plugins/cdh/v5/deploy.py | 32 +++++++------ sahara/plugins/cdh/v5_3_0/cloudera_utils.py | 2 + sahara/plugins/cdh/v5_3_0/deploy.py | 52 ++++++++++++++------- 4 files changed, 68 insertions(+), 33 deletions(-) diff --git a/sahara/plugins/cdh/cloudera_utils.py b/sahara/plugins/cdh/cloudera_utils.py index 4cd222da..b54237ec 100644 --- a/sahara/plugins/cdh/cloudera_utils.py +++ b/sahara/plugins/cdh/cloudera_utils.py @@ -78,6 +78,7 @@ class ClouderaUtils(object): cm_cluster = self.get_cloudera_cluster(cluster) yield cm_cluster.start() + @cpo.event_wrapper(True, step=_("Delete instances"), param=('cluster', 1)) def delete_instances(self, cluster, instances): api = self.get_api_client(cluster) cm_cluster = self.get_cloudera_cluster(cluster) @@ -88,14 +89,26 @@ class ClouderaUtils(object): cm_cluster.remove_host(host.hostId) api.delete_host(host.hostId) + @cpo.event_wrapper( + True, step=_("Decommission nodes"), param=('cluster', 1)) def decommission_nodes(self, cluster, process, role_names): service = self.get_service_by_role(process, cluster) service.decommission(*role_names).wait() for role_name in role_names: service.delete_role(role_name) + @cpo.event_wrapper( + True, step=_("Refresh DataNodes"), param=('cluster', 1)) + def refresh_datanodes(self, cluster): + self._refresh_nodes(cluster, 'DATANODE', self.HDFS_SERVICE_NAME) + + @cpo.event_wrapper( + True, step=_("Refresh YARNNodes"), param=('cluster', 1)) + def refresh_yarn_nodes(self, cluster): + self._refresh_nodes(cluster, 'NODEMANAGER', self.YARN_SERVICE_NAME) + @cloudera_cmd - def refresh_nodes(self, cluster, process, service_name): + def _refresh_nodes(self, cluster, process, service_name): cm_cluster = self.get_cloudera_cluster(cluster) service = cm_cluster.get_service(service_name) nds = [n.name for n in service.get_roles_by_type(process)] diff --git a/sahara/plugins/cdh/v5/deploy.py b/sahara/plugins/cdh/v5/deploy.py index 9b304150..dc5b012e 100644 --- a/sahara/plugins/cdh/v5/deploy.py +++ b/sahara/plugins/cdh/v5/deploy.py @@ -13,6 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +from sahara.i18n import _ from sahara.plugins.cdh import commands as cmd from sahara.plugins.cdh.v5 import cloudera_utils as cu from sahara.plugins import utils as gu @@ -73,6 +74,19 @@ def configure_cluster(cluster): CU.pu.configure_swift(cluster) +@cpo.event_wrapper( + True, step=_("Start roles: NODEMANAGER, DATANODE"), param=('cluster', 0)) +def _start_roles(cluster, instances): + for instance in instances: + if 'HDFS_DATANODE' in instance.node_group.node_processes: + hdfs = CU.get_service_by_role('DATANODE', instance=instance) + CU.start_roles(hdfs, CU.pu.get_role_name(instance, 'DATANODE')) + + if 'YARN_NODEMANAGER' in instance.node_group.node_processes: + yarn = CU.get_service_by_role('NODEMANAGER', instance=instance) + CU.start_roles(yarn, CU.pu.get_role_name(instance, 'NODEMANAGER')) + + def scale_cluster(cluster, instances): if not instances: return @@ -86,18 +100,8 @@ def scale_cluster(cluster, instances): CU.configure_instances(instances) CU.pu.configure_swift(cluster, instances) CU.update_configs(instances) - - for instance in instances: - if 'HDFS_DATANODE' in instance.node_group.node_processes: - CU.refresh_nodes(cluster, 'DATANODE', CU.HDFS_SERVICE_NAME) - - if 'HDFS_DATANODE' in instance.node_group.node_processes: - hdfs = CU.get_service_by_role('DATANODE', instance=instance) - CU.start_roles(hdfs, CU.pu.get_role_name(instance, 'DATANODE')) - - if 'YARN_NODEMANAGER' in instance.node_group.node_processes: - yarn = CU.get_service_by_role('NODEMANAGER', instance=instance) - CU.start_roles(yarn, CU.pu.get_role_name(instance, 'NODEMANAGER')) + CU.refresh_datanodes(cluster) + _start_roles(cluster, instances) def decommission_cluster(cluster, instances): @@ -117,8 +121,8 @@ def decommission_cluster(cluster, instances): CU.delete_instances(cluster, instances) - CU.refresh_nodes(cluster, 'DATANODE', CU.HDFS_SERVICE_NAME) - CU.refresh_nodes(cluster, 'NODEMANAGER', CU.YARN_SERVICE_NAME) + CU.refresh_datanodes(cluster) + CU.refresh_yarn_nodes(cluster) @cpo.event_wrapper(True, **_step_description("Zookeeper")) diff --git a/sahara/plugins/cdh/v5_3_0/cloudera_utils.py b/sahara/plugins/cdh/v5_3_0/cloudera_utils.py index 8f34df29..83ee2f18 100644 --- a/sahara/plugins/cdh/v5_3_0/cloudera_utils.py +++ b/sahara/plugins/cdh/v5_3_0/cloudera_utils.py @@ -96,6 +96,8 @@ class ClouderaUtilsV530(cu.ClouderaUtils): return super(ClouderaUtilsV530, self).get_service_by_role( process, cluster, instance) + @cpo.event_wrapper( + True, step=_("First run cluster"), param=('cluster', 1)) @cu.cloudera_cmd def first_run(self, cluster): cm_cluster = self.get_cloudera_cluster(cluster) diff --git a/sahara/plugins/cdh/v5_3_0/deploy.py b/sahara/plugins/cdh/v5_3_0/deploy.py index 977ece73..65b90cb3 100644 --- a/sahara/plugins/cdh/v5_3_0/deploy.py +++ b/sahara/plugins/cdh/v5_3_0/deploy.py @@ -13,9 +13,11 @@ # See the License for the specific language governing permissions and # limitations under the License. +from sahara.i18n import _ from sahara.plugins.cdh import commands as cmd from sahara.plugins.cdh.v5_3_0 import cloudera_utils as cu from sahara.plugins import utils as gu +from sahara.utils import cluster_progress_ops as cpo PACKAGES = [ @@ -76,6 +78,19 @@ def configure_cluster(cluster): CU.deploy_configs(cluster) +@cpo.event_wrapper( + True, step=_("Start roles: NODEMANAGER, DATANODE"), param=('cluster', 0)) +def _start_roles(cluster, instances): + for instance in instances: + if 'HDFS_DATANODE' in instance.node_group.node_processes: + hdfs = CU.get_service_by_role('DATANODE', instance=instance) + CU.start_roles(hdfs, CU.pu.get_role_name(instance, 'DATANODE')) + + if 'YARN_NODEMANAGER' in instance.node_group.node_processes: + yarn = CU.get_service_by_role('NODEMANAGER', instance=instance) + CU.start_roles(yarn, CU.pu.get_role_name(instance, 'NODEMANAGER')) + + def scale_cluster(cluster, instances): if not instances: return @@ -89,18 +104,8 @@ def scale_cluster(cluster, instances): CU.configure_instances(instances, cluster) CU.update_configs(instances) CU.pu.configure_swift(cluster, instances) - - for instance in instances: - if 'HDFS_DATANODE' in instance.node_group.node_processes: - CU.refresh_nodes(cluster, 'DATANODE', CU.HDFS_SERVICE_NAME) - - if 'HDFS_DATANODE' in instance.node_group.node_processes: - hdfs = CU.get_service_by_role('DATANODE', instance=instance) - CU.start_roles(hdfs, CU.pu.get_role_name(instance, 'DATANODE')) - - if 'YARN_NODEMANAGER' in instance.node_group.node_processes: - yarn = CU.get_service_by_role('NODEMANAGER', instance=instance) - CU.start_roles(yarn, CU.pu.get_role_name(instance, 'NODEMANAGER')) + CU.refresh_datanodes(cluster) + _start_roles(cluster, instances) def decommission_cluster(cluster, instances): @@ -120,11 +125,12 @@ def decommission_cluster(cluster, instances): CU.delete_instances(cluster, instances) - CU.refresh_nodes(cluster, 'DATANODE', CU.HDFS_SERVICE_NAME) - CU.refresh_nodes(cluster, 'NODEMANAGER', CU.YARN_SERVICE_NAME) + CU.refresh_datanodes(cluster) + CU.refresh_yarn_nodes(cluster) -def start_cluster(cluster): +@cpo.event_wrapper(True, step=_("Prepare cluster"), param=('cluster', 0)) +def _prepare_cluster(cluster): if CU.pu.get_oozie(cluster): CU.pu.install_extjs(cluster) @@ -134,10 +140,10 @@ def start_cluster(cluster): if CU.pu.get_sentry(cluster): CU.pu.configure_sentry(cluster) - CU.first_run(cluster) - - CU.pu.configure_swift(cluster) +@cpo.event_wrapper( + True, step=_("Finish cluster starting"), param=('cluster', 0)) +def _finish_cluster_starting(cluster): if CU.pu.get_hive_metastore(cluster): CU.pu.put_hive_hdfs_xml(cluster) @@ -146,6 +152,16 @@ def start_cluster(cluster): CU.start_service(flume) +def start_cluster(cluster): + _prepare_cluster(cluster) + + CU.first_run(cluster) + + CU.pu.configure_swift(cluster) + + _finish_cluster_starting(cluster) + + def get_open_ports(node_group): ports = [9000] # for CM agent