Merge "Allow Sahara native urls and runtime urls to differ for datasources"

This commit is contained in:
Jenkins 2015-08-18 15:51:29 +00:00 committed by Gerrit Code Review
commit 033e2228e5
5 changed files with 70 additions and 19 deletions

View File

@ -69,17 +69,17 @@ def create_workflow_dir(where, path, job, use_uuid=None, chmod=""):
def get_data_sources(job_execution, job, data_source_urls):
def _construct(ctx, ds_id):
source = conductor.data_source_get(ctx, ds_id)
if source and source.id not in data_source_urls:
url = _construct_data_source_url(source.url, job_execution.id)
runtime_url = _runtime_url(url)
data_source_urls[source.id] = (url, runtime_url)
return source
ctx = context.ctx()
input_source = conductor.data_source_get(ctx, job_execution.input_id)
if input_source and input_source.id not in data_source_urls:
data_source_urls[input_source.id] = _construct_data_source_url(
input_source.url, job_execution.id)
output_source = conductor.data_source_get(ctx, job_execution.output_id)
if output_source and output_source.id not in data_source_urls:
data_source_urls[output_source.id] = _construct_data_source_url(
output_source.url, job_execution.id)
input_source = _construct(ctx, job_execution.input_id)
output_source = _construct(ctx, job_execution.output_id)
return input_source, output_source
@ -232,10 +232,11 @@ def resolve_data_source_references(job_configs, job_exec_id, data_source_urls):
ds = ds[0]
ds_seen[ds.id] = ds
if ds.id not in data_source_urls:
data_source_urls[ds.id] = _construct_data_source_url(
ds.url, job_exec_id)
url = _construct_data_source_url(ds.url, job_exec_id)
runtime_url = _runtime_url(url)
data_source_urls[ds.id] = (url, runtime_url)
return data_source_urls[ds.id]
return data_source_urls[ds.id][1]
return value
# Loop over configs/params/args and look up each value as a data_source.
@ -287,3 +288,12 @@ def _construct_data_source_url(url, job_exec_id):
url = re.sub(r"%RANDSTR\((\d+)\)%", _randstr, url)
return url
def _runtime_url(url):
return url
def to_url_dict(data_source_urls, runtime=False):
idx = 1 if runtime else 0
return {id: urls[idx] for id, urls in six.iteritems(data_source_urls)}

View File

@ -102,6 +102,8 @@ class OozieJobEngine(base_engine.JobEngine):
def run_job(self, job_execution):
ctx = context.ctx()
# This will be a dictionary of tuples, (native_url, runtime_url)
# keyed by data_source id
data_source_urls = {}
job = conductor.job_get(ctx, job_execution.job_id)
@ -120,7 +122,13 @@ class OozieJobEngine(base_engine.JobEngine):
)
job_execution = conductor.job_execution_update(
ctx, job_execution, {"data_source_urls": data_source_urls})
ctx, job_execution,
{"data_source_urls": job_utils.to_url_dict(data_source_urls)})
# Now that we've recorded the native urls, we can switch to the
# runtime urls
data_source_urls = job_utils.to_url_dict(data_source_urls,
runtime=True)
proxy_configs = updated_job_configs.get('proxy_configs')
configs = updated_job_configs.get('configs', {})

View File

@ -195,6 +195,9 @@ class SparkJobEngine(base_engine.JobEngine):
ctx = context.ctx()
job = conductor.job_get(ctx, job_execution.job_id)
indep_params = {}
# This will be a dictionary of tuples, (native_url, runtime_url)
# keyed by data_source id
data_source_urls = {}
additional_sources, updated_job_configs = (
job_utils.resolve_data_source_references(
@ -202,7 +205,13 @@ class SparkJobEngine(base_engine.JobEngine):
)
job_execution = conductor.job_execution_update(
ctx, job_execution, {"data_source_urls": data_source_urls})
ctx, job_execution,
{"data_source_urls": job_utils.to_url_dict(data_source_urls)})
# Now that we've recorded the native urls, we can switch to the
# runtime urls
data_source_urls = job_utils.to_url_dict(data_source_urls,
runtime=True)
for data_source in additional_sources:
if data_source and data_source.type == 'hdfs':

View File

@ -169,6 +169,8 @@ class StormJobEngine(base_engine.JobEngine):
ctx = context.ctx()
job = conductor.job_get(ctx, job_execution.job_id)
# This will be a dictionary of tuples, (native_url, runtime_url)
# keyed by data_source id
data_source_urls = {}
additional_sources, updated_job_configs = (
@ -177,7 +179,13 @@ class StormJobEngine(base_engine.JobEngine):
)
job_execution = conductor.job_execution_update(
ctx, job_execution, {"data_source_urls": data_source_urls})
ctx, job_execution,
{"data_source_urls": job_utils.to_url_dict(data_source_urls)})
# Now that we've recorded the native urls, we can switch to the
# runtime urls
data_source_urls = job_utils.to_url_dict(data_source_urls,
runtime=True)
# We'll always run the driver program on the master
master = plugin_utils.get_instance(self.cluster, "nimbus")

View File

@ -163,7 +163,8 @@ class JobUtilsTestCase(testtools.TestCase):
name_ref = job_utils.DATA_SOURCE_PREFIX+'input'
job_exec_id = six.text_type(uuid.uuid4())
input = u.create_data_source("swift://container/input",
input_url = "swift://container/input"
input = u.create_data_source(input_url,
name="input",
id=six.text_type(uuid.uuid4()))
@ -200,9 +201,9 @@ class JobUtilsTestCase(testtools.TestCase):
job_utils.DATA_SOURCE_SUBST_NAME: True,
job_utils.DATA_SOURCE_SUBST_UUID: True},
'args': [name_ref, output.id, input.id]}
urls = {}
ds, nc = job_utils.resolve_data_source_references(job_configs,
job_exec_id, {})
job_exec_id, urls)
self.assertEqual(2, len(ds))
self.assertEqual([input.url, output_url, input.url], nc['args'])
# Swift configs should be filled in since they were blank
@ -210,6 +211,9 @@ class JobUtilsTestCase(testtools.TestCase):
nc['configs']['fs.swift.service.sahara.username'])
self.assertEqual(input.credentials['password'],
nc['configs']['fs.swift.service.sahara.password'])
self.assertEqual(2, len(urls))
self.assertItemsEqual({input.id: (input_url, input_url),
output.id: (output_url, output_url)}, urls)
job_configs['configs'] = {'fs.swift.service.sahara.username': 'sam',
'fs.swift.service.sahara.password': 'gamgee',
@ -255,6 +259,18 @@ class JobUtilsTestCase(testtools.TestCase):
self.assertEqual(nc['args'], job_configs['args'])
self.assertEqual(nc['configs'], job_configs['configs'])
def test_to_url_dict(self):
data_source_urls = {'1': ('1_native', '1_runtime'),
'2': ('2_native', '2_runtime')}
self.assertItemsEqual({'1': '1_native',
'2': '2_native'},
job_utils.to_url_dict(data_source_urls))
self.assertItemsEqual({'1': '1_runtime',
'2': '2_runtime'},
job_utils.to_url_dict(data_source_urls,
runtime=True))
def test_construct_data_source_url_no_placeholders(self):
base_url = "swift://container/input"
job_exec_id = six.text_type(uuid.uuid4())