Implementation of Storm scaling

Implementation of Storm scaling. This will allow users to quickly scale
a storm cluster.

Change-Id: Id2f00e0aadc8aac5c79ebec5e55e13c2964c7fec
Implements: bp storm-scaling
This commit is contained in:
Telles Nobrega 2015-04-27 11:51:47 -03:00
parent 50e82cbe8e
commit 559f2f1e55
3 changed files with 232 additions and 0 deletions

View File

@ -29,6 +29,7 @@ from sahara.plugins.storm import edp_engine
from sahara.plugins.storm import run_scripts as run
from sahara.plugins import utils
from sahara.utils import cluster_progress_ops as cpo
from sahara.utils import general as ug
from sahara.utils import remote
conductor = conductor.API
@ -295,3 +296,75 @@ class StormProvider(p.ProvisioningPluginBase):
def _push_supervisor_configs(self, r, files):
r.append_to_files(files, run_as_root=True)
# Scaling
def _get_running_topologies_names(self, cluster):
master = utils.get_instance(cluster, "nimbus")
cmd = (
"%(storm)s -c nimbus.host=%(host)s "
"list | grep ACTIVE | awk '{print $1}'") % (
{
"storm": "/usr/local/storm/bin/storm",
"host": master.hostname()
})
with remote.get_remote(master) as r:
ret, stdout = r.execute_command(cmd)
names = stdout.split('\n')
topology_names = names[0:len(names)-1]
return topology_names
@cpo.event_wrapper(True, step=_("Rebalance Topology"),
param=('cluster', 1))
def rebalance_topology(self, cluster):
topology_names = self._get_running_topologies_names(cluster)
master = utils.get_instance(cluster, "nimbus")
for topology_name in topology_names:
cmd = (
'%(rebalance)s -c nimbus.host=%(host)s %(topology_name)s') % (
{
"rebalance": "/usr/local/storm/bin/storm rebalance",
"host": master.hostname(),
"topology_name": topology_name
})
with remote.get_remote(master) as r:
ret, stdout = r.execute_command(cmd)
def validate_scaling(self, cluster, existing, additional):
self._validate_existing_ng_scaling(cluster, existing)
self._validate_additional_ng_scaling(cluster, additional)
def scale_cluster(self, cluster, instances):
self._setup_instances(cluster, instances)
# start storm slaves
self._start_slave_processes(instances)
self.rebalance_topology(cluster)
LOG.info(_LI("Storm scaling has been started."))
def _get_scalable_processes(self):
return ["supervisor"]
def _validate_additional_ng_scaling(self, cluster, additional):
scalable_processes = self._get_scalable_processes()
for ng_id in additional:
ng = ug.get_by_id(cluster.node_groups, ng_id)
if not set(ng.node_processes).issubset(scalable_processes):
raise ex.NodeGroupCannotBeScaled(
ng.name, _("Storm plugin cannot scale nodegroup"
" with processes: %s") %
' '.join(ng.node_processes))
def _validate_existing_ng_scaling(self, cluster, existing):
scalable_processes = self._get_scalable_processes()
for ng in cluster.node_groups:
if ng.id in existing:
if not set(ng.node_processes).issubset(scalable_processes):
raise ex.NodeGroupCannotBeScaled(
ng.name, _("Storm plugin cannot scale nodegroup"
" with processes: %s") %
' '.join(ng.node_processes))

View File

