Adding cluster, instance, job_execution ids to logs

implements bp: logs-improvement

Change-Id: Ic4b64fa44e5a9a8bddcceef87fe5c706e31048d9
This commit is contained in:
Andrey Pavlov 2014-12-25 14:54:06 +03:00
parent 477a72f3e4
commit d39332a686
16 changed files with 226 additions and 158 deletions

View File

@ -301,3 +301,29 @@ class InstanceInfoManager(object):
def __exit__(self, *args):
current().current_instance_info = self.prev_instance_info
def set_current_cluster_id(cluster_id):
current().resource_uuid = 'none, cluster: %s' % cluster_id
def set_current_job_execution_id(je_id):
current().resource_uuid = 'none, job_execution: %s' % je_id
class SetCurrentInstanceId(object):
def __init__(self, instance_id):
ctx = current()
self.prev_uuid = ctx.resource_uuid
if ctx.resource_uuid:
ctx.resource_uuid = ctx.resource_uuid.replace('none', instance_id)
def __enter__(self):
pass
def __exit__(self, *ex):
current().resource_uuid = self.prev_uuid
def set_current_instance_id(instance_id):
return SetCurrentInstanceId(instance_id)

View File

@ -187,9 +187,11 @@ class AmbariPlugin(p.ProvisioningPluginBase):
_("Provision cluster via Ambari"), len(servers))
for server in servers:
self._spawn(
"hdp-provision-instance-%s" % server.instance.hostname(),
server.provision_ambari, ambari_info, cluster_spec)
with context.set_current_instance_id(
server.instance['instance_id']):
self._spawn(
"hdp-provision-instance-%s" % server.instance.hostname(),
server.provision_ambari, ambari_info, cluster_spec)
handler = self.version_factory.get_version_handler(version)
ambari_client = handler.get_ambari_client()
@ -326,8 +328,10 @@ class AmbariPlugin(p.ProvisioningPluginBase):
cluster.id, _("Provision cluster via Ambari"), len(servers))
for server in servers:
self._spawn('Ambari provisioning thread',
server.provision_ambari, ambari_info, cluster_spec)
with context.set_current_instance_id(
server.instance['instance_id']):
self._spawn('Ambari provisioning thread',
server.provision_ambari, ambari_info, cluster_spec)
ambari_client.configure_scaled_cluster_instances(
cluster.name, cluster_spec, self._get_num_hosts(cluster),

View File

@ -124,7 +124,9 @@ class VersionHandler(avm.AbstractVersionHandler):
len(servers))
for server in servers:
server.install_swift_integration()
with context.set_current_instance_id(
server.instance['instance_id']):
server.install_swift_integration()
def get_services_processor(self):
return services
@ -330,33 +332,34 @@ class AmbariClient(object):
add_host_component_url = ('http://{0}/api/v1/clusters/{1}'
'/hosts/{2}/host_components/{3}')
for host in servers:
hostname = host.instance.fqdn().lower()
result = self._post(
add_host_url.format(ambari_info.get_address(), name, hostname),
ambari_info)
if result.status_code != 201:
LOG.error(
_LE('Create host command failed. {result}').format(
result=result.text))
raise ex.HadoopProvisionError(
_('Failed to add host: %s') % result.text)
with context.set_current_instance_id(host.instance['instance_id']):
hostname = host.instance.fqdn().lower()
result = self._post(
add_host_url.format(ambari_info.get_address(), name,
hostname), ambari_info)
if result.status_code != 201:
LOG.error(
_LE('Create host command failed. {result}').format(
result=result.text))
raise ex.HadoopProvisionError(
_('Failed to add host: %s') % result.text)
node_group_name = host.node_group.name
# TODO(jspeidel): ensure that node group exists
node_group = cluster_spec.node_groups[node_group_name]
for component in node_group.components:
# don't add any AMBARI components
if component.find('AMBARI') != 0:
result = self._post(add_host_component_url.format(
ambari_info.get_address(), name, hostname, component),
ambari_info)
if result.status_code != 201:
LOG.error(
_LE('Create host_component command failed. '
'{result}').format(result=result.text))
raise ex.HadoopProvisionError(
_('Failed to add host component: %s')
% result.text)
node_group_name = host.node_group.name
# TODO(jspeidel): ensure that node group exists
node_group = cluster_spec.node_groups[node_group_name]
for component in node_group.components:
# don't add any AMBARI components
if component.find('AMBARI') != 0:
result = self._post(add_host_component_url.format(
ambari_info.get_address(), name, hostname,
component), ambari_info)
if result.status_code != 201:
LOG.error(
_LE('Create host_component command failed. '
'{result}').format(result=result.text))
raise ex.HadoopProvisionError(
_('Failed to add host component: %s')
% result.text)
@cpo.event_wrapper(
True, step=_("Install services"), param=('ambari_info', 2))

