Adding interface argument for job template and job

Partially implements: blueprint unified-job-interface-map

Change-Id: Ie61ae1799458056fff7390895bc929efb5a53381
This commit is contained in:
Ethan Gafford
2015-06-11 16:53:27 -04:00
parent 26cf2237fc
commit 7435a98a0c
7 changed files with 98 additions and 59 deletions

View File

@@ -33,11 +33,13 @@ class JobExecutionsManager(base.ResourceManager):
def delete(self, obj_id):
self._delete('/job-executions/%s' % obj_id)
def create(self, job_id, cluster_id, input_id, output_id, configs):
def create(self, job_id, cluster_id, input_id,
output_id, configs, interface=None):
url = "/jobs/%s/execute" % job_id
data = {
"cluster_id": cluster_id,
"job_configs": configs
"job_configs": configs,
"interface": interface or {}
}
# Leave these out if they are null. For Java job types they

View File

@@ -23,15 +23,15 @@ class Job(base.Resource):
class JobsManager(base.ResourceManager):
resource_class = Job
def create(self, name, type, mains, libs, description):
def create(self, name, type, mains, libs, description, interface=None):
data = {
'name': name,
'type': type,
'description': description,
'mains': mains,
'libs': libs
'libs': libs,
'interface': interface or []
}
return self._create('/jobs', data, 'job')
def list(self, search_opts=None):

View File

