From 559f2f1e5553c941e78cf92b9faafa23d5827e21 Mon Sep 17 00:00:00 2001 From: Telles Nobrega Date: Mon, 27 Apr 2015 11:51:47 -0300 Subject: [PATCH] Implementation of Storm scaling Implementation of Storm scaling. This will allow users to quickly scale a storm cluster. Change-Id: Id2f00e0aadc8aac5c79ebec5e55e13c2964c7fec Implements: bp storm-scaling --- sahara/plugins/storm/plugin.py | 73 ++++++++ sahara/tests/unit/plugins/storm/__init__.py | 0 .../tests/unit/plugins/storm/test_plugin.py | 159 ++++++++++++++++++ 3 files changed, 232 insertions(+) create mode 100644 sahara/tests/unit/plugins/storm/__init__.py create mode 100644 sahara/tests/unit/plugins/storm/test_plugin.py diff --git a/sahara/plugins/storm/plugin.py b/sahara/plugins/storm/plugin.py index dc94d47e..77801daa 100644 --- a/sahara/plugins/storm/plugin.py +++ b/sahara/plugins/storm/plugin.py @@ -29,6 +29,7 @@ from sahara.plugins.storm import edp_engine from sahara.plugins.storm import run_scripts as run from sahara.plugins import utils from sahara.utils import cluster_progress_ops as cpo +from sahara.utils import general as ug from sahara.utils import remote conductor = conductor.API @@ -295,3 +296,75 @@ class StormProvider(p.ProvisioningPluginBase): def _push_supervisor_configs(self, r, files): r.append_to_files(files, run_as_root=True) + + # Scaling + + def _get_running_topologies_names(self, cluster): + master = utils.get_instance(cluster, "nimbus") + + cmd = ( + "%(storm)s -c nimbus.host=%(host)s " + "list | grep ACTIVE | awk '{print $1}'") % ( + { + "storm": "/usr/local/storm/bin/storm", + "host": master.hostname() + }) + + with remote.get_remote(master) as r: + ret, stdout = r.execute_command(cmd) + names = stdout.split('\n') + topology_names = names[0:len(names)-1] + return topology_names + + @cpo.event_wrapper(True, step=_("Rebalance Topology"), + param=('cluster', 1)) + def rebalance_topology(self, cluster): + topology_names = self._get_running_topologies_names(cluster) + master = utils.get_instance(cluster, "nimbus") + + for topology_name in topology_names: + cmd = ( + '%(rebalance)s -c nimbus.host=%(host)s %(topology_name)s') % ( + { + "rebalance": "/usr/local/storm/bin/storm rebalance", + "host": master.hostname(), + "topology_name": topology_name + }) + + with remote.get_remote(master) as r: + ret, stdout = r.execute_command(cmd) + + def validate_scaling(self, cluster, existing, additional): + self._validate_existing_ng_scaling(cluster, existing) + self._validate_additional_ng_scaling(cluster, additional) + + def scale_cluster(self, cluster, instances): + self._setup_instances(cluster, instances) + # start storm slaves + self._start_slave_processes(instances) + self.rebalance_topology(cluster) + LOG.info(_LI("Storm scaling has been started.")) + + def _get_scalable_processes(self): + return ["supervisor"] + + def _validate_additional_ng_scaling(self, cluster, additional): + scalable_processes = self._get_scalable_processes() + + for ng_id in additional: + ng = ug.get_by_id(cluster.node_groups, ng_id) + if not set(ng.node_processes).issubset(scalable_processes): + raise ex.NodeGroupCannotBeScaled( + ng.name, _("Storm plugin cannot scale nodegroup" + " with processes: %s") % + ' '.join(ng.node_processes)) + + def _validate_existing_ng_scaling(self, cluster, existing): + scalable_processes = self._get_scalable_processes() + for ng in cluster.node_groups: + if ng.id in existing: + if not set(ng.node_processes).issubset(scalable_processes): + raise ex.NodeGroupCannotBeScaled( + ng.name, _("Storm plugin cannot scale nodegroup" + " with processes: %s") % + ' '.join(ng.node_processes)) diff --git a/sahara/tests/unit/plugins/storm/__init__.py b/sahara/tests/unit/plugins/storm/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/sahara/tests/unit/plugins/storm/test_plugin.py b/sahara/tests/unit/plugins/storm/test_plugin.py new file mode 100644 index 00000000..d5fac8cf --- /dev/null +++ b/sahara/tests/unit/plugins/storm/test_plugin.py @@ -0,0 +1,159 @@ +# Copyright (c) 2015 TellesNobrega +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import mock + +from sahara import conductor as cond +from sahara import context +from sahara.plugins import base as pb +from sahara.plugins import exceptions as ex +from sahara.tests.unit import base + + +conductor = cond.API + + +class StormPluginTest(base.SaharaWithDbTestCase): + def setUp(self): + super(StormPluginTest, self).setUp() + self.override_config("plugins", ["storm"]) + self.master_host = "master" + self.master_inst = "6789" + self.storm_topology_name = 'topology1' + pb.setup_plugins() + + def _make_master_instance(self, return_code=0): + master = mock.Mock() + master.execute_command.return_value = (return_code, + self.storm_topology_name) + master.hostname.return_value = self.master_host + master.id = self.master_inst + return master + + def test_validate_existing_ng_scaling(self): + data = {'name': "cluster", + 'plugin_name': "storm", + 'hadoop_version': "0.9.2", + 'node_groups': [ + {'name': 'master', + 'flavor_id': '42', + 'count': 1, + 'node_processes': ['nimbus']}, + {'name': 'slave', + 'flavor_id': '42', + 'count': 1, + 'node_processes': ['supervisor']}, + {'name': 'zookeeper', + 'flavor_id': '42', + 'count': 1, + 'node_processes': ['zookeeper']} + ] + } + cluster = conductor.cluster_create(context.ctx(), data) + plugin = pb.PLUGINS.get_plugin(cluster.plugin_name) + supervisor_id = [node.id for node in cluster.node_groups + if node.name == 'supervisor'] + self.assertEqual(None, + plugin._validate_existing_ng_scaling(cluster, + supervisor_id)) + + def test_validate_additional_ng_scaling(self): + data = {'name': "cluster", + 'plugin_name': "storm", + 'hadoop_version': "0.9.2", + 'node_groups': [ + {'name': 'master', + 'flavor_id': '42', + 'count': 1, + 'node_processes': ['nimbus']}, + {'name': 'slave', + 'flavor_id': '42', + 'count': 1, + 'node_processes': ['supervisor']}, + {'name': 'zookeeper', + 'flavor_id': '42', + 'count': 1, + 'node_processes': ['zookeeper']}, + {'name': 'slave2', + 'flavor_id': '42', + 'count': 0, + 'node_processes': ['supervisor']} + ] + } + cluster = conductor.cluster_create(context.ctx(), data) + plugin = pb.PLUGINS.get_plugin(cluster.plugin_name) + supervisor_id = [node.id for node in cluster.node_groups + if node.name == 'supervisor'] + self.assertEqual(None, + plugin._validate_additional_ng_scaling(cluster, + supervisor_id)) + + def test_validate_existing_ng_scaling_raises(self): + data = {'name': "cluster", + 'plugin_name': "storm", + 'hadoop_version': "0.9.2", + 'node_groups': [ + {'name': 'master', + 'flavor_id': '42', + 'count': 1, + 'node_processes': ['nimbus']}, + {'name': 'slave', + 'flavor_id': '42', + 'count': 1, + 'node_processes': ['supervisor']}, + {'name': 'zookeeper', + 'flavor_id': '42', + 'count': 1, + 'node_processes': ['zookeeper']} + ] + } + cluster = conductor.cluster_create(context.ctx(), data) + plugin = pb.PLUGINS.get_plugin(cluster.plugin_name) + master_id = [node.id for node in cluster.node_groups + if node.name == 'master'] + self.assertRaises(ex.NodeGroupCannotBeScaled, + plugin._validate_existing_ng_scaling, + cluster, master_id) + + def test_validate_additional_ng_scaling_raises(self): + data = {'name': "cluster", + 'plugin_name': "storm", + 'hadoop_version': "0.9.2", + 'node_groups': [ + {'name': 'master', + 'flavor_id': '42', + 'count': 1, + 'node_processes': ['nimbus']}, + {'name': 'slave', + 'flavor_id': '42', + 'count': 1, + 'node_processes': ['supervisor']}, + {'name': 'zookeeper', + 'flavor_id': '42', + 'count': 1, + 'node_processes': ['zookeeper']}, + {'name': 'master2', + 'flavor_id': '42', + 'count': 0, + 'node_processes': ['nimbus']} + ] + } + cluster = conductor.cluster_create(context.ctx(), data) + plugin = pb.PLUGINS.get_plugin(cluster.plugin_name) + master_id = [node.id for node in cluster.node_groups + if node.name == 'master2'] + self.assertRaises(ex.NodeGroupCannotBeScaled, + plugin._validate_existing_ng_scaling, + cluster, master_id)