Storm EDP implementation

Implementation of Storm EDP. This will allow users to submit Storm Jobs
via UI.
This patch implements the EDP engine, Storm Job Type.

Implements: bp storm-edp

Change-Id: I7d5937f26df715ef66826396a6387ab512af1f47
This commit is contained in:
Telles Nobrega 2015-03-24 10:21:11 -03:00
parent 603b30f90a
commit 252b311dfc
8 changed files with 693 additions and 3 deletions

View File

@ -0,0 +1,35 @@
# Copyright (c) 2015 Telles Nobrega.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from sahara import exceptions as ex
from sahara.i18n import _
from sahara.service.edp.storm import engine as edp_engine
class EdpEngine(edp_engine.StormJobEngine):
edp_base_version = "0.9.2"
@staticmethod
def edp_supported(version):
return version >= EdpEngine.edp_base_version
def validate_job_execution(self, cluster, job, data):
if not self.edp_supported(cluster.hadoop_version):
raise ex.InvalidDataException(
_('Storm {base} required to run {type} jobs').format(
base=EdpEngine.edp_base_version, type=job.type))
super(EdpEngine, self).validate_job_execution(cluster, job, data)

View File

@ -25,6 +25,7 @@ from sahara.i18n import _LI
from sahara.plugins import exceptions as ex from sahara.plugins import exceptions as ex
from sahara.plugins import provisioning as p from sahara.plugins import provisioning as p
from sahara.plugins.storm import config_helper as c_helper from sahara.plugins.storm import config_helper as c_helper
from sahara.plugins.storm import edp_engine
from sahara.plugins.storm import run_scripts as run from sahara.plugins.storm import run_scripts as run
from sahara.plugins import utils from sahara.plugins import utils
from sahara.utils import cluster_progress_ops as cpo from sahara.utils import cluster_progress_ops as cpo
@ -100,6 +101,25 @@ class StormProvider(p.ProvisioningPluginBase):
cluster=cluster.name)) cluster=cluster.name))
self._set_cluster_info(cluster) self._set_cluster_info(cluster)
def get_edp_engine(self, cluster, job_type):
if job_type in edp_engine.EdpEngine.get_supported_job_types():
return edp_engine.EdpEngine(cluster)
return None
def get_edp_job_types(self, versions=[]):
res = {}
for vers in self.get_versions():
if not versions or vers in versions:
if edp_engine.EdpEngine.edp_supported(vers):
res[vers] = edp_engine.EdpEngine.get_supported_job_types()
return res
def get_edp_config_hints(self, job_type, version):
if edp_engine.EdpEngine.edp_supported(version):
return edp_engine.EdpEngine.get_possible_job_config(job_type)
return {}
def _extract_configs_to_extra(self, cluster): def _extract_configs_to_extra(self, cluster):
st_master = utils.get_instance(cluster, "nimbus") st_master = utils.get_instance(cluster, "nimbus")
zk_servers = utils.get_instances(cluster, "zookeeper") zk_servers = utils.get_instances(cluster, "zookeeper")

View File

View File