View File

@ -19,6 +19,7 @@ from oslo_config import cfg
from oslo_log import log as logging
import six
from sahara import context
from sahara import exceptions as e
from sahara.i18n import _
from sahara.i18n import _LI
@ -1268,7 +1269,9 @@ class HueService(Service):
for ng in hue_ngs:
if ng.instances:
for instance in ng.instances:
HueService._handle_pre_service_start(instance,
cluster_spec,
hue_ini,
create_user)
with context.set_current_instance_id(
instance.instance_id):
HueService._handle_pre_service_start(instance,
cluster_spec,
hue_ini,
create_user)

View File

@ -114,7 +114,9 @@ class VersionHandler(avm.AbstractVersionHandler):
len(servers))
for server in servers:
server.install_swift_integration()
with context.set_current_instance_id(
server.instance['instance_id']):
server.install_swift_integration()
def get_services_processor(self):
return services
@ -308,36 +310,37 @@ class AmbariClient(object):
add_host_component_url = ('http://{0}/api/v1/clusters/{1}'
'/hosts/{2}/host_components/{3}')
for host in servers:
hostname = host.instance.fqdn().lower()
result = self._post(
add_host_url.format(ambari_info.get_address(), name, hostname),
ambari_info)
if result.status_code != 201:
LOG.error(
_LE('Create host command failed. {result}').format(
result=result.text))
raise ex.HadoopProvisionError(
_('Failed to add host: %s') % result.text)
with context.set_current_instance_id(host.instance['instance_id']):
hostname = host.instance.fqdn().lower()
result = self._post(
add_host_url.format(ambari_info.get_address(), name,
hostname), ambari_info)
if result.status_code != 201:
LOG.error(
_LE('Create host command failed. {result}').format(
result=result.text))
raise ex.HadoopProvisionError(
_('Failed to add host: %s') % result.text)
node_group_name = host.node_group.name
# TODO(jspeidel): ensure that node group exists
node_group = cluster_spec.node_groups[node_group_name]
for component in node_group.components:
# Don't add any AMBARI or HUE components
# TODO(rlevas): Pragmatically determine if component is
# managed by Ambari
if (component.find('AMBARI') != 0
and component.find('HUE') != 0):
result = self._post(add_host_component_url.format(
ambari_info.get_address(), name, hostname, component),
ambari_info)
if result.status_code != 201:
LOG.error(
_LE('Create host_component command failed. '
'{result}').format(result=result.text))
raise ex.HadoopProvisionError(
_('Failed to add host component: %s')
% result.text)
node_group_name = host.node_group.name
# TODO(jspeidel): ensure that node group exists
node_group = cluster_spec.node_groups[node_group_name]
for component in node_group.components:
# Don't add any AMBARI or HUE components
# TODO(rlevas): Pragmatically determine if component is
# managed by Ambari
if (component.find('AMBARI') != 0
and component.find('HUE') != 0):
result = self._post(add_host_component_url.format(
ambari_info.get_address(), name, hostname,
component), ambari_info)
if result.status_code != 201:
LOG.error(
_LE('Create host_component command failed. '
'{result}').format(result=result.text))
raise ex.HadoopProvisionError(
_('Failed to add host component: %s')
% result.text)
@cpo.event_wrapper(
True, step=_("Install services"), param=('ambari_info', 2))

View File

@ -17,6 +17,7 @@ from oslo_config import cfg
from oslo_log import log as logging
import six
from sahara import context
from sahara.i18n import _
from sahara.i18n import _LW
from sahara.plugins.vanilla.hadoop2 import config_helper as c_helper
@ -63,7 +64,8 @@ def configure_instances(pctx, instances):
instances[0].cluster_id, _("Configure instances"), len(instances))
for instance in instances:
_configure_instance(pctx, instance)
with context.set_current_instance_id(instance.instance_id):
_configure_instance(pctx, instance)
@cpo.event_wrapper(True)

