Merge "Configure NTP service on cluster instances"
This commit is contained in:
commit
adfb084b0f
@ -431,3 +431,17 @@ set to ``True`` and some extra configurations are needed:
|
||||
|
||||
It should be noted that in a situation when the host has no space for volume
|
||||
creation, the created volume will have an ``Error`` state and can not be used.
|
||||
|
||||
NTP service configuration
|
||||
-------------------------
|
||||
|
||||
By default sahara will enable the NTP service on all cluster instances if the
|
||||
NTP package is included in the image (the sahara disk image builder will
|
||||
include NTP in all images it generates). The default NTP server will be
|
||||
``pool.ntp.org``; this can be overridden using the ``default_ntp_server``
|
||||
setting in the ``DEFAULT`` section of the sahara configuration file.
|
||||
If you would like to specify a different NTP server for a particular cluster
|
||||
template, use the ``URL of NTP server`` setting in the ``General Parameters``
|
||||
section when you create the template. If you would like to disable NTP for a
|
||||
particular cluster template, deselect the ``Enable NTP service`` checkbox in
|
||||
the ``General Parameters`` section when you create the template.
|
||||
|
@ -94,6 +94,13 @@ class ProvisioningPluginBase(plugins_base.PluginInterface):
|
||||
def recommend_configs(self, cluster):
|
||||
pass
|
||||
|
||||
def get_all_configs(self, hadoop_version):
|
||||
common = list_of_common_configs()
|
||||
plugin_specific_configs = self.get_configs(hadoop_version)
|
||||
if plugin_specific_configs:
|
||||
common.extend(plugin_specific_configs)
|
||||
return common
|
||||
|
||||
def to_dict(self):
|
||||
res = super(ProvisioningPluginBase, self).to_dict()
|
||||
res['versions'] = self.get_versions()
|
||||
@ -102,7 +109,7 @@ class ProvisioningPluginBase(plugins_base.PluginInterface):
|
||||
# Some helpers for plugins
|
||||
|
||||
def _map_to_user_inputs(self, hadoop_version, configs):
|
||||
config_objs = self.get_configs(hadoop_version)
|
||||
config_objs = self.get_all_configs(hadoop_version)
|
||||
|
||||
# convert config objects to applicable_target -> config_name -> obj
|
||||
config_objs_map = {}
|
||||
@ -203,3 +210,23 @@ class ValidationError(object):
|
||||
|
||||
def __repr__(self):
|
||||
return "<ValidationError %s>" % self.config.name
|
||||
|
||||
# COMMON FOR ALL PLUGINS CONFIGS
|
||||
|
||||
NTP_URL = Config(
|
||||
"URL of NTP server", 'general', 'cluster', priority=1,
|
||||
default_value='', is_optional=True,
|
||||
description='URL of the NTP server for synchronization time on cluster'
|
||||
' instances'
|
||||
)
|
||||
|
||||
NTP_ENABLED = Config(
|
||||
"Enable NTP service", 'general', 'cluster', priority=1, default_value=True,
|
||||
config_type="bool",
|
||||
description='Enables NTP service for synchronization time on cluster '
|
||||
'instances'
|
||||
)
|
||||
|
||||
|
||||
def list_of_common_configs():
|
||||
return [NTP_ENABLED, NTP_URL]
|
||||
|
@ -87,7 +87,7 @@ def get_config_value_or_default(service, name, cluster):
|
||||
# Find and return the default
|
||||
|
||||
plugin = plugins_base.PLUGINS.get_plugin(cluster.plugin_name)
|
||||
configs = plugin.get_configs(cluster.hadoop_version)
|
||||
configs = plugin.get_all_configs(cluster.hadoop_version)
|
||||
|
||||
for config in configs:
|
||||
if config.applicable_target == service and config.name == name:
|
||||
|
@ -221,7 +221,7 @@ def get_plugin(plugin_name, version=None):
|
||||
res = plugin.as_resource()
|
||||
if version:
|
||||
if version in plugin.get_versions():
|
||||
configs = plugin.get_configs(version)
|
||||
configs = plugin.get_all_configs(version)
|
||||
res._info['configs'] = [c.dict for c in configs]
|
||||
processes = plugin.get_node_processes(version)
|
||||
res._info['node_processes'] = processes
|
||||
|
123
sahara/service/ntp_service.py
Normal file
123
sahara/service/ntp_service.py
Normal file
@ -0,0 +1,123 @@
|
||||
# Copyright (c) 2015 Mirantis Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log as logging
|
||||
|
||||
from sahara import conductor as cond
|
||||
from sahara import context
|
||||
from sahara.i18n import _LI
|
||||
from sahara.i18n import _LW
|
||||
from sahara.plugins import provisioning as common_configs
|
||||
from sahara.utils import general as g
|
||||
|
||||
|
||||
CONF = cfg.CONF
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
conductor = cond.API
|
||||
|
||||
ntp_opts = [
|
||||
cfg.StrOpt('default_ntp_server',
|
||||
default="pool.ntp.org",
|
||||
help="Default ntp server for time sync")
|
||||
]
|
||||
|
||||
CONF.register_opts(ntp_opts)
|
||||
|
||||
|
||||
def _get_os_distrib(remote):
|
||||
return remote.execute_command('lsb_release -is')[1].strip().lower()
|
||||
|
||||
|
||||
def _sudo(remote, cmd):
|
||||
remote.execute_command(cmd, run_as_root=True)
|
||||
|
||||
|
||||
def _restart_ntp(remote):
|
||||
distrib = _get_os_distrib(remote)
|
||||
cmd = "service %s restart"
|
||||
if distrib == 'ubuntu':
|
||||
cmd = cmd % "ntp"
|
||||
else:
|
||||
cmd = cmd % "ntpd"
|
||||
|
||||
_sudo(remote, cmd)
|
||||
|
||||
|
||||
def _verify_installation(remote):
|
||||
distrib = _get_os_distrib(remote)
|
||||
if distrib == 'ubuntu':
|
||||
return remote.execute_command("dpkg -s ntp")
|
||||
else:
|
||||
return remote.execute_command("rpm -q ntp")
|
||||
|
||||
|
||||
def _check_ntp_installed(remote):
|
||||
try:
|
||||
exit_code, stdout = _verify_installation(remote)
|
||||
if exit_code != 0:
|
||||
return False
|
||||
return True
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
|
||||
def _configure_ntp_on_instance(instance, url):
|
||||
with context.set_current_instance_id(instance.instance_id):
|
||||
LOG.debug("Configuring ntp server")
|
||||
with instance.remote() as r:
|
||||
if not _check_ntp_installed(r):
|
||||
# missing ntp service
|
||||
LOG.warning(_LW("Unable to configure NTP service"))
|
||||
return
|
||||
|
||||
r.append_to_file(
|
||||
"/etc/ntp.conf", "server {url}".format(url=url),
|
||||
run_as_root=True)
|
||||
_restart_ntp(r)
|
||||
_sudo(r, "ntpdate -u {url}".format(url=url))
|
||||
LOG.info(_LI("NTP successfully configured"))
|
||||
|
||||
|
||||
def is_ntp_enabled(cluster):
|
||||
target = common_configs.NTP_ENABLED.applicable_target
|
||||
name = common_configs.NTP_ENABLED.name
|
||||
cl_configs = cluster.cluster_configs
|
||||
if target not in cl_configs or name not in cl_configs[target]:
|
||||
return common_configs.NTP_ENABLED.default_value
|
||||
return cl_configs[target][name]
|
||||
|
||||
|
||||
def retrieve_ntp_server_url(cluster):
|
||||
target = common_configs.NTP_URL.applicable_target
|
||||
name = common_configs.NTP_URL.name
|
||||
cl_configs = cluster.cluster_configs
|
||||
if target not in cl_configs or name not in cl_configs[target]:
|
||||
return CONF.default_ntp_server
|
||||
return cl_configs[target][name]
|
||||
|
||||
|
||||
def configure_ntp(cluster_id):
|
||||
cluster = conductor.cluster_get(context.ctx(), cluster_id)
|
||||
if not is_ntp_enabled(cluster):
|
||||
LOG.debug("Don't configure NTP on cluster")
|
||||
return
|
||||
instances = g.get_instances(cluster)
|
||||
url = retrieve_ntp_server_url(cluster)
|
||||
with context.ThreadGroup() as tg:
|
||||
for instance in instances:
|
||||
tg.spawn("configure-ntp-%s" % instance.instance_name,
|
||||
_configure_ntp_on_instance, instance, url)
|
@ -28,6 +28,7 @@ from sahara.i18n import _
|
||||
from sahara.i18n import _LE
|
||||
from sahara.plugins import base as plugin_base
|
||||
from sahara.service.edp import job_manager
|
||||
from sahara.service import ntp_service
|
||||
from sahara.service import trusts
|
||||
from sahara.utils import general as g
|
||||
from sahara.utils import remote
|
||||
@ -257,6 +258,7 @@ def _provision_cluster(cluster_id):
|
||||
plugin.configure_cluster(cluster)
|
||||
|
||||
# starting prepared and configured cluster
|
||||
ntp_service.configure_ntp(cluster_id)
|
||||
cluster = g.change_cluster_status(cluster, "Starting")
|
||||
context.set_step_type(_("Plugin: start cluster"))
|
||||
plugin.start_cluster(cluster)
|
||||
@ -301,6 +303,7 @@ def _provision_scaled_cluster(cluster_id, node_group_id_map):
|
||||
|
||||
# Setting up new nodes with the plugin
|
||||
if instance_ids:
|
||||
ntp_service.configure_ntp(cluster_id)
|
||||
cluster = g.change_cluster_status(cluster, "Configuring")
|
||||
instances = g.get_instances(cluster, instance_ids)
|
||||
context.set_step_type(_("Plugin: scale cluster"))
|
||||
|
@ -40,7 +40,7 @@ MAX_HOSTNAME_LENGTH = 64
|
||||
def _get_plugin_configs(plugin_name, hadoop_version, scope=None):
|
||||
pl_confs = {}
|
||||
for config in plugin_base.PLUGINS.get_plugin(
|
||||
plugin_name).get_configs(hadoop_version):
|
||||
plugin_name).get_all_configs(hadoop_version):
|
||||
if pl_confs.get(config.applicable_target):
|
||||
pl_confs[config.applicable_target].append(config.name)
|
||||
else:
|
||||
|
@ -111,6 +111,9 @@ class FakePlugin(object):
|
||||
def get_versions(self):
|
||||
return ['0.1', '0.2']
|
||||
|
||||
def get_all_configs(self, version):
|
||||
return self.get_configs(version)
|
||||
|
||||
def get_required_image_tags(self, version):
|
||||
return ['fake']
|
||||
|
||||
|
99
sahara/tests/unit/service/test_ntp_service.py
Normal file
99
sahara/tests/unit/service/test_ntp_service.py
Normal file
@ -0,0 +1,99 @@
|
||||
# Copyright (c) 2015 Mirantis Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import mock
|
||||
|
||||
from sahara.service import ntp_service as ntp
|
||||
from sahara.tests.unit import base as test_base
|
||||
|
||||
|
||||
class FakeRemote(object):
|
||||
def __init__(self, effects):
|
||||
self.effects = effects
|
||||
self.idx = 0
|
||||
|
||||
def __enter__(self):
|
||||
return self
|
||||
|
||||
def __exit__(self, *args):
|
||||
# validate number of executions
|
||||
if self.idx != len(self.effects):
|
||||
raise ValueError()
|
||||
|
||||
def _get_effect(self):
|
||||
self.idx += 1
|
||||
return self.effects[self.idx - 1]
|
||||
|
||||
def execute_command(self, cmd, run_as_root=False):
|
||||
effect = self._get_effect()
|
||||
if isinstance(effect, RuntimeError):
|
||||
raise effect
|
||||
return 0, effect
|
||||
|
||||
def append_to_file(self, file, text, run_as_root=False):
|
||||
return self.execute_command(file, run_as_root)
|
||||
|
||||
|
||||
class FakeInstance(object):
|
||||
def __init__(self, effects, id):
|
||||
self.id = id
|
||||
self.instance_name = id
|
||||
self.instance_id = id
|
||||
self.effects = effects
|
||||
|
||||
def remote(self):
|
||||
return FakeRemote(self.effects)
|
||||
|
||||
|
||||
class NTPServiceTest(test_base.SaharaTestCase):
|
||||
@mock.patch('sahara.service.ntp_service.LOG.warning')
|
||||
@mock.patch('sahara.service.ntp_service.conductor.cluster_get')
|
||||
def test_configuring_ntp_unable_to_configure(self, cl_get, logger):
|
||||
instance = FakeInstance(["ubuntu", RuntimeError()], "1")
|
||||
ng = mock.Mock(instances=[instance])
|
||||
cl_get.return_value = mock.Mock(
|
||||
node_groups=[ng], cluster_configs={})
|
||||
ntp.configure_ntp('1')
|
||||
self.assertEqual(
|
||||
[mock.call("Unable to configure NTP service")],
|
||||
logger.call_args_list)
|
||||
|
||||
@mock.patch('sahara.service.ntp_service.LOG.info')
|
||||
@mock.patch('sahara.service.ntp_service.conductor.cluster_get')
|
||||
def test_configuring_success(self, cl_get, logger):
|
||||
instance = FakeInstance(
|
||||
['centos', "cat", "batman", "vs", "superman", "boom"], "1")
|
||||
ng = mock.Mock(instances=[instance])
|
||||
cl_get.return_value = mock.Mock(node_groups=[ng], cluster_configs={})
|
||||
ntp.configure_ntp('1')
|
||||
self.assertEqual([mock.call("NTP successfully configured")],
|
||||
logger.call_args_list)
|
||||
|
||||
def test_retrieve_url(self):
|
||||
cl = mock.Mock(
|
||||
cluster_configs={'general': {"URL of NTP server": "batman.org"}})
|
||||
self.assertEqual("batman.org", ntp.retrieve_ntp_server_url(cl))
|
||||
self.override_config('default_ntp_server', "superman.org")
|
||||
cl = mock.Mock(cluster_configs={'general': {}})
|
||||
self.assertEqual("superman.org", ntp.retrieve_ntp_server_url(cl))
|
||||
|
||||
@mock.patch('sahara.service.ntp_service.conductor.cluster_get')
|
||||
@mock.patch('sahara.service.ntp_service.retrieve_ntp_server_url')
|
||||
def test_is_ntp_enabled(self, ntp_url, cl_get):
|
||||
cl = mock.Mock(
|
||||
cluster_configs={'general': {"Enable NTP service": False}})
|
||||
cl_get.return_value = cl
|
||||
ntp.configure_ntp('1')
|
||||
self.assertEqual(0, ntp_url.call_count)
|
@ -99,6 +99,7 @@ class TestOPS(base.SaharaWithDbTestCase):
|
||||
'configure_cluster', 'start_cluster'], self.SEQUENCE,
|
||||
'Order of calls is wrong')
|
||||
|
||||
@mock.patch('sahara.service.ntp_service.configure_ntp')
|
||||
@mock.patch('sahara.service.ops.CONF')
|
||||
@mock.patch('sahara.service.ops._prepare_provisioning',
|
||||
return_value=(mock.Mock(), mock.Mock(), FakePlugin()))
|
||||
@ -106,7 +107,7 @@ class TestOPS(base.SaharaWithDbTestCase):
|
||||
return_value=FakePlugin())
|
||||
@mock.patch('sahara.utils.general.get_instances')
|
||||
def test_provision_scaled_cluster(self, p_get_instances, p_change_status,
|
||||
p_prep_provisioning, p_conf):
|
||||
p_prep_provisioning, p_conf, p_ntp):
|
||||
del self.SEQUENCE[:]
|
||||
ops.INFRA = FakeINFRA()
|
||||
p_conf.use_identity_api_v3 = True
|
||||
|
Loading…
Reference in New Issue
Block a user