From 299bc2f343a53fef4316c67a59430cf9a1d329c7 Mon Sep 17 00:00:00 2001 From: Vitaly Gridnev Date: Tue, 3 Mar 2015 15:23:52 +0300 Subject: [PATCH] Add usages of plugin poll - part 1 It's required to add support of plugin poll in plugin, so it's require big changes. We would do it in series of small CR. This patch introduce support in vanilla 1 and in hdp. partially implements bp: add-timeouts-for-polling Change-Id: I46e277beabe00f8d18b2370065dc07c39a63f1bd --- sahara/plugins/hdp/configprovider.py | 27 ++++- .../versions/version_1_3_2/versionhandler.py | 29 +++-- .../versions/version_2_0_6/versionhandler.py | 110 +++++++++--------- .../plugins/vanilla/v1_2_1/config_helper.py | 6 + sahara/plugins/vanilla/v1_2_1/scaling.py | 52 ++++----- .../plugins/vanilla/v1_2_1/versionhandler.py | 24 ++-- sahara/utils/general.py | 47 -------- 7 files changed, 129 insertions(+), 166 deletions(-) diff --git a/sahara/plugins/hdp/configprovider.py b/sahara/plugins/hdp/configprovider.py index 25c16554..a5fe926f 100644 --- a/sahara/plugins/hdp/configprovider.py +++ b/sahara/plugins/hdp/configprovider.py @@ -13,16 +13,32 @@ # See the License for the specific language governing permissions and # limitations under the License. +import copy + from sahara import exceptions from sahara.i18n import _ from sahara.plugins import provisioning as p +HOST_REGISTRATIONS_TIMEOUT = p.Config( + 'Host registrations timeout', 'general', + 'cluster', config_type='int', priority=1, + default_value=3600, is_optional=True, + description='Timeout for host registrations, in seconds') + +DECOMMISSIONING_TIMEOUT = p.Config( + 'Timeout for decommissioning nodes', 'general', + 'cluster', config_type='int', priority=1, + default_value=1000, is_optional=True, + description='Timeout for decommissioning nodes, in seconds') + + class ConfigurationProvider(object): - def __init__(self, config): + def __init__(self, config, hadoop_version): self.config = config self.config_mapper = {} self.config_items = [] + self.hadoop_version = hadoop_version self._initialize(config) def get_config_items(self): @@ -67,3 +83,12 @@ class ConfigurationProvider(object): self.config_mapper[service_property['name']] = ( self._get_target( service_property['applicable_target'])) + host_reg_timeout = copy.copy(HOST_REGISTRATIONS_TIMEOUT) + setattr(host_reg_timeout, 'tag', 'global') + self.config_items.append(host_reg_timeout) + self.config_mapper[host_reg_timeout.name] = 'global' + if self.hadoop_version == '2.0.6': + dec_timeout = copy.copy(DECOMMISSIONING_TIMEOUT) + setattr(dec_timeout, 'tag', 'global') + self.config_items.append(dec_timeout) + self.config_mapper[dec_timeout.name] = 'global' diff --git a/sahara/plugins/hdp/versions/version_1_3_2/versionhandler.py b/sahara/plugins/hdp/versions/version_1_3_2/versionhandler.py index f8980500..c0d47a37 100644 --- a/sahara/plugins/hdp/versions/version_1_3_2/versionhandler.py +++ b/sahara/plugins/hdp/versions/version_1_3_2/versionhandler.py @@ -32,7 +32,7 @@ from sahara.plugins.hdp.versions import abstractversionhandler as avm from sahara.plugins.hdp.versions.version_1_3_2 import edp_engine from sahara.plugins.hdp.versions.version_1_3_2 import services from sahara.utils import cluster_progress_ops as cpo -from sahara.utils import general as g +from sahara.utils import poll_utils from sahara import version @@ -40,14 +40,6 @@ LOG = logging.getLogger(__name__) CONF = cfg.CONF -def _check_ambari(obj): - try: - obj.is_ambari_info() - return obj.get_cluster() - except AttributeError: - return None - - class VersionHandler(avm.AbstractVersionHandler): config_provider = None version = None @@ -62,7 +54,8 @@ class VersionHandler(avm.AbstractVersionHandler): json.load(pkg.resource_stream( version.version_info.package, 'plugins/hdp/versions/version_1_3_2/resources/' - 'ambari-config-resource.json'))) + 'ambari-config-resource.json')), + hadoop_version='1.3.2') return self.config_provider @@ -575,11 +568,7 @@ class AmbariClient(object): 'components in scaled instances. status' ' code returned = {0}').format(result.status)) - @cpo.event_wrapper(True, step=_("Wait for all Ambari agents to register"), - param=('ambari_info', 2)) - @g.await_process( - 3600, 5, _("Ambari agents registering with server"), _check_ambari) - def wait_for_host_registrations(self, num_hosts, ambari_info): + def _check_host_registrations(self, num_hosts, ambari_info): url = 'http://{0}/api/v1/hosts'.format(ambari_info.get_address()) try: result = self._get(url, ambari_info) @@ -597,6 +586,16 @@ class AmbariClient(object): LOG.debug('Waiting to connect to ambari server') return False + @cpo.event_wrapper(True, step=_("Wait for all Ambari agents to register"), + param=('ambari_info', 2)) + def wait_for_host_registrations(self, num_hosts, ambari_info): + cluster = ambari_info.get_cluster() + poll_utils.plugin_option_poll( + cluster, self._check_host_registrations, + cfgprov.HOST_REGISTRATIONS_TIMEOUT, + _("Wait for host registrations"), 5, { + 'num_hosts': num_hosts, 'ambari_info': ambari_info}) + def update_ambari_admin_user(self, password, ambari_info): old_pwd = ambari_info.password user_url = 'http://{0}/api/v1/users/admin'.format( diff --git a/sahara/plugins/hdp/versions/version_2_0_6/versionhandler.py b/sahara/plugins/hdp/versions/version_2_0_6/versionhandler.py index 6c3b15b7..32dce78c 100644 --- a/sahara/plugins/hdp/versions/version_2_0_6/versionhandler.py +++ b/sahara/plugins/hdp/versions/version_2_0_6/versionhandler.py @@ -33,7 +33,7 @@ from sahara.plugins.hdp.versions import abstractversionhandler as avm from sahara.plugins.hdp.versions.version_2_0_6 import edp_engine from sahara.plugins.hdp.versions.version_2_0_6 import services from sahara.utils import cluster_progress_ops as cpo -from sahara.utils import general as g +from sahara.utils import poll_utils from sahara import version @@ -41,14 +41,6 @@ LOG = logging.getLogger(__name__) CONF = cfg.CONF -def _check_ambari(obj): - try: - obj.is_ambari_info() - return obj.get_cluster() - except AttributeError: - return None - - class VersionHandler(avm.AbstractVersionHandler): config_provider = None version = None @@ -63,7 +55,8 @@ class VersionHandler(avm.AbstractVersionHandler): json.load(pkg.resource_stream( version.version_info.package, 'plugins/hdp/versions/version_2_0_6/resources/' - 'ambari-config-resource.json'))) + 'ambari-config-resource.json')), + hadoop_version='2.0.6') return self.config_provider @@ -559,20 +552,16 @@ class AmbariClient(object): 'components in scaled instances. status' ' code returned = {0}').format(result.status)) - @cpo.event_wrapper(True, step=_("Wait for all Ambari agents to register"), - param=('ambari_info', 2)) - @g.await_process( - 3600, 5, _("Ambari agents registering with server"), _check_ambari) - def wait_for_host_registrations(self, num_hosts, ambari_info): + def _check_host_registrations(self, num_hosts, ambari_info): url = 'http://{0}/api/v1/hosts'.format(ambari_info.get_address()) try: result = self._get(url, ambari_info) json_result = json.loads(result.text) - LOG.info(_LI('Registered Hosts: {current_number} ' - 'of {final_number}').format( - current_number=len(json_result['items']), - final_number=num_hosts)) + LOG.debug('Registered Hosts: {current_number} ' + 'of {final_number}'.format( + current_number=len(json_result['items']), + final_number=num_hosts)) for hosts in json_result['items']: LOG.debug('Registered Host: {host}'.format( host=hosts['Hosts']['host_name'])) @@ -581,6 +570,16 @@ class AmbariClient(object): LOG.debug('Waiting to connect to ambari server') return False + @cpo.event_wrapper(True, step=_("Wait for all Ambari agents to register"), + param=('ambari_info', 2)) + def wait_for_host_registrations(self, num_hosts, ambari_info): + cluster = ambari_info.get_cluster() + poll_utils.plugin_option_poll( + cluster, self._check_host_registrations, + cfgprov.HOST_REGISTRATIONS_TIMEOUT, + _("Wait for host registrations"), 5, { + 'num_hosts': num_hosts, 'ambari_info': ambari_info}) + def update_ambari_admin_user(self, password, ambari_info): old_pwd = ambari_info.password user_url = 'http://{0}/api/v1/users/admin'.format( @@ -716,45 +715,46 @@ class AmbariClient(object): LOG.debug('AmbariClient: about to make decommission status request,' 'uri = {uri}'.format(uri=status_request)) - count = 0 - while count < 100 and len(hosts_to_decommission) > 0: - LOG.debug('AmbariClient: number of hosts waiting for ' - 'decommissioning to complete = {count}'.format( - count=str(len(hosts_to_decommission)))) + poll_utils.plugin_option_poll( + ambari_info.get_cluster(), + self.process_decommission, + cfgprov.DECOMMISSIONING_TIMEOUT, _("Decommission nodes"), 5, + {'status_request': status_request, 'ambari_info': ambari_info, + 'hosts_to_decommission': hosts_to_decommission}) - result = self._get(status_request, ambari_info) - if result.status_code != 200: - LOG.error(_LE('AmbariClient: error in making decommission ' - 'status request, error = {result}').format( - result=result.text)) - else: - LOG.info(_LI('AmbariClient: decommission status request ok, ' - 'result = {result}').format(result=result.text)) - json_result = json.loads(result.text) - live_nodes = ( - json_result['metrics']['dfs']['namenode']['LiveNodes']) - # parse out the map of live hosts associated with the NameNode - json_result_nodes = json.loads(live_nodes) - for node, val in six.iteritems(json_result_nodes): - admin_state = val['adminState'] - if admin_state == 'Decommissioned': - LOG.debug('AmbariClient: node = {node} is ' - 'now in adminState = {admin_state}'.format( - node=node, admin_state=admin_state)) - # remove from list, to track which nodes - # are now in Decommissioned state - hosts_to_decommission.remove(node) + def process_decommission(self, status_request, ambari_info, + hosts_to_decommission): + if len(hosts_to_decommission) == 0: + # Nothing for decommissioning + return True - LOG.debug('AmbariClient: sleeping for 5 seconds') - context.sleep(5) + LOG.debug('AmbariClient: number of hosts waiting for ' + 'decommissioning to complete = {count}'.format( + count=str(len(hosts_to_decommission)))) - # increment loop counter - count += 1 - - if len(hosts_to_decommission) > 0: - LOG.error(_LE('AmbariClient: decommissioning process timed-out ' - 'waiting for nodes to enter "Decommissioned" ' - 'status.')) + result = self._get(status_request, ambari_info) + if result.status_code != 200: + LOG.error(_LE('AmbariClient: error in making decommission ' + 'status request, error = {result}').format( + result=result.text)) + else: + LOG.info(_LI('AmbariClient: decommission status request ok, ' + 'result = {result}').format(result=result.text)) + json_result = json.loads(result.text) + live_nodes = ( + json_result['metrics']['dfs']['namenode']['LiveNodes']) + # parse out the map of live hosts associated with the NameNode + json_result_nodes = json.loads(live_nodes) + for node, val in six.iteritems(json_result_nodes): + admin_state = val['adminState'] + if admin_state == 'Decommissioned': + LOG.debug('AmbariClient: node = {node} is ' + 'now in adminState = {admin_state}'.format( + node=node, admin_state=admin_state)) + # remove from list, to track which nodes + # are now in Decommissioned state + hosts_to_decommission.remove(node) + return False def provision_cluster(self, cluster_spec, servers, ambari_info, name): self._add_cluster(ambari_info, name) diff --git a/sahara/plugins/vanilla/v1_2_1/config_helper.py b/sahara/plugins/vanilla/v1_2_1/config_helper.py index fdd4349a..0a38e63b 100644 --- a/sahara/plugins/vanilla/v1_2_1/config_helper.py +++ b/sahara/plugins/vanilla/v1_2_1/config_helper.py @@ -97,6 +97,11 @@ DECOMMISSIONING_TIMEOUT = p.Config('Decommissioning Timeout', 'general', ' decommissioning operation' ' during scaling, in seconds') +DATANODES_STARTUP_TIMEOUT = p.Config( + 'Datanodes startup timeout', 'general', 'cluster', config_type='int', + priority=1, default_value=10800, is_optional=True, + description='Timeout for datanodes startup, in seconds') + HIDDEN_CONFS = ['fs.default.name', 'dfs.name.dir', 'dfs.data.dir', 'mapred.job.tracker', 'mapred.system.dir', 'mapred.local.dir', @@ -159,6 +164,7 @@ def _initialise_configs(): configs.append(ENABLE_SWIFT) configs.append(ENABLE_MYSQL) configs.append(DECOMMISSIONING_TIMEOUT) + configs.append(DATANODES_STARTUP_TIMEOUT) if CONF.enable_data_locality: configs.append(ENABLE_DATA_LOCALITY) diff --git a/sahara/plugins/vanilla/v1_2_1/scaling.py b/sahara/plugins/vanilla/v1_2_1/scaling.py index 64258c3d..f7614165 100644 --- a/sahara/plugins/vanilla/v1_2_1/scaling.py +++ b/sahara/plugins/vanilla/v1_2_1/scaling.py @@ -15,16 +15,15 @@ import os -from oslo_utils import timeutils import six from sahara import context from sahara.i18n import _ -from sahara.plugins import exceptions as ex from sahara.plugins import utils from sahara.plugins.vanilla.v1_2_1 import config_helper from sahara.plugins.vanilla.v1_2_1 import run_scripts as run from sahara.utils import cluster_progress_ops as cpo +from sahara.utils import poll_utils from sahara.utils import remote @@ -42,6 +41,17 @@ def decommission_tt(jt, inst_to_be_deleted, survived_inst): }) +def is_decommissioned(r, inst_to_be_deleted): + cmd = r.execute_command("sudo su -c 'hadoop dfsadmin -report' hadoop") + datanodes_info = parse_dfs_report(cmd[1]) + for inst in inst_to_be_deleted: + for dn in datanodes_info: + if (dn["Name"].startswith(inst.internal_ip)) and ( + dn["Decommission Status"] != "Decommissioned"): + return False + return True + + @cpo.event_wrapper(True, step=_("Decommission %s") % "DataNodes") def decommission_dn(nn, inst_to_be_deleted, survived_inst): with remote.get_remote(nn) as r: @@ -51,37 +61,15 @@ def decommission_dn(nn, inst_to_be_deleted, survived_inst): run.refresh_nodes(remote.get_remote(nn), "dfsadmin") context.sleep(3) - timeout = config_helper.get_decommissioning_timeout( - nn.cluster) - s_time = timeutils.utcnow() - all_found = False + poll_utils.plugin_option_poll( + nn.cluster, is_decommissioned, + config_helper.DECOMMISSIONING_TIMEOUT, + _("Decommission %s") % "DataNodes", 3, + {'r': r, 'inst_to_be_deleted': inst_to_be_deleted}) - while timeutils.delta_seconds(s_time, timeutils.utcnow()) < timeout: - cmd = r.execute_command( - "sudo su -c 'hadoop dfsadmin -report' hadoop") - all_found = True - datanodes_info = parse_dfs_report(cmd[1]) - for i in inst_to_be_deleted: - for dn in datanodes_info: - if (dn["Name"].startswith(i.internal_ip)) and ( - dn["Decommission Status"] != "Decommissioned"): - all_found = False - break - - if all_found: - r.write_files_to({'/etc/hadoop/dn.incl': - utils. - generate_fqdn_host_names(survived_inst), - '/etc/hadoop/dn.excl': "", - }) - break - context.sleep(3) - - if not all_found: - ex.DecommissionError( - _("Cannot finish decommission of cluster %(cluster)s in " - "%(seconds)d seconds") % - {"cluster": nn.cluster, "seconds": timeout}) + r.write_files_to({'/etc/hadoop/dn.incl': + utils.generate_fqdn_host_names(survived_inst), + '/etc/hadoop/dn.excl': ""}) def parse_dfs_report(cmd_output): diff --git a/sahara/plugins/vanilla/v1_2_1/versionhandler.py b/sahara/plugins/vanilla/v1_2_1/versionhandler.py index 22441750..47bfe530 100644 --- a/sahara/plugins/vanilla/v1_2_1/versionhandler.py +++ b/sahara/plugins/vanilla/v1_2_1/versionhandler.py @@ -36,6 +36,7 @@ from sahara.utils import cluster_progress_ops as cpo from sahara.utils import edp from sahara.utils import files as f from sahara.utils import general as g +from sahara.utils import poll_utils from sahara.utils import proxy from sahara.utils import remote @@ -212,23 +213,14 @@ class VersionHandler(avm.AbstractVersionHandler): if datanodes_count < 1: return - LOG.debug("Waiting {count} datanodes to start up".format( - count=datanodes_count)) + l_message = _("Waiting on %s datanodes to start up") % datanodes_count + LOG.info(l_message) with remote.get_remote(vu.get_namenode(cluster)) as r: - while True: - if run.check_datanodes_count(r, datanodes_count): - LOG.info( - _LI('Datanodes on cluster {cluster} have been started') - .format(cluster=cluster.name)) - return - - context.sleep(1) - - if not g.check_cluster_exists(cluster): - LOG.debug('Stop waiting for datanodes on cluster {cluster}' - ' since it has been deleted'.format( - cluster=cluster.name)) - return + poll_utils.plugin_option_poll( + cluster, run.check_datanodes_count, + c_helper.DATANODES_STARTUP_TIMEOUT, l_message, 1, { + 'remote': r, + 'count': datanodes_count}) def _generate_hive_mysql_password(self, cluster): extra = cluster.extra.to_dict() if cluster.extra else {} diff --git a/sahara/utils/general.py b/sahara/utils/general.py index c45f7961..c0782b3b 100644 --- a/sahara/utils/general.py +++ b/sahara/utils/general.py @@ -13,16 +13,13 @@ # See the License for the specific language governing permissions and # limitations under the License. -import functools import re from oslo_log import log as logging -from oslo_utils import timeutils import six from sahara import conductor as c from sahara import context -from sahara import exceptions as e from sahara.i18n import _LI from sahara.utils.notification import sender @@ -165,47 +162,3 @@ def generate_auto_security_group_name(node_group): def generate_aa_group_name(cluster_name): return ("%s-aa-group" % cluster_name).lower() - - -def _get_consumed(start_time): - return timeutils.delta_seconds(start_time, timeutils.utcnow()) - - -def get_obj_in_args(check_obj, *args, **kwargs): - for arg in args: - val = check_obj(arg) - if val is not None: - return val - - for arg in kwargs.values(): - val = check_obj(arg) - if val is not None: - return val - return None - - -def await_process(timeout, sleeping_time, op_name, check_object): - """"Awaiting something in cluster.""" - def decorator(func): - @functools.wraps(func) - def handler(*args, **kwargs): - start_time = timeutils.utcnow() - cluster = get_obj_in_args(check_object, *args, **kwargs) - - while _get_consumed(start_time) < timeout: - consumed = _get_consumed(start_time) - if func(*args, **kwargs): - LOG.info( - _LI("Operation {op_name} was successfully executed " - "in seconds: {sec}").format(op_name=op_name, - sec=consumed)) - return - - if not check_cluster_exists(cluster): - return - - context.sleep(sleeping_time) - - raise e.TimeoutException(timeout, op_name) - return handler - return decorator