diff --git a/sahara/conductor/resource.py b/sahara/conductor/resource.py index a35491b2..644cae43 100644 --- a/sahara/conductor/resource.py +++ b/sahara/conductor/resource.py @@ -179,6 +179,14 @@ class NodeGroupTemplateResource(Resource, objects.NodeGroupTemplate): class InstanceResource(Resource, objects.Instance): _filter_fields = ['tenant_id', 'node_group_id'] + @property + def cluster_id(self): + return self.node_group.cluster_id + + @property + def cluster(self): + return self.node_group.cluster + class NodeGroupResource(Resource, objects.NodeGroup): _children = { diff --git a/sahara/plugins/cdh/cloudera_utils.py b/sahara/plugins/cdh/cloudera_utils.py index 9f975f1b..3c30590c 100644 --- a/sahara/plugins/cdh/cloudera_utils.py +++ b/sahara/plugins/cdh/cloudera_utils.py @@ -168,7 +168,7 @@ class ClouderaUtils(object): if cluster: cm_cluster = self.get_cloudera_cluster(cluster) elif instance: - cm_cluster = self.get_cloudera_cluster(instance.node_group.cluster) + cm_cluster = self.get_cloudera_cluster(instance.cluster) else: raise ValueError(_("'cluster' or 'instance' argument missed")) @@ -194,7 +194,7 @@ class ClouderaUtils(object): {'process': process}) def await_agents(self, instances): - api = self.get_api_client(instances[0].node_group.cluster) + api = self.get_api_client(instances[0].cluster) timeout = 300 LOG.debug("Waiting %(timeout)s seconds for agent connected to manager" % {'timeout': timeout}) diff --git a/sahara/plugins/cdh/plugin_utils.py b/sahara/plugins/cdh/plugin_utils.py index 3db9016a..1c148ae2 100644 --- a/sahara/plugins/cdh/plugin_utils.py +++ b/sahara/plugins/cdh/plugin_utils.py @@ -186,7 +186,7 @@ class AbstractPluginUtils(object): self.start_cloudera_agent, i) def start_cloudera_agent(self, instance): - mng_hostname = self.get_manager(instance.node_group.cluster).hostname() + mng_hostname = self.get_manager(instance.cluster).hostname() with instance.remote() as r: cmd.start_ntp(r) cmd.configure_agent(r, mng_hostname) @@ -201,7 +201,7 @@ class AbstractPluginUtils(object): self.configure_swift_to_inst, i) def configure_swift_to_inst(self, instance): - cluster = instance.node_group.cluster + cluster = instance.cluster with instance.remote() as r: r.execute_command('sudo curl %s -o %s/hadoop-openstack.jar' % ( self.c_helper.get_swift_lib_url(cluster), HADOOP_LIB_DIR)) @@ -282,7 +282,7 @@ class AbstractPluginUtils(object): def _configure_repo_from_inst(self, instance): LOG.debug("Configure repos from instance '%(instance)s'" % { 'instance': instance.instance_name}) - cluster = instance.node_group.cluster + cluster = instance.cluster cdh5_key = self.c_helper.get_cdh5_key_url(cluster) cm5_key = self.c_helper.get_cm5_key_url(cluster) diff --git a/sahara/plugins/cdh/v5_3_0/cloudera_utils.py b/sahara/plugins/cdh/v5_3_0/cloudera_utils.py index 2305be06..602c6c93 100644 --- a/sahara/plugins/cdh/v5_3_0/cloudera_utils.py +++ b/sahara/plugins/cdh/v5_3_0/cloudera_utils.py @@ -90,7 +90,7 @@ class ClouderaUtilsV530(cu.ClouderaUtils): if cluster: cm_cluster = self.get_cloudera_cluster(cluster) elif instance: - cm_cluster = self.get_cloudera_cluster(instance.node_group.cluster) + cm_cluster = self.get_cloudera_cluster(instance.cluster) else: raise ValueError(_("'cluster' or 'instance' argument missed")) diff --git a/sahara/plugins/hdp/ambariplugin.py b/sahara/plugins/hdp/ambariplugin.py index f0c259cf..4f4cd026 100644 --- a/sahara/plugins/hdp/ambariplugin.py +++ b/sahara/plugins/hdp/ambariplugin.py @@ -411,5 +411,4 @@ class AmbariInfo(object): def get_cluster(self): sahara_instance = self.host.sahara_instance - cluster_id = sahara_instance.node_group.cluster_id - return conductor.cluster_get(context.ctx(), cluster_id) + return sahara_instance.cluster diff --git a/sahara/plugins/spark/scaling.py b/sahara/plugins/spark/scaling.py index 68d6977f..5819bb29 100644 --- a/sahara/plugins/spark/scaling.py +++ b/sahara/plugins/spark/scaling.py @@ -36,7 +36,7 @@ def decommission_sl(master, inst_to_be_deleted, survived_inst): else: slaves_content = "\n" - cluster = master.node_group.cluster + cluster = master.cluster sp_home = c_helper.get_config_value("Spark", "Spark home", cluster) r_master = remote.get_remote(master) run.stop_spark(r_master, sp_home) @@ -61,8 +61,7 @@ def decommission_dn(nn, inst_to_be_deleted, survived_inst): run.refresh_nodes(remote.get_remote(nn), "dfsadmin") context.sleep(3) - timeout = c_helper.get_decommissioning_timeout( - nn.node_group.cluster) + timeout = c_helper.get_decommissioning_timeout(nn.cluster) s_time = timeutils.utcnow() all_found = False @@ -91,8 +90,7 @@ def decommission_dn(nn, inst_to_be_deleted, survived_inst): ex.DecommissionError( _("Cannot finish decommission of cluster %(cluster)s in " "%(seconds)d seconds") % - {"cluster": nn.node_group.cluster, - "seconds": timeout}) + {"cluster": nn.cluster, "seconds": timeout}) def parse_dfs_report(cmd_output): diff --git a/sahara/plugins/vanilla/hadoop2/config.py b/sahara/plugins/vanilla/hadoop2/config.py index 42f743e8..fe9653f4 100644 --- a/sahara/plugins/vanilla/hadoop2/config.py +++ b/sahara/plugins/vanilla/hadoop2/config.py @@ -291,7 +291,7 @@ def _post_configuration(pctx, instance): r.execute_command('sudo /tmp/post_conf.sh') if c_helper.is_data_locality_enabled(pctx, - instance.node_group.cluster): + instance.cluster): t_script = HADOOP_CONF_DIR + '/topology.sh' r.write_file_to(t_script, f.get_file_text( 'plugins/vanilla/hadoop2/resources/topology.sh'), diff --git a/sahara/plugins/vanilla/hadoop2/run_scripts.py b/sahara/plugins/vanilla/hadoop2/run_scripts.py index f56ffcda..dc6d00f3 100644 --- a/sahara/plugins/vanilla/hadoop2/run_scripts.py +++ b/sahara/plugins/vanilla/hadoop2/run_scripts.py @@ -76,7 +76,7 @@ def start_historyserver(instance): def start_oozie_process(pctx, instance): with instance.remote() as r: - if c_helper.is_mysql_enabled(pctx, instance.node_group.cluster): + if c_helper.is_mysql_enabled(pctx, instance.cluster): _start_mysql(r) LOG.debug("Creating Oozie DB Schema...") sql_script = files.get_file_text( @@ -211,7 +211,7 @@ def start_hiveserver_process(pctx, instance): _hive_copy_shared_conf( r, edp.get_hive_shared_conf_path('hadoop')) - if c_helper.is_mysql_enabled(pctx, instance.node_group.cluster): + if c_helper.is_mysql_enabled(pctx, instance.cluster): oozie = vu.get_oozie(instance.node_group.cluster) if not oozie or instance.hostname() != oozie.hostname(): _start_mysql(r) diff --git a/sahara/plugins/vanilla/v1_2_1/scaling.py b/sahara/plugins/vanilla/v1_2_1/scaling.py index 031d881a..1b428a23 100644 --- a/sahara/plugins/vanilla/v1_2_1/scaling.py +++ b/sahara/plugins/vanilla/v1_2_1/scaling.py @@ -49,7 +49,7 @@ def decommission_dn(nn, inst_to_be_deleted, survived_inst): context.sleep(3) timeout = config_helper.get_decommissioning_timeout( - nn.node_group.cluster) + nn.cluster) s_time = timeutils.utcnow() all_found = False @@ -78,7 +78,7 @@ def decommission_dn(nn, inst_to_be_deleted, survived_inst): ex.DecommissionError( _("Cannot finish decommission of cluster %(cluster)s in " "%(seconds)d seconds") % - {"cluster": nn.node_group.cluster, "seconds": timeout}) + {"cluster": nn.cluster, "seconds": timeout}) def parse_dfs_report(cmd_output): diff --git a/sahara/service/engine.py b/sahara/service/engine.py index 6ed29851..9005ea80 100644 --- a/sahara/service/engine.py +++ b/sahara/service/engine.py @@ -118,7 +118,7 @@ class Engine(object): context.sleep(5) - if not g.check_cluster_exists(instance.node_group.cluster): + if not g.check_cluster_exists(instance.cluster): return def _configure_instances(self, cluster): diff --git a/sahara/service/networks.py b/sahara/service/networks.py index 49595961..0db103a7 100644 --- a/sahara/service/networks.py +++ b/sahara/service/networks.py @@ -48,7 +48,7 @@ def init_instances_ips(instance): else: management_ip = management_ip or address['addr'] - cluster = instance.node_group.cluster + cluster = instance.cluster if (not CONF.use_floating_ips or (cluster.has_proxy_gateway() and not instance.node_group.is_proxy_gateway)): diff --git a/sahara/service/volumes.py b/sahara/service/volumes.py index ce815990..120dd8f5 100644 --- a/sahara/service/volumes.py +++ b/sahara/service/volumes.py @@ -56,17 +56,13 @@ def _count_volumes_to_attach(instances): return sum([inst.node_group.volumes_per_node for inst in instances]) -def _get_cluster_id(instance): - return instance.node_group.cluster_id - - def attach_to_instances(instances): instances_to_attach = _count_instances_to_attach(instances) if instances_to_attach == 0: return cpo.add_provisioning_step( - _get_cluster_id(instances[0]), _("Attach volumes to instances"), + instances[0].cluster_id, _("Attach volumes to instances"), instances_to_attach) with context.ThreadGroup() as tg: @@ -157,7 +153,7 @@ def mount_to_instances(instances): return cpo.add_provisioning_step( - _get_cluster_id(instances[0]), + instances[0].cluster_id, _("Mount volumes to instances"), _count_volumes_to_attach(instances)) with context.ThreadGroup() as tg: diff --git a/sahara/tests/unit/utils/test_ssh_remote.py b/sahara/tests/unit/utils/test_ssh_remote.py index 7a9cb716..563eea3a 100644 --- a/sahara/tests/unit/utils/test_ssh_remote.py +++ b/sahara/tests/unit/utils/test_ssh_remote.py @@ -53,6 +53,10 @@ class FakeInstance(object): self.management_ip = management_ip self.node_group = FakeNodeGroup(user, priv_key) + @property + def cluster(self): + return self.node_group.cluster + class TestInstanceInteropHelper(base.SaharaTestCase): def setUp(self): diff --git a/sahara/utils/cluster_progress_ops.py b/sahara/utils/cluster_progress_ops.py index 649216c1..c1590e15 100644 --- a/sahara/utils/cluster_progress_ops.py +++ b/sahara/utils/cluster_progress_ops.py @@ -26,16 +26,8 @@ from sahara import context conductor = c.API -def _get_cluster_id(instance): - # If instance is InstanceInfo from context, then get cluster_id directly - if hasattr(instance, 'node_group'): - return instance.node_group.cluster_id - else: - return instance.cluster_id - - def add_successful_event(instance): - cluster_id = _get_cluster_id(instance) + cluster_id = instance.cluster_id step_id = get_current_provisioning_step(cluster_id) if step_id: conductor.cluster_event_add(context.ctx(), step_id, { @@ -48,7 +40,7 @@ def add_successful_event(instance): def add_fail_event(instance, exception): - cluster_id = _get_cluster_id(instance) + cluster_id = instance.cluster_id step_id = get_current_provisioning_step(cluster_id) event_info = six.text_type(exception) @@ -162,7 +154,7 @@ def event_wrapper(mark_successful_on_exit, **spec): if step_name: # It's single process, let's add provisioning step here - cluster_id = _get_cluster_id(instance) + cluster_id = instance.cluster_id add_provisioning_step(cluster_id, step_name, 1) try: diff --git a/sahara/utils/ssh_remote.py b/sahara/utils/ssh_remote.py index 09d40fe5..7a223628 100644 --- a/sahara/utils/ssh_remote.py +++ b/sahara/utils/ssh_remote.py @@ -538,8 +538,7 @@ class InstanceInteropHelper(remote.Remote): if not instance: instance = self.instance neutron_info = h.HashableDict() - neutron_info['network'] = ( - instance.node_group.cluster.neutron_management_network) + neutron_info['network'] = instance.cluster.neutron_management_network ctx = context.current() neutron_info['uri'] = base.url_for(ctx.service_catalog, 'network') neutron_info['token'] = ctx.auth_token