@ -0,0 +1,248 @@
# Copyright (c) 2014 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
from oslo_config import cfg
import six
import uuid
from sahara import conductor as c
from sahara import context
from sahara import exceptions as e
from sahara.i18n import _
from sahara.plugins import utils as plugin_utils
from sahara.service.edp import base_engine
from sahara.service.edp.binary_retrievers import dispatch
from sahara.service.edp import job_utils
from sahara.service.validations.edp import job_execution as j
from sahara.utils import edp
from sahara.utils import files
from sahara.utils import general
from sahara.utils import remote
conductor = c.API
CONF = cfg.CONF
class StormJobEngine(base_engine.JobEngine):
def __init__(self, cluster):
self.cluster = cluster
def _get_topology_and_inst_id(self, job_id):
try:
topology_name, inst_id = job_id.split("@", 1)
if topology_name and inst_id:
return (topology_name, inst_id)
except Exception:
pass
return "", ""
def _get_instance_if_running(self, job_execution):
topology_name, inst_id = self._get_topology_and_inst_id(
job_execution.oozie_job_id)
if not topology_name or not inst_id or (
job_execution.info['status'] in edp.JOB_STATUSES_TERMINATED):
return None, None
# TODO(tmckay): well, if there is a list index out of range
# error here it probably means that the instance is gone. If we
# have a job execution that is not terminated, and the instance
# is gone, we should probably change the status somehow.
# For now, do nothing.
try:
instance = general.get_instances(self.cluster, [inst_id])[0]
except Exception:
instance = None
return topology_name, instance
def _get_topology_name(self, job_execution):
topology_name, inst_id = self._get_topology_and_inst_id(
job_execution.oozie_job_id)
return topology_name
def _generate_topology_name(self, name):
return name + "_" + six.text_type(uuid.uuid4())
def _get_job_status_from_remote(self, job_execution):
topology_name, inst_id = self._get_instance_if_running(job_execution)
if topology_name is None or inst_id is None:
return edp.JOB_STATUSES_TERMINATED
topology_name = self._get_topology_name(job_execution)
master = plugin_utils.get_instance(self.cluster, "nimbus")
cmd = (
"%(storm)s -c nimbus.host=%(host)s "
"list | grep %(topology_name)s | awk '{print $2}'") % (
{
"storm": "/usr/local/storm/bin/storm",
"host": master.hostname(),
"topology_name": topology_name
})
with remote.get_remote(master) as r:
ret, stdout = r.execute_command("%s " % (cmd))
# If the status is ACTIVE is there, it's still running
if stdout.strip() == "ACTIVE":
return {"status": edp.JOB_STATUS_RUNNING}
else:
return {"status": edp.JOB_STATUS_KILLED}
def _job_script(self):
path = "service/edp/resources/launch_command.py"
return files.get_file_text(path)
def _upload_job_files(self, where, job_dir, job, job_configs):
def upload(r, dir, job_file, proxy_configs):
dst = os.path.join(dir, job_file.name)
raw_data = dispatch.get_raw_binary(job_file, proxy_configs)
r.write_file_to(dst, raw_data)
return dst
uploaded_paths = []
with remote.get_remote(where) as r:
mains = list(job.mains) if job.mains else []
libs = list(job.libs) if job.libs else []
for job_file in mains+libs:
uploaded_paths.append(
upload(r, job_dir, job_file,
job_configs.get('proxy_configs')))
return uploaded_paths
def _exec_cmd_on_remote_instance(self, master, cmd):
if master is not None:
with remote.get_remote(master) as r:
ret, stdout = r.execute_command("%s > /dev/null 2>&1 & echo $!"
% cmd)
return ret, stdout
def cancel_job(self, job_execution):
topology_name, instance = self._get_instance_if_running(job_execution)
if topology_name is None or instance is None:
return None
topology_name = self._get_topology_name(job_execution)
master = plugin_utils.get_instance(self.cluster, "nimbus")
cmd = (
'%(storm_kill)s -c nimbus.host=%(host)s %(topology_name)s') % (
{
"storm_kill": "/usr/local/storm/bin/storm kill",
"host": master.hostname(),
"topology_name": topology_name
})
ret, stdout = self._exec_cmd_on_remote_instance(instance, cmd)
if ret == 0:
# We had some effect, check the status
return self._get_job_status_from_remote(job_execution)
def get_job_status(self, job_execution):
topology_name, instance = self._get_instance_if_running(job_execution)
if instance is not None:
return self._get_job_status_from_remote(job_execution)
def run_job(self, job_execution):
ctx = context.ctx()
job = conductor.job_get(ctx, job_execution.job_id)
additional_sources, updated_job_configs = (
job_utils.resolve_data_source_references(job_execution.job_configs)
)
# We'll always run the driver program on the master
master = plugin_utils.get_instance(self.cluster, "nimbus")
# TODO(tmckay): wf_dir should probably be configurable.
# The only requirement is that the dir is writable by the image user
wf_dir = job_utils.create_workflow_dir(master, '/tmp/storm-edp', job,
job_execution.id, "700")
paths = self._upload_job_files(master, wf_dir, job,
updated_job_configs)
# We can shorten the paths in this case since we'll run out of wf_dir
paths = [os.path.basename(p) for p in paths]
app_jar = paths.pop(0)
job_class = updated_job_configs["configs"]["edp.java.main_class"]
topology_name = self._generate_topology_name(job.name)
# Launch the storm job using storm jar
host = master.hostname()
args = updated_job_configs.get('args', [])
args = " ".join([arg for arg in args])
if args:
args = " " + args
cmd = (
'%(storm_jar)s -c nimbus.host=%(host)s %(job_jar)s '
'%(main_class)s %(topology_name)s%(args)s' % (
{
"storm_jar": "/usr/local/storm/bin/storm jar",
"main_class": job_class,
"job_jar": app_jar,
"host": host,
"topology_name": topology_name,
"args": args
}))
job_execution = conductor.job_execution_get(ctx, job_execution.id)
if job_execution.info['status'] == edp.JOB_STATUS_TOBEKILLED:
return (None, edp.JOB_STATUS_KILLED, None)
# If an exception is raised here, the job_manager will mark
# the job failed and log the exception
# The redirects of stdout and stderr will preserve output in the wf_dir
with remote.get_remote(master) as r:
# Upload the command launch script
launch = os.path.join(wf_dir, "launch_command")
r.write_file_to(launch, self._job_script())
r.execute_command("chmod +x %s" % launch)
ret, stdout = r.execute_command(
"cd %s; ./launch_command %s > /dev/null 2>&1 & echo $!"
% (wf_dir, cmd))
if ret == 0:
# Success, we'll add the wf_dir in job_execution.extra and store
# topology_name@instance_id as the job id
# We know the job is running so return "RUNNING"
return (topology_name + "@" + master.id,
edp.JOB_STATUS_RUNNING,
{'storm-path': wf_dir})
# Hmm, no execption but something failed.
# Since we're using backgrounding with redirect, this is unlikely.
raise e.EDPError(_("Storm job execution failed. Exit status = "
"%(status)s, stdout = %(stdout)s") %
{'status': ret, 'stdout': stdout})
def validate_job_execution(self, cluster, job, data):
j.check_main_class_present(data, job)
@staticmethod
def get_possible_job_config(job_type):
return {'job_config': {'configs': [], 'args': []}}
@staticmethod
def get_supported_job_types():
return [edp.JOB_TYPE_STORM]

