From d39332a6860954d8b99c1e355b8013eec2e9610a Mon Sep 17 00:00:00 2001 From: Andrey Pavlov Date: Thu, 25 Dec 2014 14:54:06 +0300 Subject: [PATCH] Adding cluster, instance, job_execution ids to logs implements bp: logs-improvement Change-Id: Ic4b64fa44e5a9a8bddcceef87fe5c706e31048d9 --- sahara/context.py | 26 +++++++ sahara/plugins/hdp/ambariplugin.py | 14 ++-- .../versions/version_1_3_2/versionhandler.py | 57 +++++++-------- .../hdp/versions/version_2_0_6/services.py | 11 +-- .../versions/version_2_0_6/versionhandler.py | 63 +++++++++-------- sahara/plugins/vanilla/hadoop2/config.py | 4 +- sahara/plugins/vanilla/hadoop2/run_scripts.py | 69 ++++++++++--------- .../plugins/vanilla/v1_2_1/versionhandler.py | 39 ++++++----- sahara/service/api.py | 3 + sahara/service/direct_engine.py | 36 ++++++---- sahara/service/edp/api.py | 3 + sahara/service/engine.py | 18 +++-- sahara/service/periodic.py | 1 + sahara/service/volumes.py | 25 ++++--- sahara/tests/unit/utils/test_ssh_remote.py | 11 +-- sahara/utils/ssh_remote.py | 4 +- 16 files changed, 226 insertions(+), 158 deletions(-) diff --git a/sahara/context.py b/sahara/context.py index c7cfef39..8f54b92e 100644 --- a/sahara/context.py +++ b/sahara/context.py @@ -301,3 +301,29 @@ class InstanceInfoManager(object): def __exit__(self, *args): current().current_instance_info = self.prev_instance_info + + +def set_current_cluster_id(cluster_id): + current().resource_uuid = 'none, cluster: %s' % cluster_id + + +def set_current_job_execution_id(je_id): + current().resource_uuid = 'none, job_execution: %s' % je_id + + +class SetCurrentInstanceId(object): + def __init__(self, instance_id): + ctx = current() + self.prev_uuid = ctx.resource_uuid + if ctx.resource_uuid: + ctx.resource_uuid = ctx.resource_uuid.replace('none', instance_id) + + def __enter__(self): + pass + + def __exit__(self, *ex): + current().resource_uuid = self.prev_uuid + + +def set_current_instance_id(instance_id): + return SetCurrentInstanceId(instance_id) diff --git a/sahara/plugins/hdp/ambariplugin.py b/sahara/plugins/hdp/ambariplugin.py index c8054350..b941d48a 100644 --- a/sahara/plugins/hdp/ambariplugin.py +++ b/sahara/plugins/hdp/ambariplugin.py @@ -187,9 +187,11 @@ class AmbariPlugin(p.ProvisioningPluginBase): _("Provision cluster via Ambari"), len(servers)) for server in servers: - self._spawn( - "hdp-provision-instance-%s" % server.instance.hostname(), - server.provision_ambari, ambari_info, cluster_spec) + with context.set_current_instance_id( + server.instance['instance_id']): + self._spawn( + "hdp-provision-instance-%s" % server.instance.hostname(), + server.provision_ambari, ambari_info, cluster_spec) handler = self.version_factory.get_version_handler(version) ambari_client = handler.get_ambari_client() @@ -326,8 +328,10 @@ class AmbariPlugin(p.ProvisioningPluginBase): cluster.id, _("Provision cluster via Ambari"), len(servers)) for server in servers: - self._spawn('Ambari provisioning thread', - server.provision_ambari, ambari_info, cluster_spec) + with context.set_current_instance_id( + server.instance['instance_id']): + self._spawn('Ambari provisioning thread', + server.provision_ambari, ambari_info, cluster_spec) ambari_client.configure_scaled_cluster_instances( cluster.name, cluster_spec, self._get_num_hosts(cluster), diff --git a/sahara/plugins/hdp/versions/version_1_3_2/versionhandler.py b/sahara/plugins/hdp/versions/version_1_3_2/versionhandler.py index 79627ac2..516f7e5a 100644 --- a/sahara/plugins/hdp/versions/version_1_3_2/versionhandler.py +++ b/sahara/plugins/hdp/versions/version_1_3_2/versionhandler.py @@ -124,7 +124,9 @@ class VersionHandler(avm.AbstractVersionHandler): len(servers)) for server in servers: - server.install_swift_integration() + with context.set_current_instance_id( + server.instance['instance_id']): + server.install_swift_integration() def get_services_processor(self): return services @@ -330,33 +332,34 @@ class AmbariClient(object): add_host_component_url = ('http://{0}/api/v1/clusters/{1}' '/hosts/{2}/host_components/{3}') for host in servers: - hostname = host.instance.fqdn().lower() - result = self._post( - add_host_url.format(ambari_info.get_address(), name, hostname), - ambari_info) - if result.status_code != 201: - LOG.error( - _LE('Create host command failed. {result}').format( - result=result.text)) - raise ex.HadoopProvisionError( - _('Failed to add host: %s') % result.text) + with context.set_current_instance_id(host.instance['instance_id']): + hostname = host.instance.fqdn().lower() + result = self._post( + add_host_url.format(ambari_info.get_address(), name, + hostname), ambari_info) + if result.status_code != 201: + LOG.error( + _LE('Create host command failed. {result}').format( + result=result.text)) + raise ex.HadoopProvisionError( + _('Failed to add host: %s') % result.text) - node_group_name = host.node_group.name - # TODO(jspeidel): ensure that node group exists - node_group = cluster_spec.node_groups[node_group_name] - for component in node_group.components: - # don't add any AMBARI components - if component.find('AMBARI') != 0: - result = self._post(add_host_component_url.format( - ambari_info.get_address(), name, hostname, component), - ambari_info) - if result.status_code != 201: - LOG.error( - _LE('Create host_component command failed. ' - '{result}').format(result=result.text)) - raise ex.HadoopProvisionError( - _('Failed to add host component: %s') - % result.text) + node_group_name = host.node_group.name + # TODO(jspeidel): ensure that node group exists + node_group = cluster_spec.node_groups[node_group_name] + for component in node_group.components: + # don't add any AMBARI components + if component.find('AMBARI') != 0: + result = self._post(add_host_component_url.format( + ambari_info.get_address(), name, hostname, + component), ambari_info) + if result.status_code != 201: + LOG.error( + _LE('Create host_component command failed. ' + '{result}').format(result=result.text)) + raise ex.HadoopProvisionError( + _('Failed to add host component: %s') + % result.text) @cpo.event_wrapper( True, step=_("Install services"), param=('ambari_info', 2)) diff --git a/sahara/plugins/hdp/versions/version_2_0_6/services.py b/sahara/plugins/hdp/versions/version_2_0_6/services.py index 8245c3e1..ec0f7b72 100644 --- a/sahara/plugins/hdp/versions/version_2_0_6/services.py +++ b/sahara/plugins/hdp/versions/version_2_0_6/services.py @@ -19,6 +19,7 @@ from oslo_config import cfg from oslo_log import log as logging import six +from sahara import context from sahara import exceptions as e from sahara.i18n import _ from sahara.i18n import _LI @@ -1268,7 +1269,9 @@ class HueService(Service): for ng in hue_ngs: if ng.instances: for instance in ng.instances: - HueService._handle_pre_service_start(instance, - cluster_spec, - hue_ini, - create_user) + with context.set_current_instance_id( + instance.instance_id): + HueService._handle_pre_service_start(instance, + cluster_spec, + hue_ini, + create_user) diff --git a/sahara/plugins/hdp/versions/version_2_0_6/versionhandler.py b/sahara/plugins/hdp/versions/version_2_0_6/versionhandler.py index c6fd94bc..43330204 100644 --- a/sahara/plugins/hdp/versions/version_2_0_6/versionhandler.py +++ b/sahara/plugins/hdp/versions/version_2_0_6/versionhandler.py @@ -114,7 +114,9 @@ class VersionHandler(avm.AbstractVersionHandler): len(servers)) for server in servers: - server.install_swift_integration() + with context.set_current_instance_id( + server.instance['instance_id']): + server.install_swift_integration() def get_services_processor(self): return services @@ -308,36 +310,37 @@ class AmbariClient(object): add_host_component_url = ('http://{0}/api/v1/clusters/{1}' '/hosts/{2}/host_components/{3}') for host in servers: - hostname = host.instance.fqdn().lower() - result = self._post( - add_host_url.format(ambari_info.get_address(), name, hostname), - ambari_info) - if result.status_code != 201: - LOG.error( - _LE('Create host command failed. {result}').format( - result=result.text)) - raise ex.HadoopProvisionError( - _('Failed to add host: %s') % result.text) + with context.set_current_instance_id(host.instance['instance_id']): + hostname = host.instance.fqdn().lower() + result = self._post( + add_host_url.format(ambari_info.get_address(), name, + hostname), ambari_info) + if result.status_code != 201: + LOG.error( + _LE('Create host command failed. {result}').format( + result=result.text)) + raise ex.HadoopProvisionError( + _('Failed to add host: %s') % result.text) - node_group_name = host.node_group.name - # TODO(jspeidel): ensure that node group exists - node_group = cluster_spec.node_groups[node_group_name] - for component in node_group.components: - # Don't add any AMBARI or HUE components - # TODO(rlevas): Pragmatically determine if component is - # managed by Ambari - if (component.find('AMBARI') != 0 - and component.find('HUE') != 0): - result = self._post(add_host_component_url.format( - ambari_info.get_address(), name, hostname, component), - ambari_info) - if result.status_code != 201: - LOG.error( - _LE('Create host_component command failed. ' - '{result}').format(result=result.text)) - raise ex.HadoopProvisionError( - _('Failed to add host component: %s') - % result.text) + node_group_name = host.node_group.name + # TODO(jspeidel): ensure that node group exists + node_group = cluster_spec.node_groups[node_group_name] + for component in node_group.components: + # Don't add any AMBARI or HUE components + # TODO(rlevas): Pragmatically determine if component is + # managed by Ambari + if (component.find('AMBARI') != 0 + and component.find('HUE') != 0): + result = self._post(add_host_component_url.format( + ambari_info.get_address(), name, hostname, + component), ambari_info) + if result.status_code != 201: + LOG.error( + _LE('Create host_component command failed. ' + '{result}').format(result=result.text)) + raise ex.HadoopProvisionError( + _('Failed to add host component: %s') + % result.text) @cpo.event_wrapper( True, step=_("Install services"), param=('ambari_info', 2)) diff --git a/sahara/plugins/vanilla/hadoop2/config.py b/sahara/plugins/vanilla/hadoop2/config.py index 554d011f..6b3fab99 100644 --- a/sahara/plugins/vanilla/hadoop2/config.py +++ b/sahara/plugins/vanilla/hadoop2/config.py @@ -17,6 +17,7 @@ from oslo_config import cfg from oslo_log import log as logging import six +from sahara import context from sahara.i18n import _ from sahara.i18n import _LW from sahara.plugins.vanilla.hadoop2 import config_helper as c_helper @@ -63,7 +64,8 @@ def configure_instances(pctx, instances): instances[0].cluster_id, _("Configure instances"), len(instances)) for instance in instances: - _configure_instance(pctx, instance) + with context.set_current_instance_id(instance.instance_id): + _configure_instance(pctx, instance) @cpo.event_wrapper(True) diff --git a/sahara/plugins/vanilla/hadoop2/run_scripts.py b/sahara/plugins/vanilla/hadoop2/run_scripts.py index 2a697311..687c0f2d 100644 --- a/sahara/plugins/vanilla/hadoop2/run_scripts.py +++ b/sahara/plugins/vanilla/hadoop2/run_scripts.py @@ -45,10 +45,11 @@ def start_dn_nm_processes(instances): with context.ThreadGroup() as tg: for instance in instances: - processes = set(instance.node_group.node_processes) - processes = processes.intersection(filternames) - tg.spawn('vanilla-start-processes-%s' % instance.instance_name, - _start_processes, instance, list(processes)) + with context.set_current_instance_id(instance.instance_id): + processes = set(instance.node_group.node_processes) + processes = processes.intersection(filternames) + tg.spawn('vanilla-start-processes-%s' % instance.instance_name, + _start_processes, instance, list(processes)) @cpo.event_wrapper(True) @@ -80,20 +81,21 @@ def start_historyserver(instance): @cpo.event_wrapper(True, step=pu.start_process_event_message("Oozie")) def start_oozie_process(pctx, instance): - with instance.remote() as r: - if c_helper.is_mysql_enabled(pctx, instance.cluster): - _start_mysql(r) - LOG.debug("Creating Oozie DB Schema") - sql_script = files.get_file_text( - 'plugins/vanilla/hadoop2/resources/create_oozie_db.sql') - script_location = "create_oozie_db.sql" - r.write_file_to(script_location, sql_script) - r.execute_command('mysql -u root < %(script_location)s && ' - 'rm %(script_location)s' % - {"script_location": script_location}) + with context.set_current_instance_id(instance.instance_id): + with instance.remote() as r: + if c_helper.is_mysql_enabled(pctx, instance.cluster): + _start_mysql(r) + LOG.debug("Creating Oozie DB Schema") + sql_script = files.get_file_text( + 'plugins/vanilla/hadoop2/resources/create_oozie_db.sql') + script_location = "create_oozie_db.sql" + r.write_file_to(script_location, sql_script) + r.execute_command('mysql -u root < %(script_location)s && ' + 'rm %(script_location)s' % + {"script_location": script_location}) - _oozie_share_lib(r) - _start_oozie(r) + _oozie_share_lib(r) + _start_oozie(r) def format_namenode(instance): @@ -208,22 +210,23 @@ def _hive_metastore_start(remote): @cpo.event_wrapper(True, step=pu.start_process_event_message("HiveServer")) def start_hiveserver_process(pctx, instance): - with instance.remote() as r: - _hive_create_warehouse_dir(r) - _hive_copy_shared_conf( - r, edp.get_hive_shared_conf_path('hadoop')) + with context.set_current_instance_id(instance.instance_id): + with instance.remote() as r: + _hive_create_warehouse_dir(r) + _hive_copy_shared_conf( + r, edp.get_hive_shared_conf_path('hadoop')) - if c_helper.is_mysql_enabled(pctx, instance.cluster): - oozie = vu.get_oozie(instance.node_group.cluster) - if not oozie or instance.hostname() != oozie.hostname(): - _start_mysql(r) + if c_helper.is_mysql_enabled(pctx, instance.cluster): + oozie = vu.get_oozie(instance.node_group.cluster) + if not oozie or instance.hostname() != oozie.hostname(): + _start_mysql(r) - sql_script = files.get_file_text( - 'plugins/vanilla/hadoop2/resources/create_hive_db.sql' - ) + sql_script = files.get_file_text( + 'plugins/vanilla/hadoop2/resources/create_hive_db.sql' + ) - r.write_file_to('/tmp/create_hive_db.sql', sql_script) - _hive_create_db(r) - _hive_metastore_start(r) - LOG.info(_LI("Hive Metastore server at {host} has been " - "started").format(host=instance.hostname())) + r.write_file_to('/tmp/create_hive_db.sql', sql_script) + _hive_create_db(r) + _hive_metastore_start(r) + LOG.info(_LI("Hive Metastore server at {host} has been " + "started").format(host=instance.hostname())) diff --git a/sahara/plugins/vanilla/v1_2_1/versionhandler.py b/sahara/plugins/vanilla/v1_2_1/versionhandler.py index 1dc3ba87..cbadef98 100644 --- a/sahara/plugins/vanilla/v1_2_1/versionhandler.py +++ b/sahara/plugins/vanilla/v1_2_1/versionhandler.py @@ -159,13 +159,15 @@ class VersionHandler(avm.AbstractVersionHandler): nn_instance = vu.get_namenode(cluster) with remote.get_remote(oozie) as r: - if c_helper.is_mysql_enable(cluster): - run.mysql_start(r, oozie) - run.oozie_create_db(r) - run.oozie_share_lib(r, nn_instance.hostname()) - run.start_oozie(r) - LOG.info(_LI("Oozie service at {host} has been started").format( - host=nn_instance.hostname())) + with context.set_current_instance_id(oozie.instance_id): + if c_helper.is_mysql_enable(cluster): + run.mysql_start(r, oozie) + run.oozie_create_db(r) + run.oozie_share_lib(r, nn_instance.hostname()) + run.start_oozie(r) + LOG.info( + _LI("Oozie service at {host} has been started").format( + host=nn_instance.hostname())) def start_hiveserver(self, cluster): hs = vu.get_hiveserver(cluster) @@ -178,18 +180,19 @@ class VersionHandler(avm.AbstractVersionHandler): oozie = vu.get_oozie(cluster) with remote.get_remote(hive_server) as r: - run.hive_create_warehouse_dir(r) - run.hive_copy_shared_conf( - r, edp.get_hive_shared_conf_path('hadoop')) + with context.set_current_instance_id(hive_server.instance_id): + run.hive_create_warehouse_dir(r) + run.hive_copy_shared_conf( + r, edp.get_hive_shared_conf_path('hadoop')) - if c_helper.is_mysql_enable(cluster): - if not oozie or hive_server.hostname() != oozie.hostname(): - run.mysql_start(r, hive_server) - run.hive_create_db(r, cluster.extra['hive_mysql_passwd']) - run.hive_metastore_start(r) - LOG.info(_LI("Hive Metastore server at {host} has been " - "started").format( - host=hive_server.hostname())) + if c_helper.is_mysql_enable(cluster): + if not oozie or hive_server.hostname() != oozie.hostname(): + run.mysql_start(r, hive_server) + run.hive_create_db(r, cluster.extra['hive_mysql_passwd']) + run.hive_metastore_start(r) + LOG.info(_LI("Hive Metastore server at {host} has been " + "started").format( + host=hive_server.hostname())) def start_cluster(self, cluster): self.start_namenode(cluster) diff --git a/sahara/service/api.py b/sahara/service/api.py index 7aca7f5d..cc7b8d2c 100644 --- a/sahara/service/api.py +++ b/sahara/service/api.py @@ -54,6 +54,7 @@ def get_cluster(id, show_progress=False): def scale_cluster(id, data): + context.set_current_cluster_id(id) ctx = context.ctx() cluster = conductor.cluster_get(ctx, id) @@ -96,6 +97,7 @@ def scale_cluster(id, data): def create_cluster(values): ctx = context.ctx() cluster = conductor.cluster_create(ctx, values) + context.set_current_cluster_id(cluster.id) sender.notify(ctx, cluster.id, cluster.name, "New", "create") plugin = plugin_base.PLUGINS.get_plugin(cluster.plugin_name) @@ -124,6 +126,7 @@ def _add_ports_for_auto_sg(ctx, cluster, plugin): def terminate_cluster(id): + context.set_current_cluster_id(id) cluster = g.change_cluster_status(id, "Deleting") OPS.terminate_cluster(id) diff --git a/sahara/service/direct_engine.py b/sahara/service/direct_engine.py index c70c1d9b..6632fc37 100644 --- a/sahara/service/direct_engine.py +++ b/sahara/service/direct_engine.py @@ -282,7 +282,8 @@ class DirectEngine(e.Engine): cluster = g.change_cluster_status(cluster, "Deleting Instances") for instance in instances_to_delete: - self._shutdown_instance(instance) + with context.set_current_instance_id(instance.instance_id): + self._shutdown_instance(instance) self._await_deleted(cluster, instances_to_delete) for ng in cluster.node_groups: @@ -418,10 +419,11 @@ class DirectEngine(e.Engine): def _assign_floating_ips(self, instances): for instance in instances: - node_group = instance.node_group - if node_group.floating_ip_pool: - networks.assign_floating_ip(instance.instance_id, - node_group.floating_ip_pool) + with context.set_current_instance_id(instance.instance_id): + node_group = instance.node_group + if node_group.floating_ip_pool: + networks.assign_floating_ip(instance.instance_id, + node_group.floating_ip_pool) @poll_utils.poll_status( 'await_for_instances_active', @@ -431,9 +433,10 @@ class DirectEngine(e.Engine): return True for instance in instances: if instance.id not in active_ids: - if self._check_if_active(instance): - active_ids.add(instance.id) - cpo.add_successful_event(instance) + with context.set_current_instance_id(instance.instance_id): + if self._check_if_active(instance): + active_ids.add(instance.id) + cpo.add_successful_event(instance) return len(instances) == len(active_ids) def _await_active(self, cluster, instances): @@ -460,11 +463,12 @@ class DirectEngine(e.Engine): for instance in instances: if instance.id not in deleted_ids: - if self._check_if_deleted(instance): - LOG.debug("Instance {instance} is deleted".format( - instance=instance.instance_name)) - deleted_ids.add(instance.id) - cpo.add_successful_event(instance) + with context.set_current_instance_id(instance.instance_id): + if self._check_if_deleted(instance): + LOG.debug("Instance {instance} is deleted".format( + instance=instance.instance_name)) + deleted_ids.add(instance.id) + cpo.add_successful_event(instance) return len(deleted_ids) == len(instances) def _await_deleted(self, cluster, instances): @@ -503,7 +507,8 @@ class DirectEngine(e.Engine): """Attempt to rollback cluster scaling.""" for i in instances: - self._shutdown_instance(i) + with context.set_current_instance_id(i.instance_id): + self._shutdown_instance(i) cluster = conductor.cluster_get(context.ctx(), cluster) g.clean_cluster_from_empty_ng(cluster) @@ -511,7 +516,8 @@ class DirectEngine(e.Engine): def _shutdown_instances(self, cluster): for node_group in cluster.node_groups: for instance in node_group.instances: - self._shutdown_instance(instance) + with context.set_current_instance_id(instance.instance_id): + self._shutdown_instance(instance) self._await_deleted(cluster, node_group.instances) self._delete_auto_security_group(node_group) diff --git a/sahara/service/edp/api.py b/sahara/service/edp/api.py index e6f0218a..6279bd75 100644 --- a/sahara/service/edp/api.py +++ b/sahara/service/edp/api.py @@ -123,6 +123,7 @@ def execute_job(job_id, data): 'info': {'status': edp.JOB_STATUS_PENDING}, 'job_configs': configs, 'extra': {}} job_execution = conductor.job_execution_create(context.ctx(), job_ex_dict) + context.set_current_job_execution_id(job_execution.id) # check to use proxy user if p.job_execution_requires_proxy_user(job_execution): @@ -153,6 +154,7 @@ def get_job_execution(id): def cancel_job_execution(id): + context.set_current_job_execution_id(id) job_execution = conductor.job_execution_get(context.ctx(), id) OPS.cancel_job_execution(id) @@ -160,6 +162,7 @@ def cancel_job_execution(id): def delete_job_execution(id): + context.set_current_job_execution_id(id) OPS.delete_job_execution(id) diff --git a/sahara/service/engine.py b/sahara/service/engine.py index 2c09811e..ccfb09c2 100644 --- a/sahara/service/engine.py +++ b/sahara/service/engine.py @@ -72,9 +72,10 @@ class Engine(object): return True for instance in instances: if instance.id not in ips_assigned: - if networks.init_instances_ips(instance): - ips_assigned.add(instance.id) - cpo.add_successful_event(instance) + with context.set_current_instance_id(instance.instance_id): + if networks.init_instances_ips(instance): + ips_assigned.add(instance.id) + cpo.add_successful_event(instance) return len(ips_assigned) == len(instances) def _await_networks(self, cluster, instances): @@ -98,8 +99,9 @@ class Engine(object): with context.ThreadGroup() as tg: for instance in instances: - tg.spawn("wait-for-ssh-%s" % instance.instance_name, - self._wait_until_accessible, instance) + with context.set_current_instance_id(instance.instance_id): + tg.spawn("wait-for-ssh-%s" % instance.instance_name, + self._wait_until_accessible, instance) LOG.info(_LI("Cluster {cluster_id}: all instances are accessible") .format(cluster_id=cluster.id)) @@ -148,8 +150,10 @@ class Engine(object): with context.ThreadGroup() as tg: for node_group in cluster.node_groups: for instance in node_group.instances: - tg.spawn("configure-instance-%s" % instance.instance_name, - self._configure_instance, instance, hosts_file) + with context.set_current_instance_id(instance.instance_id): + tg.spawn( + "configure-instance-%s" % instance.instance_name, + self._configure_instance, instance, hosts_file) @cpo.event_wrapper(mark_successful_on_exit=True) def _configure_instance(self, instance, hosts_file): diff --git a/sahara/service/periodic.py b/sahara/service/periodic.py index e90c4340..4bdac85e 100644 --- a/sahara/service/periodic.py +++ b/sahara/service/periodic.py @@ -81,6 +81,7 @@ def get_time_since_last_update(cluster): def terminate_cluster(ctx, cluster, description): if CONF.use_identity_api_v3: trusts.use_os_admin_auth_token(cluster) + context.set_current_cluster_id(cluster.id) LOG.debug('Terminating {description} cluster {cluster} ' 'in "{status}" state with id {id}' diff --git a/sahara/service/volumes.py b/sahara/service/volumes.py index 3f3bf6b1..806ff475 100644 --- a/sahara/service/volumes.py +++ b/sahara/service/volumes.py @@ -59,9 +59,11 @@ def attach_to_instances(instances): with context.ThreadGroup() as tg: for instance in instances: if instance.node_group.volumes_per_node > 0: - tg.spawn( - 'attach-volumes-for-instance-%s' % instance.instance_name, - _attach_volumes_to_node, instance.node_group, instance) + with context.set_current_instance_id(instance.instance_id): + tg.spawn( + 'attach-volumes-for-instance-%s' + % instance.instance_name, _attach_volumes_to_node, + instance.node_group, instance) @poll_utils.poll_status( @@ -156,14 +158,15 @@ def mount_to_instances(instances): with context.ThreadGroup() as tg: for instance in instances: - devices = _find_instance_volume_devices(instance) - - # Since formatting can take several minutes (for large disks) and - # can be done in parallel, launch one thread per disk. - for idx in range(0, instance.node_group.volumes_per_node): - tg.spawn('mount-volume-%d-to-node-%s' % - (idx, instance.instance_name), - _mount_volume_to_node, instance, idx, devices[idx]) + with context.set_current_instance_id(instance.instance_id): + devices = _find_instance_volume_devices(instance) + # Since formating can take several minutes (for large disks) + # and can be done in parallel, launch one thread per disk. + for idx in range(0, instance.node_group.volumes_per_node): + tg.spawn( + 'mount-volume-%d-to-node-%s' % + (idx, instance.instance_name), + _mount_volume_to_node, instance, idx, devices[idx]) def _find_instance_volume_devices(instance): diff --git a/sahara/tests/unit/utils/test_ssh_remote.py b/sahara/tests/unit/utils/test_ssh_remote.py index de91bec0..87c33e0e 100644 --- a/sahara/tests/unit/utils/test_ssh_remote.py +++ b/sahara/tests/unit/utils/test_ssh_remote.py @@ -48,8 +48,9 @@ class FakeNodeGroup(object): class FakeInstance(object): - def __init__(self, inst_name, management_ip, user, priv_key): + def __init__(self, inst_name, inst_id, management_ip, user, priv_key): self.instance_name = inst_name + self.instance_id = inst_id self.management_ip = management_ip self.node_group = FakeNodeGroup(user, priv_key) @@ -107,7 +108,7 @@ class TestInstanceInteropHelper(base.SaharaTestCase): def test_use_floating_ips(self, p_adapter): self.override_config('use_floating_ips', True) - instance = FakeInstance('inst1', '10.0.0.1', 'user1', 'key1') + instance = FakeInstance('inst1', '123', '10.0.0.1', 'user1', 'key1') remote = ssh_remote.InstanceInteropHelper(instance) # Test SSH @@ -128,7 +129,7 @@ class TestInstanceInteropHelper(base.SaharaTestCase): self.override_config('use_floating_ips', False) self.override_config('use_namespaces', True) - instance = FakeInstance('inst2', '10.0.0.2', 'user2', 'key2') + instance = FakeInstance('inst2', '123', '10.0.0.2', 'user2', 'key2') remote = ssh_remote.InstanceInteropHelper(instance) # Test SSH @@ -152,7 +153,7 @@ class TestInstanceInteropHelper(base.SaharaTestCase): def test_proxy_command(self, p_adapter, p_simple_exec_func): self.override_config('proxy_command', 'ssh fakerelay nc {host} {port}') - instance = FakeInstance('inst3', '10.0.0.3', 'user3', 'key3') + instance = FakeInstance('inst3', '123', '10.0.0.3', 'user3', 'key3') remote = ssh_remote.InstanceInteropHelper(instance) # Test SSH @@ -171,7 +172,7 @@ class TestInstanceInteropHelper(base.SaharaTestCase): def test_proxy_command_bad(self): self.override_config('proxy_command', '{bad_kw} nc {host} {port}') - instance = FakeInstance('inst4', '10.0.0.4', 'user4', 'key4') + instance = FakeInstance('inst4', '123', '10.0.0.4', 'user4', 'key4') remote = ssh_remote.InstanceInteropHelper(instance) # Test SSH diff --git a/sahara/utils/ssh_remote.py b/sahara/utils/ssh_remote.py index 5382c31f..2d4204bd 100644 --- a/sahara/utils/ssh_remote.py +++ b/sahara/utils/ssh_remote.py @@ -763,8 +763,8 @@ class InstanceInteropHelper(remote.Remote): self._run_s(_execute_on_vm_interactive, timeout, cmd, matcher) def _log_command(self, str): - LOG.debug('[{instance}] {command}'.format( - instance=self.instance.instance_name, command=str)) + with context.set_current_instance_id(self.instance.instance_id): + LOG.debug(str) class BulkInstanceInteropHelper(InstanceInteropHelper):