Add usages of plugin poll - part 1

It's required to add support of plugin poll in
plugin, so it's require big changes. We would do it
in series of small CR.

This patch introduce support in vanilla 1 and in hdp.

partially implements bp: add-timeouts-for-polling

Change-Id: I46e277beabe00f8d18b2370065dc07c39a63f1bd
This commit is contained in:
Vitaly Gridnev 2015-03-03 15:23:52 +03:00
parent 9f709c912a
commit 299bc2f343
7 changed files with 129 additions and 166 deletions

View File

@ -13,16 +13,32 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import copy
from sahara import exceptions from sahara import exceptions
from sahara.i18n import _ from sahara.i18n import _
from sahara.plugins import provisioning as p from sahara.plugins import provisioning as p
HOST_REGISTRATIONS_TIMEOUT = p.Config(
'Host registrations timeout', 'general',
'cluster', config_type='int', priority=1,
default_value=3600, is_optional=True,
description='Timeout for host registrations, in seconds')
DECOMMISSIONING_TIMEOUT = p.Config(
'Timeout for decommissioning nodes', 'general',
'cluster', config_type='int', priority=1,
default_value=1000, is_optional=True,
description='Timeout for decommissioning nodes, in seconds')
class ConfigurationProvider(object): class ConfigurationProvider(object):
def __init__(self, config): def __init__(self, config, hadoop_version):
self.config = config self.config = config
self.config_mapper = {} self.config_mapper = {}
self.config_items = [] self.config_items = []
self.hadoop_version = hadoop_version
self._initialize(config) self._initialize(config)
def get_config_items(self): def get_config_items(self):
@ -67,3 +83,12 @@ class ConfigurationProvider(object):
self.config_mapper[service_property['name']] = ( self.config_mapper[service_property['name']] = (
self._get_target( self._get_target(
service_property['applicable_target'])) service_property['applicable_target']))
host_reg_timeout = copy.copy(HOST_REGISTRATIONS_TIMEOUT)
setattr(host_reg_timeout, 'tag', 'global')
self.config_items.append(host_reg_timeout)
self.config_mapper[host_reg_timeout.name] = 'global'
if self.hadoop_version == '2.0.6':
dec_timeout = copy.copy(DECOMMISSIONING_TIMEOUT)
setattr(dec_timeout, 'tag', 'global')
self.config_items.append(dec_timeout)
self.config_mapper[dec_timeout.name] = 'global'

View File