View File

@ -45,10 +45,11 @@ def start_dn_nm_processes(instances):
with context.ThreadGroup() as tg:
for instance in instances:
processes = set(instance.node_group.node_processes)
processes = processes.intersection(filternames)
tg.spawn('vanilla-start-processes-%s' % instance.instance_name,
_start_processes, instance, list(processes))
with context.set_current_instance_id(instance.instance_id):
processes = set(instance.node_group.node_processes)
processes = processes.intersection(filternames)
tg.spawn('vanilla-start-processes-%s' % instance.instance_name,
_start_processes, instance, list(processes))
@cpo.event_wrapper(True)
@ -80,20 +81,21 @@ def start_historyserver(instance):
@cpo.event_wrapper(True, step=pu.start_process_event_message("Oozie"))
def start_oozie_process(pctx, instance):
with instance.remote() as r:
if c_helper.is_mysql_enabled(pctx, instance.cluster):
_start_mysql(r)
LOG.debug("Creating Oozie DB Schema")
sql_script = files.get_file_text(
'plugins/vanilla/hadoop2/resources/create_oozie_db.sql')
script_location = "create_oozie_db.sql"
r.write_file_to(script_location, sql_script)
r.execute_command('mysql -u root < %(script_location)s && '
'rm %(script_location)s' %
{"script_location": script_location})
with context.set_current_instance_id(instance.instance_id):
with instance.remote() as r:
if c_helper.is_mysql_enabled(pctx, instance.cluster):
_start_mysql(r)
LOG.debug("Creating Oozie DB Schema")
sql_script = files.get_file_text(
'plugins/vanilla/hadoop2/resources/create_oozie_db.sql')
script_location = "create_oozie_db.sql"
r.write_file_to(script_location, sql_script)
r.execute_command('mysql -u root < %(script_location)s && '
'rm %(script_location)s' %
{"script_location": script_location})
_oozie_share_lib(r)
_start_oozie(r)
_oozie_share_lib(r)
_start_oozie(r)
def format_namenode(instance):
@ -208,22 +210,23 @@ def _hive_metastore_start(remote):
@cpo.event_wrapper(True, step=pu.start_process_event_message("HiveServer"))
def start_hiveserver_process(pctx, instance):
with instance.remote() as r:
_hive_create_warehouse_dir(r)
_hive_copy_shared_conf(
r, edp.get_hive_shared_conf_path('hadoop'))
with context.set_current_instance_id(instance.instance_id):
with instance.remote() as r:
_hive_create_warehouse_dir(r)
_hive_copy_shared_conf(
r, edp.get_hive_shared_conf_path('hadoop'))
if c_helper.is_mysql_enabled(pctx, instance.cluster):
oozie = vu.get_oozie(instance.node_group.cluster)
if not oozie or instance.hostname() != oozie.hostname():
_start_mysql(r)
if c_helper.is_mysql_enabled(pctx, instance.cluster):
oozie = vu.get_oozie(instance.node_group.cluster)
if not oozie or instance.hostname() != oozie.hostname():
_start_mysql(r)
sql_script = files.get_file_text(
'plugins/vanilla/hadoop2/resources/create_hive_db.sql'
)
sql_script = files.get_file_text(
'plugins/vanilla/hadoop2/resources/create_hive_db.sql'
)
r.write_file_to('/tmp/create_hive_db.sql', sql_script)
_hive_create_db(r)
_hive_metastore_start(r)
LOG.info(_LI("Hive Metastore server at {host} has been "
"started").format(host=instance.hostname()))
r.write_file_to('/tmp/create_hive_db.sql', sql_script)
_hive_create_db(r)
_hive_metastore_start(r)
LOG.info(_LI("Hive Metastore server at {host} has been "
"started").format(host=instance.hostname()))

View File

