Implement poll util and plugin poll util

Implemented polling utils module. Also added simple
usage of plugin util for decommissioning datanodes for
Spark plugin.

partially implements bp: add-timeouts-for-polling

Change-Id: I62c4cd92925d0d09bd529cb1103519d3d84c2f1f
This commit is contained in:
Vitaly Gridnev 2015-02-19 16:46:44 +03:00
parent a0d6a938e2
commit 51eac34f2a
4 changed files with 205 additions and 33 deletions

View File

@ -284,14 +284,20 @@ class TimeoutException(SaharaException):
code = "TIMEOUT"
message = _("'%(operation)s' timed out after %(timeout)i second(s)")
def __init__(self, timeout, op_name=None):
def __init__(self, timeout, op_name=None, timeout_name=None):
if op_name:
op_name = _("Operation '%s'") % op_name
op_name = _("Operation with name '%s'") % op_name
else:
op_name = _("Operation")
self.message = self.message % {
'operation': op_name, 'timeout': timeout}
if timeout_name:
desc = _("%(message)s and following timeout was violated: "
"%(timeout_name)s")
self.message = desc % {
'message': self.message, 'timeout_name': timeout_name}
super(TimeoutException, self).__init__()

View File

@ -15,16 +15,15 @@
import os
from oslo_utils import timeutils
import six
from sahara import context
from sahara.i18n import _
from sahara.plugins import exceptions as ex
from sahara.plugins.spark import config_helper as c_helper
from sahara.plugins.spark import run_scripts as run
from sahara.plugins import utils
from sahara.utils import cluster_progress_ops as cpo
from sahara.utils import poll_utils
from sahara.utils import remote
@ -55,6 +54,17 @@ def decommission_sl(master, inst_to_be_deleted, survived_inst):
run.start_spark_master(r_master, sp_home)
def _is_decommissioned(r, inst_to_be_deleted):
cmd = r.execute_command("sudo -u hdfs hadoop dfsadmin -report")
datanodes_info = parse_dfs_report(cmd[1])
for i in inst_to_be_deleted:
for dn in datanodes_info:
if (dn["Name"].startswith(i.internal_ip)) and (
dn["Decommission Status"] != "Decommissioned"):
return False
return True
@cpo.event_wrapper(True, step=_("Decommission %s") % "DataNodes")
def decommission_dn(nn, inst_to_be_deleted, survived_inst):
with remote.get_remote(nn) as r:
@ -64,36 +74,15 @@ def decommission_dn(nn, inst_to_be_deleted, survived_inst):
run.refresh_nodes(remote.get_remote(nn), "dfsadmin")
context.sleep(3)
timeout = c_helper.get_decommissioning_timeout(nn.cluster)
s_time = timeutils.utcnow()
all_found = False
poll_utils.plugin_option_poll(
nn.cluster, _is_decommissioned, c_helper.DECOMMISSIONING_TIMEOUT,
_("Decommission %s") % "DataNodes", 3, {
'r': r, 'inst_to_be_deleted': inst_to_be_deleted})
while timeutils.delta_seconds(s_time, timeutils.utcnow()) < timeout:
cmd = r.execute_command(
"sudo -u hdfs hadoop dfsadmin -report")
all_found = True
datanodes_info = parse_dfs_report(cmd[1])
for i in inst_to_be_deleted:
for dn in datanodes_info:
if (dn["Name"].startswith(i.internal_ip)) and (
dn["Decommission Status"] != "Decommissioned"):
all_found = False
break
if all_found:
r.write_files_to({'/etc/hadoop/dn.incl':
utils.
generate_fqdn_host_names(survived_inst),
'/etc/hadoop/dn.excl': "",
})
break
context.sleep(3)
if not all_found:
ex.DecommissionError(
_("Cannot finish decommission of cluster %(cluster)s in "
"%(seconds)d seconds") %
{"cluster": nn.cluster, "seconds": timeout})
r.write_files_to({
'/etc/hadoop/dn.incl': utils.
generate_fqdn_host_names(survived_inst),
'/etc/hadoop/dn.excl': ""})
def parse_dfs_report(cmd_output):

View File

