diff --git a/sahara/plugins/storm/config_helper.py b/sahara/plugins/storm/config_helper.py index e81ec66a..ae6c98c3 100644 --- a/sahara/plugins/storm/config_helper.py +++ b/sahara/plugins/storm/config_helper.py @@ -42,10 +42,17 @@ def get_plugin_configs(): return {} -def generate_storm_config(master_hostname, zk_hostnames): +def generate_storm_config(master_hostname, zk_hostnames, version): + + if version == '1.0.1': + host_cfg = 'nimbus.seeds' + master_value = [master_hostname.encode('ascii', 'ignore')] + else: + host_cfg = 'nimbus.host' + master_value = master_hostname.encode('ascii', 'ignore') cfg = { - "nimbus.host": master_hostname.encode('ascii', 'ignore'), + host_cfg: master_value, "worker.childopts": "-Xmx768m -Djava.net.preferIPv4Stack=true", "nimbus.childopts": "-Xmx1024m -Djava.net.preferIPv4Stack=true", "supervisor.childopts": "-Djava.net.preferIPv4Stack=true", diff --git a/sahara/plugins/storm/plugin.py b/sahara/plugins/storm/plugin.py index 634c32cc..f65fe462 100644 --- a/sahara/plugins/storm/plugin.py +++ b/sahara/plugins/storm/plugin.py @@ -53,7 +53,7 @@ class StormProvider(p.ProvisioningPluginBase): "cluster without any management consoles.")) def get_versions(self): - return ['0.9.2'] + return ['0.9.2', '1.0.1'] def get_configs(self, storm_version): return c_helper.get_plugin_configs() @@ -148,7 +148,8 @@ class StormProvider(p.ProvisioningPluginBase): config_instances = c_helper.generate_storm_config( st_master.hostname(), - zknames) + zknames, + cluster.hadoop_version) config = self._convert_dict_to_yaml(config_instances) supervisor_conf = c_helper.generate_slave_supervisor_conf() diff --git a/sahara/tests/unit/plugins/storm/test_plugin.py b/sahara/tests/unit/plugins/storm/test_plugin.py index 4b00f2e4..3a394f25 100644 --- a/sahara/tests/unit/plugins/storm/test_plugin.py +++ b/sahara/tests/unit/plugins/storm/test_plugin.py @@ -20,7 +20,9 @@ from sahara import context from sahara.plugins import base as pb from sahara.plugins import exceptions as ex from sahara.plugins.storm import plugin as pl +from sahara.service.edp.storm import engine from sahara.tests.unit import base +from sahara.utils import edp conductor = cond.API @@ -43,120 +45,149 @@ class StormPluginTest(base.SaharaWithDbTestCase): master.id = self.master_inst return master + def _get_cluster(self, name, version): + cluster_dict = { + 'name': name, + 'plugin_name': 'storm', + 'hadoop_version': version, + 'node_groups': []} + return cluster_dict + def test_validate_existing_ng_scaling(self): - data = {'name': "cluster", - 'plugin_name': "storm", - 'hadoop_version': "0.9.2", - 'node_groups': [ - {'name': 'master', - 'flavor_id': '42', - 'count': 1, - 'node_processes': ['nimbus']}, - {'name': 'slave', - 'flavor_id': '42', - 'count': 1, - 'node_processes': ['supervisor']}, - {'name': 'zookeeper', - 'flavor_id': '42', - 'count': 1, - 'node_processes': ['zookeeper']} - ] - } - cluster = conductor.cluster_create(context.ctx(), data) - plugin = pb.PLUGINS.get_plugin(cluster.plugin_name) - supervisor_id = [node.id for node in cluster.node_groups - if node.name == 'supervisor'] - self.assertIsNone(plugin._validate_existing_ng_scaling(cluster, - supervisor_id)) + data = [ + {'name': 'master', + 'flavor_id': '42', + 'count': 1, + 'node_processes': ['nimbus']}, + {'name': 'slave', + 'flavor_id': '42', + 'count': 1, + 'node_processes': ['supervisor']}, + {'name': 'zookeeper', + 'flavor_id': '42', + 'count': 1, + 'node_processes': ['zookeeper']} + ] + + cluster_data_092 = self._get_cluster('cluster_0.9.2', '0.9.2') + cluster_data_101 = self._get_cluster('cluster_1.0.1', '1.0.1') + cluster_data_092['node_groups'] = data + cluster_data_101['node_groups'] = data + + clusters = [cluster_data_092, cluster_data_101] + + for cluster_data in clusters: + cluster = conductor.cluster_create(context.ctx(), cluster_data) + plugin = pb.PLUGINS.get_plugin(cluster.plugin_name) + supervisor_id = [node.id for node in cluster.node_groups + if node.name == 'supervisor'] + self.assertIsNone( + plugin._validate_existing_ng_scaling(cluster, + supervisor_id)) def test_validate_additional_ng_scaling(self): - data = {'name': "cluster", - 'plugin_name': "storm", - 'hadoop_version': "0.9.2", - 'node_groups': [ - {'name': 'master', - 'flavor_id': '42', - 'count': 1, - 'node_processes': ['nimbus']}, - {'name': 'slave', - 'flavor_id': '42', - 'count': 1, - 'node_processes': ['supervisor']}, - {'name': 'zookeeper', - 'flavor_id': '42', - 'count': 1, - 'node_processes': ['zookeeper']}, - {'name': 'slave2', - 'flavor_id': '42', - 'count': 0, - 'node_processes': ['supervisor']} - ] - } - cluster = conductor.cluster_create(context.ctx(), data) - plugin = pb.PLUGINS.get_plugin(cluster.plugin_name) - supervisor_id = [node.id for node in cluster.node_groups - if node.name == 'supervisor'] - self.assertIsNone(plugin._validate_additional_ng_scaling(cluster, - supervisor_id) - ) + data = [ + {'name': 'master', + 'flavor_id': '42', + 'count': 1, + 'node_processes': ['nimbus']}, + {'name': 'slave', + 'flavor_id': '42', + 'count': 1, + 'node_processes': ['supervisor']}, + {'name': 'zookeeper', + 'flavor_id': '42', + 'count': 1, + 'node_processes': ['zookeeper']}, + {'name': 'slave2', + 'flavor_id': '42', + 'count': 0, + 'node_processes': ['supervisor']} + ] + + cluster_data_092 = self._get_cluster('cluster_0.9.2', '0.9.2') + cluster_data_101 = self._get_cluster('cluster_1.0.1', '1.0.1') + cluster_data_092['node_groups'] = data + cluster_data_101['node_groups'] = data + + clusters = [cluster_data_092, cluster_data_101] + + for cluster_data in clusters: + cluster = conductor.cluster_create(context.ctx(), cluster_data) + plugin = pb.PLUGINS.get_plugin(cluster.plugin_name) + supervisor_id = [node.id for node in cluster.node_groups + if node.name == 'supervisor'] + self.assertIsNone( + plugin._validate_additional_ng_scaling(cluster, + supervisor_id)) def test_validate_existing_ng_scaling_raises(self): - data = {'name': "cluster", - 'plugin_name': "storm", - 'hadoop_version': "0.9.2", - 'node_groups': [ - {'name': 'master', - 'flavor_id': '42', - 'count': 1, - 'node_processes': ['nimbus']}, - {'name': 'slave', - 'flavor_id': '42', - 'count': 1, - 'node_processes': ['supervisor']}, - {'name': 'zookeeper', - 'flavor_id': '42', - 'count': 1, - 'node_processes': ['zookeeper']} - ] - } - cluster = conductor.cluster_create(context.ctx(), data) - plugin = pb.PLUGINS.get_plugin(cluster.plugin_name) - master_id = [node.id for node in cluster.node_groups - if node.name == 'master'] - self.assertRaises(ex.NodeGroupCannotBeScaled, - plugin._validate_existing_ng_scaling, - cluster, master_id) + data = [ + {'name': 'master', + 'flavor_id': '42', + 'count': 1, + 'node_processes': ['nimbus']}, + {'name': 'slave', + 'flavor_id': '42', + 'count': 1, + 'node_processes': ['supervisor']}, + {'name': 'zookeeper', + 'flavor_id': '42', + 'count': 1, + 'node_processes': ['zookeeper']} + ] + + cluster_data_092 = self._get_cluster('cluster_0.9.2', '0.9.2') + cluster_data_101 = self._get_cluster('cluster_1.0.1', '1.0.1') + cluster_data_092['node_groups'] = data + cluster_data_101['node_groups'] = data + + clusters = [cluster_data_092, cluster_data_101] + + for cluster_data in clusters: + cluster = conductor.cluster_create(context.ctx(), cluster_data) + plugin = pb.PLUGINS.get_plugin(cluster.plugin_name) + master_id = [node.id for node in cluster.node_groups + if node.name == 'master'] + self.assertRaises(ex.NodeGroupCannotBeScaled, + plugin._validate_existing_ng_scaling, + cluster, master_id) def test_validate_additional_ng_scaling_raises(self): - data = {'name': "cluster", - 'plugin_name': "storm", - 'hadoop_version': "0.9.2", - 'node_groups': [ - {'name': 'master', - 'flavor_id': '42', - 'count': 1, - 'node_processes': ['nimbus']}, - {'name': 'slave', - 'flavor_id': '42', - 'count': 1, - 'node_processes': ['supervisor']}, - {'name': 'zookeeper', - 'flavor_id': '42', - 'count': 1, - 'node_processes': ['zookeeper']}, - {'name': 'master2', - 'flavor_id': '42', - 'count': 0, - 'node_processes': ['nimbus']} - ] - } - cluster = conductor.cluster_create(context.ctx(), data) - plugin = pb.PLUGINS.get_plugin(cluster.plugin_name) - master_id = [node.id for node in cluster.node_groups - if node.name == 'master2'] - self.assertRaises(ex.NodeGroupCannotBeScaled, - plugin._validate_existing_ng_scaling, - cluster, master_id) + data = [ + {'name': 'master', + 'flavor_id': '42', + 'count': 1, + 'node_processes': ['nimbus']}, + {'name': 'slave', + 'flavor_id': '42', + 'count': 1, + 'node_processes': ['supervisor']}, + {'name': 'zookeeper', + 'flavor_id': '42', + 'count': 1, + 'node_processes': ['zookeeper']}, + {'name': 'master2', + 'flavor_id': '42', + 'count': 0, + 'node_processes': ['nimbus']} + ] + + cluster_data_092 = self._get_cluster('cluster_0.9.2', '0.9.2') + cluster_data_101 = self._get_cluster('cluster_1.0.1', '1.0.1') + cluster_data_092['node_groups'] = data + cluster_data_101['node_groups'] = data + + clusters = [cluster_data_092, cluster_data_101] + + for cluster_data in clusters: + cluster = conductor.cluster_create(context.ctx(), cluster_data) + plugin = pb.PLUGINS.get_plugin(cluster.plugin_name) + master_id = [node.id for node in cluster.node_groups + if node.name == 'master2'] + self.assertRaises(ex.NodeGroupCannotBeScaled, + plugin._validate_existing_ng_scaling, + cluster, master_id) def test_get_open_port(self): plugin_storm = pl.StormProvider() @@ -167,3 +198,18 @@ class StormPluginTest(base.SaharaWithDbTestCase): ng.cluster = cluster ports = plugin_storm.get_open_ports(ng) self.assertEqual([8080], ports) + + def _test_engine(self, version, job_type, eng): + cluster_dict = self._get_cluster('demo', version) + + cluster = conductor.cluster_create(context.ctx(), cluster_dict) + plugin = pb.PLUGINS.get_plugin(cluster.plugin_name) + self.assertIsInstance(plugin.get_edp_engine(cluster, job_type), eng) + + def test_plugin092_edp_engine(self): + self._test_engine('0.9.2', edp.JOB_TYPE_STORM, + engine.StormJobEngine) + + def test_plugin101_edp_engine(self): + self._test_engine('1.0.1', edp.JOB_TYPE_STORM, + engine.StormJobEngine)