@ -159,13 +159,15 @@ class VersionHandler(avm.AbstractVersionHandler):
nn_instance = vu.get_namenode(cluster)
with remote.get_remote(oozie) as r:
if c_helper.is_mysql_enable(cluster):
run.mysql_start(r, oozie)
run.oozie_create_db(r)
run.oozie_share_lib(r, nn_instance.hostname())
run.start_oozie(r)
LOG.info(_LI("Oozie service at {host} has been started").format(
host=nn_instance.hostname()))
with context.set_current_instance_id(oozie.instance_id):
if c_helper.is_mysql_enable(cluster):
run.mysql_start(r, oozie)
run.oozie_create_db(r)
run.oozie_share_lib(r, nn_instance.hostname())
run.start_oozie(r)
LOG.info(
_LI("Oozie service at {host} has been started").format(
host=nn_instance.hostname()))
def start_hiveserver(self, cluster):
hs = vu.get_hiveserver(cluster)
@ -178,18 +180,19 @@ class VersionHandler(avm.AbstractVersionHandler):
oozie = vu.get_oozie(cluster)
with remote.get_remote(hive_server) as r:
run.hive_create_warehouse_dir(r)
run.hive_copy_shared_conf(
r, edp.get_hive_shared_conf_path('hadoop'))
with context.set_current_instance_id(hive_server.instance_id):
run.hive_create_warehouse_dir(r)
run.hive_copy_shared_conf(
r, edp.get_hive_shared_conf_path('hadoop'))
if c_helper.is_mysql_enable(cluster):
if not oozie or hive_server.hostname() != oozie.hostname():
run.mysql_start(r, hive_server)
run.hive_create_db(r, cluster.extra['hive_mysql_passwd'])
run.hive_metastore_start(r)
LOG.info(_LI("Hive Metastore server at {host} has been "
"started").format(
host=hive_server.hostname()))
if c_helper.is_mysql_enable(cluster):
if not oozie or hive_server.hostname() != oozie.hostname():
run.mysql_start(r, hive_server)
run.hive_create_db(r, cluster.extra['hive_mysql_passwd'])
run.hive_metastore_start(r)
LOG.info(_LI("Hive Metastore server at {host} has been "
"started").format(
host=hive_server.hostname()))
def start_cluster(self, cluster):
self.start_namenode(cluster)

View File

@ -54,6 +54,7 @@ def get_cluster(id, show_progress=False):
def scale_cluster(id, data):
context.set_current_cluster_id(id)
ctx = context.ctx()
cluster = conductor.cluster_get(ctx, id)
@ -96,6 +97,7 @@ def scale_cluster(id, data):
def create_cluster(values):
ctx = context.ctx()
cluster = conductor.cluster_create(ctx, values)
context.set_current_cluster_id(cluster.id)
sender.notify(ctx, cluster.id, cluster.name, "New",
"create")
plugin = plugin_base.PLUGINS.get_plugin(cluster.plugin_name)
@ -124,6 +126,7 @@ def _add_ports_for_auto_sg(ctx, cluster, plugin):
def terminate_cluster(id):
context.set_current_cluster_id(id)
cluster = g.change_cluster_status(id, "Deleting")
OPS.terminate_cluster(id)

View File