View File

@ -78,14 +78,16 @@ def check_mains_libs(data, **kwargs):
# These types must have a value in mains and may also use libs # These types must have a value in mains and may also use libs
if job_type in [edp.JOB_TYPE_PIG, edp.JOB_TYPE_HIVE, if job_type in [edp.JOB_TYPE_PIG, edp.JOB_TYPE_HIVE,
edp.JOB_TYPE_SHELL, edp.JOB_TYPE_SPARK]: edp.JOB_TYPE_SHELL, edp.JOB_TYPE_SPARK,
edp.JOB_TYPE_STORM]:
if not mains: if not mains:
if job_type == edp.JOB_TYPE_SPARK: if job_type in [edp.JOB_TYPE_SPARK, edp.JOB_TYPE_STORM]:
msg = _( msg = _(
"%s job requires main application jar") % data.get("type") "%s job requires main application jar") % data.get("type")
else: else:
msg = _("%s flow requires main script") % data.get("type") msg = _("%s flow requires main script") % data.get("type")
raise e.InvalidDataException(msg) raise e.InvalidDataException(msg)
# Check for overlap # Check for overlap
if set(mains).intersection(set(libs)): if set(mains).intersection(set(libs)):
raise e.InvalidDataException(_("'mains' and 'libs' overlap")) raise e.InvalidDataException(_("'mains' and 'libs' overlap"))

View File

