From 252b311dfcc0c1507244fece69c61e566d25d666 Mon Sep 17 00:00:00 2001 From: Telles Nobrega Date: Tue, 24 Mar 2015 10:21:11 -0300 Subject: [PATCH] Storm EDP implementation Implementation of Storm EDP. This will allow users to submit Storm Jobs via UI. This patch implements the EDP engine, Storm Job Type. Implements: bp storm-edp Change-Id: I7d5937f26df715ef66826396a6387ab512af1f47 --- sahara/plugins/storm/edp_engine.py | 35 ++ sahara/plugins/storm/plugin.py | 20 + sahara/service/edp/storm/__init__.py | 0 sahara/service/edp/storm/engine.py | 248 ++++++++++++ sahara/service/validations/edp/job.py | 6 +- .../tests/unit/service/edp/storm/__init__.py | 0 .../unit/service/edp/storm/test_storm.py | 383 ++++++++++++++++++ sahara/utils/edp.py | 4 +- 8 files changed, 693 insertions(+), 3 deletions(-) create mode 100644 sahara/plugins/storm/edp_engine.py create mode 100644 sahara/service/edp/storm/__init__.py create mode 100644 sahara/service/edp/storm/engine.py create mode 100644 sahara/tests/unit/service/edp/storm/__init__.py create mode 100644 sahara/tests/unit/service/edp/storm/test_storm.py diff --git a/sahara/plugins/storm/edp_engine.py b/sahara/plugins/storm/edp_engine.py new file mode 100644 index 00000000..fb8e07de --- /dev/null +++ b/sahara/plugins/storm/edp_engine.py @@ -0,0 +1,35 @@ +# Copyright (c) 2015 Telles Nobrega. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from sahara import exceptions as ex +from sahara.i18n import _ +from sahara.service.edp.storm import engine as edp_engine + + +class EdpEngine(edp_engine.StormJobEngine): + + edp_base_version = "0.9.2" + + @staticmethod + def edp_supported(version): + return version >= EdpEngine.edp_base_version + + def validate_job_execution(self, cluster, job, data): + if not self.edp_supported(cluster.hadoop_version): + raise ex.InvalidDataException( + _('Storm {base} required to run {type} jobs').format( + base=EdpEngine.edp_base_version, type=job.type)) + + super(EdpEngine, self).validate_job_execution(cluster, job, data) diff --git a/sahara/plugins/storm/plugin.py b/sahara/plugins/storm/plugin.py index ae82fb40..dc94d47e 100644 --- a/sahara/plugins/storm/plugin.py +++ b/sahara/plugins/storm/plugin.py @@ -25,6 +25,7 @@ from sahara.i18n import _LI from sahara.plugins import exceptions as ex from sahara.plugins import provisioning as p from sahara.plugins.storm import config_helper as c_helper +from sahara.plugins.storm import edp_engine from sahara.plugins.storm import run_scripts as run from sahara.plugins import utils from sahara.utils import cluster_progress_ops as cpo @@ -100,6 +101,25 @@ class StormProvider(p.ProvisioningPluginBase): cluster=cluster.name)) self._set_cluster_info(cluster) + def get_edp_engine(self, cluster, job_type): + if job_type in edp_engine.EdpEngine.get_supported_job_types(): + return edp_engine.EdpEngine(cluster) + + return None + + def get_edp_job_types(self, versions=[]): + res = {} + for vers in self.get_versions(): + if not versions or vers in versions: + if edp_engine.EdpEngine.edp_supported(vers): + res[vers] = edp_engine.EdpEngine.get_supported_job_types() + return res + + def get_edp_config_hints(self, job_type, version): + if edp_engine.EdpEngine.edp_supported(version): + return edp_engine.EdpEngine.get_possible_job_config(job_type) + return {} + def _extract_configs_to_extra(self, cluster): st_master = utils.get_instance(cluster, "nimbus") zk_servers = utils.get_instances(cluster, "zookeeper") diff --git a/sahara/service/edp/storm/__init__.py b/sahara/service/edp/storm/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/sahara/service/edp/storm/engine.py b/sahara/service/edp/storm/engine.py new file mode 100644 index 00000000..a0c2c4e1 --- /dev/null +++ b/sahara/service/edp/storm/engine.py @@ -0,0 +1,248 @@ +# Copyright (c) 2014 OpenStack Foundation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os + +from oslo_config import cfg +import six + +import uuid + +from sahara import conductor as c +from sahara import context +from sahara import exceptions as e +from sahara.i18n import _ +from sahara.plugins import utils as plugin_utils +from sahara.service.edp import base_engine +from sahara.service.edp.binary_retrievers import dispatch +from sahara.service.edp import job_utils +from sahara.service.validations.edp import job_execution as j +from sahara.utils import edp +from sahara.utils import files +from sahara.utils import general +from sahara.utils import remote + +conductor = c.API +CONF = cfg.CONF + + +class StormJobEngine(base_engine.JobEngine): + def __init__(self, cluster): + self.cluster = cluster + + def _get_topology_and_inst_id(self, job_id): + try: + topology_name, inst_id = job_id.split("@", 1) + if topology_name and inst_id: + return (topology_name, inst_id) + except Exception: + pass + return "", "" + + def _get_instance_if_running(self, job_execution): + topology_name, inst_id = self._get_topology_and_inst_id( + job_execution.oozie_job_id) + if not topology_name or not inst_id or ( + job_execution.info['status'] in edp.JOB_STATUSES_TERMINATED): + return None, None + # TODO(tmckay): well, if there is a list index out of range + # error here it probably means that the instance is gone. If we + # have a job execution that is not terminated, and the instance + # is gone, we should probably change the status somehow. + # For now, do nothing. + try: + instance = general.get_instances(self.cluster, [inst_id])[0] + except Exception: + instance = None + return topology_name, instance + + def _get_topology_name(self, job_execution): + topology_name, inst_id = self._get_topology_and_inst_id( + job_execution.oozie_job_id) + + return topology_name + + def _generate_topology_name(self, name): + return name + "_" + six.text_type(uuid.uuid4()) + + def _get_job_status_from_remote(self, job_execution): + topology_name, inst_id = self._get_instance_if_running(job_execution) + if topology_name is None or inst_id is None: + return edp.JOB_STATUSES_TERMINATED + + topology_name = self._get_topology_name(job_execution) + master = plugin_utils.get_instance(self.cluster, "nimbus") + + cmd = ( + "%(storm)s -c nimbus.host=%(host)s " + "list | grep %(topology_name)s | awk '{print $2}'") % ( + { + "storm": "/usr/local/storm/bin/storm", + "host": master.hostname(), + "topology_name": topology_name + }) + + with remote.get_remote(master) as r: + ret, stdout = r.execute_command("%s " % (cmd)) + # If the status is ACTIVE is there, it's still running + if stdout.strip() == "ACTIVE": + return {"status": edp.JOB_STATUS_RUNNING} + else: + return {"status": edp.JOB_STATUS_KILLED} + + def _job_script(self): + path = "service/edp/resources/launch_command.py" + return files.get_file_text(path) + + def _upload_job_files(self, where, job_dir, job, job_configs): + + def upload(r, dir, job_file, proxy_configs): + dst = os.path.join(dir, job_file.name) + raw_data = dispatch.get_raw_binary(job_file, proxy_configs) + r.write_file_to(dst, raw_data) + return dst + + uploaded_paths = [] + with remote.get_remote(where) as r: + mains = list(job.mains) if job.mains else [] + libs = list(job.libs) if job.libs else [] + for job_file in mains+libs: + uploaded_paths.append( + upload(r, job_dir, job_file, + job_configs.get('proxy_configs'))) + + return uploaded_paths + + def _exec_cmd_on_remote_instance(self, master, cmd): + if master is not None: + with remote.get_remote(master) as r: + ret, stdout = r.execute_command("%s > /dev/null 2>&1 & echo $!" + % cmd) + + return ret, stdout + + def cancel_job(self, job_execution): + topology_name, instance = self._get_instance_if_running(job_execution) + if topology_name is None or instance is None: + return None + + topology_name = self._get_topology_name(job_execution) + master = plugin_utils.get_instance(self.cluster, "nimbus") + + cmd = ( + '%(storm_kill)s -c nimbus.host=%(host)s %(topology_name)s') % ( + { + "storm_kill": "/usr/local/storm/bin/storm kill", + "host": master.hostname(), + "topology_name": topology_name + }) + + ret, stdout = self._exec_cmd_on_remote_instance(instance, cmd) + + if ret == 0: + # We had some effect, check the status + return self._get_job_status_from_remote(job_execution) + + def get_job_status(self, job_execution): + topology_name, instance = self._get_instance_if_running(job_execution) + if instance is not None: + return self._get_job_status_from_remote(job_execution) + + def run_job(self, job_execution): + ctx = context.ctx() + job = conductor.job_get(ctx, job_execution.job_id) + + additional_sources, updated_job_configs = ( + job_utils.resolve_data_source_references(job_execution.job_configs) + ) + + # We'll always run the driver program on the master + master = plugin_utils.get_instance(self.cluster, "nimbus") + + # TODO(tmckay): wf_dir should probably be configurable. + # The only requirement is that the dir is writable by the image user + wf_dir = job_utils.create_workflow_dir(master, '/tmp/storm-edp', job, + job_execution.id, "700") + + paths = self._upload_job_files(master, wf_dir, job, + updated_job_configs) + + # We can shorten the paths in this case since we'll run out of wf_dir + paths = [os.path.basename(p) for p in paths] + + app_jar = paths.pop(0) + job_class = updated_job_configs["configs"]["edp.java.main_class"] + topology_name = self._generate_topology_name(job.name) + + # Launch the storm job using storm jar + host = master.hostname() + args = updated_job_configs.get('args', []) + args = " ".join([arg for arg in args]) + + if args: + args = " " + args + + cmd = ( + '%(storm_jar)s -c nimbus.host=%(host)s %(job_jar)s ' + '%(main_class)s %(topology_name)s%(args)s' % ( + { + "storm_jar": "/usr/local/storm/bin/storm jar", + "main_class": job_class, + "job_jar": app_jar, + "host": host, + "topology_name": topology_name, + "args": args + })) + + job_execution = conductor.job_execution_get(ctx, job_execution.id) + if job_execution.info['status'] == edp.JOB_STATUS_TOBEKILLED: + return (None, edp.JOB_STATUS_KILLED, None) + + # If an exception is raised here, the job_manager will mark + # the job failed and log the exception + # The redirects of stdout and stderr will preserve output in the wf_dir + with remote.get_remote(master) as r: + # Upload the command launch script + launch = os.path.join(wf_dir, "launch_command") + r.write_file_to(launch, self._job_script()) + r.execute_command("chmod +x %s" % launch) + ret, stdout = r.execute_command( + "cd %s; ./launch_command %s > /dev/null 2>&1 & echo $!" + % (wf_dir, cmd)) + + if ret == 0: + # Success, we'll add the wf_dir in job_execution.extra and store + # topology_name@instance_id as the job id + # We know the job is running so return "RUNNING" + return (topology_name + "@" + master.id, + edp.JOB_STATUS_RUNNING, + {'storm-path': wf_dir}) + + # Hmm, no execption but something failed. + # Since we're using backgrounding with redirect, this is unlikely. + raise e.EDPError(_("Storm job execution failed. Exit status = " + "%(status)s, stdout = %(stdout)s") % + {'status': ret, 'stdout': stdout}) + + def validate_job_execution(self, cluster, job, data): + j.check_main_class_present(data, job) + + @staticmethod + def get_possible_job_config(job_type): + return {'job_config': {'configs': [], 'args': []}} + + @staticmethod + def get_supported_job_types(): + return [edp.JOB_TYPE_STORM] diff --git a/sahara/service/validations/edp/job.py b/sahara/service/validations/edp/job.py index 3d51c967..8f658501 100644 --- a/sahara/service/validations/edp/job.py +++ b/sahara/service/validations/edp/job.py @@ -78,14 +78,16 @@ def check_mains_libs(data, **kwargs): # These types must have a value in mains and may also use libs if job_type in [edp.JOB_TYPE_PIG, edp.JOB_TYPE_HIVE, - edp.JOB_TYPE_SHELL, edp.JOB_TYPE_SPARK]: + edp.JOB_TYPE_SHELL, edp.JOB_TYPE_SPARK, + edp.JOB_TYPE_STORM]: if not mains: - if job_type == edp.JOB_TYPE_SPARK: + if job_type in [edp.JOB_TYPE_SPARK, edp.JOB_TYPE_STORM]: msg = _( "%s job requires main application jar") % data.get("type") else: msg = _("%s flow requires main script") % data.get("type") raise e.InvalidDataException(msg) + # Check for overlap if set(mains).intersection(set(libs)): raise e.InvalidDataException(_("'mains' and 'libs' overlap")) diff --git a/sahara/tests/unit/service/edp/storm/__init__.py b/sahara/tests/unit/service/edp/storm/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/sahara/tests/unit/service/edp/storm/test_storm.py b/sahara/tests/unit/service/edp/storm/test_storm.py new file mode 100644 index 00000000..f0ee1cda --- /dev/null +++ b/sahara/tests/unit/service/edp/storm/test_storm.py @@ -0,0 +1,383 @@ +# Copyright (c) 2015 OpenStack Foundation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os + +import mock + +import sahara.exceptions as ex +from sahara.service.edp.storm import engine as se +from sahara.tests.unit import base +from sahara.utils import edp + + +class TestStorm(base.SaharaTestCase): + def setUp(self): + super(TestStorm, self).setUp() + + self.master_host = "master" + self.master_inst = "6789" + self.storm_topology_name = "MyJob_ed8347a9-39aa-477c-8108-066202eb6130" + self.workflow_dir = "/wfdir" + + def test_get_topology_and_inst_id(self): + '''Test parsing of job ids + + Test that job ids of the form topology_name@instance are + split into topology_name and instance ids by + eng._get_topology_name_and_inst_id() but anything else + returns empty strings + ''' + eng = se.StormJobEngine(None) + for job_id in [None, "", "@", "something", "topology_name@", + "@instance"]: + topology_name, inst_id = eng._get_topology_and_inst_id(job_id) + self.assertEqual(("", ""), (topology_name, inst_id)) + + topology_name, inst_id = eng._get_topology_and_inst_id( + "topology_name@instance") + self.assertEqual(("topology_name", "instance"), + (topology_name, inst_id)) + + @mock.patch('sahara.utils.general.get_instances') + def test_get_instance_if_running(self, get_instances): + '''Test retrieval of topology_name and instance object for running job + + If the job id is valid and the job status is non-terminated, + _get_instance_if_running() should retrieve the instance + based on the inst_id and return the topology_name and instance. + + If the job is invalid or the job is terminated, it should + return None, None. + + If get_instances() throws an exception or returns an empty list, + the instance returned should be None (topology_name might + still be set) + ''' + get_instances.return_value = ["instance"] + job_exec = mock.Mock() + eng = se.StormJobEngine("cluster") + + job_exec.oozie_job_id = "invalid id" + self.assertEqual((None, None), + eng._get_instance_if_running(job_exec)) + + job_exec.oozie_job_id = "topology_name@inst_id" + for state in edp.JOB_STATUSES_TERMINATED: + job_exec.info = {'status': state} + self.assertEqual((None, None), + eng._get_instance_if_running(job_exec)) + + job_exec.info = {'status': edp.JOB_STATUS_RUNNING} + self.assertEqual(("topology_name", "instance"), + eng._get_instance_if_running(job_exec)) + get_instances.assert_called_with("cluster", ["inst_id"]) + + # Pretend get_instances returns nothing + get_instances.return_value = [] + topology_name, instance = eng._get_instance_if_running(job_exec) + self.assertIsNone(instance) + + # Pretend get_instances throws an exception + get_instances.side_effect = Exception("some failure") + topology_name, instance = eng._get_instance_if_running(job_exec) + self.assertIsNone(instance) + + @mock.patch('sahara.plugins.utils.get_instance') + @mock.patch('sahara.utils.general.get_instances') + @mock.patch('sahara.utils.remote.get_remote') + @mock.patch('sahara.conductor.API.job_get') + @mock.patch('sahara.context.ctx', return_value="ctx") + def test_get_job_status_from_remote(self, get_instance, get_instances, + get_remote, ctx, job_get): + '''Test retrieval of job status from remote instance + + If the process is present, status is RUNNING + If the process is not present, status depends on the result file + If the result file is missing, status is DONEWITHERROR + ''' + eng = se.StormJobEngine("cluster") + job_exec = mock.Mock() + + master_instance = self._make_master_instance() + master_instance.execute_command.return_value = 0, "ACTIVE" + get_remote.return_value.__enter__ = mock.Mock( + return_value=master_instance) + get_instance.return_value = master_instance + get_instances.return_value = ["instance"] + + # Pretend process is running + job_exec.oozie_job_id = "topology_name@inst_id" + job_exec.info = {'status': edp.JOB_STATUS_RUNNING} + job_exec.job_configs = {"configs": {"topology_name": "topology_name"}} + status = eng._get_job_status_from_remote(job_exec) + self.assertEqual({"status": edp.JOB_STATUS_RUNNING}, status) + + @mock.patch.object(se.StormJobEngine, + '_get_job_status_from_remote', + autospec=True) + @mock.patch.object(se.StormJobEngine, + '_get_instance_if_running', + autospec=True) + @mock.patch('sahara.utils.remote.get_remote') + def test_get_job_status(self, + get_remote, + _get_instance_if_running, + _get_job_status_from_remote): + + # This is to mock "with remote.get_remote(instance) as r" + remote_instance = mock.Mock() + get_remote.return_value.__enter__ = mock.Mock( + return_value=remote_instance) + + # Pretend instance is not returned + _get_instance_if_running.return_value = "topology_name", None + job_exec = mock.Mock() + eng = se.StormJobEngine("cluster") + status = eng.get_job_status(job_exec) + self.assertIsNone(status) + + # Pretend we have an instance + _get_instance_if_running.return_value = "topology_name", "instance" + _get_job_status_from_remote.return_value = {"status": + edp.JOB_STATUS_RUNNING} + status = eng.get_job_status(job_exec) + _get_job_status_from_remote.assert_called_with(eng, + job_exec) + self.assertEqual({"status": edp.JOB_STATUS_RUNNING}, status) + + @mock.patch.object(se.StormJobEngine, + '_get_instance_if_running', + autospec=True, + return_value=(None, None)) + @mock.patch('sahara.utils.remote.get_remote') + @mock.patch('sahara.conductor.API.job_get') + @mock.patch('sahara.context.ctx', return_value="ctx") + def test_cancel_job_null_or_done(self, + get_remote, + _get_instance_if_running, + job_get, + ctx): + '''Test cancel_job() when instance is None + + Test that cancel_job() returns None and does not try to + retrieve a remote instance if _get_instance_if_running() returns None + ''' + eng = se.StormJobEngine("cluster") + job_exec = mock.Mock() + self.assertIsNone(eng.cancel_job(job_exec)) + self.assertFalse(get_remote.called) + + @mock.patch.object(se.StormJobEngine, + '_get_job_status_from_remote', + autospec=True, + return_value={"status": edp.JOB_STATUS_KILLED}) + @mock.patch('sahara.utils.general.get_instances') + @mock.patch('sahara.plugins.utils.get_instance') + @mock.patch('sahara.utils.remote.get_remote') + def test_cancel_job(self, get_remote, get_instance, get_instances, + _get_job_status_from_remote): + master_instance = self._make_master_instance() + status = self._setup_tests(master_instance) + get_instance.return_value = master_instance + get_instances.return_value = ["instance"] + master_instance.execute_command.return_value = 0, "KILLED" + + get_remote.return_value.__enter__ = mock.Mock( + return_value=master_instance) + eng = se.StormJobEngine("cluster") + job_exec = mock.Mock() + job_exec.oozie_job_id = "topology_name@inst_id" + job_exec.info = {'status': edp.JOB_STATUS_RUNNING} + job_exec.job_configs = {"configs": {"topology_name": "topology_name"}} + status = eng.cancel_job(job_exec) + + master_instance.execute_command.assert_called_with( + "/usr/local/storm/bin/storm kill -c nimbus.host=%s topology_name " + "> /dev/null 2>&1 & echo $!" % self.master_host) + + self.assertEqual({"status": edp.JOB_STATUS_KILLED}, status) + + @mock.patch('sahara.service.edp.binary_retrievers.dispatch.get_raw_binary') + @mock.patch('sahara.utils.remote.get_remote') + def test_upload_job_files(self, get_remote, get_raw_binary): + main_names = ["main1", "main2", "main3"] + lib_names = ["lib1", "lib2", "lib3"] + + def make_data_objects(*args): + objs = [] + for name in args: + m = mock.Mock() + m.name = name + objs.append(m) + return objs + + job = mock.Mock() + job.name = "job" + job.mains = make_data_objects(*main_names) + job.libs = make_data_objects(*lib_names) + + # This is to mock "with remote.get_remote(instance) as r" + remote_instance = mock.Mock() + get_remote.return_value.__enter__ = mock.Mock( + return_value=remote_instance) + + get_raw_binary.return_value = "data" + + eng = se.StormJobEngine("cluster") + paths = eng._upload_job_files("where", "/somedir", job, {}) + self.assertEqual(["/somedir/" + n for n in main_names + lib_names], + paths) + for path in paths: + remote_instance.write_file_to.assert_any_call(path, "data") + + def _make_master_instance(self, return_code=0): + master = mock.Mock() + master.execute_command.return_value = (return_code, + self.storm_topology_name) + master.hostname.return_value = self.master_host + master.id = self.master_inst + return master + + @mock.patch('sahara.conductor.API.job_execution_get') + @mock.patch('sahara.utils.remote.get_remote') + @mock.patch('sahara.plugins.utils.get_instance') + @mock.patch('sahara.conductor.API.job_get') + @mock.patch('sahara.context.ctx', return_value="ctx") + def _setup_tests(self, master_instance, ctx, job_get, + get_instance, get_remote, job_exec_get): + + # This is to mock "with remote.get_remote(master) as r" in run_job + get_remote.return_value.__enter__ = mock.Mock( + return_value=master_instance) + get_instance.return_value = master_instance + + @mock.patch.object(se.StormJobEngine, + '_generate_topology_name', + autospec=True, + return_value=( + "MyJob_ed8347a9-39aa-477c-8108-066202eb6130")) + @mock.patch('sahara.conductor.API.job_execution_get') + @mock.patch('sahara.utils.remote.get_remote') + @mock.patch('sahara.service.edp.job_utils.create_workflow_dir') + @mock.patch('sahara.plugins.utils.get_instance') + @mock.patch('sahara.conductor.API.job_get') + @mock.patch('sahara.context.ctx', return_value="ctx") + def _setup_run_job(self, master_instance, job_configs, files, + ctx, job_get, get_instance, create_workflow_dir, + get_remote, job_exec_get, _generate_topology_name): + + def _upload_job_files(where, job_dir, job, + libs_subdir=True, job_configs=None): + paths = [os.path.join(self.workflow_dir, f) for f in files['jars']] + return paths + + job = mock.Mock() + job.name = "MyJob" + job_get.return_value = job + + job_exec = mock.Mock() + job_exec.job_configs = job_configs + + create_workflow_dir.return_value = self.workflow_dir + + # This is to mock "with remote.get_remote(master) as r" in run_job + get_remote.return_value.__enter__ = mock.Mock( + return_value=master_instance) + get_instance.return_value = master_instance + + eng = se.StormJobEngine("cluster") + eng._upload_job_files = mock.Mock() + eng._upload_job_files.side_effect = _upload_job_files + status = eng.run_job(job_exec) + + # Check that we launch on the master node + get_instance.assert_called_with("cluster", "nimbus") + + return status + + def test_run_job_raise(self): + job_configs = { + 'configs': {"edp.java.main_class": "org.me.myclass", + "topology_name": "topology_name"}, + } + + files = {'jars': ["app.jar"]} + + # The object representing the storm master node + # The storm jar command will be run on this instance + master_instance = self._make_master_instance(return_code=1) + + # If execute_command returns an error we should get a raise + self.assertRaises(ex.EDPError, + self._setup_run_job, + master_instance, job_configs, files) + + def test_run_job(self): + job_configs = { + 'configs': {"edp.java.main_class": "org.me.myclass"} + } + + files = {'jars': ["app.jar"]} + + # The object representing the storm master node + # The storm jar command will be run on this instance + master_instance = self._make_master_instance() + status = self._setup_run_job(master_instance, job_configs, files) + + # Check the command + master_instance.execute_command.assert_called_with( + 'cd %(workflow_dir)s; ' + './launch_command /usr/local/storm/bin/storm jar ' + '-c nimbus.host=master ' + 'app.jar org.me.myclass %(topology_name)s ' + '> /dev/null 2>&1 & echo $!' % {"workflow_dir": self.workflow_dir, + "topology_name": ( + self.storm_topology_name)}) + + # Check result here + self.assertEqual(("%s@%s" % (self.storm_topology_name, + self.master_inst), + edp.JOB_STATUS_RUNNING, + {"storm-path": self.workflow_dir}), status) + + def test_run_job_args(self): + job_configs = { + 'configs': {"edp.java.main_class": "org.me.myclass"}, + 'args': ['input_arg', 'output_arg'] + } + + files = {'jars': ["app.jar"]} + + # The object representing the spark master node + # The spark-submit command will be run on this instance + master_instance = self._make_master_instance() + status = self._setup_run_job(master_instance, job_configs, files) + + # Check the command + master_instance.execute_command.assert_called_with( + 'cd %(workflow_dir)s; ' + './launch_command /usr/local/storm/bin/storm jar ' + '-c nimbus.host=master ' + 'app.jar org.me.myclass %(topology_name)s input_arg output_arg ' + '> /dev/null 2>&1 & echo $!' % {"workflow_dir": self.workflow_dir, + "topology_name": ( + self.storm_topology_name)}) + + # Check result here + self.assertEqual(("%s@%s" % (self.storm_topology_name, + self.master_inst), + edp.JOB_STATUS_RUNNING, + {"storm-path": self.workflow_dir}), status) diff --git a/sahara/utils/edp.py b/sahara/utils/edp.py index effe3e92..7db636d3 100644 --- a/sahara/utils/edp.py +++ b/sahara/utils/edp.py @@ -44,6 +44,7 @@ JOB_TYPE_HIVE = 'Hive' JOB_TYPE_JAVA = 'Java' JOB_TYPE_MAPREDUCE = 'MapReduce' JOB_TYPE_SPARK = 'Spark' +JOB_TYPE_STORM = 'Storm' JOB_TYPE_MAPREDUCE_STREAMING = (JOB_TYPE_MAPREDUCE + JOB_TYPE_SEP + JOB_SUBTYPE_STREAMING) JOB_TYPE_PIG = 'Pig' @@ -57,7 +58,8 @@ JOB_TYPES_ALL = [ JOB_TYPE_MAPREDUCE_STREAMING, JOB_TYPE_PIG, JOB_TYPE_SHELL, - JOB_TYPE_SPARK + JOB_TYPE_SPARK, + JOB_TYPE_STORM ] ADAPT_FOR_OOZIE = 'edp.java.adapt_for_oozie'