Merge "Applying event log feature for CDH - part 3"
This commit is contained in:
commit
bcd1c76651
@ -78,6 +78,7 @@ class ClouderaUtils(object):
|
||||
cm_cluster = self.get_cloudera_cluster(cluster)
|
||||
yield cm_cluster.start()
|
||||
|
||||
@cpo.event_wrapper(True, step=_("Delete instances"), param=('cluster', 1))
|
||||
def delete_instances(self, cluster, instances):
|
||||
api = self.get_api_client(cluster)
|
||||
cm_cluster = self.get_cloudera_cluster(cluster)
|
||||
@ -88,14 +89,26 @@ class ClouderaUtils(object):
|
||||
cm_cluster.remove_host(host.hostId)
|
||||
api.delete_host(host.hostId)
|
||||
|
||||
@cpo.event_wrapper(
|
||||
True, step=_("Decommission nodes"), param=('cluster', 1))
|
||||
def decommission_nodes(self, cluster, process, role_names):
|
||||
service = self.get_service_by_role(process, cluster)
|
||||
service.decommission(*role_names).wait()
|
||||
for role_name in role_names:
|
||||
service.delete_role(role_name)
|
||||
|
||||
@cpo.event_wrapper(
|
||||
True, step=_("Refresh DataNodes"), param=('cluster', 1))
|
||||
def refresh_datanodes(self, cluster):
|
||||
self._refresh_nodes(cluster, 'DATANODE', self.HDFS_SERVICE_NAME)
|
||||
|
||||
@cpo.event_wrapper(
|
||||
True, step=_("Refresh YARNNodes"), param=('cluster', 1))
|
||||
def refresh_yarn_nodes(self, cluster):
|
||||
self._refresh_nodes(cluster, 'NODEMANAGER', self.YARN_SERVICE_NAME)
|
||||
|
||||
@cloudera_cmd
|
||||
def refresh_nodes(self, cluster, process, service_name):
|
||||
def _refresh_nodes(self, cluster, process, service_name):
|
||||
cm_cluster = self.get_cloudera_cluster(cluster)
|
||||
service = cm_cluster.get_service(service_name)
|
||||
nds = [n.name for n in service.get_roles_by_type(process)]
|
||||
|
@ -13,6 +13,7 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from sahara.i18n import _
|
||||
from sahara.plugins.cdh import commands as cmd
|
||||
from sahara.plugins.cdh.v5 import cloudera_utils as cu
|
||||
from sahara.plugins import utils as gu
|
||||
@ -73,6 +74,19 @@ def configure_cluster(cluster):
|
||||
CU.pu.configure_swift(cluster)
|
||||
|
||||
|
||||
@cpo.event_wrapper(
|
||||
True, step=_("Start roles: NODEMANAGER, DATANODE"), param=('cluster', 0))
|
||||
def _start_roles(cluster, instances):
|
||||
for instance in instances:
|
||||
if 'HDFS_DATANODE' in instance.node_group.node_processes:
|
||||
hdfs = CU.get_service_by_role('DATANODE', instance=instance)
|
||||
CU.start_roles(hdfs, CU.pu.get_role_name(instance, 'DATANODE'))
|
||||
|
||||
if 'YARN_NODEMANAGER' in instance.node_group.node_processes:
|
||||
yarn = CU.get_service_by_role('NODEMANAGER', instance=instance)
|
||||
CU.start_roles(yarn, CU.pu.get_role_name(instance, 'NODEMANAGER'))
|
||||
|
||||
|
||||
def scale_cluster(cluster, instances):
|
||||
if not instances:
|
||||
return
|
||||
@ -86,18 +100,8 @@ def scale_cluster(cluster, instances):
|
||||
CU.configure_instances(instances)
|
||||
CU.pu.configure_swift(cluster, instances)
|
||||
CU.update_configs(instances)
|
||||
|
||||
for instance in instances:
|
||||
if 'HDFS_DATANODE' in instance.node_group.node_processes:
|
||||
CU.refresh_nodes(cluster, 'DATANODE', CU.HDFS_SERVICE_NAME)
|
||||
|
||||
if 'HDFS_DATANODE' in instance.node_group.node_processes:
|
||||
hdfs = CU.get_service_by_role('DATANODE', instance=instance)
|
||||
CU.start_roles(hdfs, CU.pu.get_role_name(instance, 'DATANODE'))
|
||||
|
||||
if 'YARN_NODEMANAGER' in instance.node_group.node_processes:
|
||||
yarn = CU.get_service_by_role('NODEMANAGER', instance=instance)
|
||||
CU.start_roles(yarn, CU.pu.get_role_name(instance, 'NODEMANAGER'))
|
||||
CU.refresh_datanodes(cluster)
|
||||
_start_roles(cluster, instances)
|
||||
|
||||
|
||||
def decommission_cluster(cluster, instances):
|
||||
@ -117,8 +121,8 @@ def decommission_cluster(cluster, instances):
|
||||
|
||||
CU.delete_instances(cluster, instances)
|
||||
|
||||
CU.refresh_nodes(cluster, 'DATANODE', CU.HDFS_SERVICE_NAME)
|
||||
CU.refresh_nodes(cluster, 'NODEMANAGER', CU.YARN_SERVICE_NAME)
|
||||
CU.refresh_datanodes(cluster)
|
||||
CU.refresh_yarn_nodes(cluster)
|
||||
|
||||
|
||||
@cpo.event_wrapper(True, **_step_description("Zookeeper"))
|
||||
|
@ -96,6 +96,8 @@ class ClouderaUtilsV530(cu.ClouderaUtils):
|
||||
return super(ClouderaUtilsV530, self).get_service_by_role(
|
||||
process, cluster, instance)
|
||||
|
||||
@cpo.event_wrapper(
|
||||
True, step=_("First run cluster"), param=('cluster', 1))
|
||||
@cu.cloudera_cmd
|
||||
def first_run(self, cluster):
|
||||
cm_cluster = self.get_cloudera_cluster(cluster)
|
||||
|
@ -13,9 +13,11 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from sahara.i18n import _
|
||||
from sahara.plugins.cdh import commands as cmd
|
||||
from sahara.plugins.cdh.v5_3_0 import cloudera_utils as cu
|
||||
from sahara.plugins import utils as gu
|
||||
from sahara.utils import cluster_progress_ops as cpo
|
||||
|
||||
|
||||
PACKAGES = [
|
||||
@ -76,6 +78,19 @@ def configure_cluster(cluster):
|
||||
CU.deploy_configs(cluster)
|
||||
|
||||
|
||||
@cpo.event_wrapper(
|
||||
True, step=_("Start roles: NODEMANAGER, DATANODE"), param=('cluster', 0))
|
||||
def _start_roles(cluster, instances):
|
||||
for instance in instances:
|
||||
if 'HDFS_DATANODE' in instance.node_group.node_processes:
|
||||
hdfs = CU.get_service_by_role('DATANODE', instance=instance)
|
||||
CU.start_roles(hdfs, CU.pu.get_role_name(instance, 'DATANODE'))
|
||||
|
||||
if 'YARN_NODEMANAGER' in instance.node_group.node_processes:
|
||||
yarn = CU.get_service_by_role('NODEMANAGER', instance=instance)
|
||||
CU.start_roles(yarn, CU.pu.get_role_name(instance, 'NODEMANAGER'))
|
||||
|
||||
|
||||
def scale_cluster(cluster, instances):
|
||||
if not instances:
|
||||
return
|
||||
@ -89,18 +104,8 @@ def scale_cluster(cluster, instances):
|
||||
CU.configure_instances(instances, cluster)
|
||||
CU.update_configs(instances)
|
||||
CU.pu.configure_swift(cluster, instances)
|
||||
|
||||
for instance in instances:
|
||||
if 'HDFS_DATANODE' in instance.node_group.node_processes:
|
||||
CU.refresh_nodes(cluster, 'DATANODE', CU.HDFS_SERVICE_NAME)
|
||||
|
||||
if 'HDFS_DATANODE' in instance.node_group.node_processes:
|
||||
hdfs = CU.get_service_by_role('DATANODE', instance=instance)
|
||||
CU.start_roles(hdfs, CU.pu.get_role_name(instance, 'DATANODE'))
|
||||
|
||||
if 'YARN_NODEMANAGER' in instance.node_group.node_processes:
|
||||
yarn = CU.get_service_by_role('NODEMANAGER', instance=instance)
|
||||
CU.start_roles(yarn, CU.pu.get_role_name(instance, 'NODEMANAGER'))
|
||||
CU.refresh_datanodes(cluster)
|
||||
_start_roles(cluster, instances)
|
||||
|
||||
|
||||
def decommission_cluster(cluster, instances):
|
||||
@ -120,11 +125,12 @@ def decommission_cluster(cluster, instances):
|
||||
|
||||
CU.delete_instances(cluster, instances)
|
||||
|
||||
CU.refresh_nodes(cluster, 'DATANODE', CU.HDFS_SERVICE_NAME)
|
||||
CU.refresh_nodes(cluster, 'NODEMANAGER', CU.YARN_SERVICE_NAME)
|
||||
CU.refresh_datanodes(cluster)
|
||||
CU.refresh_yarn_nodes(cluster)
|
||||
|
||||
|
||||
def start_cluster(cluster):
|
||||
@cpo.event_wrapper(True, step=_("Prepare cluster"), param=('cluster', 0))
|
||||
def _prepare_cluster(cluster):
|
||||
if CU.pu.get_oozie(cluster):
|
||||
CU.pu.install_extjs(cluster)
|
||||
|
||||
@ -134,10 +140,10 @@ def start_cluster(cluster):
|
||||
if CU.pu.get_sentry(cluster):
|
||||
CU.pu.configure_sentry(cluster)
|
||||
|
||||
CU.first_run(cluster)
|
||||
|
||||
CU.pu.configure_swift(cluster)
|
||||
|
||||
@cpo.event_wrapper(
|
||||
True, step=_("Finish cluster starting"), param=('cluster', 0))
|
||||
def _finish_cluster_starting(cluster):
|
||||
if CU.pu.get_hive_metastore(cluster):
|
||||
CU.pu.put_hive_hdfs_xml(cluster)
|
||||
|
||||
@ -146,6 +152,16 @@ def start_cluster(cluster):
|
||||
CU.start_service(flume)
|
||||
|
||||
|
||||
def start_cluster(cluster):
|
||||
_prepare_cluster(cluster)
|
||||
|
||||
CU.first_run(cluster)
|
||||
|
||||
CU.pu.configure_swift(cluster)
|
||||
|
||||
_finish_cluster_starting(cluster)
|
||||
|
||||
|
||||
def get_open_ports(node_group):
|
||||
ports = [9000] # for CM agent
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user