diff --git a/sahara/exceptions.py b/sahara/exceptions.py index d4eaec831e..c295a50df6 100644 --- a/sahara/exceptions.py +++ b/sahara/exceptions.py @@ -284,14 +284,20 @@ class TimeoutException(SaharaException): code = "TIMEOUT" message = _("'%(operation)s' timed out after %(timeout)i second(s)") - def __init__(self, timeout, op_name=None): + def __init__(self, timeout, op_name=None, timeout_name=None): if op_name: - op_name = _("Operation '%s'") % op_name + op_name = _("Operation with name '%s'") % op_name else: op_name = _("Operation") self.message = self.message % { 'operation': op_name, 'timeout': timeout} + if timeout_name: + desc = _("%(message)s and following timeout was violated: " + "%(timeout_name)s") + self.message = desc % { + 'message': self.message, 'timeout_name': timeout_name} + super(TimeoutException, self).__init__() diff --git a/sahara/plugins/spark/scaling.py b/sahara/plugins/spark/scaling.py index 267dfb062d..21742e7ec7 100644 --- a/sahara/plugins/spark/scaling.py +++ b/sahara/plugins/spark/scaling.py @@ -15,16 +15,15 @@ import os -from oslo_utils import timeutils import six from sahara import context from sahara.i18n import _ -from sahara.plugins import exceptions as ex from sahara.plugins.spark import config_helper as c_helper from sahara.plugins.spark import run_scripts as run from sahara.plugins import utils from sahara.utils import cluster_progress_ops as cpo +from sahara.utils import poll_utils from sahara.utils import remote @@ -55,6 +54,17 @@ def decommission_sl(master, inst_to_be_deleted, survived_inst): run.start_spark_master(r_master, sp_home) +def _is_decommissioned(r, inst_to_be_deleted): + cmd = r.execute_command("sudo -u hdfs hadoop dfsadmin -report") + datanodes_info = parse_dfs_report(cmd[1]) + for i in inst_to_be_deleted: + for dn in datanodes_info: + if (dn["Name"].startswith(i.internal_ip)) and ( + dn["Decommission Status"] != "Decommissioned"): + return False + return True + + @cpo.event_wrapper(True, step=_("Decommission %s") % "DataNodes") def decommission_dn(nn, inst_to_be_deleted, survived_inst): with remote.get_remote(nn) as r: @@ -64,36 +74,15 @@ def decommission_dn(nn, inst_to_be_deleted, survived_inst): run.refresh_nodes(remote.get_remote(nn), "dfsadmin") context.sleep(3) - timeout = c_helper.get_decommissioning_timeout(nn.cluster) - s_time = timeutils.utcnow() - all_found = False + poll_utils.plugin_option_poll( + nn.cluster, _is_decommissioned, c_helper.DECOMMISSIONING_TIMEOUT, + _("Decommission %s") % "DataNodes", 3, { + 'r': r, 'inst_to_be_deleted': inst_to_be_deleted}) - while timeutils.delta_seconds(s_time, timeutils.utcnow()) < timeout: - cmd = r.execute_command( - "sudo -u hdfs hadoop dfsadmin -report") - all_found = True - datanodes_info = parse_dfs_report(cmd[1]) - for i in inst_to_be_deleted: - for dn in datanodes_info: - if (dn["Name"].startswith(i.internal_ip)) and ( - dn["Decommission Status"] != "Decommissioned"): - all_found = False - break - - if all_found: - r.write_files_to({'/etc/hadoop/dn.incl': - utils. - generate_fqdn_host_names(survived_inst), - '/etc/hadoop/dn.excl': "", - }) - break - context.sleep(3) - - if not all_found: - ex.DecommissionError( - _("Cannot finish decommission of cluster %(cluster)s in " - "%(seconds)d seconds") % - {"cluster": nn.cluster, "seconds": timeout}) + r.write_files_to({ + '/etc/hadoop/dn.incl': utils. + generate_fqdn_host_names(survived_inst), + '/etc/hadoop/dn.excl': ""}) def parse_dfs_report(cmd_output): diff --git a/sahara/tests/unit/utils/test_poll_utils.py b/sahara/tests/unit/utils/test_poll_utils.py new file mode 100644 index 0000000000..8ae46a33e0 --- /dev/null +++ b/sahara/tests/unit/utils/test_poll_utils.py @@ -0,0 +1,66 @@ +# Copyright (c) 2015 Mirantis Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import mock +import six + +from sahara.tests.unit import base +from sahara.utils import poll_utils + + +class TestPollUtils(base.SaharaTestCase): + @mock.patch('sahara.utils.poll_utils.LOG.debug') + def test_poll_success(self, logger): + poll_utils.poll(**{'get_status': lambda: True, + 'kwargs': {}, 'timeout': 5, 'sleep': 3}) + expected_call = mock.call( + 'Operation was executed successfully in timeout 5') + self.assertEqual(1, logger.call_count) + self.assertEqual([expected_call], logger.call_args_list) + + @mock.patch('sahara.context.sleep', return_value=None) + @mock.patch('sahara.utils.poll_utils._get_consumed', return_value=0) + def test_poll_failed_first_scenario(self, p_1, p_2): + message = "" + try: + poll_utils.poll( + **{'get_status': lambda: False, 'kwargs': {}, + 'timeout': 0, 'sleep': 3}) + except Exception as e: + message = six.text_type(e) + + if message.find('Error ID') != -1: + message = message.split("\n")[0] + expected_message = "'Operation' timed out after 0 second(s)" + + self.assertEqual(expected_message, message) + + @mock.patch('sahara.context.sleep', return_value=None) + @mock.patch('sahara.utils.poll_utils._get_consumed', return_value=0) + def test_poll_failed_second_scenario(self, p_1, p_2): + message = "" + try: + poll_utils.poll( + **{'get_status': lambda: False, 'kwargs': {}, + 'timeout': 0, 'sleep': 3, 'timeout_name': "some timeout"}) + except Exception as e: + message = six.text_type(e) + + if message.find('Error ID') != -1: + message = message.split("\n")[0] + expected_message = ("'Operation' timed out after 0 second(s) and " + "following timeout was violated: some timeout") + + self.assertEqual(expected_message, message) diff --git a/sahara/utils/poll_utils.py b/sahara/utils/poll_utils.py new file mode 100644 index 0000000000..c88cec3825 --- /dev/null +++ b/sahara/utils/poll_utils.py @@ -0,0 +1,111 @@ +# Copyright (c) 2015 Mirantis Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from oslo_log import log as logging +from oslo_utils import timeutils + +from sahara import context +from sahara import exceptions as ex +from sahara.utils import general + +LOG = logging.getLogger(__name__) + +# set 3 hours timeout by default +DEFAULT_TIMEOUT = 10800 +DEFAULT_SLEEP_TIME = 5 + + +def _get_consumed(started_at): + return timeutils.delta_seconds(started_at, timeutils.utcnow()) + + +def _get_current_value(cluster, option): + option_target = option.applicable_target + conf = cluster.cluster_configs + if option_target in conf and option.name in conf[option_target]: + return conf[option_target][option.name] + return option.default_value + + +def poll(get_status, kwargs, operation_name=None, timeout_name=None, + timeout=DEFAULT_TIMEOUT, sleep=DEFAULT_SLEEP_TIME, + exception_strategy='raise'): + """This util poll status of object obj during some timeout. + + :param get_status: function, which return current status of polling + as Boolean + :param kwargs: keyword arguments of function get_status + :param operation_name: name of polling process + :param timeout_name: name of timeout option + :param timeout: value of timeout in seconds. By default, it equals to + 3 hours + :param sleep: duration between two consecutive executions of + get_status function + :param exception_strategy: possible values ('raise', 'mark_as_true', + 'mark_as_false'). If exception_strategy is 'raise' exception would be + raised. If exception_strategy is 'mark_as_true', return value of + get_status would marked as True, and in case of 'mark_as_false' - False. + By default it's 'raise'. + """ + start_time = timeutils.utcnow() + # We shouldn't raise TimeoutException if incorrect timeout specified and + # status is ok now. In such way we should execute get_status at least once. + at_least_once = True + + while at_least_once or _get_consumed(start_time) < timeout: + at_least_once = False + try: + status = get_status(**kwargs) + except BaseException: + if exception_strategy == 'raise': + raise + elif exception_strategy == 'mark_as_true': + status = True + else: + status = False + + if status: + operation = "Operation" + if operation_name: + operation = "Operation with name {op_name}".format( + op_name=operation_name) + LOG.debug( + '{operation_desc} was executed successfully in timeout ' + '{timeout}' + .format(operation_desc=operation, timeout=timeout)) + return + + context.sleep(sleep) + raise ex.TimeoutException(timeout, operation_name, timeout_name) + + +def plugin_option_poll(cluster, get_status, option, operation_name, sleep_time, + kwargs): + + def _get(n_cluster, n_kwargs): + if not general.check_cluster_exists(n_cluster): + return True + return get_status(**n_kwargs) + + poll_description = { + 'get_status': _get, + 'kwargs': {'n_cluster': cluster, 'n_kwargs': kwargs}, + 'timeout': _get_current_value(cluster, option), + 'operation_name': operation_name, + 'sleep': sleep_time, + 'timeout_name': option.name + } + + poll(**poll_description)