@ -282,7 +282,8 @@ class DirectEngine(e.Engine):
cluster = g.change_cluster_status(cluster, "Deleting Instances")
for instance in instances_to_delete:
self._shutdown_instance(instance)
with context.set_current_instance_id(instance.instance_id):
self._shutdown_instance(instance)
self._await_deleted(cluster, instances_to_delete)
for ng in cluster.node_groups:
@ -418,10 +419,11 @@ class DirectEngine(e.Engine):
def _assign_floating_ips(self, instances):
for instance in instances:
node_group = instance.node_group
if node_group.floating_ip_pool:
networks.assign_floating_ip(instance.instance_id,
node_group.floating_ip_pool)
with context.set_current_instance_id(instance.instance_id):
node_group = instance.node_group
if node_group.floating_ip_pool:
networks.assign_floating_ip(instance.instance_id,
node_group.floating_ip_pool)
@poll_utils.poll_status(
'await_for_instances_active',
@ -431,9 +433,10 @@ class DirectEngine(e.Engine):
return True
for instance in instances:
if instance.id not in active_ids:
if self._check_if_active(instance):
active_ids.add(instance.id)
cpo.add_successful_event(instance)
with context.set_current_instance_id(instance.instance_id):
if self._check_if_active(instance):
active_ids.add(instance.id)
cpo.add_successful_event(instance)
return len(instances) == len(active_ids)
def _await_active(self, cluster, instances):
@ -460,11 +463,12 @@ class DirectEngine(e.Engine):
for instance in instances:
if instance.id not in deleted_ids:
if self._check_if_deleted(instance):
LOG.debug("Instance {instance} is deleted".format(
instance=instance.instance_name))
deleted_ids.add(instance.id)
cpo.add_successful_event(instance)
with context.set_current_instance_id(instance.instance_id):
if self._check_if_deleted(instance):
LOG.debug("Instance {instance} is deleted".format(
instance=instance.instance_name))
deleted_ids.add(instance.id)
cpo.add_successful_event(instance)
return len(deleted_ids) == len(instances)
def _await_deleted(self, cluster, instances):
@ -503,7 +507,8 @@ class DirectEngine(e.Engine):
"""Attempt to rollback cluster scaling."""
for i in instances:
self._shutdown_instance(i)
with context.set_current_instance_id(i.instance_id):
self._shutdown_instance(i)
cluster = conductor.cluster_get(context.ctx(), cluster)
g.clean_cluster_from_empty_ng(cluster)
@ -511,7 +516,8 @@ class DirectEngine(e.Engine):
def _shutdown_instances(self, cluster):
for node_group in cluster.node_groups:
for instance in node_group.instances:
self._shutdown_instance(instance)
with context.set_current_instance_id(instance.instance_id):
self._shutdown_instance(instance)
self._await_deleted(cluster, node_group.instances)
self._delete_auto_security_group(node_group)

View File

@ -123,6 +123,7 @@ def execute_job(job_id, data):
'info': {'status': edp.JOB_STATUS_PENDING},
'job_configs': configs, 'extra': {}}
job_execution = conductor.job_execution_create(context.ctx(), job_ex_dict)
context.set_current_job_execution_id(job_execution.id)
# check to use proxy user
if p.job_execution_requires_proxy_user(job_execution):
@ -153,6 +154,7 @@ def get_job_execution(id):
def cancel_job_execution(id):
context.set_current_job_execution_id(id)
job_execution = conductor.job_execution_get(context.ctx(), id)
OPS.cancel_job_execution(id)
@ -160,6 +162,7 @@ def cancel_job_execution(id):
def delete_job_execution(id):
context.set_current_job_execution_id(id)
OPS.delete_job_execution(id)

View File

@ -72,9 +72,10 @@ class Engine(object):
return True
for instance in instances:
if instance.id not in ips_assigned:
if networks.init_instances_ips(instance):
ips_assigned.add(instance.id)
cpo.add_successful_event(instance)
with context.set_current_instance_id(instance.instance_id):
if networks.init_instances_ips(instance):
ips_assigned.add(instance.id)
cpo.add_successful_event(instance)
return len(ips_assigned) == len(instances)
def _await_networks(self, cluster, instances):
@ -98,8 +99,9 @@ class Engine(object):
with context.ThreadGroup() as tg:
for instance in instances:
tg.spawn("wait-for-ssh-%s" % instance.instance_name,
self._wait_until_accessible, instance)
with context.set_current_instance_id(instance.instance_id):
tg.spawn("wait-for-ssh-%s" % instance.instance_name,
self._wait_until_accessible, instance)
LOG.info(_LI("Cluster {cluster_id}: all instances are accessible")
.format(cluster_id=cluster.id))
@ -148,8 +150,10 @@ class Engine(object):
with context.ThreadGroup() as tg:
for node_group in cluster.node_groups:
for instance in node_group.instances:
tg.spawn("configure-instance-%s" % instance.instance_name,
self._configure_instance, instance, hosts_file)
with context.set_current_instance_id(instance.instance_id):
tg.spawn(
"configure-instance-%s" % instance.instance_name,
self._configure_instance, instance, hosts_file)
@cpo.event_wrapper(mark_successful_on_exit=True)
def _configure_instance(self, instance, hosts_file):

View File