@@ -756,10 +756,8 @@ def do_job_template_show(cs, args):
@utils.arg('--name',
required=True,
help='Name of the job template.')
@utils.arg('--type',
required=True,
help='Type of the job template.')
@utils.arg('--main',
action='append',
@@ -772,11 +770,28 @@ def do_job_template_show(cs, args):
@utils.arg('--description',
default='',
help='Description of the job template.')
@utils.arg('--json',
default=None,
type=argparse.FileType('r'),
help='JSON representation of job template.')
def do_job_template_create(cs, args):
"""Create a job template."""
_show_job_template(cs.jobs.create(args.name, args.type,
args.main, args.lib,
args.description))
template = json.loads(args.json.read()) if args.json else {}
_filter_call_args(template, cs.jobs.create)
template = {
"name": args.name or template.get("name") or None,
"type": args.type or template.get("type") or None,
"mains": args.main or template.get("mains") or [],
"libs": args.lib or template.get("libs") or [],
"description": args.description or template.get("description") or '',
"interface": template.get("interface") or []
}
if not template["name"]:
raise Exception("name is required")
if not template["type"]:
raise Exception("type is required")
_show_job_template(cs.jobs.create(**template))
@utils.arg('--name',
@@ -830,7 +845,7 @@ def do_job_show(cs, args):
required=True,
help='ID of the job template to run.')
@utils.arg('--cluster',
required=True,
required=False,
help='ID of the cluster to run the job in.')
@utils.arg('--input-data',
default=None,
@@ -852,14 +867,30 @@ def do_job_show(cs, args):
action='append',
default=[],
help='Config parameters to add to the job, repeatable.')
@utils.arg('--json',
default=None,
type=argparse.FileType('r'),
help='JSON representation of the job.')
def do_job_create(cs, args):
"""Create a job."""
job = json.loads(args.json.read()) if args.json else {}
remap = {"job_configs": "configs"}
_filter_call_args(job, cs.job_executions.create, remap)
_convert = lambda ls: dict(map(lambda i: i.split('=', 1), ls))
_show_job(cs.job_executions.create(args.job_template, args.cluster,
args.input_data, args.output_data,
{'params': _convert(args.param),
'args': args.arg,
'configs': _convert(args.config)}))
job = {
"cluster_id": args.cluster or job.get("cluster_id") or None,
"input_id": args.input_data or job.get("input_id") or None,
"output_id": args.output_data or job.get("output_id") or None,
"interface": job.get("interface") or [],
"configs": job.get("configs") or {}
}
if any((args.config, args.param, args.arg)):
job["configs"] = {"configs": _convert(args.config),
"args": args.arg,
"params": _convert(args.param)}
if not job["cluster_id"]:
raise Exception("cluster is required")
_show_job(cs.job_executions.create(args.job_template, **job))
@utils.arg('--id',

View File

@@ -17,8 +17,6 @@ import logging
import shlex
import subprocess
import six
from saharaclient.tests.integration.configs import config as cfg
cfg = cfg.ITConfig()
@@ -144,34 +142,16 @@ class CLICommands(CommandBase):
params = '--id %s' % id
return self.sahara('job-binary-data-delete', params=params)
def job_template_create(self, name, jobtype, main='', lib=''):
params = '--name %s --type %s' % (name, jobtype)
if main:
params += ' --main %s' % main
if lib:
params += ' --lib %s' % lib
def job_template_create(self, filename):
params = '--json %s' % (filename)
return self.sahara('job-template-create', params=params)
def job_template_delete(self, id):
params = '--id %s' % id
return self.sahara('job-template-delete', params=params)
def job_create(self, job_id, cluster_id, input_id='', output_id='',
job_params=None, args=None, configs=None):
params = '--job-template %s --cluster %s' % (job_id, cluster_id)
if input_id:
params += ' --input-data %s' % input_id
if output_id:
params += ' --output-data %s' % output_id
if job_params:
for k, v in six.iteritems(params):
params += ' --param %s=%s' % (k, v)
if args:
for arg in args:
params += ' --arg %s' % arg
if configs:
for k, v in six.iteritems(configs):
params += ' --config %s=%s' % (k, v)
def job_create(self, job_template_id, filename):
params = '--job-template %s --json %s' % (job_template_id, filename)
return self.sahara('job-create', params=params)
def job_delete(self, id):

View File

@@ -94,9 +94,12 @@ class EDPTest(base.ITestBase):
return info
def edp_common(self, job_type, lib=None, main=None, configs=None,
add_data_sources=True, pass_data_sources_as_args=False):
add_data_sources=True, pass_data_sources_as_args=False,
job_interface=None, execution_interface=None):
# Generate a new marker for this so we can keep containers separate
# and create some input data
job_interface = job_interface or []
execution_interface = execution_interface or {}
marker = "%s-%s" % (job_type.replace(".", ""), os.getpid())
container = self.swift_utils.create_container(marker)
self.swift_utils.generate_input(container, 'input')
@@ -127,12 +130,17 @@ class EDPTest(base.ITestBase):
# Create a job template
job_template_name = marker
self.cli.job_template_create(job_template_name,
job_type,
main=self.main_binary and (
self.main_binary.id or ''),
lib=self.lib_binary and (
self.lib_binary.id or ''))
job_template_dict = {
"name": job_template_name,
"type": job_type,
"mains": [self.main_binary.id] if (self.main_binary and
self.main_binary.id) else [],
"libs": [self.lib_binary.id] if (self.lib_binary and
self.lib_binary.id) else [],
"interface": job_interface
}
f = self.util.generate_json_file(job_template_dict)
self.cli.job_template_create(f.name)
self.job_template = self.util.find_job_template_by_name(
job_template_name)
self.assertIsNotNone(self.job_template)
@@ -152,14 +160,19 @@ class EDPTest(base.ITestBase):
args = [input_url, output_url]
else:
args = None
self.cli.job_create(self.job_template.id,
self.cluster.id,
input_id=self.input_source and (
self.input_source.id or ''),
output_id=self.output_source and (
self.output_source.id or ''),
args=args,
configs=configs)
job_dict = {
"cluster_id": self.cluster.id,
"input_id": self.input_source and (
self.input_source.id or None),
"output_id": self.output_source and (
self.output_source.id or None),
"job_configs": {"configs": configs,
"args": args},
"interface": execution_interface
}
f = self.util.generate_json_file(job_dict)
self.cli.job_create(self.job_template.id, f.name)
# Find the job using the job_template_id
self.job = self.util.find_job_by_job_template_id(self.job_template.id)
@@ -197,16 +210,25 @@ class EDPTest(base.ITestBase):
def java_edp(self):
configs = {
'fs.swift.service.sahara.username': common.OS_USERNAME,
'fs.swift.service.sahara.password': common.OS_PASSWORD,
'edp.java.main_class': 'org.openstack.sahara.examples.WordCount'
}
job_interface = [{
"name": "Swift Username",
"mapping_type": "configs",
"location": "fs.swift.service.sahara.username",
"value_type": "string",
"required": True
}]
execution_interface = {"Swift Username": common.OS_USERNAME}
self.edp_common('Java',
lib=self._binary_info('edp-java.jar',
relative_path='edp-java/'),
configs=configs,
add_data_sources=False,
pass_data_sources_as_args=True)
pass_data_sources_as_args=True,
job_interface=job_interface,
execution_interface=execution_interface)
def run_edp_jobs(self, config):
try:

View File

@@ -23,12 +23,14 @@ class JobExecutionTest(base.BaseTestCase):
'job_id': 'job_id',
'cluster_id': 'cluster_id',
'configs': {},
'interface': {},
'input_id': None,
'output_id': None
}
response = {
'cluster_id': 'cluster_id',
'job_configs': {},
'interface': {}
}
def test_create_job_execution_with_io(self):

View File

@@ -24,7 +24,8 @@ class JobTest(base.BaseTestCase):
'type': 'pig',
'mains': ['job_binary_id'],
'libs': [],
'description': 'descr'
'description': 'descr',
'interface': []
}
def test_create_job(self):
@@ -65,7 +66,8 @@ class JobTest(base.BaseTestCase):
"job_config": {
"args": [],
"configs": []
}
},
"interface": []
}
self.responses.get(url, json=response)