@ -0,0 +1,383 @@
# Copyright (c) 2015 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import mock
import sahara.exceptions as ex
from sahara.service.edp.storm import engine as se
from sahara.tests.unit import base
from sahara.utils import edp
class TestStorm(base.SaharaTestCase):
def setUp(self):
super(TestStorm, self).setUp()
self.master_host = "master"
self.master_inst = "6789"
self.storm_topology_name = "MyJob_ed8347a9-39aa-477c-8108-066202eb6130"
self.workflow_dir = "/wfdir"
def test_get_topology_and_inst_id(self):
'''Test parsing of job ids
Test that job ids of the form topology_name@instance are
split into topology_name and instance ids by
eng._get_topology_name_and_inst_id() but anything else
returns empty strings
'''
eng = se.StormJobEngine(None)
for job_id in [None, "", "@", "something", "topology_name@",
"@instance"]:
topology_name, inst_id = eng._get_topology_and_inst_id(job_id)
self.assertEqual(("", ""), (topology_name, inst_id))
topology_name, inst_id = eng._get_topology_and_inst_id(
"topology_name@instance")
self.assertEqual(("topology_name", "instance"),
(topology_name, inst_id))
@mock.patch('sahara.utils.general.get_instances')
def test_get_instance_if_running(self, get_instances):
'''Test retrieval of topology_name and instance object for running job
If the job id is valid and the job status is non-terminated,
_get_instance_if_running() should retrieve the instance
based on the inst_id and return the topology_name and instance.
If the job is invalid or the job is terminated, it should
return None, None.
If get_instances() throws an exception or returns an empty list,
the instance returned should be None (topology_name might
still be set)
'''
get_instances.return_value = ["instance"]
job_exec = mock.Mock()
eng = se.StormJobEngine("cluster")
job_exec.oozie_job_id = "invalid id"
self.assertEqual((None, None),
eng._get_instance_if_running(job_exec))
job_exec.oozie_job_id = "topology_name@inst_id"
for state in edp.JOB_STATUSES_TERMINATED:
job_exec.info = {'status': state}
self.assertEqual((None, None),
eng._get_instance_if_running(job_exec))
job_exec.info = {'status': edp.JOB_STATUS_RUNNING}
self.assertEqual(("topology_name", "instance"),
eng._get_instance_if_running(job_exec))
get_instances.assert_called_with("cluster", ["inst_id"])
# Pretend get_instances returns nothing
get_instances.return_value = []
topology_name, instance = eng._get_instance_if_running(job_exec)
self.assertIsNone(instance)
# Pretend get_instances throws an exception
get_instances.side_effect = Exception("some failure")
topology_name, instance = eng._get_instance_if_running(job_exec)
self.assertIsNone(instance)
@mock.patch('sahara.plugins.utils.get_instance')
@mock.patch('sahara.utils.general.get_instances')
@mock.patch('sahara.utils.remote.get_remote')
@mock.patch('sahara.conductor.API.job_get')
@mock.patch('sahara.context.ctx', return_value="ctx")
def test_get_job_status_from_remote(self, get_instance, get_instances,
get_remote, ctx, job_get):
'''Test retrieval of job status from remote instance
If the process is present, status is RUNNING
If the process is not present, status depends on the result file
If the result file is missing, status is DONEWITHERROR
'''
eng = se.StormJobEngine("cluster")
job_exec = mock.Mock()
master_instance = self._make_master_instance()
master_instance.execute_command.return_value = 0, "ACTIVE"
get_remote.return_value.__enter__ = mock.Mock(
return_value=master_instance)
get_instance.return_value = master_instance
get_instances.return_value = ["instance"]
# Pretend process is running
job_exec.oozie_job_id = "topology_name@inst_id"
job_exec.info = {'status': edp.JOB_STATUS_RUNNING}
job_exec.job_configs = {"configs": {"topology_name": "topology_name"}}
status = eng._get_job_status_from_remote(job_exec)
self.assertEqual({"status": edp.JOB_STATUS_RUNNING}, status)
@mock.patch.object(se.StormJobEngine,
'_get_job_status_from_remote',
autospec=True)
@mock.patch.object(se.StormJobEngine,
'_get_instance_if_running',
autospec=True)
@mock.patch('sahara.utils.remote.get_remote')
def test_get_job_status(self,
get_remote,
_get_instance_if_running,
_get_job_status_from_remote):
# This is to mock "with remote.get_remote(instance) as r"
remote_instance = mock.Mock()
get_remote.return_value.__enter__ = mock.Mock(
return_value=remote_instance)
# Pretend instance is not returned
_get_instance_if_running.return_value = "topology_name", None
job_exec = mock.Mock()
eng = se.StormJobEngine("cluster")
status = eng.get_job_status(job_exec)
self.assertIsNone(status)
# Pretend we have an instance
_get_instance_if_running.return_value = "topology_name", "instance"
_get_job_status_from_remote.return_value = {"status":
edp.JOB_STATUS_RUNNING}
status = eng.get_job_status(job_exec)
_get_job_status_from_remote.assert_called_with(eng,
job_exec)
self.assertEqual({"status": edp.JOB_STATUS_RUNNING}, status)
@mock.patch.object(se.StormJobEngine,
'_get_instance_if_running',
autospec=True,
return_value=(None, None))
@mock.patch('sahara.utils.remote.get_remote')
@mock.patch('sahara.conductor.API.job_get')
@mock.patch('sahara.context.ctx', return_value="ctx")
def test_cancel_job_null_or_done(self,
get_remote,
_get_instance_if_running,
job_get,
ctx):
'''Test cancel_job() when instance is None
Test that cancel_job() returns None and does not try to
retrieve a remote instance if _get_instance_if_running() returns None
'''
eng = se.StormJobEngine("cluster")
job_exec = mock.Mock()
self.assertIsNone(eng.cancel_job(job_exec))
self.assertFalse(get_remote.called)
@mock.patch.object(se.StormJobEngine,
'_get_job_status_from_remote',
autospec=True,
return_value={"status": edp.JOB_STATUS_KILLED})
@mock.patch('sahara.utils.general.get_instances')
@mock.patch('sahara.plugins.utils.get_instance')
@mock.patch('sahara.utils.remote.get_remote')
def test_cancel_job(self, get_remote, get_instance, get_instances,
_get_job_status_from_remote):
master_instance = self._make_master_instance()
status = self._setup_tests(master_instance)
get_instance.return_value = master_instance
get_instances.return_value = ["instance"]
master_instance.execute_command.return_value = 0, "KILLED"
get_remote.return_value.__enter__ = mock.Mock(
return_value=master_instance)
eng = se.StormJobEngine("cluster")
job_exec = mock.Mock()
job_exec.oozie_job_id = "topology_name@inst_id"
job_exec.info = {'status': edp.JOB_STATUS_RUNNING}
job_exec.job_configs = {"configs": {"topology_name": "topology_name"}}
status = eng.cancel_job(job_exec)
master_instance.execute_command.assert_called_with(
"/usr/local/storm/bin/storm kill -c nimbus.host=%s topology_name "
"> /dev/null 2>&1 & echo $!" % self.master_host)
self.assertEqual({"status": edp.JOB_STATUS_KILLED}, status)
@mock.patch('sahara.service.edp.binary_retrievers.dispatch.get_raw_binary')
@mock.patch('sahara.utils.remote.get_remote')
def test_upload_job_files(self, get_remote, get_raw_binary):
main_names = ["main1", "main2", "main3"]
lib_names = ["lib1", "lib2", "lib3"]
def make_data_objects(*args):
objs = []
for name in args:
m = mock.Mock()
m.name = name
objs.append(m)
return objs
job = mock.Mock()
job.name = "job"
job.mains = make_data_objects(*main_names)
job.libs = make_data_objects(*lib_names)
# This is to mock "with remote.get_remote(instance) as r"
remote_instance = mock.Mock()
get_remote.return_value.__enter__ = mock.Mock(
return_value=remote_instance)
get_raw_binary.return_value = "data"
eng = se.StormJobEngine("cluster")
paths = eng._upload_job_files("where", "/somedir", job, {})
self.assertEqual(["/somedir/" + n for n in main_names + lib_names],
paths)
for path in paths:
remote_instance.write_file_to.assert_any_call(path, "data")
def _make_master_instance(self, return_code=0):
master = mock.Mock()
master.execute_command.return_value = (return_code,
self.storm_topology_name)
master.hostname.return_value = self.master_host
master.id = self.master_inst
return master
@mock.patch('sahara.conductor.API.job_execution_get')
@mock.patch('sahara.utils.remote.get_remote')
@mock.patch('sahara.plugins.utils.get_instance')
@mock.patch('sahara.conductor.API.job_get')
@mock.patch('sahara.context.ctx', return_value="ctx")
def _setup_tests(self, master_instance, ctx, job_get,
get_instance, get_remote, job_exec_get):
# This is to mock "with remote.get_remote(master) as r" in run_job
get_remote.return_value.__enter__ = mock.Mock(
return_value=master_instance)
get_instance.return_value = master_instance
@mock.patch.object(se.StormJobEngine,
'_generate_topology_name',
autospec=True,
return_value=(
"MyJob_ed8347a9-39aa-477c-8108-066202eb6130"))
@mock.patch('sahara.conductor.API.job_execution_get')
@mock.patch('sahara.utils.remote.get_remote')
@mock.patch('sahara.service.edp.job_utils.create_workflow_dir')
@mock.patch('sahara.plugins.utils.get_instance')
@mock.patch('sahara.conductor.API.job_get')
@mock.patch('sahara.context.ctx', return_value="ctx")
def _setup_run_job(self, master_instance, job_configs, files,
ctx, job_get, get_instance, create_workflow_dir,
get_remote, job_exec_get, _generate_topology_name):
def _upload_job_files(where, job_dir, job,
libs_subdir=True, job_configs=None):
paths = [os.path.join(self.workflow_dir, f) for f in files['jars']]
return paths
job = mock.Mock()
job.name = "MyJob"
job_get.return_value = job
job_exec = mock.Mock()
job_exec.job_configs = job_configs
create_workflow_dir.return_value = self.workflow_dir
# This is to mock "with remote.get_remote(master) as r" in run_job
get_remote.return_value.__enter__ = mock.Mock(
return_value=master_instance)
get_instance.return_value = master_instance
eng = se.StormJobEngine("cluster")
eng._upload_job_files = mock.Mock()
eng._upload_job_files.side_effect = _upload_job_files
status = eng.run_job(job_exec)
# Check that we launch on the master node
get_instance.assert_called_with("cluster", "nimbus")
return status
def test_run_job_raise(self):
job_configs = {
'configs': {"edp.java.main_class": "org.me.myclass",
"topology_name": "topology_name"},
}
files = {'jars': ["app.jar"]}
# The object representing the storm master node
# The storm jar command will be run on this instance
master_instance = self._make_master_instance(return_code=1)
# If execute_command returns an error we should get a raise
self.assertRaises(ex.EDPError,
self._setup_run_job,
master_instance, job_configs, files)
def test_run_job(self):
job_configs = {
'configs': {"edp.java.main_class": "org.me.myclass"}
}
files = {'jars': ["app.jar"]}
# The object representing the storm master node
# The storm jar command will be run on this instance
master_instance = self._make_master_instance()
status = self._setup_run_job(master_instance, job_configs, files)
# Check the command
master_instance.execute_command.assert_called_with(
'cd %(workflow_dir)s; '
'./launch_command /usr/local/storm/bin/storm jar '
'-c nimbus.host=master '
'app.jar org.me.myclass %(topology_name)s '
'> /dev/null 2>&1 & echo $!' % {"workflow_dir": self.workflow_dir,
"topology_name": (
self.storm_topology_name)})
# Check result here
self.assertEqual(("%s@%s" % (self.storm_topology_name,
self.master_inst),
edp.JOB_STATUS_RUNNING,
{"storm-path": self.workflow_dir}), status)
def test_run_job_args(self):
job_configs = {
'configs': {"edp.java.main_class": "org.me.myclass"},
'args': ['input_arg', 'output_arg']
}
files = {'jars': ["app.jar"]}
# The object representing the spark master node
# The spark-submit command will be run on this instance
master_instance = self._make_master_instance()
status = self._setup_run_job(master_instance, job_configs, files)
# Check the command
master_instance.execute_command.assert_called_with(
'cd %(workflow_dir)s; '
'./launch_command /usr/local/storm/bin/storm jar '
'-c nimbus.host=master '
'app.jar org.me.myclass %(topology_name)s input_arg output_arg '
'> /dev/null 2>&1 & echo $!' % {"workflow_dir": self.workflow_dir,
"topology_name": (
self.storm_topology_name)})
# Check result here
self.assertEqual(("%s@%s" % (self.storm_topology_name,
self.master_inst),
edp.JOB_STATUS_RUNNING,
{"storm-path": self.workflow_dir}), status)

