Merge "Added new unittest to oozie module"
This commit is contained in:
@@ -93,6 +93,10 @@ def _create_job_exec(job_id, type, configs=None, info=None):
|
||||
j_exec.job_id = job_id
|
||||
j_exec.job_configs = configs
|
||||
j_exec.info = info
|
||||
j_exec.input_id = 4
|
||||
j_exec.output_id = 5
|
||||
j_exec.engine_job_id = None
|
||||
j_exec.data_source_urls = {}
|
||||
if not j_exec.job_configs:
|
||||
j_exec.job_configs = {}
|
||||
if edp.compare_job_type(type, edp.JOB_TYPE_JAVA):
|
||||
|
||||
@@ -15,6 +15,7 @@
|
||||
|
||||
import mock
|
||||
|
||||
from sahara import context as ctx
|
||||
from sahara.plugins import base as pb
|
||||
from sahara.service.edp.oozie import engine as oe
|
||||
from sahara.tests.unit import base
|
||||
@@ -27,6 +28,20 @@ class TestOozieEngine(base.SaharaTestCase):
|
||||
super(TestOozieEngine, self).setUp()
|
||||
pb.setup_plugins()
|
||||
|
||||
def test_get_job_status(self):
|
||||
oje = FakeOozieJobEngine(u.create_cluster())
|
||||
client_class = mock.MagicMock()
|
||||
client_class.add_job = mock.MagicMock(return_value=1)
|
||||
client_class.get_job_info = mock.MagicMock(
|
||||
return_value={'status': 'PENDING'})
|
||||
oje._get_client = mock.MagicMock(return_value=client_class)
|
||||
|
||||
_, job_exec = u.create_job_exec(edp.JOB_TYPE_PIG)
|
||||
self.assertIsNone(oje.get_job_status(job_exec))
|
||||
|
||||
job_exec.engine_job_id = 1
|
||||
self.assertEqual({'status': 'PENDING'}, oje.get_job_status(job_exec))
|
||||
|
||||
def test_add_postfix(self):
|
||||
oje = FakeOozieJobEngine(u.create_cluster())
|
||||
|
||||
@@ -65,6 +80,14 @@ class TestOozieEngine(base.SaharaTestCase):
|
||||
self.assertEqual("hdfs://localhost:8020/user/"
|
||||
"sahara-hbase-lib", job_params['oozie.libpath'])
|
||||
|
||||
job_execution_type = 'scheduled'
|
||||
job_params = oje._get_oozie_job_params('hadoop',
|
||||
'/tmp', oozie_params, True,
|
||||
scheduled_params, job_dir,
|
||||
job_execution_type)
|
||||
for i in ["start", "end", "frequency"]:
|
||||
self.assertEqual(scheduled_params[i], job_params[i])
|
||||
|
||||
@mock.patch('sahara.utils.remote.get_remote')
|
||||
@mock.patch('sahara.utils.ssh_remote.InstanceInteropHelper')
|
||||
@mock.patch('sahara.conductor.API.job_binary_internal_get_raw_data')
|
||||
@@ -93,6 +116,16 @@ class TestOozieEngine(base.SaharaTestCase):
|
||||
'hdfs')
|
||||
self.assertEqual("test/workflow.xml", res)
|
||||
|
||||
@mock.patch('sahara.utils.remote.get_remote')
|
||||
def test_upload_coordinator_file(self, remote_get):
|
||||
oje = FakeOozieJobEngine(u.create_cluster())
|
||||
remote_class = mock.MagicMock()
|
||||
remote_class.__exit__.return_value = 'closed'
|
||||
remote_get.return_value = remote_class
|
||||
res = oje._upload_coordinator_file(remote_get, "test", "hadoop.xml",
|
||||
'hdfs')
|
||||
self.assertEqual("test/coordinator.xml", res)
|
||||
|
||||
@mock.patch('sahara.utils.remote.get_remote')
|
||||
def test_hdfs_create_workflow_dir(self, remote):
|
||||
remote_class = mock.MagicMock()
|
||||
@@ -143,6 +176,72 @@ class TestOozieEngine(base.SaharaTestCase):
|
||||
oje.cancel_job(job_exec)
|
||||
self.assertEqual(1, kill_get.call_count)
|
||||
|
||||
@mock.patch('sahara.service.edp.oozie.workflow_creator.'
|
||||
'workflow_factory.get_workflow_xml')
|
||||
@mock.patch('sahara.utils.remote.get_remote')
|
||||
@mock.patch('sahara.conductor.API.job_execution_update')
|
||||
@mock.patch('sahara.conductor.API.data_source_get')
|
||||
@mock.patch('sahara.conductor.API.job_get')
|
||||
def test_prepare_run_job(self, job, data_source, update,
|
||||
remote, wf_factory):
|
||||
wf_factory.return_value = mock.MagicMock()
|
||||
|
||||
remote_class = mock.MagicMock()
|
||||
remote_class.__exit__.return_value = 'closed'
|
||||
remote.return_value = remote_class
|
||||
|
||||
job_class = mock.MagicMock()
|
||||
job_class.name = "myJob"
|
||||
job.return_value = job_class
|
||||
|
||||
source = mock.MagicMock()
|
||||
source.url = "localhost"
|
||||
|
||||
data_source.return_value = source
|
||||
oje = FakeOozieJobEngine(u.create_cluster())
|
||||
_, job_exec = u.create_job_exec(edp.JOB_TYPE_PIG)
|
||||
update.return_value = job_exec
|
||||
|
||||
res = oje._prepare_run_job(job_exec)
|
||||
self.assertEqual(ctx.ctx(), res['context'])
|
||||
self.assertEqual('hadoop', res['hdfs_user'])
|
||||
self.assertEqual(job_exec, res['job_execution'])
|
||||
self.assertEqual({}, res['oozie_params'])
|
||||
|
||||
@mock.patch('sahara.service.edp.oozie.workflow_creator.'
|
||||
'workflow_factory.get_workflow_xml')
|
||||
@mock.patch('sahara.utils.remote.get_remote')
|
||||
@mock.patch('sahara.conductor.API.job_execution_update')
|
||||
@mock.patch('sahara.conductor.API.data_source_get')
|
||||
@mock.patch('sahara.conductor.API.job_get')
|
||||
@mock.patch('sahara.conductor.API.job_execution_get')
|
||||
def test_run_job(self, exec_get, job, data_source,
|
||||
update, remote, wf_factory):
|
||||
wf_factory.return_value = mock.MagicMock()
|
||||
remote_class = mock.MagicMock()
|
||||
remote_class.__exit__.return_value = 'closed'
|
||||
remote.return_value = remote_class
|
||||
|
||||
job_class = mock.MagicMock()
|
||||
job.return_value = job_class
|
||||
job.name = "myJob"
|
||||
|
||||
source = mock.MagicMock()
|
||||
source.url = "localhost"
|
||||
data_source.return_value = source
|
||||
|
||||
oje = FakeOozieJobEngine(u.create_cluster())
|
||||
client_class = mock.MagicMock()
|
||||
client_class.add_job = mock.MagicMock(return_value=1)
|
||||
client_class.get_job_info = mock.MagicMock(
|
||||
return_value={'status': 'PENDING'})
|
||||
oje._get_client = mock.MagicMock(return_value=client_class)
|
||||
|
||||
_, job_exec = u.create_job_exec(edp.JOB_TYPE_PIG)
|
||||
update.return_value = job_exec
|
||||
|
||||
self.assertEqual((1, 'PENDING', None), oje.run_job(job_exec))
|
||||
|
||||
|
||||
class FakeOozieJobEngine(oe.OozieJobEngine):
|
||||
def get_hdfs_user(self):
|
||||
|
||||
Reference in New Issue
Block a user