Merge "Add usages of plugin poll - part 2"
This commit is contained in:
commit
0259e727b4
@ -16,7 +16,6 @@
|
||||
import functools
|
||||
|
||||
from oslo_log import log as logging
|
||||
from oslo_utils import timeutils
|
||||
|
||||
from sahara import context
|
||||
from sahara.i18n import _
|
||||
@ -25,7 +24,7 @@ from sahara.plugins.cdh.client import services
|
||||
from sahara.plugins.cdh import db_helper
|
||||
from sahara.plugins import exceptions as ex
|
||||
from sahara.utils import cluster_progress_ops as cpo
|
||||
|
||||
from sahara.utils import poll_utils
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
@ -219,30 +218,22 @@ class ClouderaUtils(object):
|
||||
_("Process %(process)s is not supported by CDH plugin") %
|
||||
{'process': process})
|
||||
|
||||
@cpo.event_wrapper(True, step=_("Await agents"), param=('cluster', 1))
|
||||
def await_agents(self, cluster, instances):
|
||||
api = self.get_api_client(instances[0].cluster)
|
||||
timeout = 300
|
||||
LOG.debug("Waiting {timeout} seconds for agent connected to manager"
|
||||
.format(timeout=timeout))
|
||||
s_time = timeutils.utcnow()
|
||||
while timeutils.delta_seconds(s_time, timeutils.utcnow()) < timeout:
|
||||
hostnames = [i.fqdn() for i in instances]
|
||||
hostnames_to_manager = [h.hostname for h in
|
||||
api.get_all_hosts('full')]
|
||||
is_ok = True
|
||||
for hostname in hostnames:
|
||||
if hostname not in hostnames_to_manager:
|
||||
is_ok = False
|
||||
break
|
||||
def _agents_connected(self, instances, api):
|
||||
hostnames = [i.fqdn() for i in instances]
|
||||
hostnames_to_manager = [h.hostname for h in
|
||||
api.get_all_hosts('full')]
|
||||
for hostname in hostnames:
|
||||
if hostname not in hostnames_to_manager:
|
||||
return False
|
||||
return True
|
||||
|
||||
if not is_ok:
|
||||
context.sleep(5)
|
||||
else:
|
||||
break
|
||||
else:
|
||||
raise ex.HadoopProvisionError(_("Cloudera agents failed to connect"
|
||||
" to Cloudera Manager"))
|
||||
@cpo.event_wrapper(True, step=_("Await agents"), param=('cluster', 1))
|
||||
def _await_agents(self, cluster, instances, timeout_config):
|
||||
api = self.get_api_client(instances[0].cluster)
|
||||
poll_utils.plugin_option_poll(
|
||||
cluster, self._agents_connected, timeout_config,
|
||||
_("Await Cloudera agents"), 5, {
|
||||
'instances': instances, 'api': api})
|
||||
|
||||
def configure_instances(self, instances, cluster=None):
|
||||
# instances non-empty
|
||||
|
@ -19,17 +19,15 @@ import os
|
||||
import telnetlib
|
||||
|
||||
from oslo_log import log as logging
|
||||
from oslo_utils import timeutils
|
||||
|
||||
from sahara.conductor import resource as res
|
||||
from sahara import context
|
||||
from sahara.i18n import _
|
||||
from sahara.i18n import _LI
|
||||
from sahara.plugins.cdh import commands as cmd
|
||||
from sahara.plugins import exceptions as ex
|
||||
from sahara.plugins import utils as u
|
||||
from sahara.utils import cluster_progress_ops as cpo
|
||||
from sahara.utils import edp as edp_u
|
||||
from sahara.utils import poll_utils
|
||||
|
||||
|
||||
PATH_TO_CORE_SITE_XML = '/etc/hadoop/conf/core-site.xml'
|
||||
@ -262,35 +260,24 @@ class AbstractPluginUtils(object):
|
||||
extjs_vm_location_path, extjs_vm_location_dir),
|
||||
run_as_root=True)
|
||||
|
||||
def _check_cloudera_manager_started(self, manager):
|
||||
try:
|
||||
conn = telnetlib.Telnet(manager.management_ip, CM_API_PORT)
|
||||
conn.close()
|
||||
return True
|
||||
except IOError:
|
||||
return False
|
||||
|
||||
@cpo.event_wrapper(
|
||||
True, step=_("Start Cloudera manager"), param=('cluster', 1))
|
||||
def start_cloudera_manager(self, cluster):
|
||||
True, step=_("Start Cloudera Manager"), param=('cluster', 1))
|
||||
def _start_cloudera_manager(self, cluster, timeout_config):
|
||||
manager = self.get_manager(cluster)
|
||||
with manager.remote() as r:
|
||||
cmd.start_cloudera_db(r)
|
||||
cmd.start_manager(r)
|
||||
|
||||
timeout = 300
|
||||
LOG.debug("Waiting {timeout} seconds for Manager to start: "
|
||||
.format(timeout=timeout))
|
||||
s_time = timeutils.utcnow()
|
||||
while timeutils.delta_seconds(s_time, timeutils.utcnow()) < timeout:
|
||||
try:
|
||||
conn = telnetlib.Telnet(manager.management_ip, CM_API_PORT)
|
||||
conn.close()
|
||||
break
|
||||
except IOError:
|
||||
context.sleep(2)
|
||||
else:
|
||||
message = _("Cloudera Manager failed to start in %(timeout)s "
|
||||
"minutes on node '%(node)s' of cluster "
|
||||
"'%(cluster)s'") % {
|
||||
'timeout': timeout / 60,
|
||||
'node': manager.management_ip,
|
||||
'cluster': cluster.name}
|
||||
raise ex.HadoopProvisionError(message)
|
||||
|
||||
LOG.info(_LI("Cloudera Manager has been started"))
|
||||
poll_utils.plugin_option_poll(
|
||||
cluster, self._check_cloudera_manager_started, timeout_config,
|
||||
_("Await starting Cloudera Manager"), 2, {'manager': manager})
|
||||
|
||||
def configure_os(self, instances):
|
||||
# instances non-empty
|
||||
|
@ -17,6 +17,7 @@ import six
|
||||
|
||||
from sahara.i18n import _
|
||||
from sahara.plugins.cdh import cloudera_utils as cu
|
||||
from sahara.plugins.cdh.v5 import config_helper as c_helper
|
||||
from sahara.plugins.cdh.v5 import plugin_utils as pu
|
||||
from sahara.plugins.cdh.v5 import validation as v
|
||||
from sahara.swift import swift_helper
|
||||
@ -24,8 +25,6 @@ from sahara.utils import cluster_progress_ops as cpo
|
||||
from sahara.utils import xmlutils
|
||||
|
||||
|
||||
CM_API_PORT = 7180
|
||||
|
||||
HDFS_SERVICE_TYPE = 'HDFS'
|
||||
YARN_SERVICE_TYPE = 'YARN'
|
||||
OOZIE_SERVICE_TYPE = 'OOZIE'
|
||||
@ -117,6 +116,9 @@ class ClouderaUtilsV5(cu.ClouderaUtils):
|
||||
cm_cluster.create_service(self.HBASE_SERVICE_NAME,
|
||||
HBASE_SERVICE_TYPE)
|
||||
|
||||
def await_agents(self, cluster, instances):
|
||||
self._await_agents(cluster, instances, c_helper.AWAIT_AGENTS_TIMEOUT)
|
||||
|
||||
@cpo.event_wrapper(
|
||||
True, step=_("Configure services"), param=('cluster', 1))
|
||||
def configure_services(self, cluster):
|
||||
|
@ -92,11 +92,23 @@ EXTJS_LIB_URL = p.Config(
|
||||
description=("Ext 2.2 library is required for Oozie Web Console. "
|
||||
"The file will be downloaded from VM with oozie."))
|
||||
|
||||
AWAIT_AGENTS_TIMEOUT = p.Config(
|
||||
'Await Cloudera agents timeout', 'general', 'cluster', config_type='int',
|
||||
priority=1, default_value=300, is_optional=True,
|
||||
description="Timeout for Cloudera agents connecting to Coudera Manager, "
|
||||
"in seconds")
|
||||
|
||||
AWAIT_MANAGER_STARTING_TIMEOUT = p.Config(
|
||||
'Timeout for Cloudera Manager starting', 'general', 'cluster',
|
||||
config_type='int', priority=1, default_value=300, is_optional=True,
|
||||
description='Timeout for Cloudera Manager starting, in seconds')
|
||||
|
||||
|
||||
def _get_cluster_plugin_configs():
|
||||
return [CDH5_REPO_URL, CDH5_REPO_KEY_URL, CM5_REPO_URL, CM5_REPO_KEY_URL,
|
||||
ENABLE_SWIFT, ENABLE_HBASE_COMMON_LIB, SWIFT_LIB_URL,
|
||||
EXTJS_LIB_URL]
|
||||
EXTJS_LIB_URL, AWAIT_MANAGER_STARTING_TIMEOUT,
|
||||
AWAIT_AGENTS_TIMEOUT]
|
||||
|
||||
|
||||
# ng wide configs
|
||||
|
@ -53,3 +53,7 @@ class PluginUtilsV5(pu.AbstractPluginUtils):
|
||||
'sudo su - -c "hadoop fs -mkdir -p /tmp/hive-hive" hdfs')
|
||||
r.execute_command(
|
||||
'sudo su - -c "hadoop fs -chown hive /tmp/hive-hive" hdfs')
|
||||
|
||||
def start_cloudera_manager(self, cluster):
|
||||
self._start_cloudera_manager(
|
||||
cluster, c_helper.AWAIT_MANAGER_STARTING_TIMEOUT)
|
||||
|
@ -25,8 +25,6 @@ from sahara.utils import cluster_progress_ops as cpo
|
||||
from sahara.utils import xmlutils
|
||||
|
||||
|
||||
CM_API_PORT = 7180
|
||||
|
||||
HDFS_SERVICE_TYPE = 'HDFS'
|
||||
YARN_SERVICE_TYPE = 'YARN'
|
||||
OOZIE_SERVICE_TYPE = 'OOZIE'
|
||||
@ -148,6 +146,9 @@ class ClouderaUtilsV530(cu.ClouderaUtils):
|
||||
cm_cluster.create_service(self.IMPALA_SERVICE_NAME,
|
||||
IMPALA_SERVICE_TYPE)
|
||||
|
||||
def await_agents(self, cluster, instances):
|
||||
self._await_agents(cluster, instances, c_helper.AWAIT_AGENTS_TIMEOUT)
|
||||
|
||||
@cpo.event_wrapper(
|
||||
True, step=_("Configure services"), param=('cluster', 1))
|
||||
def configure_services(self, cluster):
|
||||
|
@ -149,11 +149,23 @@ EXTJS_LIB_URL = p.Config(
|
||||
description=("Ext 2.2 library is required for Oozie Web Console. "
|
||||
"The file will be downloaded from VM with oozie."))
|
||||
|
||||
AWAIT_AGENTS_TIMEOUT = p.Config(
|
||||
'Await Cloudera agents timeout', 'general', 'cluster', config_type='int',
|
||||
priority=1, default_value=300, is_optional=True,
|
||||
description='Timeout for Cloudera agents connecting to Cloudera'
|
||||
' Manager, in seconds')
|
||||
|
||||
AWAIT_MANAGER_STARTING_TIMEOUT = p.Config(
|
||||
'Timeout for Cloudera Manager starting', 'general', 'cluster',
|
||||
config_type='int', priority=1, default_value=300, is_optional=True,
|
||||
description='Timeout for Cloudera Manager starting, in seconds')
|
||||
|
||||
|
||||
def _get_cluster_plugin_configs():
|
||||
return [CDH5_REPO_URL, CDH5_REPO_KEY_URL, CM5_REPO_URL, CM5_REPO_KEY_URL,
|
||||
ENABLE_SWIFT, ENABLE_HBASE_COMMON_LIB, SWIFT_LIB_URL,
|
||||
EXTJS_LIB_URL]
|
||||
EXTJS_LIB_URL, AWAIT_AGENTS_TIMEOUT,
|
||||
AWAIT_MANAGER_STARTING_TIMEOUT]
|
||||
|
||||
|
||||
# ng wide configs
|
||||
|
@ -123,3 +123,7 @@ class PluginUtilsV530(pu.AbstractPluginUtils):
|
||||
manager = self.get_manager(cluster)
|
||||
with manager.remote() as r:
|
||||
self.db_helper.create_sentry_database(cluster, r)
|
||||
|
||||
def start_cloudera_manager(self, cluster):
|
||||
self._start_cloudera_manager(
|
||||
cluster, c_helper.AWAIT_MANAGER_STARTING_TIMEOUT)
|
||||
|
@ -126,6 +126,28 @@ ENABLE_DATA_LOCALITY = p.Config('Enable Data Locality', 'general', 'cluster',
|
||||
default_value=True, is_optional=True)
|
||||
|
||||
|
||||
DATANODES_DECOMMISSIONING_TIMEOUT = p.Config(
|
||||
'DataNodes decommissioning timeout', 'general',
|
||||
'cluster', config_type='int', priority=1,
|
||||
default_value=3600 * 4, is_optional=True,
|
||||
description='Timeout for datanode decommissioning operation'
|
||||
' during scaling, in seconds')
|
||||
|
||||
|
||||
NODEMANAGERS_DECOMMISSIONING_TIMEOUT = p.Config(
|
||||
'NodeManagers decommissioning timeout', 'general',
|
||||
'cluster', config_type='int', priority=1,
|
||||
default_value=300, is_optional=True,
|
||||
description='Timeout for NodeManager decommissioning operation'
|
||||
' during scaling, in seconds')
|
||||
|
||||
|
||||
DATANODES_STARTUP_TIMEOUT = p.Config(
|
||||
'DataNodes startup timeout', 'general', 'cluster', config_type='int',
|
||||
priority=1, default_value=10800, is_optional=True,
|
||||
description='Timeout for DataNodes startup, in seconds')
|
||||
|
||||
|
||||
def init_env_configs(env_confs):
|
||||
configs = []
|
||||
for service, config_items in env_confs.iteritems():
|
||||
@ -138,7 +160,9 @@ def init_env_configs(env_confs):
|
||||
|
||||
|
||||
def _init_general_configs():
|
||||
configs = [ENABLE_SWIFT, ENABLE_MYSQL]
|
||||
configs = [ENABLE_SWIFT, ENABLE_MYSQL, DATANODES_STARTUP_TIMEOUT,
|
||||
DATANODES_DECOMMISSIONING_TIMEOUT,
|
||||
NODEMANAGERS_DECOMMISSIONING_TIMEOUT]
|
||||
if CONF.enable_data_locality:
|
||||
configs.append(ENABLE_DATA_LOCALITY)
|
||||
return configs
|
||||
|
@ -26,7 +26,7 @@ from sahara.plugins.vanilla import utils as vu
|
||||
from sahara.utils import cluster_progress_ops as cpo
|
||||
from sahara.utils import edp
|
||||
from sahara.utils import files
|
||||
from sahara.utils import general as g
|
||||
from sahara.utils import poll_utils
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
@ -158,23 +158,12 @@ def await_datanodes(cluster):
|
||||
if datanodes_count < 1:
|
||||
return
|
||||
|
||||
LOG.debug("Waiting {count} datanodes to start up".format(
|
||||
count=datanodes_count))
|
||||
l_message = _("Waiting on %s datanodes to start up") % datanodes_count
|
||||
with vu.get_namenode(cluster).remote() as r:
|
||||
while True:
|
||||
if _check_datanodes_count(r, datanodes_count):
|
||||
LOG.info(
|
||||
_LI('Datanodes on cluster {cluster} have been started')
|
||||
.format(cluster=cluster.name))
|
||||
return
|
||||
|
||||
context.sleep(1)
|
||||
|
||||
if not g.check_cluster_exists(cluster):
|
||||
LOG.info(
|
||||
_LI('Stop waiting for datanodes on cluster {cluster} since'
|
||||
' it has been deleted').format(cluster=cluster.name))
|
||||
return
|
||||
poll_utils.plugin_option_poll(
|
||||
cluster, _check_datanodes_count,
|
||||
c_helper.DATANODES_STARTUP_TIMEOUT, l_message, 1, {
|
||||
'remote': r, 'count': datanodes_count})
|
||||
|
||||
|
||||
def _check_datanodes_count(remote, count):
|
||||
|
@ -13,17 +13,16 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from oslo_utils import timeutils
|
||||
|
||||
from sahara import context
|
||||
from sahara.i18n import _
|
||||
from sahara.plugins import exceptions as ex
|
||||
from sahara.plugins import utils as u
|
||||
from sahara.plugins.vanilla.hadoop2 import config
|
||||
from sahara.plugins.vanilla.hadoop2 import config_helper as c_helper
|
||||
from sahara.plugins.vanilla.hadoop2 import run_scripts as run
|
||||
from sahara.plugins.vanilla.hadoop2 import utils as pu
|
||||
from sahara.plugins.vanilla import utils as vu
|
||||
from sahara.utils import cluster_progress_ops as cpo
|
||||
from sahara.utils import poll_utils
|
||||
|
||||
|
||||
HADOOP_CONF_DIR = config.HADOOP_CONF_DIR
|
||||
|
||||
@ -116,33 +115,30 @@ def _clear_exclude_files(cluster):
|
||||
'sudo su - -c "echo > %s/nm-exclude" hadoop' % HADOOP_CONF_DIR)
|
||||
|
||||
|
||||
def _check_decommission(cluster, instances, check_func, timeout):
|
||||
s_time = timeutils.utcnow()
|
||||
while timeutils.delta_seconds(s_time, timeutils.utcnow()) < timeout:
|
||||
statuses = check_func(cluster)
|
||||
dec_ok = True
|
||||
for instance in instances:
|
||||
if statuses[instance.fqdn()] != 'decommissioned':
|
||||
dec_ok = False
|
||||
def is_decommissioned(cluster, check_func, instances):
|
||||
statuses = check_func(cluster)
|
||||
for instance in instances:
|
||||
if statuses[instance.fqdn()] != 'decommissioned':
|
||||
return False
|
||||
return True
|
||||
|
||||
if dec_ok:
|
||||
return
|
||||
else:
|
||||
context.sleep(5)
|
||||
else:
|
||||
ex.DecommissionError(
|
||||
_("Cannot finish decommission of cluster %(cluster)s in "
|
||||
"%(seconds)d seconds") %
|
||||
{"cluster": cluster, "seconds": timeout})
|
||||
|
||||
def _check_decommission(cluster, instances, check_func, option):
|
||||
poll_utils.plugin_option_poll(
|
||||
cluster, is_decommissioned, option, _("Wait for decommissioning"),
|
||||
5, {'cluster': cluster, 'check_func': check_func,
|
||||
'instances': instances})
|
||||
|
||||
|
||||
@cpo.event_wrapper(
|
||||
True, step=_("Decommission %s") % "NodeManagers", param=('cluster', 0))
|
||||
def _check_nodemanagers_decommission(cluster, instances):
|
||||
_check_decommission(cluster, instances, pu.get_nodemanagers_status, 300)
|
||||
_check_decommission(cluster, instances, pu.get_nodemanagers_status,
|
||||
c_helper.NODEMANAGERS_DECOMMISSIONING_TIMEOUT)
|
||||
|
||||
|
||||
@cpo.event_wrapper(
|
||||
True, step=_("Decommission %s") % "DataNodes", param=('cluster', 0))
|
||||
def _check_datanodes_decommission(cluster, instances):
|
||||
_check_decommission(cluster, instances, pu.get_datanodes_status, 3600 * 4)
|
||||
_check_decommission(cluster, instances, pu.get_datanodes_status,
|
||||
c_helper.DATANODES_DECOMMISSIONING_TIMEOUT)
|
||||
|
Loading…
Reference in New Issue
Block a user