python-saharaclient/saharaclient/tests/unit/osc/v1/test_jobs.py

366 lines
13 KiB
Python

# Copyright (c) 2015 Mirantis Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from osc_lib.tests import utils as osc_utils
from unittest import mock
from saharaclient.api import job_executions as api_je
from saharaclient.osc.v1 import jobs as osc_je
from saharaclient.tests.unit.osc.v1 import fakes
JOB_EXECUTION_INFO = {
"is_public": False,
"id": "je_id",
"interface": [],
"is_protected": False,
"input_id": 'input_id',
"output_id": 'output_id',
"job_id": "job_id",
"cluster_id": 'cluster_id',
"start_time": "start",
"end_time": "end",
"engine_job_id": "engine_job_id",
"info": {
"status": 'SUCCEEDED'
},
"job_configs": {
"configs": {
"config1": "1",
"config2": "2"
},
"args": [
"arg1",
"arg2"
],
"params": {
"param2": "value2",
"param1": "value1"
}
}
}
class TestJobs(fakes.TestDataProcessing):
def setUp(self):
super(TestJobs, self).setUp()
self.je_mock = self.app.client_manager.data_processing.job_executions
self.je_mock.reset_mock()
self.app.api_version['data_processing'] = '1'
class TestExecuteJob(TestJobs):
# TODO(apavlov): check for execution with --interface, --configs, --json
def setUp(self):
super(TestExecuteJob, self).setUp()
self.je_mock.create.return_value = api_je.JobExecution(
None, JOB_EXECUTION_INFO)
self.ds_mock = self.app.client_manager.data_processing.data_sources
self.ds_mock.find_unique.return_value = mock.Mock(id='ds_id')
self.c_mock = self.app.client_manager.data_processing.clusters
self.c_mock.find_unique.return_value = mock.Mock(id='cluster_id')
self.jt_mock = self.app.client_manager.data_processing.jobs
self.jt_mock.find_unique.return_value = mock.Mock(id='job_id')
self.ds_mock.reset_mock()
self.c_mock.reset_mock()
self.jt_mock.reset_mock()
# Command to test
self.cmd = osc_je.ExecuteJob(self.app, None)
def test_job_execute_minimum_options(self):
arglist = ['--job-template', 'job-template', '--cluster', 'cluster']
verifylist = [('job_template', 'job-template'), ('cluster', 'cluster')]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
self.cmd.take_action(parsed_args)
# Check that correct arguments were passed
self.je_mock.create.assert_called_once_with(
cluster_id='cluster_id', configs={}, input_id=None,
interface=None, is_protected=False, is_public=False,
job_id='job_id', output_id=None)
def test_job_execute_with_input_output_option(self):
arglist = ['--job-template', 'job-template', '--cluster', 'cluster',
'--input', 'input', '--output', 'output']
verifylist = [('job_template', 'job-template'), ('cluster', 'cluster'),
('input', 'input'), ('output', 'output')]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
self.cmd.take_action(parsed_args)
self.je_mock.create.assert_called_once_with(
cluster_id='cluster_id', configs={}, input_id='ds_id',
interface=None, is_protected=False, is_public=False,
job_id='job_id', output_id='ds_id')
# without option --output
arglist = ['--job-template', 'job-template', '--cluster', 'cluster',
'--input', 'input']
verifylist = [('job_template', 'job-template'), ('cluster', 'cluster'),
('input', 'input')]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
self.cmd.take_action(parsed_args)
self.je_mock.create.assert_called_with(
cluster_id='cluster_id', configs={}, input_id='ds_id',
interface=None, is_protected=False, is_public=False,
job_id='job_id', output_id=None)
# without options --output and --input
arglist = ['--job-template', 'job-template', '--cluster', 'cluster']
verifylist = [('job_template', 'job-template'), ('cluster', 'cluster')]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
self.cmd.take_action(parsed_args)
self.je_mock.create.assert_called_with(
cluster_id='cluster_id', configs={}, input_id=None,
interface=None, is_protected=False, is_public=False,
job_id='job_id', output_id=None)
def test_job_execute_all_options(self):
arglist = ['--job-template', 'job-template', '--cluster', 'cluster',
'--input', 'input', '--output', 'output', '--params',
'param1:value1', 'param2:value2', '--args', 'arg1', 'arg2',
'--configs', 'config1:1', 'config2:2', '--public',
'--protected']
verifylist = [('job_template', 'job-template'), ('cluster', 'cluster'),
('input', 'input'), ('output', 'output'),
('params', ['param1:value1', 'param2:value2']),
('args', ['arg1', 'arg2']),
('configs', ['config1:1', 'config2:2']),
('public', True),
('protected', True)]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
columns, data = self.cmd.take_action(parsed_args)
# Check that correct arguments were passed
self.je_mock.create.assert_called_once_with(
cluster_id='cluster_id',
configs={'configs': {'config1': '1', 'config2': '2'},
'args': ['arg1', 'arg2'],
'params': {'param2': 'value2', 'param1': 'value1'}},
input_id='ds_id', interface=None, is_protected=True,
is_public=True, job_id='job_id', output_id='ds_id')
# Check that columns are correct
expected_columns = ('Cluster id', 'End time', 'Engine job id', 'Id',
'Input id', 'Is protected', 'Is public',
'Job template id', 'Output id', 'Start time',
'Status')
self.assertEqual(expected_columns, columns)
# Check that data is correct
expected_data = ('cluster_id', 'end', 'engine_job_id', 'je_id',
'input_id', False, False, 'job_id', 'output_id',
'start', 'SUCCEEDED')
self.assertEqual(expected_data, data)
class TestListJobs(TestJobs):
def setUp(self):
super(TestListJobs, self).setUp()
self.je_mock.list.return_value = [api_je.JobExecution(
None, JOB_EXECUTION_INFO)]
# Command to test
self.cmd = osc_je.ListJobs(self.app, None)
def test_jobs_list_no_options(self):
arglist = []
verifylist = []
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
columns, data = self.cmd.take_action(parsed_args)
# Check that columns are correct
expected_columns = ['Id', 'Cluster id', 'Job id', 'Status']
self.assertEqual(expected_columns, columns)
# Check that data is correct
expected_data = [('je_id', 'cluster_id', 'job_id', 'SUCCEEDED')]
self.assertEqual(expected_data, list(data))
def test_jobs_list_long(self):
arglist = ['--long']
verifylist = [('long', True)]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
columns, data = self.cmd.take_action(parsed_args)
# Check that columns are correct
expected_columns = ['Id', 'Cluster id', 'Job id', 'Status',
'Start time', 'End time']
self.assertEqual(expected_columns, columns)
# Check that data is correct
expected_data = [('je_id', 'cluster_id', 'job_id', 'SUCCEEDED',
'start', 'end')]
self.assertEqual(expected_data, list(data))
def test_jobs_list_extra_search_opts(self):
arglist = ['--status', 'succeeded']
verifylist = [('status', 'succeeded')]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
columns, data = self.cmd.take_action(parsed_args)
# Check that columns are correct
expected_columns = ['Id', 'Cluster id', 'Job id', 'Status']
self.assertEqual(expected_columns, columns)
# Check that data is correct
expected_data = [('je_id', 'cluster_id', 'job_id', 'SUCCEEDED')]
self.assertEqual(expected_data, list(data))
class TestShowJob(TestJobs):
def setUp(self):
super(TestShowJob, self).setUp()
self.je_mock.get.return_value = api_je.JobExecution(
None, JOB_EXECUTION_INFO)
# Command to test
self.cmd = osc_je.ShowJob(self.app, None)
def test_job_show(self):
arglist = ['job_id']
verifylist = [('job', 'job_id')]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
columns, data = self.cmd.take_action(parsed_args)
# Check that correct arguments were passed
self.je_mock.get.assert_called_once_with('job_id')
# Check that columns are correct
expected_columns = ('Cluster id', 'End time', 'Engine job id', 'Id',
'Input id', 'Is protected', 'Is public',
'Job template id', 'Output id', 'Start time',
'Status')
self.assertEqual(expected_columns, columns)
# Check that data is correct
expected_data = ('cluster_id', 'end', 'engine_job_id', 'je_id',
'input_id', False, False, 'job_id', 'output_id',
'start', 'SUCCEEDED')
self.assertEqual(expected_data, data)
class TestDeleteJob(TestJobs):
def setUp(self):
super(TestDeleteJob, self).setUp()
self.je_mock.get.return_value = api_je.JobExecution(
None, JOB_EXECUTION_INFO)
# Command to test
self.cmd = osc_je.DeleteJob(self.app, None)
def test_job_delete(self):
arglist = ['job_id']
verifylist = [('job', ['job_id'])]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
self.cmd.take_action(parsed_args)
# Check that correct arguments were passed
self.je_mock.delete.assert_called_once_with('job_id')
class TestUpdateJob(TestJobs):
def setUp(self):
super(TestUpdateJob, self).setUp()
self.je_mock.get.return_value = api_je.JobExecution(
None, JOB_EXECUTION_INFO)
self.je_mock.update.return_value = mock.Mock(
job_execution=JOB_EXECUTION_INFO.copy())
# Command to test
self.cmd = osc_je.UpdateJob(self.app, None)
def test_job_update_no_options(self):
arglist = []
verifylist = []
self.assertRaises(osc_utils.ParserException, self.check_parser,
self.cmd, arglist, verifylist)
def test_job_update_nothing_updated(self):
arglist = ['job_id']
verifylist = [('job', 'job_id')]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
self.cmd.take_action(parsed_args)
# Check that correct arguments were passed
self.je_mock.update.assert_called_once_with('job_id')
def test_job_update_public_protected(self):
arglist = ['job_id', '--public', '--protected']
verifylist = [('job', 'job_id'), ('is_public', True),
('is_protected', True)]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
columns, data = self.cmd.take_action(parsed_args)
# Check that correct arguments were passed
self.je_mock.update.assert_called_once_with(
'job_id', is_protected=True, is_public=True)
# Check that columns are correct
expected_columns = ('Cluster id', 'End time', 'Engine job id', 'Id',
'Input id', 'Is protected', 'Is public',
'Job template id', 'Output id', 'Start time',
'Status')
self.assertEqual(expected_columns, columns)
# Check that data is correct
expected_data = ('cluster_id', 'end', 'engine_job_id', 'je_id',
'input_id', False, False, 'job_id', 'output_id',
'start', 'SUCCEEDED')
self.assertEqual(expected_data, data)
def test_job_update_private_unprotected(self):
arglist = ['job_id', '--private', '--unprotected']
verifylist = [('job', 'job_id'), ('is_public', False),
('is_protected', False)]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
self.cmd.take_action(parsed_args)
# Check that correct arguments were passed
self.je_mock.update.assert_called_once_with(
'job_id', is_protected=False, is_public=False)