@ -81,6 +81,7 @@ def get_time_since_last_update(cluster):
def terminate_cluster(ctx, cluster, description):
if CONF.use_identity_api_v3:
trusts.use_os_admin_auth_token(cluster)
context.set_current_cluster_id(cluster.id)
LOG.debug('Terminating {description} cluster {cluster} '
'in "{status}" state with id {id}'

View File

@ -59,9 +59,11 @@ def attach_to_instances(instances):
with context.ThreadGroup() as tg:
for instance in instances:
if instance.node_group.volumes_per_node > 0:
tg.spawn(
'attach-volumes-for-instance-%s' % instance.instance_name,
_attach_volumes_to_node, instance.node_group, instance)
with context.set_current_instance_id(instance.instance_id):
tg.spawn(
'attach-volumes-for-instance-%s'
% instance.instance_name, _attach_volumes_to_node,
instance.node_group, instance)
@poll_utils.poll_status(
@ -156,14 +158,15 @@ def mount_to_instances(instances):
with context.ThreadGroup() as tg:
for instance in instances:
devices = _find_instance_volume_devices(instance)
# Since formatting can take several minutes (for large disks) and
# can be done in parallel, launch one thread per disk.
for idx in range(0, instance.node_group.volumes_per_node):
tg.spawn('mount-volume-%d-to-node-%s' %
(idx, instance.instance_name),
_mount_volume_to_node, instance, idx, devices[idx])
with context.set_current_instance_id(instance.instance_id):
devices = _find_instance_volume_devices(instance)
# Since formating can take several minutes (for large disks)
# and can be done in parallel, launch one thread per disk.
for idx in range(0, instance.node_group.volumes_per_node):
tg.spawn(
'mount-volume-%d-to-node-%s' %
(idx, instance.instance_name),
_mount_volume_to_node, instance, idx, devices[idx])
def _find_instance_volume_devices(instance):

View File

@ -48,8 +48,9 @@ class FakeNodeGroup(object):
class FakeInstance(object):
def __init__(self, inst_name, management_ip, user, priv_key):
def __init__(self, inst_name, inst_id, management_ip, user, priv_key):
self.instance_name = inst_name
self.instance_id = inst_id
self.management_ip = management_ip
self.node_group = FakeNodeGroup(user, priv_key)
@ -107,7 +108,7 @@ class TestInstanceInteropHelper(base.SaharaTestCase):
def test_use_floating_ips(self, p_adapter):
self.override_config('use_floating_ips', True)
instance = FakeInstance('inst1', '10.0.0.1', 'user1', 'key1')
instance = FakeInstance('inst1', '123', '10.0.0.1', 'user1', 'key1')
remote = ssh_remote.InstanceInteropHelper(instance)
# Test SSH
@ -128,7 +129,7 @@ class TestInstanceInteropHelper(base.SaharaTestCase):
self.override_config('use_floating_ips', False)
self.override_config('use_namespaces', True)
instance = FakeInstance('inst2', '10.0.0.2', 'user2', 'key2')
instance = FakeInstance('inst2', '123', '10.0.0.2', 'user2', 'key2')
remote = ssh_remote.InstanceInteropHelper(instance)
# Test SSH
@ -152,7 +153,7 @@ class TestInstanceInteropHelper(base.SaharaTestCase):
def test_proxy_command(self, p_adapter, p_simple_exec_func):
self.override_config('proxy_command', 'ssh fakerelay nc {host} {port}')
instance = FakeInstance('inst3', '10.0.0.3', 'user3', 'key3')
instance = FakeInstance('inst3', '123', '10.0.0.3', 'user3', 'key3')
remote = ssh_remote.InstanceInteropHelper(instance)
# Test SSH
@ -171,7 +172,7 @@ class TestInstanceInteropHelper(base.SaharaTestCase):
def test_proxy_command_bad(self):
self.override_config('proxy_command', '{bad_kw} nc {host} {port}')
instance = FakeInstance('inst4', '10.0.0.4', 'user4', 'key4')
instance = FakeInstance('inst4', '123', '10.0.0.4', 'user4', 'key4')
remote = ssh_remote.InstanceInteropHelper(instance)
# Test SSH

View File

@ -763,8 +763,8 @@ class InstanceInteropHelper(remote.Remote):
self._run_s(_execute_on_vm_interactive, timeout, cmd, matcher)
def _log_command(self, str):
LOG.debug('[{instance}] {command}'.format(
instance=self.instance.instance_name, command=str))
with context.set_current_instance_id(self.instance.instance_id):
LOG.debug(str)
class BulkInstanceInteropHelper(InstanceInteropHelper):