Merge "Add ability to get cluster_id directly from instance"
This commit is contained in:
commit
1fdf5b1dd7
@ -179,6 +179,14 @@ class NodeGroupTemplateResource(Resource, objects.NodeGroupTemplate):
|
||||
class InstanceResource(Resource, objects.Instance):
|
||||
_filter_fields = ['tenant_id', 'node_group_id']
|
||||
|
||||
@property
|
||||
def cluster_id(self):
|
||||
return self.node_group.cluster_id
|
||||
|
||||
@property
|
||||
def cluster(self):
|
||||
return self.node_group.cluster
|
||||
|
||||
|
||||
class NodeGroupResource(Resource, objects.NodeGroup):
|
||||
_children = {
|
||||
|
@ -168,7 +168,7 @@ class ClouderaUtils(object):
|
||||
if cluster:
|
||||
cm_cluster = self.get_cloudera_cluster(cluster)
|
||||
elif instance:
|
||||
cm_cluster = self.get_cloudera_cluster(instance.node_group.cluster)
|
||||
cm_cluster = self.get_cloudera_cluster(instance.cluster)
|
||||
else:
|
||||
raise ValueError(_("'cluster' or 'instance' argument missed"))
|
||||
|
||||
@ -194,7 +194,7 @@ class ClouderaUtils(object):
|
||||
{'process': process})
|
||||
|
||||
def await_agents(self, instances):
|
||||
api = self.get_api_client(instances[0].node_group.cluster)
|
||||
api = self.get_api_client(instances[0].cluster)
|
||||
timeout = 300
|
||||
LOG.debug("Waiting %(timeout)s seconds for agent connected to manager"
|
||||
% {'timeout': timeout})
|
||||
|
@ -186,7 +186,7 @@ class AbstractPluginUtils(object):
|
||||
self.start_cloudera_agent, i)
|
||||
|
||||
def start_cloudera_agent(self, instance):
|
||||
mng_hostname = self.get_manager(instance.node_group.cluster).hostname()
|
||||
mng_hostname = self.get_manager(instance.cluster).hostname()
|
||||
with instance.remote() as r:
|
||||
cmd.start_ntp(r)
|
||||
cmd.configure_agent(r, mng_hostname)
|
||||
@ -201,7 +201,7 @@ class AbstractPluginUtils(object):
|
||||
self.configure_swift_to_inst, i)
|
||||
|
||||
def configure_swift_to_inst(self, instance):
|
||||
cluster = instance.node_group.cluster
|
||||
cluster = instance.cluster
|
||||
with instance.remote() as r:
|
||||
r.execute_command('sudo curl %s -o %s/hadoop-openstack.jar' % (
|
||||
self.c_helper.get_swift_lib_url(cluster), HADOOP_LIB_DIR))
|
||||
@ -282,7 +282,7 @@ class AbstractPluginUtils(object):
|
||||
def _configure_repo_from_inst(self, instance):
|
||||
LOG.debug("Configure repos from instance '%(instance)s'" % {
|
||||
'instance': instance.instance_name})
|
||||
cluster = instance.node_group.cluster
|
||||
cluster = instance.cluster
|
||||
|
||||
cdh5_key = self.c_helper.get_cdh5_key_url(cluster)
|
||||
cm5_key = self.c_helper.get_cm5_key_url(cluster)
|
||||
|
@ -90,7 +90,7 @@ class ClouderaUtilsV530(cu.ClouderaUtils):
|
||||
if cluster:
|
||||
cm_cluster = self.get_cloudera_cluster(cluster)
|
||||
elif instance:
|
||||
cm_cluster = self.get_cloudera_cluster(instance.node_group.cluster)
|
||||
cm_cluster = self.get_cloudera_cluster(instance.cluster)
|
||||
else:
|
||||
raise ValueError(_("'cluster' or 'instance' argument missed"))
|
||||
|
||||
|
@ -411,5 +411,4 @@ class AmbariInfo(object):
|
||||
|
||||
def get_cluster(self):
|
||||
sahara_instance = self.host.sahara_instance
|
||||
cluster_id = sahara_instance.node_group.cluster_id
|
||||
return conductor.cluster_get(context.ctx(), cluster_id)
|
||||
return sahara_instance.cluster
|
||||
|
@ -36,7 +36,7 @@ def decommission_sl(master, inst_to_be_deleted, survived_inst):
|
||||
else:
|
||||
slaves_content = "\n"
|
||||
|
||||
cluster = master.node_group.cluster
|
||||
cluster = master.cluster
|
||||
sp_home = c_helper.get_config_value("Spark", "Spark home", cluster)
|
||||
r_master = remote.get_remote(master)
|
||||
run.stop_spark(r_master, sp_home)
|
||||
@ -61,8 +61,7 @@ def decommission_dn(nn, inst_to_be_deleted, survived_inst):
|
||||
run.refresh_nodes(remote.get_remote(nn), "dfsadmin")
|
||||
context.sleep(3)
|
||||
|
||||
timeout = c_helper.get_decommissioning_timeout(
|
||||
nn.node_group.cluster)
|
||||
timeout = c_helper.get_decommissioning_timeout(nn.cluster)
|
||||
s_time = timeutils.utcnow()
|
||||
all_found = False
|
||||
|
||||
@ -91,8 +90,7 @@ def decommission_dn(nn, inst_to_be_deleted, survived_inst):
|
||||
ex.DecommissionError(
|
||||
_("Cannot finish decommission of cluster %(cluster)s in "
|
||||
"%(seconds)d seconds") %
|
||||
{"cluster": nn.node_group.cluster,
|
||||
"seconds": timeout})
|
||||
{"cluster": nn.cluster, "seconds": timeout})
|
||||
|
||||
|
||||
def parse_dfs_report(cmd_output):
|
||||
|
@ -291,7 +291,7 @@ def _post_configuration(pctx, instance):
|
||||
r.execute_command('sudo /tmp/post_conf.sh')
|
||||
|
||||
if c_helper.is_data_locality_enabled(pctx,
|
||||
instance.node_group.cluster):
|
||||
instance.cluster):
|
||||
t_script = HADOOP_CONF_DIR + '/topology.sh'
|
||||
r.write_file_to(t_script, f.get_file_text(
|
||||
'plugins/vanilla/hadoop2/resources/topology.sh'),
|
||||
|
@ -76,7 +76,7 @@ def start_historyserver(instance):
|
||||
|
||||
def start_oozie_process(pctx, instance):
|
||||
with instance.remote() as r:
|
||||
if c_helper.is_mysql_enabled(pctx, instance.node_group.cluster):
|
||||
if c_helper.is_mysql_enabled(pctx, instance.cluster):
|
||||
_start_mysql(r)
|
||||
LOG.debug("Creating Oozie DB Schema...")
|
||||
sql_script = files.get_file_text(
|
||||
@ -211,7 +211,7 @@ def start_hiveserver_process(pctx, instance):
|
||||
_hive_copy_shared_conf(
|
||||
r, edp.get_hive_shared_conf_path('hadoop'))
|
||||
|
||||
if c_helper.is_mysql_enabled(pctx, instance.node_group.cluster):
|
||||
if c_helper.is_mysql_enabled(pctx, instance.cluster):
|
||||
oozie = vu.get_oozie(instance.node_group.cluster)
|
||||
if not oozie or instance.hostname() != oozie.hostname():
|
||||
_start_mysql(r)
|
||||
|
@ -49,7 +49,7 @@ def decommission_dn(nn, inst_to_be_deleted, survived_inst):
|
||||
context.sleep(3)
|
||||
|
||||
timeout = config_helper.get_decommissioning_timeout(
|
||||
nn.node_group.cluster)
|
||||
nn.cluster)
|
||||
s_time = timeutils.utcnow()
|
||||
all_found = False
|
||||
|
||||
@ -78,7 +78,7 @@ def decommission_dn(nn, inst_to_be_deleted, survived_inst):
|
||||
ex.DecommissionError(
|
||||
_("Cannot finish decommission of cluster %(cluster)s in "
|
||||
"%(seconds)d seconds") %
|
||||
{"cluster": nn.node_group.cluster, "seconds": timeout})
|
||||
{"cluster": nn.cluster, "seconds": timeout})
|
||||
|
||||
|
||||
def parse_dfs_report(cmd_output):
|
||||
|
@ -118,7 +118,7 @@ class Engine(object):
|
||||
|
||||
context.sleep(5)
|
||||
|
||||
if not g.check_cluster_exists(instance.node_group.cluster):
|
||||
if not g.check_cluster_exists(instance.cluster):
|
||||
return
|
||||
|
||||
def _configure_instances(self, cluster):
|
||||
|
@ -48,7 +48,7 @@ def init_instances_ips(instance):
|
||||
else:
|
||||
management_ip = management_ip or address['addr']
|
||||
|
||||
cluster = instance.node_group.cluster
|
||||
cluster = instance.cluster
|
||||
if (not CONF.use_floating_ips or
|
||||
(cluster.has_proxy_gateway() and
|
||||
not instance.node_group.is_proxy_gateway)):
|
||||
|
@ -56,17 +56,13 @@ def _count_volumes_to_attach(instances):
|
||||
return sum([inst.node_group.volumes_per_node for inst in instances])
|
||||
|
||||
|
||||
def _get_cluster_id(instance):
|
||||
return instance.node_group.cluster_id
|
||||
|
||||
|
||||
def attach_to_instances(instances):
|
||||
instances_to_attach = _count_instances_to_attach(instances)
|
||||
if instances_to_attach == 0:
|
||||
return
|
||||
|
||||
cpo.add_provisioning_step(
|
||||
_get_cluster_id(instances[0]), _("Attach volumes to instances"),
|
||||
instances[0].cluster_id, _("Attach volumes to instances"),
|
||||
instances_to_attach)
|
||||
|
||||
with context.ThreadGroup() as tg:
|
||||
@ -157,7 +153,7 @@ def mount_to_instances(instances):
|
||||
return
|
||||
|
||||
cpo.add_provisioning_step(
|
||||
_get_cluster_id(instances[0]),
|
||||
instances[0].cluster_id,
|
||||
_("Mount volumes to instances"), _count_volumes_to_attach(instances))
|
||||
|
||||
with context.ThreadGroup() as tg:
|
||||
|
@ -53,6 +53,10 @@ class FakeInstance(object):
|
||||
self.management_ip = management_ip
|
||||
self.node_group = FakeNodeGroup(user, priv_key)
|
||||
|
||||
@property
|
||||
def cluster(self):
|
||||
return self.node_group.cluster
|
||||
|
||||
|
||||
class TestInstanceInteropHelper(base.SaharaTestCase):
|
||||
def setUp(self):
|
||||
|
@ -26,16 +26,8 @@ from sahara import context
|
||||
conductor = c.API
|
||||
|
||||
|
||||
def _get_cluster_id(instance):
|
||||
# If instance is InstanceInfo from context, then get cluster_id directly
|
||||
if hasattr(instance, 'node_group'):
|
||||
return instance.node_group.cluster_id
|
||||
else:
|
||||
return instance.cluster_id
|
||||
|
||||
|
||||
def add_successful_event(instance):
|
||||
cluster_id = _get_cluster_id(instance)
|
||||
cluster_id = instance.cluster_id
|
||||
step_id = get_current_provisioning_step(cluster_id)
|
||||
if step_id:
|
||||
conductor.cluster_event_add(context.ctx(), step_id, {
|
||||
@ -48,7 +40,7 @@ def add_successful_event(instance):
|
||||
|
||||
|
||||
def add_fail_event(instance, exception):
|
||||
cluster_id = _get_cluster_id(instance)
|
||||
cluster_id = instance.cluster_id
|
||||
step_id = get_current_provisioning_step(cluster_id)
|
||||
event_info = six.text_type(exception)
|
||||
|
||||
@ -162,7 +154,7 @@ def event_wrapper(mark_successful_on_exit, **spec):
|
||||
|
||||
if step_name:
|
||||
# It's single process, let's add provisioning step here
|
||||
cluster_id = _get_cluster_id(instance)
|
||||
cluster_id = instance.cluster_id
|
||||
add_provisioning_step(cluster_id, step_name, 1)
|
||||
|
||||
try:
|
||||
|
@ -538,8 +538,7 @@ class InstanceInteropHelper(remote.Remote):
|
||||
if not instance:
|
||||
instance = self.instance
|
||||
neutron_info = h.HashableDict()
|
||||
neutron_info['network'] = (
|
||||
instance.node_group.cluster.neutron_management_network)
|
||||
neutron_info['network'] = instance.cluster.neutron_management_network
|
||||
ctx = context.current()
|
||||
neutron_info['uri'] = base.url_for(ctx.service_catalog, 'network')
|
||||
neutron_info['token'] = ctx.auth_token
|
||||
|
Loading…
Reference in New Issue
Block a user