@ -32,7 +32,7 @@ from sahara.plugins.hdp.versions import abstractversionhandler as avm
from sahara.plugins.hdp.versions.version_1_3_2 import edp_engine from sahara.plugins.hdp.versions.version_1_3_2 import edp_engine
from sahara.plugins.hdp.versions.version_1_3_2 import services from sahara.plugins.hdp.versions.version_1_3_2 import services
from sahara.utils import cluster_progress_ops as cpo from sahara.utils import cluster_progress_ops as cpo
from sahara.utils import general as g from sahara.utils import poll_utils
from sahara import version from sahara import version
@ -40,14 +40,6 @@ LOG = logging.getLogger(__name__)
CONF = cfg.CONF CONF = cfg.CONF
def _check_ambari(obj):
try:
obj.is_ambari_info()
return obj.get_cluster()
except AttributeError:
return None
class VersionHandler(avm.AbstractVersionHandler): class VersionHandler(avm.AbstractVersionHandler):
config_provider = None config_provider = None
version = None version = None
@ -62,7 +54,8 @@ class VersionHandler(avm.AbstractVersionHandler):
json.load(pkg.resource_stream( json.load(pkg.resource_stream(
version.version_info.package, version.version_info.package,
'plugins/hdp/versions/version_1_3_2/resources/' 'plugins/hdp/versions/version_1_3_2/resources/'
'ambari-config-resource.json'))) 'ambari-config-resource.json')),
hadoop_version='1.3.2')
return self.config_provider return self.config_provider
@ -575,11 +568,7 @@ class AmbariClient(object):
'components in scaled instances. status' 'components in scaled instances. status'
' code returned = {0}').format(result.status)) ' code returned = {0}').format(result.status))
@cpo.event_wrapper(True, step=_("Wait for all Ambari agents to register"), def _check_host_registrations(self, num_hosts, ambari_info):
param=('ambari_info', 2))
@g.await_process(
3600, 5, _("Ambari agents registering with server"), _check_ambari)
def wait_for_host_registrations(self, num_hosts, ambari_info):
url = 'http://{0}/api/v1/hosts'.format(ambari_info.get_address()) url = 'http://{0}/api/v1/hosts'.format(ambari_info.get_address())
try: try:
result = self._get(url, ambari_info) result = self._get(url, ambari_info)
@ -597,6 +586,16 @@ class AmbariClient(object):
LOG.debug('Waiting to connect to ambari server') LOG.debug('Waiting to connect to ambari server')
return False return False
@cpo.event_wrapper(True, step=_("Wait for all Ambari agents to register"),
param=('ambari_info', 2))
def wait_for_host_registrations(self, num_hosts, ambari_info):
cluster = ambari_info.get_cluster()
poll_utils.plugin_option_poll(
cluster, self._check_host_registrations,
cfgprov.HOST_REGISTRATIONS_TIMEOUT,
_("Wait for host registrations"), 5, {
'num_hosts': num_hosts, 'ambari_info': ambari_info})
def update_ambari_admin_user(self, password, ambari_info): def update_ambari_admin_user(self, password, ambari_info):
old_pwd = ambari_info.password old_pwd = ambari_info.password
user_url = 'http://{0}/api/v1/users/admin'.format( user_url = 'http://{0}/api/v1/users/admin'.format(

View File

@ -33,7 +33,7 @@ from sahara.plugins.hdp.versions import abstractversionhandler as avm
from sahara.plugins.hdp.versions.version_2_0_6 import edp_engine from sahara.plugins.hdp.versions.version_2_0_6 import edp_engine
from sahara.plugins.hdp.versions.version_2_0_6 import services from sahara.plugins.hdp.versions.version_2_0_6 import services
from sahara.utils import cluster_progress_ops as cpo from sahara.utils import cluster_progress_ops as cpo
from sahara.utils import general as g from sahara.utils import poll_utils
from sahara import version from sahara import version
@ -41,14 +41,6 @@ LOG = logging.getLogger(__name__)
CONF = cfg.CONF CONF = cfg.CONF
def _check_ambari(obj):
try:
obj.is_ambari_info()
return obj.get_cluster()
except AttributeError:
return None
class VersionHandler(avm.AbstractVersionHandler): class VersionHandler(avm.AbstractVersionHandler):
config_provider = None config_provider = None
version = None version = None
@ -63,7 +55,8 @@ class VersionHandler(avm.AbstractVersionHandler):
json.load(pkg.resource_stream( json.load(pkg.resource_stream(
version.version_info.package, version.version_info.package,
'plugins/hdp/versions/version_2_0_6/resources/' 'plugins/hdp/versions/version_2_0_6/resources/'
'ambari-config-resource.json'))) 'ambari-config-resource.json')),
hadoop_version='2.0.6')
return self.config_provider return self.config_provider
@ -559,18 +552,14 @@ class AmbariClient(object):
'components in scaled instances. status' 'components in scaled instances. status'
' code returned = {0}').format(result.status)) ' code returned = {0}').format(result.status))
@cpo.event_wrapper(True, step=_("Wait for all Ambari agents to register"), def _check_host_registrations(self, num_hosts, ambari_info):
param=('ambari_info', 2))
@g.await_process(
3600, 5, _("Ambari agents registering with server"), _check_ambari)
def wait_for_host_registrations(self, num_hosts, ambari_info):
url = 'http://{0}/api/v1/hosts'.format(ambari_info.get_address()) url = 'http://{0}/api/v1/hosts'.format(ambari_info.get_address())
try: try:
result = self._get(url, ambari_info) result = self._get(url, ambari_info)
json_result = json.loads(result.text) json_result = json.loads(result.text)
LOG.info(_LI('Registered Hosts: {current_number} ' LOG.debug('Registered Hosts: {current_number} '
'of {final_number}').format( 'of {final_number}'.format(
current_number=len(json_result['items']), current_number=len(json_result['items']),
final_number=num_hosts)) final_number=num_hosts))
for hosts in json_result['items']: for hosts in json_result['items']:
@ -581,6 +570,16 @@ class AmbariClient(object):
LOG.debug('Waiting to connect to ambari server') LOG.debug('Waiting to connect to ambari server')
return False return False
@cpo.event_wrapper(True, step=_("Wait for all Ambari agents to register"),
param=('ambari_info', 2))
def wait_for_host_registrations(self, num_hosts, ambari_info):
cluster = ambari_info.get_cluster()
poll_utils.plugin_option_poll(
cluster, self._check_host_registrations,
cfgprov.HOST_REGISTRATIONS_TIMEOUT,
_("Wait for host registrations"), 5, {
'num_hosts': num_hosts, 'ambari_info': ambari_info})
def update_ambari_admin_user(self, password, ambari_info): def update_ambari_admin_user(self, password, ambari_info):
old_pwd = ambari_info.password old_pwd = ambari_info.password
user_url = 'http://{0}/api/v1/users/admin'.format( user_url = 'http://{0}/api/v1/users/admin'.format(
@ -716,8 +715,19 @@ class AmbariClient(object):
LOG.debug('AmbariClient: about to make decommission status request,' LOG.debug('AmbariClient: about to make decommission status request,'
'uri = {uri}'.format(uri=status_request)) 'uri = {uri}'.format(uri=status_request))
count = 0 poll_utils.plugin_option_poll(
while count < 100 and len(hosts_to_decommission) > 0: ambari_info.get_cluster(),
self.process_decommission,
cfgprov.DECOMMISSIONING_TIMEOUT, _("Decommission nodes"), 5,
{'status_request': status_request, 'ambari_info': ambari_info,
'hosts_to_decommission': hosts_to_decommission})
def process_decommission(self, status_request, ambari_info,
hosts_to_decommission):
if len(hosts_to_decommission) == 0:
# Nothing for decommissioning
return True
LOG.debug('AmbariClient: number of hosts waiting for ' LOG.debug('AmbariClient: number of hosts waiting for '
'decommissioning to complete = {count}'.format( 'decommissioning to complete = {count}'.format(
count=str(len(hosts_to_decommission)))) count=str(len(hosts_to_decommission))))
@ -744,17 +754,7 @@ class AmbariClient(object):
# remove from list, to track which nodes # remove from list, to track which nodes
# are now in Decommissioned state # are now in Decommissioned state
hosts_to_decommission.remove(node) hosts_to_decommission.remove(node)
return False
LOG.debug('AmbariClient: sleeping for 5 seconds')
context.sleep(5)
# increment loop counter
count += 1
if len(hosts_to_decommission) > 0:
LOG.error(_LE('AmbariClient: decommissioning process timed-out '
'waiting for nodes to enter "Decommissioned" '
'status.'))
def provision_cluster(self, cluster_spec, servers, ambari_info, name): def provision_cluster(self, cluster_spec, servers, ambari_info, name):
self._add_cluster(ambari_info, name) self._add_cluster(ambari_info, name)

View File

@ -97,6 +97,11 @@ DECOMMISSIONING_TIMEOUT = p.Config('Decommissioning Timeout', 'general',
' decommissioning operation' ' decommissioning operation'
' during scaling, in seconds') ' during scaling, in seconds')
DATANODES_STARTUP_TIMEOUT = p.Config(
'Datanodes startup timeout', 'general', 'cluster', config_type='int',
priority=1, default_value=10800, is_optional=True,
description='Timeout for datanodes startup, in seconds')
HIDDEN_CONFS = ['fs.default.name', 'dfs.name.dir', 'dfs.data.dir', HIDDEN_CONFS = ['fs.default.name', 'dfs.name.dir', 'dfs.data.dir',
'mapred.job.tracker', 'mapred.system.dir', 'mapred.local.dir', 'mapred.job.tracker', 'mapred.system.dir', 'mapred.local.dir',
@ -159,6 +164,7 @@ def _initialise_configs():
configs.append(ENABLE_SWIFT) configs.append(ENABLE_SWIFT)
configs.append(ENABLE_MYSQL) configs.append(ENABLE_MYSQL)
configs.append(DECOMMISSIONING_TIMEOUT) configs.append(DECOMMISSIONING_TIMEOUT)
configs.append(DATANODES_STARTUP_TIMEOUT)
if CONF.enable_data_locality: if CONF.enable_data_locality:
configs.append(ENABLE_DATA_LOCALITY) configs.append(ENABLE_DATA_LOCALITY)

View File

@ -15,16 +15,15 @@
import os import os
from oslo_utils import timeutils
import six import six
from sahara import context from sahara import context
from sahara.i18n import _ from sahara.i18n import _
from sahara.plugins import exceptions as ex
from sahara.plugins import utils from sahara.plugins import utils
from sahara.plugins.vanilla.v1_2_1 import config_helper from sahara.plugins.vanilla.v1_2_1 import config_helper
from sahara.plugins.vanilla.v1_2_1 import run_scripts as run from sahara.plugins.vanilla.v1_2_1 import run_scripts as run
from sahara.utils import cluster_progress_ops as cpo from sahara.utils import cluster_progress_ops as cpo
from sahara.utils import poll_utils
from sahara.utils import remote from sahara.utils import remote
@ -42,6 +41,17 @@ def decommission_tt(jt, inst_to_be_deleted, survived_inst):
}) })
def is_decommissioned(r, inst_to_be_deleted):
cmd = r.execute_command("sudo su -c 'hadoop dfsadmin -report' hadoop")
datanodes_info = parse_dfs_report(cmd[1])
for inst in inst_to_be_deleted:
for dn in datanodes_info:
if (dn["Name"].startswith(inst.internal_ip)) and (
dn["Decommission Status"] != "Decommissioned"):
return False
return True
@cpo.event_wrapper(True, step=_("Decommission %s") % "DataNodes") @cpo.event_wrapper(True, step=_("Decommission %s") % "DataNodes")
def decommission_dn(nn, inst_to_be_deleted, survived_inst): def decommission_dn(nn, inst_to_be_deleted, survived_inst):
with remote.get_remote(nn) as r: with remote.get_remote(nn) as r:
@ -51,37 +61,15 @@ def decommission_dn(nn, inst_to_be_deleted, survived_inst):
run.refresh_nodes(remote.get_remote(nn), "dfsadmin") run.refresh_nodes(remote.get_remote(nn), "dfsadmin")
context.sleep(3) context.sleep(3)
timeout = config_helper.get_decommissioning_timeout( poll_utils.plugin_option_poll(
nn.cluster) nn.cluster, is_decommissioned,
s_time = timeutils.utcnow() config_helper.DECOMMISSIONING_TIMEOUT,
all_found = False _("Decommission %s") % "DataNodes", 3,
{'r': r, 'inst_to_be_deleted': inst_to_be_deleted})
while timeutils.delta_seconds(s_time, timeutils.utcnow()) < timeout:
cmd = r.execute_command(
"sudo su -c 'hadoop dfsadmin -report' hadoop")
all_found = True
datanodes_info = parse_dfs_report(cmd[1])
for i in inst_to_be_deleted:
for dn in datanodes_info:
if (dn["Name"].startswith(i.internal_ip)) and (
dn["Decommission Status"] != "Decommissioned"):
all_found = False
break
if all_found:
r.write_files_to({'/etc/hadoop/dn.incl': r.write_files_to({'/etc/hadoop/dn.incl':
utils. utils.generate_fqdn_host_names(survived_inst),
generate_fqdn_host_names(survived_inst), '/etc/hadoop/dn.excl': ""})
'/etc/hadoop/dn.excl': "",
})
break
context.sleep(3)
if not all_found:
ex.DecommissionError(
_("Cannot finish decommission of cluster %(cluster)s in "
"%(seconds)d seconds") %
{"cluster": nn.cluster, "seconds": timeout})
def parse_dfs_report(cmd_output): def parse_dfs_report(cmd_output):