@ -0,0 +1,66 @@
# Copyright (c) 2015 Mirantis Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import mock
import six
from sahara.tests.unit import base
from sahara.utils import poll_utils
class TestPollUtils(base.SaharaTestCase):
@mock.patch('sahara.utils.poll_utils.LOG.debug')
def test_poll_success(self, logger):
poll_utils.poll(**{'get_status': lambda: True,
'kwargs': {}, 'timeout': 5, 'sleep': 3})
expected_call = mock.call(
'Operation was executed successfully in timeout 5')
self.assertEqual(1, logger.call_count)
self.assertEqual([expected_call], logger.call_args_list)
@mock.patch('sahara.context.sleep', return_value=None)
@mock.patch('sahara.utils.poll_utils._get_consumed', return_value=0)
def test_poll_failed_first_scenario(self, p_1, p_2):
message = ""
try:
poll_utils.poll(
**{'get_status': lambda: False, 'kwargs': {},
'timeout': 0, 'sleep': 3})
except Exception as e:
message = six.text_type(e)
if message.find('Error ID') != -1:
message = message.split("\n")[0]
expected_message = "'Operation' timed out after 0 second(s)"
self.assertEqual(expected_message, message)
@mock.patch('sahara.context.sleep', return_value=None)
@mock.patch('sahara.utils.poll_utils._get_consumed', return_value=0)
def test_poll_failed_second_scenario(self, p_1, p_2):
message = ""
try:
poll_utils.poll(
**{'get_status': lambda: False, 'kwargs': {},
'timeout': 0, 'sleep': 3, 'timeout_name': "some timeout"})
except Exception as e:
message = six.text_type(e)
if message.find('Error ID') != -1:
message = message.split("\n")[0]
expected_message = ("'Operation' timed out after 0 second(s) and "
"following timeout was violated: some timeout")
self.assertEqual(expected_message, message)

111
sahara/utils/poll_utils.py Normal file
View File

@ -0,0 +1,111 @@
# Copyright (c) 2015 Mirantis Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from oslo_log import log as logging
from oslo_utils import timeutils
from sahara import context
from sahara import exceptions as ex
from sahara.utils import general
LOG = logging.getLogger(__name__)
# set 3 hours timeout by default
DEFAULT_TIMEOUT = 10800
DEFAULT_SLEEP_TIME = 5
def _get_consumed(started_at):
return timeutils.delta_seconds(started_at, timeutils.utcnow())
def _get_current_value(cluster, option):
option_target = option.applicable_target
conf = cluster.cluster_configs
if option_target in conf and option.name in conf[option_target]:
return conf[option_target][option.name]
return option.default_value
def poll(get_status, kwargs, operation_name=None, timeout_name=None,
timeout=DEFAULT_TIMEOUT, sleep=DEFAULT_SLEEP_TIME,
exception_strategy='raise'):
"""This util poll status of object obj during some timeout.
:param get_status: function, which return current status of polling
as Boolean
:param kwargs: keyword arguments of function get_status
:param operation_name: name of polling process
:param timeout_name: name of timeout option
:param timeout: value of timeout in seconds. By default, it equals to
3 hours
:param sleep: duration between two consecutive executions of
get_status function
:param exception_strategy: possible values ('raise', 'mark_as_true',
'mark_as_false'). If exception_strategy is 'raise' exception would be
raised. If exception_strategy is 'mark_as_true', return value of
get_status would marked as True, and in case of 'mark_as_false' - False.
By default it's 'raise'.
"""
start_time = timeutils.utcnow()
# We shouldn't raise TimeoutException if incorrect timeout specified and
# status is ok now. In such way we should execute get_status at least once.
at_least_once = True
while at_least_once or _get_consumed(start_time) < timeout:
at_least_once = False
try:
status = get_status(**kwargs)
except BaseException:
if exception_strategy == 'raise':
raise
elif exception_strategy == 'mark_as_true':
status = True
else:
status = False
if status:
operation = "Operation"
if operation_name:
operation = "Operation with name {op_name}".format(
op_name=operation_name)
LOG.debug(
'{operation_desc} was executed successfully in timeout '
'{timeout}'
.format(operation_desc=operation, timeout=timeout))
return
context.sleep(sleep)
raise ex.TimeoutException(timeout, operation_name, timeout_name)
def plugin_option_poll(cluster, get_status, option, operation_name, sleep_time,
kwargs):
def _get(n_cluster, n_kwargs):
if not general.check_cluster_exists(n_cluster):
return True
return get_status(**n_kwargs)
poll_description = {
'get_status': _get,
'kwargs': {'n_cluster': cluster, 'n_kwargs': kwargs},
'timeout': _get_current_value(cluster, option),
'operation_name': operation_name,
'sleep': sleep_time,
'timeout_name': option.name
}
poll(**poll_description)