diff --git a/saharaclient/api/job_executions.py b/saharaclient/api/job_executions.py index c5af07e9..37b2b06f 100644 --- a/saharaclient/api/job_executions.py +++ b/saharaclient/api/job_executions.py @@ -33,11 +33,13 @@ class JobExecutionsManager(base.ResourceManager): def delete(self, obj_id): self._delete('/job-executions/%s' % obj_id) - def create(self, job_id, cluster_id, input_id, output_id, configs): + def create(self, job_id, cluster_id, input_id, + output_id, configs, interface=None): url = "/jobs/%s/execute" % job_id data = { "cluster_id": cluster_id, - "job_configs": configs + "job_configs": configs, + "interface": interface or {} } # Leave these out if they are null. For Java job types they diff --git a/saharaclient/api/jobs.py b/saharaclient/api/jobs.py index 6e10ccd5..44bdd3e5 100644 --- a/saharaclient/api/jobs.py +++ b/saharaclient/api/jobs.py @@ -23,15 +23,15 @@ class Job(base.Resource): class JobsManager(base.ResourceManager): resource_class = Job - def create(self, name, type, mains, libs, description): + def create(self, name, type, mains, libs, description, interface=None): data = { 'name': name, 'type': type, 'description': description, 'mains': mains, - 'libs': libs + 'libs': libs, + 'interface': interface or [] } - return self._create('/jobs', data, 'job') def list(self, search_opts=None): diff --git a/saharaclient/api/shell.py b/saharaclient/api/shell.py index 3b46057b..da00ecc8 100644 --- a/saharaclient/api/shell.py +++ b/saharaclient/api/shell.py @@ -756,10 +756,8 @@ def do_job_template_show(cs, args): @utils.arg('--name', - required=True, help='Name of the job template.') @utils.arg('--type', - required=True, help='Type of the job template.') @utils.arg('--main', action='append', @@ -772,11 +770,28 @@ def do_job_template_show(cs, args): @utils.arg('--description', default='', help='Description of the job template.') +@utils.arg('--json', + default=None, + type=argparse.FileType('r'), + help='JSON representation of job template.') def do_job_template_create(cs, args): """Create a job template.""" - _show_job_template(cs.jobs.create(args.name, args.type, - args.main, args.lib, - args.description)) + template = json.loads(args.json.read()) if args.json else {} + _filter_call_args(template, cs.jobs.create) + template = { + "name": args.name or template.get("name") or None, + "type": args.type or template.get("type") or None, + "mains": args.main or template.get("mains") or [], + "libs": args.lib or template.get("libs") or [], + "description": args.description or template.get("description") or '', + "interface": template.get("interface") or [] + } + if not template["name"]: + raise Exception("name is required") + if not template["type"]: + raise Exception("type is required") + + _show_job_template(cs.jobs.create(**template)) @utils.arg('--name', @@ -830,7 +845,7 @@ def do_job_show(cs, args): required=True, help='ID of the job template to run.') @utils.arg('--cluster', - required=True, + required=False, help='ID of the cluster to run the job in.') @utils.arg('--input-data', default=None, @@ -852,14 +867,30 @@ def do_job_show(cs, args): action='append', default=[], help='Config parameters to add to the job, repeatable.') +@utils.arg('--json', + default=None, + type=argparse.FileType('r'), + help='JSON representation of the job.') def do_job_create(cs, args): """Create a job.""" + job = json.loads(args.json.read()) if args.json else {} + remap = {"job_configs": "configs"} + _filter_call_args(job, cs.job_executions.create, remap) _convert = lambda ls: dict(map(lambda i: i.split('=', 1), ls)) - _show_job(cs.job_executions.create(args.job_template, args.cluster, - args.input_data, args.output_data, - {'params': _convert(args.param), - 'args': args.arg, - 'configs': _convert(args.config)})) + job = { + "cluster_id": args.cluster or job.get("cluster_id") or None, + "input_id": args.input_data or job.get("input_id") or None, + "output_id": args.output_data or job.get("output_id") or None, + "interface": job.get("interface") or [], + "configs": job.get("configs") or {} + } + if any((args.config, args.param, args.arg)): + job["configs"] = {"configs": _convert(args.config), + "args": args.arg, + "params": _convert(args.param)} + if not job["cluster_id"]: + raise Exception("cluster is required") + _show_job(cs.job_executions.create(args.job_template, **job)) @utils.arg('--id', diff --git a/saharaclient/tests/integration/tests/clidriver.py b/saharaclient/tests/integration/tests/clidriver.py index c40720bd..6089ccbc 100644 --- a/saharaclient/tests/integration/tests/clidriver.py +++ b/saharaclient/tests/integration/tests/clidriver.py @@ -17,8 +17,6 @@ import logging import shlex import subprocess -import six - from saharaclient.tests.integration.configs import config as cfg cfg = cfg.ITConfig() @@ -144,34 +142,16 @@ class CLICommands(CommandBase): params = '--id %s' % id return self.sahara('job-binary-data-delete', params=params) - def job_template_create(self, name, jobtype, main='', lib=''): - params = '--name %s --type %s' % (name, jobtype) - if main: - params += ' --main %s' % main - if lib: - params += ' --lib %s' % lib + def job_template_create(self, filename): + params = '--json %s' % (filename) return self.sahara('job-template-create', params=params) def job_template_delete(self, id): params = '--id %s' % id return self.sahara('job-template-delete', params=params) - def job_create(self, job_id, cluster_id, input_id='', output_id='', - job_params=None, args=None, configs=None): - params = '--job-template %s --cluster %s' % (job_id, cluster_id) - if input_id: - params += ' --input-data %s' % input_id - if output_id: - params += ' --output-data %s' % output_id - if job_params: - for k, v in six.iteritems(params): - params += ' --param %s=%s' % (k, v) - if args: - for arg in args: - params += ' --arg %s' % arg - if configs: - for k, v in six.iteritems(configs): - params += ' --config %s=%s' % (k, v) + def job_create(self, job_template_id, filename): + params = '--job-template %s --json %s' % (job_template_id, filename) return self.sahara('job-create', params=params) def job_delete(self, id): diff --git a/saharaclient/tests/integration/tests/edp.py b/saharaclient/tests/integration/tests/edp.py index 917b2e9f..ab006daa 100644 --- a/saharaclient/tests/integration/tests/edp.py +++ b/saharaclient/tests/integration/tests/edp.py @@ -94,9 +94,12 @@ class EDPTest(base.ITestBase): return info def edp_common(self, job_type, lib=None, main=None, configs=None, - add_data_sources=True, pass_data_sources_as_args=False): + add_data_sources=True, pass_data_sources_as_args=False, + job_interface=None, execution_interface=None): # Generate a new marker for this so we can keep containers separate # and create some input data + job_interface = job_interface or [] + execution_interface = execution_interface or {} marker = "%s-%s" % (job_type.replace(".", ""), os.getpid()) container = self.swift_utils.create_container(marker) self.swift_utils.generate_input(container, 'input') @@ -127,12 +130,17 @@ class EDPTest(base.ITestBase): # Create a job template job_template_name = marker - self.cli.job_template_create(job_template_name, - job_type, - main=self.main_binary and ( - self.main_binary.id or ''), - lib=self.lib_binary and ( - self.lib_binary.id or '')) + job_template_dict = { + "name": job_template_name, + "type": job_type, + "mains": [self.main_binary.id] if (self.main_binary and + self.main_binary.id) else [], + "libs": [self.lib_binary.id] if (self.lib_binary and + self.lib_binary.id) else [], + "interface": job_interface + } + f = self.util.generate_json_file(job_template_dict) + self.cli.job_template_create(f.name) self.job_template = self.util.find_job_template_by_name( job_template_name) self.assertIsNotNone(self.job_template) @@ -152,14 +160,19 @@ class EDPTest(base.ITestBase): args = [input_url, output_url] else: args = None - self.cli.job_create(self.job_template.id, - self.cluster.id, - input_id=self.input_source and ( - self.input_source.id or ''), - output_id=self.output_source and ( - self.output_source.id or ''), - args=args, - configs=configs) + + job_dict = { + "cluster_id": self.cluster.id, + "input_id": self.input_source and ( + self.input_source.id or None), + "output_id": self.output_source and ( + self.output_source.id or None), + "job_configs": {"configs": configs, + "args": args}, + "interface": execution_interface + } + f = self.util.generate_json_file(job_dict) + self.cli.job_create(self.job_template.id, f.name) # Find the job using the job_template_id self.job = self.util.find_job_by_job_template_id(self.job_template.id) @@ -197,16 +210,25 @@ class EDPTest(base.ITestBase): def java_edp(self): configs = { - 'fs.swift.service.sahara.username': common.OS_USERNAME, 'fs.swift.service.sahara.password': common.OS_PASSWORD, 'edp.java.main_class': 'org.openstack.sahara.examples.WordCount' } + job_interface = [{ + "name": "Swift Username", + "mapping_type": "configs", + "location": "fs.swift.service.sahara.username", + "value_type": "string", + "required": True + }] + execution_interface = {"Swift Username": common.OS_USERNAME} self.edp_common('Java', lib=self._binary_info('edp-java.jar', relative_path='edp-java/'), configs=configs, add_data_sources=False, - pass_data_sources_as_args=True) + pass_data_sources_as_args=True, + job_interface=job_interface, + execution_interface=execution_interface) def run_edp_jobs(self, config): try: diff --git a/saharaclient/tests/unit/test_job_executions.py b/saharaclient/tests/unit/test_job_executions.py index 7f545b11..8ac48fd1 100644 --- a/saharaclient/tests/unit/test_job_executions.py +++ b/saharaclient/tests/unit/test_job_executions.py @@ -23,12 +23,14 @@ class JobExecutionTest(base.BaseTestCase): 'job_id': 'job_id', 'cluster_id': 'cluster_id', 'configs': {}, + 'interface': {}, 'input_id': None, 'output_id': None } response = { 'cluster_id': 'cluster_id', 'job_configs': {}, + 'interface': {} } def test_create_job_execution_with_io(self): diff --git a/saharaclient/tests/unit/test_jobs.py b/saharaclient/tests/unit/test_jobs.py index 13dcfb2a..62ff4b69 100644 --- a/saharaclient/tests/unit/test_jobs.py +++ b/saharaclient/tests/unit/test_jobs.py @@ -24,7 +24,8 @@ class JobTest(base.BaseTestCase): 'type': 'pig', 'mains': ['job_binary_id'], 'libs': [], - 'description': 'descr' + 'description': 'descr', + 'interface': [] } def test_create_job(self): @@ -65,7 +66,8 @@ class JobTest(base.BaseTestCase): "job_config": { "args": [], "configs": [] - } + }, + "interface": [] } self.responses.get(url, json=response)