Merge "Storm EDP implementation"
This commit is contained in:
commit
751a97d173
35
sahara/plugins/storm/edp_engine.py
Normal file
35
sahara/plugins/storm/edp_engine.py
Normal file
@ -0,0 +1,35 @@
|
||||
# Copyright (c) 2015 Telles Nobrega.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from sahara import exceptions as ex
|
||||
from sahara.i18n import _
|
||||
from sahara.service.edp.storm import engine as edp_engine
|
||||
|
||||
|
||||
class EdpEngine(edp_engine.StormJobEngine):
|
||||
|
||||
edp_base_version = "0.9.2"
|
||||
|
||||
@staticmethod
|
||||
def edp_supported(version):
|
||||
return version >= EdpEngine.edp_base_version
|
||||
|
||||
def validate_job_execution(self, cluster, job, data):
|
||||
if not self.edp_supported(cluster.hadoop_version):
|
||||
raise ex.InvalidDataException(
|
||||
_('Storm {base} required to run {type} jobs').format(
|
||||
base=EdpEngine.edp_base_version, type=job.type))
|
||||
|
||||
super(EdpEngine, self).validate_job_execution(cluster, job, data)
|
@ -25,6 +25,7 @@ from sahara.i18n import _LI
|
||||
from sahara.plugins import exceptions as ex
|
||||
from sahara.plugins import provisioning as p
|
||||
from sahara.plugins.storm import config_helper as c_helper
|
||||
from sahara.plugins.storm import edp_engine
|
||||
from sahara.plugins.storm import run_scripts as run
|
||||
from sahara.plugins import utils
|
||||
from sahara.utils import cluster_progress_ops as cpo
|
||||
@ -100,6 +101,25 @@ class StormProvider(p.ProvisioningPluginBase):
|
||||
cluster=cluster.name))
|
||||
self._set_cluster_info(cluster)
|
||||
|
||||
def get_edp_engine(self, cluster, job_type):
|
||||
if job_type in edp_engine.EdpEngine.get_supported_job_types():
|
||||
return edp_engine.EdpEngine(cluster)
|
||||
|
||||
return None
|
||||
|
||||
def get_edp_job_types(self, versions=[]):
|
||||
res = {}
|
||||
for vers in self.get_versions():
|
||||
if not versions or vers in versions:
|
||||
if edp_engine.EdpEngine.edp_supported(vers):
|
||||
res[vers] = edp_engine.EdpEngine.get_supported_job_types()
|
||||
return res
|
||||
|
||||
def get_edp_config_hints(self, job_type, version):
|
||||
if edp_engine.EdpEngine.edp_supported(version):
|
||||
return edp_engine.EdpEngine.get_possible_job_config(job_type)
|
||||
return {}
|
||||
|
||||
def _extract_configs_to_extra(self, cluster):
|
||||
st_master = utils.get_instance(cluster, "nimbus")
|
||||
zk_servers = utils.get_instances(cluster, "zookeeper")
|
||||
|
0
sahara/service/edp/storm/__init__.py
Normal file
0
sahara/service/edp/storm/__init__.py
Normal file
248
sahara/service/edp/storm/engine.py
Normal file
248
sahara/service/edp/storm/engine.py
Normal file
@ -0,0 +1,248 @@
|
||||
# Copyright (c) 2014 OpenStack Foundation
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import os
|
||||
|
||||
from oslo_config import cfg
|
||||
import six
|
||||
|
||||
import uuid
|
||||
|
||||
from sahara import conductor as c
|
||||
from sahara import context
|
||||
from sahara import exceptions as e
|
||||
from sahara.i18n import _
|
||||
from sahara.plugins import utils as plugin_utils
|
||||
from sahara.service.edp import base_engine
|
||||
from sahara.service.edp.binary_retrievers import dispatch
|
||||
from sahara.service.edp import job_utils
|
||||
from sahara.service.validations.edp import job_execution as j
|
||||
from sahara.utils import edp
|
||||
from sahara.utils import files
|
||||
from sahara.utils import general
|
||||
from sahara.utils import remote
|
||||
|
||||
conductor = c.API
|
||||
CONF = cfg.CONF
|
||||
|
||||
|
||||
class StormJobEngine(base_engine.JobEngine):
|
||||
def __init__(self, cluster):
|
||||
self.cluster = cluster
|
||||
|
||||
def _get_topology_and_inst_id(self, job_id):
|
||||
try:
|
||||
topology_name, inst_id = job_id.split("@", 1)
|
||||
if topology_name and inst_id:
|
||||
return (topology_name, inst_id)
|
||||
except Exception:
|
||||
pass
|
||||
return "", ""
|
||||
|
||||
def _get_instance_if_running(self, job_execution):
|
||||
topology_name, inst_id = self._get_topology_and_inst_id(
|
||||
job_execution.oozie_job_id)
|
||||
if not topology_name or not inst_id or (
|
||||
job_execution.info['status'] in edp.JOB_STATUSES_TERMINATED):
|
||||
return None, None
|
||||
# TODO(tmckay): well, if there is a list index out of range
|
||||
# error here it probably means that the instance is gone. If we
|
||||
# have a job execution that is not terminated, and the instance
|
||||
# is gone, we should probably change the status somehow.
|
||||
# For now, do nothing.
|
||||
try:
|
||||
instance = general.get_instances(self.cluster, [inst_id])[0]
|
||||
except Exception:
|
||||
instance = None
|
||||
return topology_name, instance
|
||||
|
||||
def _get_topology_name(self, job_execution):
|
||||
topology_name, inst_id = self._get_topology_and_inst_id(
|
||||
job_execution.oozie_job_id)
|
||||
|
||||
return topology_name
|
||||
|
||||
def _generate_topology_name(self, name):
|
||||
return name + "_" + six.text_type(uuid.uuid4())
|
||||
|
||||
def _get_job_status_from_remote(self, job_execution):
|
||||
topology_name, inst_id = self._get_instance_if_running(job_execution)
|
||||
if topology_name is None or inst_id is None:
|
||||
return edp.JOB_STATUSES_TERMINATED
|
||||
|
||||
topology_name = self._get_topology_name(job_execution)
|
||||
master = plugin_utils.get_instance(self.cluster, "nimbus")
|
||||
|
||||
cmd = (
|
||||
"%(storm)s -c nimbus.host=%(host)s "
|
||||
"list | grep %(topology_name)s | awk '{print $2}'") % (
|
||||
{
|
||||
"storm": "/usr/local/storm/bin/storm",
|
||||
"host": master.hostname(),
|
||||
"topology_name": topology_name
|
||||
})
|
||||
|
||||
with remote.get_remote(master) as r:
|
||||
ret, stdout = r.execute_command("%s " % (cmd))
|
||||
# If the status is ACTIVE is there, it's still running
|
||||
if stdout.strip() == "ACTIVE":
|
||||
return {"status": edp.JOB_STATUS_RUNNING}
|
||||
else:
|
||||
return {"status": edp.JOB_STATUS_KILLED}
|
||||
|
||||
def _job_script(self):
|
||||
path = "service/edp/resources/launch_command.py"
|
||||
return files.get_file_text(path)
|
||||
|
||||
def _upload_job_files(self, where, job_dir, job, job_configs):
|
||||
|
||||
def upload(r, dir, job_file, proxy_configs):
|
||||
dst = os.path.join(dir, job_file.name)
|
||||
raw_data = dispatch.get_raw_binary(job_file, proxy_configs)
|
||||
r.write_file_to(dst, raw_data)
|
||||
return dst
|
||||
|
||||
uploaded_paths = []
|
||||
with remote.get_remote(where) as r:
|
||||
mains = list(job.mains) if job.mains else []
|
||||
libs = list(job.libs) if job.libs else []
|
||||
for job_file in mains+libs:
|
||||
uploaded_paths.append(
|
||||
upload(r, job_dir, job_file,
|
||||
job_configs.get('proxy_configs')))
|
||||
|
||||
return uploaded_paths
|
||||
|
||||
def _exec_cmd_on_remote_instance(self, master, cmd):
|
||||
if master is not None:
|
||||
with remote.get_remote(master) as r:
|
||||
ret, stdout = r.execute_command("%s > /dev/null 2>&1 & echo $!"
|
||||
% cmd)
|
||||
|
||||
return ret, stdout
|
||||
|
||||
def cancel_job(self, job_execution):
|
||||
topology_name, instance = self._get_instance_if_running(job_execution)
|
||||
if topology_name is None or instance is None:
|
||||
return None
|
||||
|
||||
topology_name = self._get_topology_name(job_execution)
|
||||
master = plugin_utils.get_instance(self.cluster, "nimbus")
|
||||
|
||||
cmd = (
|
||||
'%(storm_kill)s -c nimbus.host=%(host)s %(topology_name)s') % (
|
||||
{
|
||||
"storm_kill": "/usr/local/storm/bin/storm kill",
|
||||
"host": master.hostname(),
|
||||
"topology_name": topology_name
|
||||
})
|
||||
|
||||
ret, stdout = self._exec_cmd_on_remote_instance(instance, cmd)
|
||||
|
||||
if ret == 0:
|
||||
# We had some effect, check the status
|
||||
return self._get_job_status_from_remote(job_execution)
|
||||
|
||||
def get_job_status(self, job_execution):
|
||||
topology_name, instance = self._get_instance_if_running(job_execution)
|
||||
if instance is not None:
|
||||
return self._get_job_status_from_remote(job_execution)
|
||||
|
||||
def run_job(self, job_execution):
|
||||
ctx = context.ctx()
|
||||
job = conductor.job_get(ctx, job_execution.job_id)
|
||||
|
||||
additional_sources, updated_job_configs = (
|
||||
job_utils.resolve_data_source_references(job_execution.job_configs)
|
||||
)
|
||||
|
||||
# We'll always run the driver program on the master
|
||||
master = plugin_utils.get_instance(self.cluster, "nimbus")
|
||||
|
||||
# TODO(tmckay): wf_dir should probably be configurable.
|
||||
# The only requirement is that the dir is writable by the image user
|
||||
wf_dir = job_utils.create_workflow_dir(master, '/tmp/storm-edp', job,
|
||||
job_execution.id, "700")
|
||||
|
||||
paths = self._upload_job_files(master, wf_dir, job,
|
||||
updated_job_configs)
|
||||
|
||||
# We can shorten the paths in this case since we'll run out of wf_dir
|
||||
paths = [os.path.basename(p) for p in paths]
|
||||
|
||||
app_jar = paths.pop(0)
|
||||
job_class = updated_job_configs["configs"]["edp.java.main_class"]
|
||||
topology_name = self._generate_topology_name(job.name)
|
||||
|
||||
# Launch the storm job using storm jar
|
||||
host = master.hostname()
|
||||
args = updated_job_configs.get('args', [])
|
||||
args = " ".join([arg for arg in args])
|
||||
|
||||
if args:
|
||||
args = " " + args
|
||||
|
||||
cmd = (
|
||||
'%(storm_jar)s -c nimbus.host=%(host)s %(job_jar)s '
|
||||
'%(main_class)s %(topology_name)s%(args)s' % (
|
||||
{
|
||||
"storm_jar": "/usr/local/storm/bin/storm jar",
|
||||
"main_class": job_class,
|
||||
"job_jar": app_jar,
|
||||
"host": host,
|
||||
"topology_name": topology_name,
|
||||
"args": args
|
||||
}))
|
||||
|
||||
job_execution = conductor.job_execution_get(ctx, job_execution.id)
|
||||
if job_execution.info['status'] == edp.JOB_STATUS_TOBEKILLED:
|
||||
return (None, edp.JOB_STATUS_KILLED, None)
|
||||
|
||||
# If an exception is raised here, the job_manager will mark
|
||||
# the job failed and log the exception
|
||||
# The redirects of stdout and stderr will preserve output in the wf_dir
|
||||
with remote.get_remote(master) as r:
|
||||
# Upload the command launch script
|
||||
launch = os.path.join(wf_dir, "launch_command")
|
||||
r.write_file_to(launch, self._job_script())
|
||||
r.execute_command("chmod +x %s" % launch)
|
||||
ret, stdout = r.execute_command(
|
||||
"cd %s; ./launch_command %s > /dev/null 2>&1 & echo $!"
|
||||
% (wf_dir, cmd))
|
||||
|
||||
if ret == 0:
|
||||
# Success, we'll add the wf_dir in job_execution.extra and store
|
||||
# topology_name@instance_id as the job id
|
||||
# We know the job is running so return "RUNNING"
|
||||
return (topology_name + "@" + master.id,
|
||||
edp.JOB_STATUS_RUNNING,
|
||||
{'storm-path': wf_dir})
|
||||
|
||||
# Hmm, no execption but something failed.
|
||||
# Since we're using backgrounding with redirect, this is unlikely.
|
||||
raise e.EDPError(_("Storm job execution failed. Exit status = "
|
||||
"%(status)s, stdout = %(stdout)s") %
|
||||
{'status': ret, 'stdout': stdout})
|
||||
|
||||
def validate_job_execution(self, cluster, job, data):
|
||||
j.check_main_class_present(data, job)
|
||||
|
||||
@staticmethod
|
||||
def get_possible_job_config(job_type):
|
||||
return {'job_config': {'configs': [], 'args': []}}
|
||||
|
||||
@staticmethod
|
||||
def get_supported_job_types():
|
||||
return [edp.JOB_TYPE_STORM]
|
@ -78,14 +78,16 @@ def check_mains_libs(data, **kwargs):
|
||||
|
||||
# These types must have a value in mains and may also use libs
|
||||
if job_type in [edp.JOB_TYPE_PIG, edp.JOB_TYPE_HIVE,
|
||||
edp.JOB_TYPE_SHELL, edp.JOB_TYPE_SPARK]:
|
||||
edp.JOB_TYPE_SHELL, edp.JOB_TYPE_SPARK,
|
||||
edp.JOB_TYPE_STORM]:
|
||||
if not mains:
|
||||
if job_type == edp.JOB_TYPE_SPARK:
|
||||
if job_type in [edp.JOB_TYPE_SPARK, edp.JOB_TYPE_STORM]:
|
||||
msg = _(
|
||||
"%s job requires main application jar") % data.get("type")
|
||||
else:
|
||||
msg = _("%s flow requires main script") % data.get("type")
|
||||
raise e.InvalidDataException(msg)
|
||||
|
||||
# Check for overlap
|
||||
if set(mains).intersection(set(libs)):
|
||||
raise e.InvalidDataException(_("'mains' and 'libs' overlap"))
|
||||
|
0
sahara/tests/unit/service/edp/storm/__init__.py
Normal file
0
sahara/tests/unit/service/edp/storm/__init__.py
Normal file
383
sahara/tests/unit/service/edp/storm/test_storm.py
Normal file
383
sahara/tests/unit/service/edp/storm/test_storm.py
Normal file
@ -0,0 +1,383 @@
|
||||
# Copyright (c) 2015 OpenStack Foundation
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import os
|
||||
|
||||
import mock
|
||||
|
||||
import sahara.exceptions as ex
|
||||
from sahara.service.edp.storm import engine as se
|
||||
from sahara.tests.unit import base
|
||||
from sahara.utils import edp
|
||||
|
||||
|
||||
class TestStorm(base.SaharaTestCase):
|
||||
def setUp(self):
|
||||
super(TestStorm, self).setUp()
|
||||
|
||||
self.master_host = "master"
|
||||
self.master_inst = "6789"
|
||||
self.storm_topology_name = "MyJob_ed8347a9-39aa-477c-8108-066202eb6130"
|
||||
self.workflow_dir = "/wfdir"
|
||||
|
||||
def test_get_topology_and_inst_id(self):
|
||||
'''Test parsing of job ids
|
||||
|
||||
Test that job ids of the form topology_name@instance are
|
||||
split into topology_name and instance ids by
|
||||
eng._get_topology_name_and_inst_id() but anything else
|
||||
returns empty strings
|
||||
'''
|
||||
eng = se.StormJobEngine(None)
|
||||
for job_id in [None, "", "@", "something", "topology_name@",
|
||||
"@instance"]:
|
||||
topology_name, inst_id = eng._get_topology_and_inst_id(job_id)
|
||||
self.assertEqual(("", ""), (topology_name, inst_id))
|
||||
|
||||
topology_name, inst_id = eng._get_topology_and_inst_id(
|
||||
"topology_name@instance")
|
||||
self.assertEqual(("topology_name", "instance"),
|
||||
(topology_name, inst_id))
|
||||
|
||||
@mock.patch('sahara.utils.general.get_instances')
|
||||
def test_get_instance_if_running(self, get_instances):
|
||||
'''Test retrieval of topology_name and instance object for running job
|
||||
|
||||
If the job id is valid and the job status is non-terminated,
|
||||
_get_instance_if_running() should retrieve the instance
|
||||
based on the inst_id and return the topology_name and instance.
|
||||
|
||||
If the job is invalid or the job is terminated, it should
|
||||
return None, None.
|
||||
|
||||
If get_instances() throws an exception or returns an empty list,
|
||||
the instance returned should be None (topology_name might
|
||||
still be set)
|
||||
'''
|
||||
get_instances.return_value = ["instance"]
|
||||
job_exec = mock.Mock()
|
||||
eng = se.StormJobEngine("cluster")
|
||||
|
||||
job_exec.oozie_job_id = "invalid id"
|
||||
self.assertEqual((None, None),
|
||||
eng._get_instance_if_running(job_exec))
|
||||
|
||||
job_exec.oozie_job_id = "topology_name@inst_id"
|
||||
for state in edp.JOB_STATUSES_TERMINATED:
|
||||
job_exec.info = {'status': state}
|
||||
self.assertEqual((None, None),
|
||||
eng._get_instance_if_running(job_exec))
|
||||
|
||||
job_exec.info = {'status': edp.JOB_STATUS_RUNNING}
|
||||
self.assertEqual(("topology_name", "instance"),
|
||||
eng._get_instance_if_running(job_exec))
|
||||
get_instances.assert_called_with("cluster", ["inst_id"])
|
||||
|
||||
# Pretend get_instances returns nothing
|
||||
get_instances.return_value = []
|
||||
topology_name, instance = eng._get_instance_if_running(job_exec)
|
||||
self.assertIsNone(instance)
|
||||
|
||||
# Pretend get_instances throws an exception
|
||||
get_instances.side_effect = Exception("some failure")
|
||||
topology_name, instance = eng._get_instance_if_running(job_exec)
|
||||
self.assertIsNone(instance)
|
||||
|
||||
@mock.patch('sahara.plugins.utils.get_instance')
|
||||
@mock.patch('sahara.utils.general.get_instances')
|
||||
@mock.patch('sahara.utils.remote.get_remote')
|
||||
@mock.patch('sahara.conductor.API.job_get')
|
||||
@mock.patch('sahara.context.ctx', return_value="ctx")
|
||||
def test_get_job_status_from_remote(self, get_instance, get_instances,
|
||||
get_remote, ctx, job_get):
|
||||
'''Test retrieval of job status from remote instance
|
||||
|
||||
If the process is present, status is RUNNING
|
||||
If the process is not present, status depends on the result file
|
||||
If the result file is missing, status is DONEWITHERROR
|
||||
'''
|
||||
eng = se.StormJobEngine("cluster")
|
||||
job_exec = mock.Mock()
|
||||
|
||||
master_instance = self._make_master_instance()
|
||||
master_instance.execute_command.return_value = 0, "ACTIVE"
|
||||
get_remote.return_value.__enter__ = mock.Mock(
|
||||
return_value=master_instance)
|
||||
get_instance.return_value = master_instance
|
||||
get_instances.return_value = ["instance"]
|
||||
|
||||
# Pretend process is running
|
||||
job_exec.oozie_job_id = "topology_name@inst_id"
|
||||
job_exec.info = {'status': edp.JOB_STATUS_RUNNING}
|
||||
job_exec.job_configs = {"configs": {"topology_name": "topology_name"}}
|
||||
status = eng._get_job_status_from_remote(job_exec)
|
||||
self.assertEqual({"status": edp.JOB_STATUS_RUNNING}, status)
|
||||
|
||||
@mock.patch.object(se.StormJobEngine,
|
||||
'_get_job_status_from_remote',
|
||||
autospec=True)
|
||||
@mock.patch.object(se.StormJobEngine,
|
||||
'_get_instance_if_running',
|
||||
autospec=True)
|
||||
@mock.patch('sahara.utils.remote.get_remote')
|
||||
def test_get_job_status(self,
|
||||
get_remote,
|
||||
_get_instance_if_running,
|
||||
_get_job_status_from_remote):
|
||||
|
||||
# This is to mock "with remote.get_remote(instance) as r"
|
||||
remote_instance = mock.Mock()
|
||||
get_remote.return_value.__enter__ = mock.Mock(
|
||||
return_value=remote_instance)
|
||||
|
||||
# Pretend instance is not returned
|
||||
_get_instance_if_running.return_value = "topology_name", None
|
||||
job_exec = mock.Mock()
|
||||
eng = se.StormJobEngine("cluster")
|
||||
status = eng.get_job_status(job_exec)
|
||||
self.assertIsNone(status)
|
||||
|
||||
# Pretend we have an instance
|
||||
_get_instance_if_running.return_value = "topology_name", "instance"
|
||||
_get_job_status_from_remote.return_value = {"status":
|
||||
edp.JOB_STATUS_RUNNING}
|
||||
status = eng.get_job_status(job_exec)
|
||||
_get_job_status_from_remote.assert_called_with(eng,
|
||||
job_exec)
|
||||
self.assertEqual({"status": edp.JOB_STATUS_RUNNING}, status)
|
||||
|
||||
@mock.patch.object(se.StormJobEngine,
|
||||
'_get_instance_if_running',
|
||||
autospec=True,
|
||||
return_value=(None, None))
|
||||
@mock.patch('sahara.utils.remote.get_remote')
|
||||
@mock.patch('sahara.conductor.API.job_get')
|
||||
@mock.patch('sahara.context.ctx', return_value="ctx")
|
||||
def test_cancel_job_null_or_done(self,
|
||||
get_remote,
|
||||
_get_instance_if_running,
|
||||
job_get,
|
||||
ctx):
|
||||
'''Test cancel_job() when instance is None
|
||||
|
||||
Test that cancel_job() returns None and does not try to
|
||||
retrieve a remote instance if _get_instance_if_running() returns None
|
||||
'''
|
||||
eng = se.StormJobEngine("cluster")
|
||||
job_exec = mock.Mock()
|
||||
self.assertIsNone(eng.cancel_job(job_exec))
|
||||
self.assertFalse(get_remote.called)
|
||||
|
||||
@mock.patch.object(se.StormJobEngine,
|
||||
'_get_job_status_from_remote',
|
||||
autospec=True,
|
||||
return_value={"status": edp.JOB_STATUS_KILLED})
|
||||
@mock.patch('sahara.utils.general.get_instances')
|
||||
@mock.patch('sahara.plugins.utils.get_instance')
|
||||
@mock.patch('sahara.utils.remote.get_remote')
|
||||
def test_cancel_job(self, get_remote, get_instance, get_instances,
|
||||
_get_job_status_from_remote):
|
||||
master_instance = self._make_master_instance()
|
||||
status = self._setup_tests(master_instance)
|
||||
get_instance.return_value = master_instance
|
||||
get_instances.return_value = ["instance"]
|
||||
master_instance.execute_command.return_value = 0, "KILLED"
|
||||
|
||||
get_remote.return_value.__enter__ = mock.Mock(
|
||||
return_value=master_instance)
|
||||
eng = se.StormJobEngine("cluster")
|
||||
job_exec = mock.Mock()
|
||||
job_exec.oozie_job_id = "topology_name@inst_id"
|
||||
job_exec.info = {'status': edp.JOB_STATUS_RUNNING}
|
||||
job_exec.job_configs = {"configs": {"topology_name": "topology_name"}}
|
||||
status = eng.cancel_job(job_exec)
|
||||
|
||||
master_instance.execute_command.assert_called_with(
|
||||
"/usr/local/storm/bin/storm kill -c nimbus.host=%s topology_name "
|
||||
"> /dev/null 2>&1 & echo $!" % self.master_host)
|
||||
|
||||
self.assertEqual({"status": edp.JOB_STATUS_KILLED}, status)
|
||||
|
||||
@mock.patch('sahara.service.edp.binary_retrievers.dispatch.get_raw_binary')
|
||||
@mock.patch('sahara.utils.remote.get_remote')
|
||||
def test_upload_job_files(self, get_remote, get_raw_binary):
|
||||
main_names = ["main1", "main2", "main3"]
|
||||
lib_names = ["lib1", "lib2", "lib3"]
|
||||
|
||||
def make_data_objects(*args):
|
||||
objs = []
|
||||
for name in args:
|
||||
m = mock.Mock()
|
||||
m.name = name
|
||||
objs.append(m)
|
||||
return objs
|
||||
|
||||
job = mock.Mock()
|
||||
job.name = "job"
|
||||
job.mains = make_data_objects(*main_names)
|
||||
job.libs = make_data_objects(*lib_names)
|
||||
|
||||
# This is to mock "with remote.get_remote(instance) as r"
|
||||
remote_instance = mock.Mock()
|
||||
get_remote.return_value.__enter__ = mock.Mock(
|
||||
return_value=remote_instance)
|
||||
|
||||
get_raw_binary.return_value = "data"
|
||||
|
||||
eng = se.StormJobEngine("cluster")
|
||||
paths = eng._upload_job_files("where", "/somedir", job, {})
|
||||
self.assertEqual(["/somedir/" + n for n in main_names + lib_names],
|
||||
paths)
|
||||
for path in paths:
|
||||
remote_instance.write_file_to.assert_any_call(path, "data")
|
||||
|
||||
def _make_master_instance(self, return_code=0):
|
||||
master = mock.Mock()
|
||||
master.execute_command.return_value = (return_code,
|
||||
self.storm_topology_name)
|
||||
master.hostname.return_value = self.master_host
|
||||
master.id = self.master_inst
|
||||
return master
|
||||
|
||||
@mock.patch('sahara.conductor.API.job_execution_get')
|
||||
@mock.patch('sahara.utils.remote.get_remote')
|
||||
@mock.patch('sahara.plugins.utils.get_instance')
|
||||
@mock.patch('sahara.conductor.API.job_get')
|
||||
@mock.patch('sahara.context.ctx', return_value="ctx")
|
||||
def _setup_tests(self, master_instance, ctx, job_get,
|
||||
get_instance, get_remote, job_exec_get):
|
||||
|
||||
# This is to mock "with remote.get_remote(master) as r" in run_job
|
||||
get_remote.return_value.__enter__ = mock.Mock(
|
||||
return_value=master_instance)
|
||||
get_instance.return_value = master_instance
|
||||
|
||||
@mock.patch.object(se.StormJobEngine,
|
||||
'_generate_topology_name',
|
||||
autospec=True,
|
||||
return_value=(
|
||||
"MyJob_ed8347a9-39aa-477c-8108-066202eb6130"))
|
||||
@mock.patch('sahara.conductor.API.job_execution_get')
|
||||
@mock.patch('sahara.utils.remote.get_remote')
|
||||
@mock.patch('sahara.service.edp.job_utils.create_workflow_dir')
|
||||
@mock.patch('sahara.plugins.utils.get_instance')
|
||||
@mock.patch('sahara.conductor.API.job_get')
|
||||
@mock.patch('sahara.context.ctx', return_value="ctx")
|
||||
def _setup_run_job(self, master_instance, job_configs, files,
|
||||
ctx, job_get, get_instance, create_workflow_dir,
|
||||
get_remote, job_exec_get, _generate_topology_name):
|
||||
|
||||
def _upload_job_files(where, job_dir, job,
|
||||
libs_subdir=True, job_configs=None):
|
||||
paths = [os.path.join(self.workflow_dir, f) for f in files['jars']]
|
||||
return paths
|
||||
|
||||
job = mock.Mock()
|
||||
job.name = "MyJob"
|
||||
job_get.return_value = job
|
||||
|
||||
job_exec = mock.Mock()
|
||||
job_exec.job_configs = job_configs
|
||||
|
||||
create_workflow_dir.return_value = self.workflow_dir
|
||||
|
||||
# This is to mock "with remote.get_remote(master) as r" in run_job
|
||||
get_remote.return_value.__enter__ = mock.Mock(
|
||||
return_value=master_instance)
|
||||
get_instance.return_value = master_instance
|
||||
|
||||
eng = se.StormJobEngine("cluster")
|
||||
eng._upload_job_files = mock.Mock()
|
||||
eng._upload_job_files.side_effect = _upload_job_files
|
||||
status = eng.run_job(job_exec)
|
||||
|
||||
# Check that we launch on the master node
|
||||
get_instance.assert_called_with("cluster", "nimbus")
|
||||
|
||||
return status
|
||||
|
||||
def test_run_job_raise(self):
|
||||
job_configs = {
|
||||
'configs': {"edp.java.main_class": "org.me.myclass",
|
||||
"topology_name": "topology_name"},
|
||||
}
|
||||
|
||||
files = {'jars': ["app.jar"]}
|
||||
|
||||
# The object representing the storm master node
|
||||
# The storm jar command will be run on this instance
|
||||
master_instance = self._make_master_instance(return_code=1)
|
||||
|
||||
# If execute_command returns an error we should get a raise
|
||||
self.assertRaises(ex.EDPError,
|
||||
self._setup_run_job,
|
||||
master_instance, job_configs, files)
|
||||
|
||||
def test_run_job(self):
|
||||
job_configs = {
|
||||
'configs': {"edp.java.main_class": "org.me.myclass"}
|
||||
}
|
||||
|
||||
files = {'jars': ["app.jar"]}
|
||||
|
||||
# The object representing the storm master node
|
||||
# The storm jar command will be run on this instance
|
||||
master_instance = self._make_master_instance()
|
||||
status = self._setup_run_job(master_instance, job_configs, files)
|
||||
|
||||
# Check the command
|
||||
master_instance.execute_command.assert_called_with(
|
||||
'cd %(workflow_dir)s; '
|
||||
'./launch_command /usr/local/storm/bin/storm jar '
|
||||
'-c nimbus.host=master '
|
||||
'app.jar org.me.myclass %(topology_name)s '
|
||||
'> /dev/null 2>&1 & echo $!' % {"workflow_dir": self.workflow_dir,
|
||||
"topology_name": (
|
||||
self.storm_topology_name)})
|
||||
|
||||
# Check result here
|
||||
self.assertEqual(("%s@%s" % (self.storm_topology_name,
|
||||
self.master_inst),
|
||||
edp.JOB_STATUS_RUNNING,
|
||||
{"storm-path": self.workflow_dir}), status)
|
||||
|
||||
def test_run_job_args(self):
|
||||
job_configs = {
|
||||
'configs': {"edp.java.main_class": "org.me.myclass"},
|
||||
'args': ['input_arg', 'output_arg']
|
||||
}
|
||||
|
||||
files = {'jars': ["app.jar"]}
|
||||
|
||||
# The object representing the spark master node
|
||||
# The spark-submit command will be run on this instance
|
||||
master_instance = self._make_master_instance()
|
||||
status = self._setup_run_job(master_instance, job_configs, files)
|
||||
|
||||
# Check the command
|
||||
master_instance.execute_command.assert_called_with(
|
||||
'cd %(workflow_dir)s; '
|
||||
'./launch_command /usr/local/storm/bin/storm jar '
|
||||
'-c nimbus.host=master '
|
||||
'app.jar org.me.myclass %(topology_name)s input_arg output_arg '
|
||||
'> /dev/null 2>&1 & echo $!' % {"workflow_dir": self.workflow_dir,
|
||||
"topology_name": (
|
||||
self.storm_topology_name)})
|
||||
|
||||
# Check result here
|
||||
self.assertEqual(("%s@%s" % (self.storm_topology_name,
|
||||
self.master_inst),
|
||||
edp.JOB_STATUS_RUNNING,
|
||||
{"storm-path": self.workflow_dir}), status)
|
@ -44,6 +44,7 @@ JOB_TYPE_HIVE = 'Hive'
|
||||
JOB_TYPE_JAVA = 'Java'
|
||||
JOB_TYPE_MAPREDUCE = 'MapReduce'
|
||||
JOB_TYPE_SPARK = 'Spark'
|
||||
JOB_TYPE_STORM = 'Storm'
|
||||
JOB_TYPE_MAPREDUCE_STREAMING = (JOB_TYPE_MAPREDUCE + JOB_TYPE_SEP +
|
||||
JOB_SUBTYPE_STREAMING)
|
||||
JOB_TYPE_PIG = 'Pig'
|
||||
@ -57,7 +58,8 @@ JOB_TYPES_ALL = [
|
||||
JOB_TYPE_MAPREDUCE_STREAMING,
|
||||
JOB_TYPE_PIG,
|
||||
JOB_TYPE_SHELL,
|
||||
JOB_TYPE_SPARK
|
||||
JOB_TYPE_SPARK,
|
||||
JOB_TYPE_STORM
|
||||
]
|
||||
|
||||
ADAPT_FOR_OOZIE = 'edp.java.adapt_for_oozie'
|
||||
|
Loading…
Reference in New Issue
Block a user