View File

@ -44,6 +44,7 @@ JOB_TYPE_HIVE = 'Hive'
JOB_TYPE_JAVA = 'Java' JOB_TYPE_JAVA = 'Java'
JOB_TYPE_MAPREDUCE = 'MapReduce' JOB_TYPE_MAPREDUCE = 'MapReduce'
JOB_TYPE_SPARK = 'Spark' JOB_TYPE_SPARK = 'Spark'
JOB_TYPE_STORM = 'Storm'
JOB_TYPE_MAPREDUCE_STREAMING = (JOB_TYPE_MAPREDUCE + JOB_TYPE_SEP + JOB_TYPE_MAPREDUCE_STREAMING = (JOB_TYPE_MAPREDUCE + JOB_TYPE_SEP +
JOB_SUBTYPE_STREAMING) JOB_SUBTYPE_STREAMING)
JOB_TYPE_PIG = 'Pig' JOB_TYPE_PIG = 'Pig'
@ -57,7 +58,8 @@ JOB_TYPES_ALL = [
JOB_TYPE_MAPREDUCE_STREAMING, JOB_TYPE_MAPREDUCE_STREAMING,
JOB_TYPE_PIG, JOB_TYPE_PIG,
JOB_TYPE_SHELL, JOB_TYPE_SHELL,
JOB_TYPE_SPARK JOB_TYPE_SPARK,
JOB_TYPE_STORM
] ]
ADAPT_FOR_OOZIE = 'edp.java.adapt_for_oozie' ADAPT_FOR_OOZIE = 'edp.java.adapt_for_oozie'