From 7bae4261d020737cfd37ab69773fb8169b58eb1a Mon Sep 17 00:00:00 2001 From: Andrew Lazarev Date: Tue, 24 Feb 2015 14:56:27 -0800 Subject: [PATCH] Implemented support of placeholders in datasource URLs Added ability to use placeholders in datasource URLs. Currently supported placeholders: * %RANDSTR(len)% - will be replaced with random string of lowercase letters of length `len`. * %JOB_EXEC_ID% - will be replaced with the job execution ID. Resulting URLs will be stored in a new field at job_execution table. Using 'info' field doesn't look as good solution since it is reserved for oozie status. Next steps: * write documentation * update horizon Implements blueprint: edp-datasource-placeholders Change-Id: I1d9282b210047982c062b24bd03cf2331ab7599e --- sahara/conductor/objects.py | 3 ++ .../versions/021_datasource_placeholders.py | 36 +++++++++++++ sahara/db/sqlalchemy/models.py | 1 + sahara/service/edp/hdfs_helper.py | 4 +- sahara/service/edp/job_utils.py | 45 ++++++++++++++-- sahara/service/edp/oozie/engine.py | 17 ++++-- .../workflow_creator/workflow_factory.py | 28 +++++----- sahara/service/edp/spark/engine.py | 7 ++- sahara/service/edp/storm/engine.py | 8 ++- .../unit/db/migration/test_migrations.py | 3 ++ .../tests/unit/service/edp/edp_test_utils.py | 3 ++ .../unit/service/edp/spark/test_spark.py | 4 +- .../unit/service/edp/storm/test_storm.py | 4 +- .../unit/service/edp/test_job_manager.py | 40 +++++++++----- .../tests/unit/service/edp/test_job_utils.py | 53 +++++++++++++++---- 15 files changed, 206 insertions(+), 50 deletions(-) create mode 100644 sahara/db/migration/alembic_migrations/versions/021_datasource_placeholders.py diff --git a/sahara/conductor/objects.py b/sahara/conductor/objects.py index bccc6c90..85c13ad7 100644 --- a/sahara/conductor/objects.py +++ b/sahara/conductor/objects.py @@ -241,6 +241,9 @@ class JobExecution(object): info oozie_job_id return_code + job_configs + extra + data_source_urls """ diff --git a/sahara/db/migration/alembic_migrations/versions/021_datasource_placeholders.py b/sahara/db/migration/alembic_migrations/versions/021_datasource_placeholders.py new file mode 100644 index 00000000..4c1eeab1 --- /dev/null +++ b/sahara/db/migration/alembic_migrations/versions/021_datasource_placeholders.py @@ -0,0 +1,36 @@ +# Copyright 2014 OpenStack Foundation. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Add data_source_urls to job_executions to support placeholders + +Revision ID: 021 +Revises: 020 +Create Date: 2015-02-24 12:47:17.871520 + +""" + +# revision identifiers, used by Alembic. +revision = '021' +down_revision = '020' + +from alembic import op +import sqlalchemy as sa + +from sahara.db.sqlalchemy import types as st + + +def upgrade(): + op.add_column('job_executions', + sa.Column('data_source_urls', st.JsonEncoded())) diff --git a/sahara/db/sqlalchemy/models.py b/sahara/db/sqlalchemy/models.py index ca791e59..cbd2f3d8 100644 --- a/sahara/db/sqlalchemy/models.py +++ b/sahara/db/sqlalchemy/models.py @@ -299,6 +299,7 @@ class JobExecution(mb.SaharaBase): return_code = sa.Column(sa.String(80)) job_configs = sa.Column(st.JsonDictType()) extra = sa.Column(st.JsonDictType()) + data_source_urls = sa.Column(st.JsonDictType()) mains_association = sa.Table("mains_association", mb.SaharaBase.metadata, diff --git a/sahara/service/edp/hdfs_helper.py b/sahara/service/edp/hdfs_helper.py index c7875e5b..fc368d5e 100644 --- a/sahara/service/edp/hdfs_helper.py +++ b/sahara/service/edp/hdfs_helper.py @@ -87,8 +87,8 @@ def _get_cluster_hosts_information(host, cluster): return None -def configure_cluster_for_hdfs(cluster, data_source): - host = urlparse.urlparse(data_source.url).hostname +def configure_cluster_for_hdfs(cluster, data_source_url): + host = urlparse.urlparse(data_source_url).hostname etc_hosts_information = _get_cluster_hosts_information(host, cluster) if etc_hosts_information is None: diff --git a/sahara/service/edp/job_utils.py b/sahara/service/edp/job_utils.py index a97544ff..fe9aae8b 100644 --- a/sahara/service/edp/job_utils.py +++ b/sahara/service/edp/job_utils.py @@ -13,6 +13,9 @@ # See the License for the specific language governing permissions and # limitations under the License. +import random +import re +import string import uuid from oslo_config import cfg @@ -65,13 +68,22 @@ def create_workflow_dir(where, path, job, use_uuid=None, chmod=""): return constructed_dir -def get_data_sources(job_execution, job): +def get_data_sources(job_execution, job, data_source_urls): if edp.compare_job_type(job.type, edp.JOB_TYPE_JAVA, edp.JOB_TYPE_SPARK): return None, None ctx = context.ctx() + input_source = conductor.data_source_get(ctx, job_execution.input_id) + if input_source and input_source.id not in data_source_urls: + data_source_urls[input_source.id] = _construct_data_source_url( + input_source.url, job_execution.id) + output_source = conductor.data_source_get(ctx, job_execution.output_id) + if output_source and output_source.id not in data_source_urls: + data_source_urls[output_source.id] = _construct_data_source_url( + output_source.url, job_execution.id) + return input_source, output_source @@ -169,7 +181,7 @@ def _add_credentials_for_data_sources(ds_list, configs): configs[sw.HADOOP_SWIFT_PASSWORD] = password -def resolve_data_source_references(job_configs): +def resolve_data_source_references(job_configs, job_exec_id, data_source_urls): """Resolve possible data_source references in job_configs. Look for any string values in the 'args', 'configs', and 'params' @@ -222,7 +234,11 @@ def resolve_data_source_references(job_configs): if len(ds) == 1: ds = ds[0] ds_seen[ds.id] = ds - return ds.url + if ds.id not in data_source_urls: + data_source_urls[ds.id] = _construct_data_source_url( + ds.url, job_exec_id) + + return data_source_urls[ds.id] return value # Loop over configs/params/args and look up each value as a data_source. @@ -251,3 +267,26 @@ def resolve_data_source_references(job_configs): k: v for k, v in six.iteritems(job_configs.get('proxy_configs'))} return ds_seen, new_configs + + +def _construct_data_source_url(url, job_exec_id): + """Resolve placeholders in data_source URL. + + Supported placeholders: + + * %RANDSTR(len)% - will be replaced with random string of lowercase + letters of length `len`. + * %JOB_EXEC_ID% - will be replaced with the job execution ID. + + """ + + def _randstr(match): + len = int(match.group(1)) + return ''.join(random.choice(string.ascii_lowercase) + for _ in xrange(len)) + + url = url.replace("%JOB_EXEC_ID%", job_exec_id) + + url = re.sub(r"%RANDSTR\((\d+)\)%", _randstr, url) + + return url diff --git a/sahara/service/edp/oozie/engine.py b/sahara/service/edp/oozie/engine.py index 509baa38..7acb5f5c 100644 --- a/sahara/service/edp/oozie/engine.py +++ b/sahara/service/edp/oozie/engine.py @@ -102,9 +102,11 @@ class OozieJobEngine(base_engine.JobEngine): def run_job(self, job_execution): ctx = context.ctx() + data_source_urls = {} + job = conductor.job_get(ctx, job_execution.job_id) - input_source, output_source = job_utils.get_data_sources(job_execution, - job) + input_source, output_source = job_utils.get_data_sources( + job_execution, job, data_source_urls) # Updated_job_configs will be a copy of job_execution.job_configs with # any name or uuid references to data_sources resolved to paths @@ -113,9 +115,13 @@ class OozieJobEngine(base_engine.JobEngine): # just be a reference to job_execution.job_configs to avoid a copy. # Additional_sources will be a list of any data_sources found. additional_sources, updated_job_configs = ( - job_utils.resolve_data_source_references(job_execution.job_configs) + job_utils.resolve_data_source_references( + job_execution.job_configs, job_execution.id, data_source_urls) ) + job_execution = conductor.job_execution_update( + ctx, job_execution, {"data_source_urls": data_source_urls}) + proxy_configs = updated_job_configs.get('proxy_configs') configs = updated_job_configs.get('configs', {}) use_hbase_lib = configs.get('edp.hbase_common_lib', {}) @@ -130,7 +136,8 @@ class OozieJobEngine(base_engine.JobEngine): for data_source in [input_source, output_source] + additional_sources: if data_source and data_source.type == 'hdfs': - h.configure_cluster_for_hdfs(self.cluster, data_source) + h.configure_cluster_for_hdfs( + self.cluster, data_source_urls[data_source.id]) break hdfs_user = self.get_hdfs_user() @@ -146,7 +153,7 @@ class OozieJobEngine(base_engine.JobEngine): wf_xml = workflow_factory.get_workflow_xml( job, self.cluster, updated_job_configs, input_source, output_source, - hdfs_user) + hdfs_user, data_source_urls) path_to_workflow = self._upload_workflow_file(oozie_server, wf_dir, wf_xml, hdfs_user) diff --git a/sahara/service/edp/oozie/workflow_creator/workflow_factory.py b/sahara/service/edp/oozie/workflow_creator/workflow_factory.py index 889058ec..0a6fb23d 100644 --- a/sahara/service/edp/oozie/workflow_creator/workflow_factory.py +++ b/sahara/service/edp/oozie/workflow_creator/workflow_factory.py @@ -118,9 +118,9 @@ class BaseFactory(object): break return configs - def get_params(self, input_data, output_data): - return {'INPUT': input_data.url, - 'OUTPUT': output_data.url} + def get_params(self, input_data, output_data, data_source_urls): + return {'INPUT': data_source_urls[input_data.id], + 'OUTPUT': data_source_urls[output_data.id]} class PigFactory(BaseFactory): @@ -133,11 +133,12 @@ class PigFactory(BaseFactory): return conductor.job_main_name(context.ctx(), job) def get_workflow_xml(self, cluster, job_configs, input_data, output_data, - hdfs_user): + hdfs_user, data_source_urls): proxy_configs = job_configs.get('proxy_configs') job_dict = {'configs': self.get_configs(input_data, output_data, proxy_configs), - 'params': self.get_params(input_data, output_data), + 'params': self.get_params(input_data, output_data, + data_source_urls), 'args': []} self.update_job_dict(job_dict, job_configs) creator = pig_workflow.PigWorkflowCreator() @@ -158,11 +159,12 @@ class HiveFactory(BaseFactory): return conductor.job_main_name(context.ctx(), job) def get_workflow_xml(self, cluster, job_configs, input_data, output_data, - hdfs_user): + hdfs_user, data_source_urls): proxy_configs = job_configs.get('proxy_configs') job_dict = {'configs': self.get_configs(input_data, output_data, proxy_configs), - 'params': self.get_params(input_data, output_data)} + 'params': self.get_params(input_data, output_data, + data_source_urls)} self.update_job_dict(job_dict, job_configs) creator = hive_workflow.HiveWorkflowCreator() @@ -175,12 +177,13 @@ class HiveFactory(BaseFactory): class MapReduceFactory(BaseFactory): - def get_configs(self, input_data, output_data, proxy_configs): + def get_configs(self, input_data, output_data, proxy_configs, + data_source_urls): configs = super(MapReduceFactory, self).get_configs(input_data, output_data, proxy_configs) - configs['mapred.input.dir'] = input_data.url - configs['mapred.output.dir'] = output_data.url + configs['mapred.input.dir'] = data_source_urls[input_data.id] + configs['mapred.output.dir'] = data_source_urls[output_data.id] return configs def _get_streaming(self, job_dict): @@ -189,10 +192,11 @@ class MapReduceFactory(BaseFactory): job_dict['edp_configs']) if k.startswith(prefix)} def get_workflow_xml(self, cluster, job_configs, input_data, output_data, - hdfs_user): + hdfs_user, data_source_urls): proxy_configs = job_configs.get('proxy_configs') job_dict = {'configs': self.get_configs(input_data, output_data, - proxy_configs)} + proxy_configs, + data_source_urls)} self.update_job_dict(job_dict, job_configs) creator = mapreduce_workflow.MapReduceWorkFlowCreator() creator.build_workflow_xml(configuration=job_dict['configs'], diff --git a/sahara/service/edp/spark/engine.py b/sahara/service/edp/spark/engine.py index 85ae1bcb..3efa7195 100644 --- a/sahara/service/edp/spark/engine.py +++ b/sahara/service/edp/spark/engine.py @@ -184,10 +184,15 @@ class SparkJobEngine(base_engine.JobEngine): ctx = context.ctx() job = conductor.job_get(ctx, job_execution.job_id) + data_source_urls = {} additional_sources, updated_job_configs = ( - job_utils.resolve_data_source_references(job_execution.job_configs) + job_utils.resolve_data_source_references( + job_execution.job_configs, job_execution.id, data_source_urls) ) + job_execution = conductor.job_execution_update( + ctx, job_execution, {"data_source_urls": data_source_urls}) + for data_source in additional_sources: if data_source and data_source.type == 'hdfs': h.configure_cluster_for_hdfs(self.cluster, data_source) diff --git a/sahara/service/edp/storm/engine.py b/sahara/service/edp/storm/engine.py index a0c2c4e1..fe3652d2 100644 --- a/sahara/service/edp/storm/engine.py +++ b/sahara/service/edp/storm/engine.py @@ -164,10 +164,16 @@ class StormJobEngine(base_engine.JobEngine): ctx = context.ctx() job = conductor.job_get(ctx, job_execution.job_id) + data_source_urls = {} + additional_sources, updated_job_configs = ( - job_utils.resolve_data_source_references(job_execution.job_configs) + job_utils.resolve_data_source_references( + job_execution.job_configs, job_execution.id, data_source_urls) ) + job_execution = conductor.job_execution_update( + ctx, job_execution, {"data_source_urls": data_source_urls}) + # We'll always run the driver program on the master master = plugin_utils.get_instance(self.cluster, "nimbus") diff --git a/sahara/tests/unit/db/migration/test_migrations.py b/sahara/tests/unit/db/migration/test_migrations.py index c7db6685..d042b641 100644 --- a/sahara/tests/unit/db/migration/test_migrations.py +++ b/sahara/tests/unit/db/migration/test_migrations.py @@ -461,6 +461,9 @@ class SaharaMigrationsCheckers(object): self.assertColumnNotExists(engine, 'cluster_provision_steps', 'started_at') + def _check_021(self, engine, data): + self.assertColumnExists(engine, 'job_executions', 'data_source_urls') + class TestMigrationsMySQL(SaharaMigrationsCheckers, base.BaseWalkMigrationTestCase, diff --git a/sahara/tests/unit/service/edp/edp_test_utils.py b/sahara/tests/unit/service/edp/edp_test_utils.py index 0893c465..7ff82108 100644 --- a/sahara/tests/unit/service/edp/edp_test_utils.py +++ b/sahara/tests/unit/service/edp/edp_test_utils.py @@ -13,8 +13,10 @@ # See the License for the specific language governing permissions and # limitations under the License. +import uuid import mock +import six from sahara import conductor as cond from sahara.utils import edp @@ -86,6 +88,7 @@ def create_data_source(url, name=None, id=None): def _create_job_exec(job_id, type, configs=None): j_exec = mock.Mock() + j_exec.id = six.text_type(uuid.uuid4()) j_exec.job_id = job_id j_exec.job_configs = configs if edp.compare_job_type(type, edp.JOB_TYPE_JAVA): diff --git a/sahara/tests/unit/service/edp/spark/test_spark.py b/sahara/tests/unit/service/edp/spark/test_spark.py index 1e0a89dc..cb9ead4b 100644 --- a/sahara/tests/unit/service/edp/spark/test_spark.py +++ b/sahara/tests/unit/service/edp/spark/test_spark.py @@ -363,6 +363,7 @@ class TestSpark(base.SaharaTestCase): ("Spark", "Executor extra classpath", "cluster"): self.driver_cp}[key] + @mock.patch('sahara.conductor.API.job_execution_update') @mock.patch('sahara.conductor.API.job_execution_get') @mock.patch('sahara.utils.remote.get_remote') @mock.patch('sahara.plugins.spark.config_helper.get_config_value') @@ -372,7 +373,8 @@ class TestSpark(base.SaharaTestCase): @mock.patch('sahara.context.ctx', return_value="ctx") def _setup_run_job(self, master_instance, job_configs, files, ctx, job_get, get_instance, create_workflow_dir, - get_config_value, get_remote, job_exec_get): + get_config_value, get_remote, job_exec_get, + job_exec_update): def _upload_job_files(where, job_dir, job, libs_subdir=True, job_configs=None): diff --git a/sahara/tests/unit/service/edp/storm/test_storm.py b/sahara/tests/unit/service/edp/storm/test_storm.py index f0ee1cda..f1b1da7b 100644 --- a/sahara/tests/unit/service/edp/storm/test_storm.py +++ b/sahara/tests/unit/service/edp/storm/test_storm.py @@ -269,6 +269,7 @@ class TestStorm(base.SaharaTestCase): autospec=True, return_value=( "MyJob_ed8347a9-39aa-477c-8108-066202eb6130")) + @mock.patch('sahara.conductor.API.job_execution_update') @mock.patch('sahara.conductor.API.job_execution_get') @mock.patch('sahara.utils.remote.get_remote') @mock.patch('sahara.service.edp.job_utils.create_workflow_dir') @@ -277,7 +278,8 @@ class TestStorm(base.SaharaTestCase): @mock.patch('sahara.context.ctx', return_value="ctx") def _setup_run_job(self, master_instance, job_configs, files, ctx, job_get, get_instance, create_workflow_dir, - get_remote, job_exec_get, _generate_topology_name): + get_remote, job_exec_get, job_exec_update, + _generate_topology_name): def _upload_job_files(where, job_dir, job, libs_subdir=True, job_configs=None): diff --git a/sahara/tests/unit/service/edp/test_job_manager.py b/sahara/tests/unit/service/edp/test_job_manager.py index 0947a04b..aa90b1ef 100644 --- a/sahara/tests/unit/service/edp/test_job_manager.py +++ b/sahara/tests/unit/service/edp/test_job_manager.py @@ -76,10 +76,12 @@ class TestJobManager(base.SaharaWithDbTestCase): input_data = u.create_data_source('swift://ex/i') output_data = u.create_data_source('swift://ex/o') + data_source_urls = {input_data.id: input_data.url, + output_data.id: output_data.url} res = workflow_factory.get_workflow_xml( job, u.create_cluster(), job_exec.job_configs, - input_data, output_data, 'hadoop') + input_data, output_data, 'hadoop', data_source_urls) self.assertIn(""" INPUT=swift://ex.sahara/i @@ -106,7 +108,7 @@ class TestJobManager(base.SaharaWithDbTestCase): res = workflow_factory.get_workflow_xml( job, u.create_cluster(), job_exec.job_configs, - input_data, output_data, 'hadoop') + input_data, output_data, 'hadoop', data_source_urls) self.assertIn(""" @@ -137,10 +139,12 @@ class TestJobManager(base.SaharaWithDbTestCase): input_data = u.create_data_source('swift://ex/i') output_data = u.create_data_source('hdfs://user/hadoop/out') + data_source_urls = {input_data.id: input_data.url, + output_data.id: output_data.url} res = workflow_factory.get_workflow_xml( job, u.create_cluster(), job_exec.job_configs, - input_data, output_data, 'hadoop') + input_data, output_data, 'hadoop', data_source_urls) self.assertIn(""" @@ -156,10 +160,12 @@ class TestJobManager(base.SaharaWithDbTestCase): input_data = u.create_data_source('hdfs://user/hadoop/in') output_data = u.create_data_source('swift://ex/o') + data_source_urls = {input_data.id: input_data.url, + output_data.id: output_data.url} res = workflow_factory.get_workflow_xml( job, u.create_cluster(), job_exec.job_configs, - input_data, output_data, 'hadoop') + input_data, output_data, 'hadoop', data_source_urls) self.assertIn(""" @@ -177,10 +183,12 @@ class TestJobManager(base.SaharaWithDbTestCase): edp.JOB_TYPE_PIG, configs={'configs': {'dummy': 'value'}}) input_data = u.create_data_source('hdfs://user/hadoop/in') output_data = u.create_data_source('hdfs://user/hadoop/out') + data_source_urls = {input_data.id: input_data.url, + output_data.id: output_data.url} res = workflow_factory.get_workflow_xml( job, u.create_cluster(), job_exec.job_configs, - input_data, output_data, 'hadoop') + input_data, output_data, 'hadoop', data_source_urls) self.assertIn(""" @@ -202,10 +210,12 @@ class TestJobManager(base.SaharaWithDbTestCase): input_data = u.create_data_source('swift://ex/i') output_data = u.create_data_source('swift://ex/o') + data_source_urls = {input_data.id: input_data.url, + output_data.id: output_data.url} res = workflow_factory.get_workflow_xml( job, u.create_cluster(), job_exec.job_configs, - input_data, output_data, 'hadoop') + input_data, output_data, 'hadoop', data_source_urls) if streaming: self.assertIn(""" @@ -247,7 +257,7 @@ class TestJobManager(base.SaharaWithDbTestCase): res = workflow_factory.get_workflow_xml( job, u.create_cluster(), job_exec.job_configs, - input_data, output_data, 'hadoop') + input_data, output_data, 'hadoop', data_source_urls) self.assertIn(""" @@ -353,10 +363,12 @@ class TestJobManager(base.SaharaWithDbTestCase): input_data = u.create_data_source('swift://ex/i') output_data = u.create_data_source('swift://ex/o') + data_source_urls = {input_data.id: input_data.url, + output_data.id: output_data.url} res = workflow_factory.get_workflow_xml( job, u.create_cluster(), job_exec.job_configs, - input_data, output_data, 'hadoop') + input_data, output_data, 'hadoop', data_source_urls) doc = xml.parseString(res) hive = doc.getElementsByTagName('hive')[0] @@ -384,7 +396,7 @@ class TestJobManager(base.SaharaWithDbTestCase): res = workflow_factory.get_workflow_xml( job, u.create_cluster(), job_exec.job_configs, - input_data, output_data, 'hadoop') + input_data, output_data, 'hadoop', data_source_urls) doc = xml.parseString(res) hive = doc.getElementsByTagName('hive')[0] @@ -489,7 +501,7 @@ class TestJobManager(base.SaharaWithDbTestCase): @mock.patch('sahara.conductor.API.data_source_get') def test_get_data_sources(self, ds): def _conductor_data_source_get(ctx, id): - return "obj_" + id + return mock.Mock(id=id, url="obj_" + id) job, job_exec = u.create_job_exec(edp.JOB_TYPE_PIG) @@ -498,10 +510,10 @@ class TestJobManager(base.SaharaWithDbTestCase): ds.side_effect = _conductor_data_source_get input_source, output_source = ( - job_utils.get_data_sources(job_exec, job)) + job_utils.get_data_sources(job_exec, job, {})) - self.assertEqual('obj_s1', input_source) - self.assertEqual('obj_s2', output_source) + self.assertEqual('obj_s1', input_source.url) + self.assertEqual('obj_s2', output_source.url) def test_get_data_sources_java(self): configs = {sw.HADOOP_SWIFT_USERNAME: 'admin', @@ -516,7 +528,7 @@ class TestJobManager(base.SaharaWithDbTestCase): job, job_exec = u.create_job_exec(edp.JOB_TYPE_JAVA, configs) input_source, output_source = ( - job_utils.get_data_sources(job_exec, job)) + job_utils.get_data_sources(job_exec, job, {})) self.assertIsNone(input_source) self.assertIsNone(output_source) diff --git a/sahara/tests/unit/service/edp/test_job_utils.py b/sahara/tests/unit/service/edp/test_job_utils.py index d81b0788..00c17338 100644 --- a/sahara/tests/unit/service/edp/test_job_utils.py +++ b/sahara/tests/unit/service/edp/test_job_utils.py @@ -161,14 +161,16 @@ class JobUtilsTestCase(testtools.TestCase): ctx.return_value = 'dummy' name_ref = job_utils.DATA_SOURCE_PREFIX+'input' + job_exec_id = six.text_type(uuid.uuid4()) input = u.create_data_source("swift://container/input", name="input", id=six.text_type(uuid.uuid4())) - output = u.create_data_source("swift://container/output", + output = u.create_data_source("swift://container/output.%JOB_EXEC_ID%", name="output", id=six.text_type(uuid.uuid4())) + output_url = "swift://container/output." + job_exec_id by_name = {'input': input, 'output': output} @@ -199,9 +201,10 @@ class JobUtilsTestCase(testtools.TestCase): job_utils.DATA_SOURCE_SUBST_UUID: True}, 'args': [name_ref, output.id, input.id]} - ds, nc = job_utils.resolve_data_source_references(job_configs) + ds, nc = job_utils.resolve_data_source_references(job_configs, + job_exec_id, {}) self.assertEqual(2, len(ds)) - self.assertEqual([input.url, output.url, input.url], nc['args']) + self.assertEqual([input.url, output_url, input.url], nc['args']) # Swift configs should be filled in since they were blank self.assertEqual(input.credentials['user'], nc['configs']['fs.swift.service.sahara.username']) @@ -212,9 +215,10 @@ class JobUtilsTestCase(testtools.TestCase): 'fs.swift.service.sahara.password': 'gamgee', job_utils.DATA_SOURCE_SUBST_NAME: False, job_utils.DATA_SOURCE_SUBST_UUID: True} - ds, nc = job_utils.resolve_data_source_references(job_configs) + ds, nc = job_utils.resolve_data_source_references(job_configs, + job_exec_id, {}) self.assertEqual(2, len(ds)) - self.assertEqual([name_ref, output.url, input.url], nc['args']) + self.assertEqual([name_ref, output_url, input.url], nc['args']) # Swift configs should not be overwritten self.assertEqual(job_configs['configs'], nc['configs']) @@ -223,7 +227,8 @@ class JobUtilsTestCase(testtools.TestCase): job_configs['proxy_configs'] = {'proxy_username': 'john', 'proxy_password': 'smith', 'proxy_trust_id': 'trustme'} - ds, nc = job_utils.resolve_data_source_references(job_configs) + ds, nc = job_utils.resolve_data_source_references(job_configs, + job_exec_id, {}) self.assertEqual(1, len(ds)) self.assertEqual([input.url, output.id, input.id], nc['args']) @@ -234,7 +239,8 @@ class JobUtilsTestCase(testtools.TestCase): # Substitution not enabled job_configs['configs'] = {job_utils.DATA_SOURCE_SUBST_NAME: False, job_utils.DATA_SOURCE_SUBST_UUID: False} - ds, nc = job_utils.resolve_data_source_references(job_configs) + ds, nc = job_utils.resolve_data_source_references(job_configs, + job_exec_id, {}) self.assertEqual(0, len(ds)) self.assertEqual(job_configs['args'], nc['args']) self.assertEqual(job_configs['configs'], nc['configs']) @@ -243,7 +249,34 @@ class JobUtilsTestCase(testtools.TestCase): job_configs['configs'] = {job_utils.DATA_SOURCE_SUBST_NAME: True, job_utils.DATA_SOURCE_SUBST_UUID: True} job_configs['args'] = ['val1', 'val2', 'val3'] - ds, nc = job_utils.resolve_data_source_references(job_configs) + ds, nc = job_utils.resolve_data_source_references(job_configs, + job_exec_id, {}) self.assertEqual(0, len(ds)) - self.assertEqual(job_configs['args'], nc['args']) - self.assertEqual(job_configs['configs'], nc['configs']) + self.assertEqual(nc['args'], job_configs['args']) + self.assertEqual(nc['configs'], job_configs['configs']) + + def test_construct_data_source_url_no_placeholders(self): + base_url = "swift://container/input" + job_exec_id = six.text_type(uuid.uuid4()) + + url = job_utils._construct_data_source_url(base_url, job_exec_id) + + self.assertEqual(base_url, url) + + def test_construct_data_source_url_job_exec_id_placeholder(self): + base_url = "swift://container/input.%JOB_EXEC_ID%.out" + job_exec_id = six.text_type(uuid.uuid4()) + + url = job_utils._construct_data_source_url(base_url, job_exec_id) + + self.assertEqual( + "swift://container/input." + job_exec_id + ".out", url) + + def test_construct_data_source_url_randstr_placeholder(self): + base_url = "swift://container/input.%RANDSTR(4)%.%RANDSTR(7)%.out" + job_exec_id = six.text_type(uuid.uuid4()) + + url = job_utils._construct_data_source_url(base_url, job_exec_id) + + self.assertRegex( + url, "swift://container/input\.[a-z]{4}\.[a-z]{7}\.out")