Merge "Add ZooKeeper support in Vanilla cluster"
This commit is contained in:
commit
ef11342656
|
@ -53,6 +53,7 @@ PORTS_MAP = {
|
|||
"oozie": [11000],
|
||||
"hiveserver": [9999, 10000],
|
||||
"spark history server": [18080],
|
||||
"zookeeper": [2181, 2888, 3888]
|
||||
}
|
||||
|
||||
|
||||
|
@ -66,12 +67,52 @@ def configure_cluster(pctx, cluster):
|
|||
instances = utils.get_instances(cluster)
|
||||
configure_instances(pctx, instances)
|
||||
configure_topology_data(pctx, cluster)
|
||||
configure_zookeeper(cluster)
|
||||
configure_spark(cluster)
|
||||
|
||||
|
||||
def configure_zookeeper(cluster, instances=None):
|
||||
zk_servers = vu.get_zk_servers(cluster)
|
||||
if zk_servers:
|
||||
zk_conf = c_helper.generate_zk_basic_config(cluster)
|
||||
zk_conf += _form_zk_servers_to_quorum(cluster, instances)
|
||||
_push_zk_configs_to_nodes(cluster, zk_conf, instances)
|
||||
|
||||
|
||||
def _form_zk_servers_to_quorum(cluster, to_delete_instances=None):
|
||||
quorum = []
|
||||
instances = map(vu.get_instance_hostname, vu.get_zk_servers(cluster))
|
||||
if to_delete_instances:
|
||||
delete_instances = map(vu.get_instance_hostname, to_delete_instances)
|
||||
reserve_instances = list(set(instances) - set(delete_instances))
|
||||
# keep the original order of instances
|
||||
reserve_instances.sort(key=instances.index)
|
||||
else:
|
||||
reserve_instances = instances
|
||||
for index, instance in enumerate(reserve_instances):
|
||||
quorum.append("server.%s=%s:2888:3888" % (index, instance))
|
||||
return '\n'.join(quorum)
|
||||
|
||||
|
||||
def _push_zk_configs_to_nodes(cluster, zk_conf, to_delete_instances=None):
|
||||
instances = vu.get_zk_servers(cluster)
|
||||
if to_delete_instances:
|
||||
for instance in to_delete_instances:
|
||||
if instance in instances:
|
||||
instances.remove(instance)
|
||||
for index, instance in enumerate(instances):
|
||||
with instance.remote() as r:
|
||||
r.write_file_to('/opt/zookeeper/conf/zoo.cfg', zk_conf,
|
||||
run_as_root=True)
|
||||
r.execute_command(
|
||||
'sudo su - -c "echo %s > /var/zookeeper/myid" hadoop' % index)
|
||||
|
||||
|
||||
def configure_spark(cluster):
|
||||
extra = _extract_spark_configs_to_extra(cluster)
|
||||
_push_spark_configs_to_node(cluster, extra)
|
||||
spark_servers = vu.get_spark_history_server(cluster)
|
||||
if spark_servers:
|
||||
extra = _extract_spark_configs_to_extra(cluster)
|
||||
_push_spark_configs_to_node(cluster, extra)
|
||||
|
||||
|
||||
def _push_spark_configs_to_node(cluster, extra):
|
||||
|
|
|
@ -26,7 +26,6 @@ from sahara.utils import types
|
|||
CONF = cfg.CONF
|
||||
CONF.import_opt("enable_data_locality", "sahara.topology.topology_helper")
|
||||
|
||||
|
||||
HIDDEN_CONFS = [
|
||||
'dfs.hosts',
|
||||
'dfs.hosts.exclude',
|
||||
|
@ -132,6 +131,33 @@ SPARK_CONFS = {
|
|||
}
|
||||
}
|
||||
|
||||
ZOOKEEPER_CONFS = {
|
||||
"ZooKeeper": {
|
||||
"OPTIONS": [
|
||||
{
|
||||
'name': 'tickTime',
|
||||
'description': 'The number of milliseconds of each tick',
|
||||
'default': 2000,
|
||||
'priority': 2,
|
||||
},
|
||||
{
|
||||
'name': 'initLimit',
|
||||
'description': 'The number of ticks that the initial'
|
||||
' synchronization phase can take',
|
||||
'default': 10,
|
||||
'priority': 2,
|
||||
},
|
||||
{
|
||||
'name': 'syncLimit',
|
||||
'description': 'The number of ticks that can pass between'
|
||||
' sending a request and getting an acknowledgement',
|
||||
'default': 5,
|
||||
'priority': 2,
|
||||
},
|
||||
]
|
||||
}
|
||||
}
|
||||
|
||||
# for now we have not so many cluster-wide configs
|
||||
# lets consider all of them having high priority
|
||||
PRIORITY_1_CONFS += CLUSTER_WIDE_CONFS
|
||||
|
@ -305,3 +331,17 @@ def generate_job_cleanup_config(cluster):
|
|||
|
||||
def get_spark_home(cluster):
|
||||
return utils.get_config_value_or_default("Spark", "Spark home", cluster)
|
||||
|
||||
|
||||
def generate_zk_basic_config(cluster):
|
||||
args = {
|
||||
'ticktime': utils.get_config_value_or_default(
|
||||
"ZooKeeper", "tickTime", cluster),
|
||||
'initlimit': utils.get_config_value_or_default(
|
||||
"ZooKeeper", "initLimit", cluster),
|
||||
'synclimit': utils.get_config_value_or_default(
|
||||
"ZooKeeper", "syncLimit", cluster)
|
||||
}
|
||||
zoo_cfg = f.get_file_text(
|
||||
'plugins/vanilla/hadoop2/resources/zoo_sample.cfg')
|
||||
return zoo_cfg.format(**args)
|
||||
|
|
|
@ -0,0 +1,29 @@
|
|||
# The number of milliseconds of each tick
|
||||
tickTime={ticktime}
|
||||
# The number of ticks that the initial
|
||||
# synchronization phase can take
|
||||
initLimit={initlimit}
|
||||
# The number of ticks that can pass between
|
||||
# sending a request and getting an acknowledgement
|
||||
syncLimit={synclimit}
|
||||
# the directory where the snapshot is stored.
|
||||
# do not use /tmp for storage, /tmp here is just
|
||||
# example sakes.
|
||||
dataDir=/var/zookeeper
|
||||
# the port at which the clients will connect
|
||||
clientPort=2181
|
||||
# the maximum number of client connections.
|
||||
# increase this if you need to handle more clients
|
||||
#maxClientCnxns=60
|
||||
#
|
||||
# Be sure to read the maintenance section of the
|
||||
# administrator guide before turning on autopurge.
|
||||
#
|
||||
# http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
|
||||
#
|
||||
# The number of snapshots to retain in dataDir
|
||||
#autopurge.snapRetainCount=3
|
||||
# Purge task interval in hours
|
||||
# Set to "0" to disable auto purge feature
|
||||
#autopurge.purgeInterval=1
|
||||
|
|
@ -115,6 +115,46 @@ def start_spark_history_server(master):
|
|||
sp_home, 'sbin/start-history-server.sh'))
|
||||
|
||||
|
||||
def start_zk_server(instances):
|
||||
cpo.add_provisioning_step(
|
||||
instances[0].cluster_id,
|
||||
pu.start_process_event_message("ZooKeeper"),
|
||||
len(instances))
|
||||
|
||||
with context.ThreadGroup() as tg:
|
||||
for instance in instances:
|
||||
with context.set_current_instance_id(instance.instance_id):
|
||||
tg.spawn('ZK-start-processes-%s' % instance.instance_name,
|
||||
_start_zk_processes, instance, 'start')
|
||||
|
||||
|
||||
def refresh_zk_servers(cluster, to_delete_instances=None):
|
||||
instances = vu.get_zk_servers(cluster)
|
||||
if to_delete_instances:
|
||||
for instance in to_delete_instances:
|
||||
if instance in instances:
|
||||
instances.remove(instance)
|
||||
|
||||
cpo.add_provisioning_step(
|
||||
cluster.id,
|
||||
pu.start_process_event_message("ZooKeeper"),
|
||||
len(instances))
|
||||
|
||||
with context.ThreadGroup() as tg:
|
||||
for instance in instances:
|
||||
with context.set_current_instance_id(instance.instance_id):
|
||||
tg.spawn('ZK-restart-processes-%s' % instance.instance_name,
|
||||
_start_zk_processes, instance, 'restart')
|
||||
|
||||
|
||||
@cpo.event_wrapper(True)
|
||||
def _start_zk_processes(instance, operation):
|
||||
with instance.remote() as r:
|
||||
r.execute_command(
|
||||
'sudo su - -c "bash /opt/zookeeper/bin/zkServer.sh %s"'
|
||||
' hadoop' % operation)
|
||||
|
||||
|
||||
def format_namenode(instance):
|
||||
instance.remote().execute_command(
|
||||
'sudo su - -c "hdfs namenode -format" hadoop')
|
||||
|
|
|
@ -39,6 +39,8 @@ def scale_cluster(pctx, cluster, instances):
|
|||
config.configure_topology_data(pctx, cluster)
|
||||
run.start_dn_nm_processes(instances)
|
||||
swift_helper.install_ssl_certs(instances)
|
||||
config.configure_zookeeper(cluster)
|
||||
run.refresh_zk_servers(cluster)
|
||||
|
||||
|
||||
def _get_instances_with_service(instances, service):
|
||||
|
@ -87,6 +89,9 @@ def decommission_nodes(pctx, cluster, instances):
|
|||
run.refresh_hadoop_nodes(cluster)
|
||||
|
||||
config.configure_topology_data(pctx, cluster)
|
||||
config.configure_zookeeper(cluster, instances)
|
||||
# TODO(shuyingya):should invent a way to lastly restart the leader node
|
||||
run.refresh_zk_servers(cluster, instances)
|
||||
|
||||
|
||||
def _update_exclude_files(cluster, instances):
|
||||
|
|
|
@ -77,3 +77,9 @@ def start_spark(cluster):
|
|||
spark = vu.get_spark_history_server(cluster)
|
||||
if spark:
|
||||
run.start_spark_history_server(spark)
|
||||
|
||||
|
||||
def start_zookeeper(cluster):
|
||||
zk_servers = vu.get_zk_servers(cluster)
|
||||
if zk_servers:
|
||||
run.start_zk_server(zk_servers)
|
||||
|
|
|
@ -83,6 +83,12 @@ def validate_cluster_creating(pctx, cluster):
|
|||
raise ex.InvalidComponentCountException('hive', _('0 or 1'),
|
||||
hive_count)
|
||||
|
||||
zk_count = _get_inst_count(cluster, 'zookeeper')
|
||||
if zk_count > 0 and (zk_count % 2) != 1:
|
||||
raise ex.InvalidComponentCountException(
|
||||
'zookeeper', _('odd'), zk_count, _('Number of zookeeper nodes'
|
||||
'should be in odd.'))
|
||||
|
||||
|
||||
def validate_additional_ng_scaling(cluster, additional):
|
||||
rm = vu.get_resourcemanager(cluster)
|
||||
|
@ -125,8 +131,27 @@ def validate_existing_ng_scaling(pctx, cluster, existing):
|
|||
cluster.name, msg % rep_factor)
|
||||
|
||||
|
||||
def validate_zookeeper_node_count(zk_ng, existing, additional):
|
||||
zk_amount = 0
|
||||
for ng in zk_ng:
|
||||
if ng.id in existing:
|
||||
zk_amount += existing[ng.id]
|
||||
else:
|
||||
zk_amount += ng.count
|
||||
|
||||
for ng_id in additional:
|
||||
ng = gu.get_by_id(zk_ng, ng_id)
|
||||
if "zookeeper" in ng.node_processes:
|
||||
zk_amount += ng.count
|
||||
|
||||
if (zk_amount % 2) != 1:
|
||||
msg = _("Vanilla plugin cannot scale cluster because it must keep"
|
||||
" zookeeper service in odd.")
|
||||
raise ex.ClusterCannotBeScaled(zk_ng[0].cluster.name, msg)
|
||||
|
||||
|
||||
def _get_scalable_processes():
|
||||
return ['datanode', 'nodemanager']
|
||||
return ['datanode', 'nodemanager', 'zookeeper']
|
||||
|
||||
|
||||
def _get_inst_count(cluster, process):
|
||||
|
|
|
@ -60,6 +60,10 @@ def get_instance_hostname(instance):
|
|||
return instance.hostname() if instance else None
|
||||
|
||||
|
||||
def get_zk_servers(cluster):
|
||||
return u.get_instances(cluster, 'zookeeper')
|
||||
|
||||
|
||||
def generate_random_password():
|
||||
password = uuidutils.generate_uuid()
|
||||
return castellan.store_secret(password)
|
||||
|
|
|
@ -79,6 +79,7 @@ def _init_all_configs():
|
|||
configs.extend(PLUGIN_ENV_CONFIGS)
|
||||
configs.extend(c_helper.PLUGIN_GENERAL_CONFIGS)
|
||||
configs.extend(_get_spark_configs())
|
||||
configs.extend(_get_zookeeper_configs())
|
||||
return configs
|
||||
|
||||
|
||||
|
@ -96,6 +97,20 @@ def _get_spark_configs():
|
|||
return spark_configs
|
||||
|
||||
|
||||
def _get_zookeeper_configs():
|
||||
zk_configs = []
|
||||
for service, config_items in six.iteritems(c_helper.ZOOKEEPER_CONFS):
|
||||
for item in config_items['OPTIONS']:
|
||||
cfg = p.Config(name=item["name"],
|
||||
description=item["description"],
|
||||
default_value=item["default"],
|
||||
applicable_target=service,
|
||||
scope="cluster", is_optional=True,
|
||||
priority=item["priority"])
|
||||
zk_configs.append(cfg)
|
||||
return zk_configs
|
||||
|
||||
|
||||
PLUGIN_CONFIGS = _init_all_configs()
|
||||
|
||||
|
||||
|
|
|
@ -56,7 +56,8 @@ class VersionHandler(avm.AbstractVersionHandler):
|
|||
"YARN": ["resourcemanager", "nodemanager"],
|
||||
"JobFlow": ["oozie"],
|
||||
"Hive": ["hiveserver"],
|
||||
"Spark": ["spark history server"]
|
||||
"Spark": ["spark history server"],
|
||||
"ZooKeeper": ["zookeeper"]
|
||||
}
|
||||
|
||||
def validate(self, cluster):
|
||||
|
@ -81,6 +82,7 @@ class VersionHandler(avm.AbstractVersionHandler):
|
|||
s_scripts.start_historyserver(cluster)
|
||||
s_scripts.start_oozie(self.pctx, cluster)
|
||||
s_scripts.start_hiveserver(self.pctx, cluster)
|
||||
s_scripts.start_zookeeper(cluster)
|
||||
|
||||
swift_helper.install_ssl_certs(cluster_utils.get_instances(cluster))
|
||||
|
||||
|
@ -93,6 +95,9 @@ class VersionHandler(avm.AbstractVersionHandler):
|
|||
def validate_scaling(self, cluster, existing, additional):
|
||||
vl.validate_additional_ng_scaling(cluster, additional)
|
||||
vl.validate_existing_ng_scaling(self.pctx, cluster, existing)
|
||||
zk_ng = utils.get_node_groups(cluster, "zookeeper")
|
||||
if zk_ng:
|
||||
vl.validate_zookeeper_node_count(zk_ng, existing, additional)
|
||||
|
||||
def scale_cluster(self, cluster, instances):
|
||||
keypairs.provision_keypairs(cluster, instances)
|
||||
|
|
|
@ -191,3 +191,19 @@ class TestConfigHelper(base.SaharaTestCase):
|
|||
get_config_value_or_default.assert_called_once_with('Spark',
|
||||
'Spark home',
|
||||
self.cluster)
|
||||
|
||||
@mock.patch('sahara.utils.files.get_file_text')
|
||||
@mock.patch('sahara.plugins.utils.get_config_value_or_default')
|
||||
def test_generate_zk_basic_config(self, get_config_value_or_default,
|
||||
get_file_text):
|
||||
key = ('tickTime={ticktime}\n' +
|
||||
'initLimit={initlimit}\n' +
|
||||
'syncLimit={synclimit}\n')
|
||||
actual = 'tickTime=5\ninitLimit=5\nsyncLimit=5\n'
|
||||
|
||||
get_config_value_or_default.return_value = 5
|
||||
get_file_text.return_value = key
|
||||
|
||||
ret = c_helper.generate_zk_basic_config(self.cluster)
|
||||
self.assertEqual(get_config_value_or_default.call_count, 3)
|
||||
self.assertEqual(ret, actual)
|
||||
|
|
|
@ -39,6 +39,8 @@ class ScalingTest(base.SaharaTestCase):
|
|||
|
||||
@mock.patch('sahara.swift.swift_helper.install_ssl_certs')
|
||||
@mock.patch('sahara.plugins.vanilla.utils.get_resourcemanager')
|
||||
@mock.patch(PLUGINS_PATH + 'run_scripts.refresh_zk_servers')
|
||||
@mock.patch(PLUGINS_PATH + 'config.configure_zookeeper')
|
||||
@mock.patch(PLUGINS_PATH + 'run_scripts.start_dn_nm_processes')
|
||||
@mock.patch(PLUGINS_PATH + 'run_scripts.refresh_yarn_nodes')
|
||||
@mock.patch(PLUGINS_PATH + 'run_scripts.refresh_hadoop_nodes')
|
||||
|
@ -51,6 +53,7 @@ class ScalingTest(base.SaharaTestCase):
|
|||
refresh_hadoop_nodes,
|
||||
refresh_yarn_nodes,
|
||||
start_dn_nm_processes,
|
||||
configure_zk, refresh_zk,
|
||||
get_resourcemanager,
|
||||
install_ssl_certs):
|
||||
get_resourcemanager.return_value = 'node1'
|
||||
|
@ -64,6 +67,9 @@ class ScalingTest(base.SaharaTestCase):
|
|||
configure_topology_data.assert_called_once_with(pctx, self.cluster)
|
||||
start_dn_nm_processes.assert_called_once_with(self.instances)
|
||||
install_ssl_certs.assert_called_once_with(self.instances)
|
||||
configure_topology_data.assert_called_once_with(pctx, self.cluster)
|
||||
configure_zk.assert_called_once_with(self.cluster)
|
||||
refresh_zk.assert_called_once_with(self.cluster)
|
||||
|
||||
def test_get_instances_with_service(self):
|
||||
ins_1 = mock.Mock()
|
||||
|
@ -115,6 +121,8 @@ class ScalingTest(base.SaharaTestCase):
|
|||
self.r.execute_command.assert_has_calls(command_calls, any_order=True)
|
||||
|
||||
@mock.patch('sahara.plugins.vanilla.utils.get_resourcemanager')
|
||||
@mock.patch(PLUGINS_PATH + 'run_scripts.refresh_zk_servers')
|
||||
@mock.patch(PLUGINS_PATH + 'config.configure_zookeeper')
|
||||
@mock.patch(PLUGINS_PATH + 'config.configure_topology_data')
|
||||
@mock.patch(PLUGINS_PATH + 'run_scripts.refresh_yarn_nodes')
|
||||
@mock.patch(PLUGINS_PATH + 'run_scripts.refresh_hadoop_nodes')
|
||||
|
@ -130,6 +138,7 @@ class ScalingTest(base.SaharaTestCase):
|
|||
_update_include_files, _clear_exclude_files,
|
||||
_update_exclude_files, refresh_hadoop_nodes,
|
||||
refresh_yarn_nodes, configure_topology_data,
|
||||
configure_zk, refresh_zk,
|
||||
get_resourcemanager):
|
||||
data = 'test_data'
|
||||
_get_instances_with_service.return_value = data
|
||||
|
@ -152,6 +161,8 @@ class ScalingTest(base.SaharaTestCase):
|
|||
self.instances)
|
||||
_clear_exclude_files.assert_called_once_with(self.cluster)
|
||||
configure_topology_data.assert_called_once_with(pctx, self.cluster)
|
||||
configure_zk.assert_called_once_with(self.cluster, self.instances)
|
||||
refresh_zk.assert_called_once_with(self.cluster, self.instances)
|
||||
|
||||
@mock.patch(PLUGINS_PATH + 'scaling._get_instances_with_service')
|
||||
@mock.patch('sahara.plugins.utils.generate_fqdn_host_names')
|
||||
|
|
|
@ -89,6 +89,13 @@ class ValidationTest(base.SaharaTestCase):
|
|||
with testtools.ExpectedException(ex.InvalidComponentCountException):
|
||||
self._validate_case(1, 1, 0, 0, 3, 0, 0, 0, 2)
|
||||
|
||||
self.ng.append(tu.make_ng_dict("zk", "f1", ["zookeeper"], 0))
|
||||
|
||||
self._validate_case(1, 1, 0, 0, 3, 0, 0, 1, 1, 3)
|
||||
|
||||
with testtools.ExpectedException(ex.InvalidComponentCountException):
|
||||
self._validate_case(1, 1, 0, 0, 3, 0, 0, 1, 1, 2)
|
||||
|
||||
def _create_cluster(self, *args, **kwargs):
|
||||
lst = []
|
||||
for i in range(0, len(args)):
|
||||
|
|
|
@ -33,7 +33,10 @@ class TestConfigHelper(base.SaharaTestCase):
|
|||
@mock.patch(plugin_path + 'config_helper.PLUGIN_ENV_CONFIGS')
|
||||
@mock.patch(plugin_path + 'config_helper.PLUGIN_XML_CONFIGS')
|
||||
@mock.patch(plugin_path + 'config_helper._get_spark_configs')
|
||||
def test_init_all_configs(self, _get_spark_configs,
|
||||
@mock.patch(plugin_path + 'config_helper._get_zookeeper_configs')
|
||||
def test_init_all_configs(self,
|
||||
_get_zk_configs,
|
||||
_get_spark_configs,
|
||||
PLUGIN_XML_CONFIGS,
|
||||
PLUGIN_ENV_CONFIGS,
|
||||
PLUGIN_GENERAL_CONFIGS):
|
||||
|
@ -42,6 +45,7 @@ class TestConfigHelper(base.SaharaTestCase):
|
|||
configs.extend(PLUGIN_ENV_CONFIGS)
|
||||
configs.extend(PLUGIN_GENERAL_CONFIGS)
|
||||
configs.extend(_get_spark_configs())
|
||||
configs.extend(_get_zk_configs())
|
||||
init_configs = v_helper._init_all_configs()
|
||||
self.assertEqual(init_configs, configs)
|
||||
|
||||
|
|
|
@ -15,7 +15,10 @@
|
|||
|
||||
import mock
|
||||
import six
|
||||
import testtools
|
||||
|
||||
from sahara.conductor import resource as r
|
||||
from sahara.plugins import exceptions as ex
|
||||
from sahara.plugins.vanilla.v2_7_1.edp_engine import EdpOozieEngine
|
||||
from sahara.plugins.vanilla.v2_7_1.edp_engine import EdpSparkEngine
|
||||
from sahara.plugins.vanilla.v2_7_1 import versionhandler as v_h
|
||||
|
@ -110,11 +113,12 @@ class VersionHandlerTest(base.SaharaTestCase):
|
|||
cluster,
|
||||
instances)
|
||||
|
||||
@mock.patch('sahara.utils.general.get_by_id')
|
||||
@mock.patch(plugin_hadoop2_path +
|
||||
'validation.validate_additional_ng_scaling')
|
||||
@mock.patch(plugin_hadoop2_path +
|
||||
'validation.validate_existing_ng_scaling')
|
||||
def test_validate_scaling(self, vls, vla):
|
||||
def test_validate_scaling(self, vls, vla, get_by_id):
|
||||
self.vh.pctx['all_confs'] = [TestConfig('HDFS', 'dfs.replication', -1)]
|
||||
ng1 = testutils.make_ng_dict('ng1', '40', ['namenode'], 1)
|
||||
ng2 = testutils.make_ng_dict('ng2', '41', ['datanode'], 2)
|
||||
|
@ -127,6 +131,21 @@ class VersionHandlerTest(base.SaharaTestCase):
|
|||
vla.assert_called_once_with(cluster, additional)
|
||||
vls.assert_called_once_with(self.vh.pctx, cluster, existing)
|
||||
|
||||
ng4 = testutils.make_ng_dict('ng4', '43', ['datanode', 'zookeeper'], 3)
|
||||
ng5 = testutils.make_ng_dict('ng5', '44', ['datanode', 'zookeeper'], 1)
|
||||
existing = {ng4['id']: 2}
|
||||
additional = {ng5['id']}
|
||||
cluster = testutils.create_cluster('test-cluster', 'tenant1', 'fake',
|
||||
'0.1', [ng1, ng4])
|
||||
|
||||
with testtools.ExpectedException(ex.ClusterCannotBeScaled):
|
||||
self.vh.validate_scaling(cluster, existing, {})
|
||||
|
||||
get_by_id.return_value = r.NodeGroupResource(ng5)
|
||||
|
||||
with testtools.ExpectedException(ex.ClusterCannotBeScaled):
|
||||
self.vh.validate_scaling(cluster, {}, additional)
|
||||
|
||||
@mock.patch(plugin_hadoop2_path + 'scaling.scale_cluster')
|
||||
@mock.patch(plugin_hadoop2_path + 'keypairs.provision_keypairs')
|
||||
def test_scale_cluster(self, provision_keypairs, scale_cluster):
|
||||
|
|
Loading…
Reference in New Issue