View File

@ -36,6 +36,7 @@ from sahara.utils import cluster_progress_ops as cpo
from sahara.utils import edp from sahara.utils import edp
from sahara.utils import files as f from sahara.utils import files as f
from sahara.utils import general as g from sahara.utils import general as g
from sahara.utils import poll_utils
from sahara.utils import proxy from sahara.utils import proxy
from sahara.utils import remote from sahara.utils import remote
@ -212,23 +213,14 @@ class VersionHandler(avm.AbstractVersionHandler):
if datanodes_count < 1: if datanodes_count < 1:
return return
LOG.debug("Waiting {count} datanodes to start up".format( l_message = _("Waiting on %s datanodes to start up") % datanodes_count
count=datanodes_count)) LOG.info(l_message)
with remote.get_remote(vu.get_namenode(cluster)) as r: with remote.get_remote(vu.get_namenode(cluster)) as r:
while True: poll_utils.plugin_option_poll(
if run.check_datanodes_count(r, datanodes_count): cluster, run.check_datanodes_count,
LOG.info( c_helper.DATANODES_STARTUP_TIMEOUT, l_message, 1, {
_LI('Datanodes on cluster {cluster} have been started') 'remote': r,
.format(cluster=cluster.name)) 'count': datanodes_count})
return
context.sleep(1)
if not g.check_cluster_exists(cluster):
LOG.debug('Stop waiting for datanodes on cluster {cluster}'
' since it has been deleted'.format(
cluster=cluster.name))
return
def _generate_hive_mysql_password(self, cluster): def _generate_hive_mysql_password(self, cluster):
extra = cluster.extra.to_dict() if cluster.extra else {} extra = cluster.extra.to_dict() if cluster.extra else {}