@ -0,0 +1,159 @@
# Copyright (c) 2015 TellesNobrega
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import mock
from sahara import conductor as cond
from sahara import context
from sahara.plugins import base as pb
from sahara.plugins import exceptions as ex
from sahara.tests.unit import base
conductor = cond.API
class StormPluginTest(base.SaharaWithDbTestCase):
def setUp(self):
super(StormPluginTest, self).setUp()
self.override_config("plugins", ["storm"])
self.master_host = "master"
self.master_inst = "6789"
self.storm_topology_name = 'topology1'
pb.setup_plugins()
def _make_master_instance(self, return_code=0):
master = mock.Mock()
master.execute_command.return_value = (return_code,
self.storm_topology_name)
master.hostname.return_value = self.master_host
master.id = self.master_inst
return master
def test_validate_existing_ng_scaling(self):
data = {'name': "cluster",
'plugin_name': "storm",
'hadoop_version': "0.9.2",
'node_groups': [
{'name': 'master',
'flavor_id': '42',
'count': 1,
'node_processes': ['nimbus']},
{'name': 'slave',
'flavor_id': '42',
'count': 1,
'node_processes': ['supervisor']},
{'name': 'zookeeper',
'flavor_id': '42',
'count': 1,
'node_processes': ['zookeeper']}
]
}
cluster = conductor.cluster_create(context.ctx(), data)
plugin = pb.PLUGINS.get_plugin(cluster.plugin_name)
supervisor_id = [node.id for node in cluster.node_groups
if node.name == 'supervisor']
self.assertEqual(None,
plugin._validate_existing_ng_scaling(cluster,
supervisor_id))
def test_validate_additional_ng_scaling(self):
data = {'name': "cluster",
'plugin_name': "storm",
'hadoop_version': "0.9.2",
'node_groups': [
{'name': 'master',
'flavor_id': '42',
'count': 1,
'node_processes': ['nimbus']},
{'name': 'slave',
'flavor_id': '42',
'count': 1,
'node_processes': ['supervisor']},
{'name': 'zookeeper',
'flavor_id': '42',
'count': 1,
'node_processes': ['zookeeper']},
{'name': 'slave2',
'flavor_id': '42',
'count': 0,
'node_processes': ['supervisor']}
]
}
cluster = conductor.cluster_create(context.ctx(), data)
plugin = pb.PLUGINS.get_plugin(cluster.plugin_name)
supervisor_id = [node.id for node in cluster.node_groups
if node.name == 'supervisor']
self.assertEqual(None,
plugin._validate_additional_ng_scaling(cluster,
supervisor_id))
def test_validate_existing_ng_scaling_raises(self):
data = {'name': "cluster",
'plugin_name': "storm",
'hadoop_version': "0.9.2",
'node_groups': [
{'name': 'master',
'flavor_id': '42',
'count': 1,
'node_processes': ['nimbus']},
{'name': 'slave',
'flavor_id': '42',
'count': 1,
'node_processes': ['supervisor']},
{'name': 'zookeeper',
'flavor_id': '42',
'count': 1,
'node_processes': ['zookeeper']}
]
}
cluster = conductor.cluster_create(context.ctx(), data)
plugin = pb.PLUGINS.get_plugin(cluster.plugin_name)
master_id = [node.id for node in cluster.node_groups
if node.name == 'master']
self.assertRaises(ex.NodeGroupCannotBeScaled,
plugin._validate_existing_ng_scaling,
cluster, master_id)
def test_validate_additional_ng_scaling_raises(self):
data = {'name': "cluster",
'plugin_name': "storm",
'hadoop_version': "0.9.2",
'node_groups': [
{'name': 'master',
'flavor_id': '42',
'count': 1,
'node_processes': ['nimbus']},
{'name': 'slave',
'flavor_id': '42',
'count': 1,
'node_processes': ['supervisor']},
{'name': 'zookeeper',
'flavor_id': '42',
'count': 1,
'node_processes': ['zookeeper']},
{'name': 'master2',
'flavor_id': '42',
'count': 0,
'node_processes': ['nimbus']}
]
}
cluster = conductor.cluster_create(context.ctx(), data)
plugin = pb.PLUGINS.get_plugin(cluster.plugin_name)
master_id = [node.id for node in cluster.node_groups
if node.name == 'master2']
self.assertRaises(ex.NodeGroupCannotBeScaled,
plugin._validate_existing_ng_scaling,
cluster, master_id)