diff --git a/sahara/conductor/objects.py b/sahara/conductor/objects.py
index bccc6c90..85c13ad7 100644
--- a/sahara/conductor/objects.py
+++ b/sahara/conductor/objects.py
@@ -241,6 +241,9 @@ class JobExecution(object):
info
oozie_job_id
return_code
+ job_configs
+ extra
+ data_source_urls
"""
diff --git a/sahara/db/migration/alembic_migrations/versions/021_datasource_placeholders.py b/sahara/db/migration/alembic_migrations/versions/021_datasource_placeholders.py
new file mode 100644
index 00000000..4c1eeab1
--- /dev/null
+++ b/sahara/db/migration/alembic_migrations/versions/021_datasource_placeholders.py
@@ -0,0 +1,36 @@
+# Copyright 2014 OpenStack Foundation.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Add data_source_urls to job_executions to support placeholders
+
+Revision ID: 021
+Revises: 020
+Create Date: 2015-02-24 12:47:17.871520
+
+"""
+
+# revision identifiers, used by Alembic.
+revision = '021'
+down_revision = '020'
+
+from alembic import op
+import sqlalchemy as sa
+
+from sahara.db.sqlalchemy import types as st
+
+
+def upgrade():
+ op.add_column('job_executions',
+ sa.Column('data_source_urls', st.JsonEncoded()))
diff --git a/sahara/db/sqlalchemy/models.py b/sahara/db/sqlalchemy/models.py
index ca791e59..cbd2f3d8 100644
--- a/sahara/db/sqlalchemy/models.py
+++ b/sahara/db/sqlalchemy/models.py
@@ -299,6 +299,7 @@ class JobExecution(mb.SaharaBase):
return_code = sa.Column(sa.String(80))
job_configs = sa.Column(st.JsonDictType())
extra = sa.Column(st.JsonDictType())
+ data_source_urls = sa.Column(st.JsonDictType())
mains_association = sa.Table("mains_association",
mb.SaharaBase.metadata,
diff --git a/sahara/service/edp/hdfs_helper.py b/sahara/service/edp/hdfs_helper.py
index c7875e5b..fc368d5e 100644
--- a/sahara/service/edp/hdfs_helper.py
+++ b/sahara/service/edp/hdfs_helper.py
@@ -87,8 +87,8 @@ def _get_cluster_hosts_information(host, cluster):
return None
-def configure_cluster_for_hdfs(cluster, data_source):
- host = urlparse.urlparse(data_source.url).hostname
+def configure_cluster_for_hdfs(cluster, data_source_url):
+ host = urlparse.urlparse(data_source_url).hostname
etc_hosts_information = _get_cluster_hosts_information(host, cluster)
if etc_hosts_information is None:
diff --git a/sahara/service/edp/job_utils.py b/sahara/service/edp/job_utils.py
index a97544ff..fe9aae8b 100644
--- a/sahara/service/edp/job_utils.py
+++ b/sahara/service/edp/job_utils.py
@@ -13,6 +13,9 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+import random
+import re
+import string
import uuid
from oslo_config import cfg
@@ -65,13 +68,22 @@ def create_workflow_dir(where, path, job, use_uuid=None, chmod=""):
return constructed_dir
-def get_data_sources(job_execution, job):
+def get_data_sources(job_execution, job, data_source_urls):
if edp.compare_job_type(job.type, edp.JOB_TYPE_JAVA, edp.JOB_TYPE_SPARK):
return None, None
ctx = context.ctx()
+
input_source = conductor.data_source_get(ctx, job_execution.input_id)
+ if input_source and input_source.id not in data_source_urls:
+ data_source_urls[input_source.id] = _construct_data_source_url(
+ input_source.url, job_execution.id)
+
output_source = conductor.data_source_get(ctx, job_execution.output_id)
+ if output_source and output_source.id not in data_source_urls:
+ data_source_urls[output_source.id] = _construct_data_source_url(
+ output_source.url, job_execution.id)
+
return input_source, output_source
@@ -169,7 +181,7 @@ def _add_credentials_for_data_sources(ds_list, configs):
configs[sw.HADOOP_SWIFT_PASSWORD] = password
-def resolve_data_source_references(job_configs):
+def resolve_data_source_references(job_configs, job_exec_id, data_source_urls):
"""Resolve possible data_source references in job_configs.
Look for any string values in the 'args', 'configs', and 'params'
@@ -222,7 +234,11 @@ def resolve_data_source_references(job_configs):
if len(ds) == 1:
ds = ds[0]
ds_seen[ds.id] = ds
- return ds.url
+ if ds.id not in data_source_urls:
+ data_source_urls[ds.id] = _construct_data_source_url(
+ ds.url, job_exec_id)
+
+ return data_source_urls[ds.id]
return value
# Loop over configs/params/args and look up each value as a data_source.
@@ -251,3 +267,26 @@ def resolve_data_source_references(job_configs):
k: v for k, v in six.iteritems(job_configs.get('proxy_configs'))}
return ds_seen, new_configs
+
+
+def _construct_data_source_url(url, job_exec_id):
+ """Resolve placeholders in data_source URL.
+
+ Supported placeholders:
+
+ * %RANDSTR(len)% - will be replaced with random string of lowercase
+ letters of length `len`.
+ * %JOB_EXEC_ID% - will be replaced with the job execution ID.
+
+ """
+
+ def _randstr(match):
+ len = int(match.group(1))
+ return ''.join(random.choice(string.ascii_lowercase)
+ for _ in xrange(len))
+
+ url = url.replace("%JOB_EXEC_ID%", job_exec_id)
+
+ url = re.sub(r"%RANDSTR\((\d+)\)%", _randstr, url)
+
+ return url
diff --git a/sahara/service/edp/oozie/engine.py b/sahara/service/edp/oozie/engine.py
index 509baa38..7acb5f5c 100644
--- a/sahara/service/edp/oozie/engine.py
+++ b/sahara/service/edp/oozie/engine.py
@@ -102,9 +102,11 @@ class OozieJobEngine(base_engine.JobEngine):
def run_job(self, job_execution):
ctx = context.ctx()
+ data_source_urls = {}
+
job = conductor.job_get(ctx, job_execution.job_id)
- input_source, output_source = job_utils.get_data_sources(job_execution,
- job)
+ input_source, output_source = job_utils.get_data_sources(
+ job_execution, job, data_source_urls)
# Updated_job_configs will be a copy of job_execution.job_configs with
# any name or uuid references to data_sources resolved to paths
@@ -113,9 +115,13 @@ class OozieJobEngine(base_engine.JobEngine):
# just be a reference to job_execution.job_configs to avoid a copy.
# Additional_sources will be a list of any data_sources found.
additional_sources, updated_job_configs = (
- job_utils.resolve_data_source_references(job_execution.job_configs)
+ job_utils.resolve_data_source_references(
+ job_execution.job_configs, job_execution.id, data_source_urls)
)
+ job_execution = conductor.job_execution_update(
+ ctx, job_execution, {"data_source_urls": data_source_urls})
+
proxy_configs = updated_job_configs.get('proxy_configs')
configs = updated_job_configs.get('configs', {})
use_hbase_lib = configs.get('edp.hbase_common_lib', {})
@@ -130,7 +136,8 @@ class OozieJobEngine(base_engine.JobEngine):
for data_source in [input_source, output_source] + additional_sources:
if data_source and data_source.type == 'hdfs':
- h.configure_cluster_for_hdfs(self.cluster, data_source)
+ h.configure_cluster_for_hdfs(
+ self.cluster, data_source_urls[data_source.id])
break
hdfs_user = self.get_hdfs_user()
@@ -146,7 +153,7 @@ class OozieJobEngine(base_engine.JobEngine):
wf_xml = workflow_factory.get_workflow_xml(
job, self.cluster, updated_job_configs,
input_source, output_source,
- hdfs_user)
+ hdfs_user, data_source_urls)
path_to_workflow = self._upload_workflow_file(oozie_server, wf_dir,
wf_xml, hdfs_user)
diff --git a/sahara/service/edp/oozie/workflow_creator/workflow_factory.py b/sahara/service/edp/oozie/workflow_creator/workflow_factory.py
index 5435307c..d9169421 100644
--- a/sahara/service/edp/oozie/workflow_creator/workflow_factory.py
+++ b/sahara/service/edp/oozie/workflow_creator/workflow_factory.py
@@ -118,9 +118,9 @@ class BaseFactory(object):
break
return configs
- def get_params(self, input_data, output_data):
- return {'INPUT': input_data.url,
- 'OUTPUT': output_data.url}
+ def get_params(self, input_data, output_data, data_source_urls):
+ return {'INPUT': data_source_urls[input_data.id],
+ 'OUTPUT': data_source_urls[output_data.id]}
class PigFactory(BaseFactory):
@@ -133,11 +133,12 @@ class PigFactory(BaseFactory):
return conductor.job_main_name(context.ctx(), job)
def get_workflow_xml(self, cluster, job_configs, input_data, output_data,
- hdfs_user):
+ hdfs_user, data_source_urls):
proxy_configs = job_configs.get('proxy_configs')
job_dict = {'configs': self.get_configs(input_data, output_data,
proxy_configs),
- 'params': self.get_params(input_data, output_data),
+ 'params': self.get_params(input_data, output_data,
+ data_source_urls),
'args': []}
self.update_job_dict(job_dict, job_configs)
creator = pig_workflow.PigWorkflowCreator()
@@ -158,11 +159,12 @@ class HiveFactory(BaseFactory):
return conductor.job_main_name(context.ctx(), job)
def get_workflow_xml(self, cluster, job_configs, input_data, output_data,
- hdfs_user):
+ hdfs_user, data_source_urls):
proxy_configs = job_configs.get('proxy_configs')
job_dict = {'configs': self.get_configs(input_data, output_data,
proxy_configs),
- 'params': self.get_params(input_data, output_data)}
+ 'params': self.get_params(input_data, output_data,
+ data_source_urls)}
self.update_job_dict(job_dict, job_configs)
creator = hive_workflow.HiveWorkflowCreator()
@@ -175,12 +177,13 @@ class HiveFactory(BaseFactory):
class MapReduceFactory(BaseFactory):
- def get_configs(self, input_data, output_data, proxy_configs):
+ def get_configs(self, input_data, output_data, proxy_configs,
+ data_source_urls):
configs = super(MapReduceFactory, self).get_configs(input_data,
output_data,
proxy_configs)
- configs['mapred.input.dir'] = input_data.url
- configs['mapred.output.dir'] = output_data.url
+ configs['mapred.input.dir'] = data_source_urls[input_data.id]
+ configs['mapred.output.dir'] = data_source_urls[output_data.id]
return configs
def _get_streaming(self, job_dict):
@@ -189,10 +192,11 @@ class MapReduceFactory(BaseFactory):
job_dict['edp_configs']) if k.startswith(prefix)}
def get_workflow_xml(self, cluster, job_configs, input_data, output_data,
- hdfs_user):
+ hdfs_user, data_source_urls):
proxy_configs = job_configs.get('proxy_configs')
job_dict = {'configs': self.get_configs(input_data, output_data,
- proxy_configs)}
+ proxy_configs,
+ data_source_urls)}
self.update_job_dict(job_dict, job_configs)
creator = mapreduce_workflow.MapReduceWorkFlowCreator()
creator.build_workflow_xml(configuration=job_dict['configs'],
diff --git a/sahara/service/edp/spark/engine.py b/sahara/service/edp/spark/engine.py
index 85ae1bcb..3efa7195 100644
--- a/sahara/service/edp/spark/engine.py
+++ b/sahara/service/edp/spark/engine.py
@@ -184,10 +184,15 @@ class SparkJobEngine(base_engine.JobEngine):
ctx = context.ctx()
job = conductor.job_get(ctx, job_execution.job_id)
+ data_source_urls = {}
additional_sources, updated_job_configs = (
- job_utils.resolve_data_source_references(job_execution.job_configs)
+ job_utils.resolve_data_source_references(
+ job_execution.job_configs, job_execution.id, data_source_urls)
)
+ job_execution = conductor.job_execution_update(
+ ctx, job_execution, {"data_source_urls": data_source_urls})
+
for data_source in additional_sources:
if data_source and data_source.type == 'hdfs':
h.configure_cluster_for_hdfs(self.cluster, data_source)
diff --git a/sahara/service/edp/storm/engine.py b/sahara/service/edp/storm/engine.py
index a0c2c4e1..fe3652d2 100644
--- a/sahara/service/edp/storm/engine.py
+++ b/sahara/service/edp/storm/engine.py
@@ -164,10 +164,16 @@ class StormJobEngine(base_engine.JobEngine):
ctx = context.ctx()
job = conductor.job_get(ctx, job_execution.job_id)
+ data_source_urls = {}
+
additional_sources, updated_job_configs = (
- job_utils.resolve_data_source_references(job_execution.job_configs)
+ job_utils.resolve_data_source_references(
+ job_execution.job_configs, job_execution.id, data_source_urls)
)
+ job_execution = conductor.job_execution_update(
+ ctx, job_execution, {"data_source_urls": data_source_urls})
+
# We'll always run the driver program on the master
master = plugin_utils.get_instance(self.cluster, "nimbus")
diff --git a/sahara/tests/unit/db/migration/test_migrations.py b/sahara/tests/unit/db/migration/test_migrations.py
index c7db6685..d042b641 100644
--- a/sahara/tests/unit/db/migration/test_migrations.py
+++ b/sahara/tests/unit/db/migration/test_migrations.py
@@ -461,6 +461,9 @@ class SaharaMigrationsCheckers(object):
self.assertColumnNotExists(engine, 'cluster_provision_steps',
'started_at')
+ def _check_021(self, engine, data):
+ self.assertColumnExists(engine, 'job_executions', 'data_source_urls')
+
class TestMigrationsMySQL(SaharaMigrationsCheckers,
base.BaseWalkMigrationTestCase,
diff --git a/sahara/tests/unit/service/edp/edp_test_utils.py b/sahara/tests/unit/service/edp/edp_test_utils.py
index 0893c465..7ff82108 100644
--- a/sahara/tests/unit/service/edp/edp_test_utils.py
+++ b/sahara/tests/unit/service/edp/edp_test_utils.py
@@ -13,8 +13,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+import uuid
import mock
+import six
from sahara import conductor as cond
from sahara.utils import edp
@@ -86,6 +88,7 @@ def create_data_source(url, name=None, id=None):
def _create_job_exec(job_id, type, configs=None):
j_exec = mock.Mock()
+ j_exec.id = six.text_type(uuid.uuid4())
j_exec.job_id = job_id
j_exec.job_configs = configs
if edp.compare_job_type(type, edp.JOB_TYPE_JAVA):
diff --git a/sahara/tests/unit/service/edp/spark/test_spark.py b/sahara/tests/unit/service/edp/spark/test_spark.py
index 1e0a89dc..cb9ead4b 100644
--- a/sahara/tests/unit/service/edp/spark/test_spark.py
+++ b/sahara/tests/unit/service/edp/spark/test_spark.py
@@ -363,6 +363,7 @@ class TestSpark(base.SaharaTestCase):
("Spark", "Executor extra classpath",
"cluster"): self.driver_cp}[key]
+ @mock.patch('sahara.conductor.API.job_execution_update')
@mock.patch('sahara.conductor.API.job_execution_get')
@mock.patch('sahara.utils.remote.get_remote')
@mock.patch('sahara.plugins.spark.config_helper.get_config_value')
@@ -372,7 +373,8 @@ class TestSpark(base.SaharaTestCase):
@mock.patch('sahara.context.ctx', return_value="ctx")
def _setup_run_job(self, master_instance, job_configs, files,
ctx, job_get, get_instance, create_workflow_dir,
- get_config_value, get_remote, job_exec_get):
+ get_config_value, get_remote, job_exec_get,
+ job_exec_update):
def _upload_job_files(where, job_dir, job,
libs_subdir=True, job_configs=None):
diff --git a/sahara/tests/unit/service/edp/storm/test_storm.py b/sahara/tests/unit/service/edp/storm/test_storm.py
index f0ee1cda..f1b1da7b 100644
--- a/sahara/tests/unit/service/edp/storm/test_storm.py
+++ b/sahara/tests/unit/service/edp/storm/test_storm.py
@@ -269,6 +269,7 @@ class TestStorm(base.SaharaTestCase):
autospec=True,
return_value=(
"MyJob_ed8347a9-39aa-477c-8108-066202eb6130"))
+ @mock.patch('sahara.conductor.API.job_execution_update')
@mock.patch('sahara.conductor.API.job_execution_get')
@mock.patch('sahara.utils.remote.get_remote')
@mock.patch('sahara.service.edp.job_utils.create_workflow_dir')
@@ -277,7 +278,8 @@ class TestStorm(base.SaharaTestCase):
@mock.patch('sahara.context.ctx', return_value="ctx")
def _setup_run_job(self, master_instance, job_configs, files,
ctx, job_get, get_instance, create_workflow_dir,
- get_remote, job_exec_get, _generate_topology_name):
+ get_remote, job_exec_get, job_exec_update,
+ _generate_topology_name):
def _upload_job_files(where, job_dir, job,
libs_subdir=True, job_configs=None):
diff --git a/sahara/tests/unit/service/edp/test_job_manager.py b/sahara/tests/unit/service/edp/test_job_manager.py
index a9854905..d61d2e22 100644
--- a/sahara/tests/unit/service/edp/test_job_manager.py
+++ b/sahara/tests/unit/service/edp/test_job_manager.py
@@ -76,10 +76,12 @@ class TestJobManager(base.SaharaWithDbTestCase):
input_data = u.create_data_source('swift://ex/i')
output_data = u.create_data_source('swift://ex/o')
+ data_source_urls = {input_data.id: input_data.url,
+ output_data.id: output_data.url}
res = workflow_factory.get_workflow_xml(
job, u.create_cluster(), job_exec.job_configs,
- input_data, output_data, 'hadoop')
+ input_data, output_data, 'hadoop', data_source_urls)
self.assertIn("""
INPUT=swift://ex.sahara/i
@@ -106,7 +108,7 @@ class TestJobManager(base.SaharaWithDbTestCase):
res = workflow_factory.get_workflow_xml(
job, u.create_cluster(), job_exec.job_configs,
- input_data, output_data, 'hadoop')
+ input_data, output_data, 'hadoop', data_source_urls)
self.assertIn("""
@@ -137,10 +139,12 @@ class TestJobManager(base.SaharaWithDbTestCase):
input_data = u.create_data_source('swift://ex/i')
output_data = u.create_data_source('hdfs://user/hadoop/out')
+ data_source_urls = {input_data.id: input_data.url,
+ output_data.id: output_data.url}
res = workflow_factory.get_workflow_xml(
job, u.create_cluster(), job_exec.job_configs,
- input_data, output_data, 'hadoop')
+ input_data, output_data, 'hadoop', data_source_urls)
self.assertIn("""
@@ -156,10 +160,12 @@ class TestJobManager(base.SaharaWithDbTestCase):
input_data = u.create_data_source('hdfs://user/hadoop/in')
output_data = u.create_data_source('swift://ex/o')
+ data_source_urls = {input_data.id: input_data.url,
+ output_data.id: output_data.url}
res = workflow_factory.get_workflow_xml(
job, u.create_cluster(), job_exec.job_configs,
- input_data, output_data, 'hadoop')
+ input_data, output_data, 'hadoop', data_source_urls)
self.assertIn("""
@@ -177,10 +183,12 @@ class TestJobManager(base.SaharaWithDbTestCase):
edp.JOB_TYPE_PIG, configs={'configs': {'dummy': 'value'}})
input_data = u.create_data_source('hdfs://user/hadoop/in')
output_data = u.create_data_source('hdfs://user/hadoop/out')
+ data_source_urls = {input_data.id: input_data.url,
+ output_data.id: output_data.url}
res = workflow_factory.get_workflow_xml(
job, u.create_cluster(), job_exec.job_configs,
- input_data, output_data, 'hadoop')
+ input_data, output_data, 'hadoop', data_source_urls)
self.assertIn("""
@@ -202,10 +210,12 @@ class TestJobManager(base.SaharaWithDbTestCase):
input_data = u.create_data_source('swift://ex/i')
output_data = u.create_data_source('swift://ex/o')
+ data_source_urls = {input_data.id: input_data.url,
+ output_data.id: output_data.url}
res = workflow_factory.get_workflow_xml(
job, u.create_cluster(), job_exec.job_configs,
- input_data, output_data, 'hadoop')
+ input_data, output_data, 'hadoop', data_source_urls)
if streaming:
self.assertIn("""
@@ -247,7 +257,7 @@ class TestJobManager(base.SaharaWithDbTestCase):
res = workflow_factory.get_workflow_xml(
job, u.create_cluster(), job_exec.job_configs,
- input_data, output_data, 'hadoop')
+ input_data, output_data, 'hadoop', data_source_urls)
self.assertIn("""
@@ -368,10 +378,12 @@ class TestJobManager(base.SaharaWithDbTestCase):
input_data = u.create_data_source('swift://ex/i')
output_data = u.create_data_source('swift://ex/o')
+ data_source_urls = {input_data.id: input_data.url,
+ output_data.id: output_data.url}
res = workflow_factory.get_workflow_xml(
job, u.create_cluster(), job_exec.job_configs,
- input_data, output_data, 'hadoop')
+ input_data, output_data, 'hadoop', data_source_urls)
doc = xml.parseString(res)
hive = doc.getElementsByTagName('hive')[0]
@@ -399,7 +411,7 @@ class TestJobManager(base.SaharaWithDbTestCase):
res = workflow_factory.get_workflow_xml(
job, u.create_cluster(), job_exec.job_configs,
- input_data, output_data, 'hadoop')
+ input_data, output_data, 'hadoop', data_source_urls)
doc = xml.parseString(res)
hive = doc.getElementsByTagName('hive')[0]
@@ -520,7 +532,7 @@ class TestJobManager(base.SaharaWithDbTestCase):
@mock.patch('sahara.conductor.API.data_source_get')
def test_get_data_sources(self, ds):
def _conductor_data_source_get(ctx, id):
- return "obj_" + id
+ return mock.Mock(id=id, url="obj_" + id)
job, job_exec = u.create_job_exec(edp.JOB_TYPE_PIG)
@@ -529,10 +541,10 @@ class TestJobManager(base.SaharaWithDbTestCase):
ds.side_effect = _conductor_data_source_get
input_source, output_source = (
- job_utils.get_data_sources(job_exec, job))
+ job_utils.get_data_sources(job_exec, job, {}))
- self.assertEqual('obj_s1', input_source)
- self.assertEqual('obj_s2', output_source)
+ self.assertEqual('obj_s1', input_source.url)
+ self.assertEqual('obj_s2', output_source.url)
def test_get_data_sources_java(self):
configs = {sw.HADOOP_SWIFT_USERNAME: 'admin',
@@ -547,7 +559,7 @@ class TestJobManager(base.SaharaWithDbTestCase):
job, job_exec = u.create_job_exec(edp.JOB_TYPE_JAVA, configs)
input_source, output_source = (
- job_utils.get_data_sources(job_exec, job))
+ job_utils.get_data_sources(job_exec, job, {}))
self.assertIsNone(input_source)
self.assertIsNone(output_source)
diff --git a/sahara/tests/unit/service/edp/test_job_utils.py b/sahara/tests/unit/service/edp/test_job_utils.py
index d81b0788..00c17338 100644
--- a/sahara/tests/unit/service/edp/test_job_utils.py
+++ b/sahara/tests/unit/service/edp/test_job_utils.py
@@ -161,14 +161,16 @@ class JobUtilsTestCase(testtools.TestCase):
ctx.return_value = 'dummy'
name_ref = job_utils.DATA_SOURCE_PREFIX+'input'
+ job_exec_id = six.text_type(uuid.uuid4())
input = u.create_data_source("swift://container/input",
name="input",
id=six.text_type(uuid.uuid4()))
- output = u.create_data_source("swift://container/output",
+ output = u.create_data_source("swift://container/output.%JOB_EXEC_ID%",
name="output",
id=six.text_type(uuid.uuid4()))
+ output_url = "swift://container/output." + job_exec_id
by_name = {'input': input,
'output': output}
@@ -199,9 +201,10 @@ class JobUtilsTestCase(testtools.TestCase):
job_utils.DATA_SOURCE_SUBST_UUID: True},
'args': [name_ref, output.id, input.id]}
- ds, nc = job_utils.resolve_data_source_references(job_configs)
+ ds, nc = job_utils.resolve_data_source_references(job_configs,
+ job_exec_id, {})
self.assertEqual(2, len(ds))
- self.assertEqual([input.url, output.url, input.url], nc['args'])
+ self.assertEqual([input.url, output_url, input.url], nc['args'])
# Swift configs should be filled in since they were blank
self.assertEqual(input.credentials['user'],
nc['configs']['fs.swift.service.sahara.username'])
@@ -212,9 +215,10 @@ class JobUtilsTestCase(testtools.TestCase):
'fs.swift.service.sahara.password': 'gamgee',
job_utils.DATA_SOURCE_SUBST_NAME: False,
job_utils.DATA_SOURCE_SUBST_UUID: True}
- ds, nc = job_utils.resolve_data_source_references(job_configs)
+ ds, nc = job_utils.resolve_data_source_references(job_configs,
+ job_exec_id, {})
self.assertEqual(2, len(ds))
- self.assertEqual([name_ref, output.url, input.url], nc['args'])
+ self.assertEqual([name_ref, output_url, input.url], nc['args'])
# Swift configs should not be overwritten
self.assertEqual(job_configs['configs'], nc['configs'])
@@ -223,7 +227,8 @@ class JobUtilsTestCase(testtools.TestCase):
job_configs['proxy_configs'] = {'proxy_username': 'john',
'proxy_password': 'smith',
'proxy_trust_id': 'trustme'}
- ds, nc = job_utils.resolve_data_source_references(job_configs)
+ ds, nc = job_utils.resolve_data_source_references(job_configs,
+ job_exec_id, {})
self.assertEqual(1, len(ds))
self.assertEqual([input.url, output.id, input.id], nc['args'])
@@ -234,7 +239,8 @@ class JobUtilsTestCase(testtools.TestCase):
# Substitution not enabled
job_configs['configs'] = {job_utils.DATA_SOURCE_SUBST_NAME: False,
job_utils.DATA_SOURCE_SUBST_UUID: False}
- ds, nc = job_utils.resolve_data_source_references(job_configs)
+ ds, nc = job_utils.resolve_data_source_references(job_configs,
+ job_exec_id, {})
self.assertEqual(0, len(ds))
self.assertEqual(job_configs['args'], nc['args'])
self.assertEqual(job_configs['configs'], nc['configs'])
@@ -243,7 +249,34 @@ class JobUtilsTestCase(testtools.TestCase):
job_configs['configs'] = {job_utils.DATA_SOURCE_SUBST_NAME: True,
job_utils.DATA_SOURCE_SUBST_UUID: True}
job_configs['args'] = ['val1', 'val2', 'val3']
- ds, nc = job_utils.resolve_data_source_references(job_configs)
+ ds, nc = job_utils.resolve_data_source_references(job_configs,
+ job_exec_id, {})
self.assertEqual(0, len(ds))
- self.assertEqual(job_configs['args'], nc['args'])
- self.assertEqual(job_configs['configs'], nc['configs'])
+ self.assertEqual(nc['args'], job_configs['args'])
+ self.assertEqual(nc['configs'], job_configs['configs'])
+
+ def test_construct_data_source_url_no_placeholders(self):
+ base_url = "swift://container/input"
+ job_exec_id = six.text_type(uuid.uuid4())
+
+ url = job_utils._construct_data_source_url(base_url, job_exec_id)
+
+ self.assertEqual(base_url, url)
+
+ def test_construct_data_source_url_job_exec_id_placeholder(self):
+ base_url = "swift://container/input.%JOB_EXEC_ID%.out"
+ job_exec_id = six.text_type(uuid.uuid4())
+
+ url = job_utils._construct_data_source_url(base_url, job_exec_id)
+
+ self.assertEqual(
+ "swift://container/input." + job_exec_id + ".out", url)
+
+ def test_construct_data_source_url_randstr_placeholder(self):
+ base_url = "swift://container/input.%RANDSTR(4)%.%RANDSTR(7)%.out"
+ job_exec_id = six.text_type(uuid.uuid4())
+
+ url = job_utils._construct_data_source_url(base_url, job_exec_id)
+
+ self.assertRegex(
+ url, "swift://container/input\.[a-z]{4}\.[a-z]{7}\.out")