View File

@ -13,16 +13,13 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import functools
import re import re
from oslo_log import log as logging from oslo_log import log as logging
from oslo_utils import timeutils
import six import six
from sahara import conductor as c from sahara import conductor as c
from sahara import context from sahara import context
from sahara import exceptions as e
from sahara.i18n import _LI from sahara.i18n import _LI
from sahara.utils.notification import sender from sahara.utils.notification import sender
@ -165,47 +162,3 @@ def generate_auto_security_group_name(node_group):
def generate_aa_group_name(cluster_name): def generate_aa_group_name(cluster_name):
return ("%s-aa-group" % cluster_name).lower() return ("%s-aa-group" % cluster_name).lower()
def _get_consumed(start_time):
return timeutils.delta_seconds(start_time, timeutils.utcnow())
def get_obj_in_args(check_obj, *args, **kwargs):
for arg in args:
val = check_obj(arg)
if val is not None:
return val
for arg in kwargs.values():
val = check_obj(arg)
if val is not None:
return val
return None
def await_process(timeout, sleeping_time, op_name, check_object):
""""Awaiting something in cluster."""
def decorator(func):
@functools.wraps(func)
def handler(*args, **kwargs):
start_time = timeutils.utcnow()
cluster = get_obj_in_args(check_object, *args, **kwargs)
while _get_consumed(start_time) < timeout:
consumed = _get_consumed(start_time)
if func(*args, **kwargs):
LOG.info(
_LI("Operation {op_name} was successfully executed "
"in seconds: {sec}").format(op_name=op_name,
sec=consumed))
return
if not check_cluster_exists(cluster):
return
context.sleep(sleeping_time)
raise e.TimeoutException(timeout, op_name)
return handler
return decorator