Merge "Implemented support of placeholders in datasource URLs"
This commit is contained in:
commit
b79e9606f8
@ -241,6 +241,9 @@ class JobExecution(object):
|
||||
info
|
||||
oozie_job_id
|
||||
return_code
|
||||
job_configs
|
||||
extra
|
||||
data_source_urls
|
||||
"""
|
||||
|
||||
|
||||
|
@ -0,0 +1,36 @@
|
||||
# Copyright 2014 OpenStack Foundation.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
"""Add data_source_urls to job_executions to support placeholders
|
||||
|
||||
Revision ID: 021
|
||||
Revises: 020
|
||||
Create Date: 2015-02-24 12:47:17.871520
|
||||
|
||||
"""
|
||||
|
||||
# revision identifiers, used by Alembic.
|
||||
revision = '021'
|
||||
down_revision = '020'
|
||||
|
||||
from alembic import op
|
||||
import sqlalchemy as sa
|
||||
|
||||
from sahara.db.sqlalchemy import types as st
|
||||
|
||||
|
||||
def upgrade():
|
||||
op.add_column('job_executions',
|
||||
sa.Column('data_source_urls', st.JsonEncoded()))
|
@ -299,6 +299,7 @@ class JobExecution(mb.SaharaBase):
|
||||
return_code = sa.Column(sa.String(80))
|
||||
job_configs = sa.Column(st.JsonDictType())
|
||||
extra = sa.Column(st.JsonDictType())
|
||||
data_source_urls = sa.Column(st.JsonDictType())
|
||||
|
||||
mains_association = sa.Table("mains_association",
|
||||
mb.SaharaBase.metadata,
|
||||
|
@ -87,8 +87,8 @@ def _get_cluster_hosts_information(host, cluster):
|
||||
return None
|
||||
|
||||
|
||||
def configure_cluster_for_hdfs(cluster, data_source):
|
||||
host = urlparse.urlparse(data_source.url).hostname
|
||||
def configure_cluster_for_hdfs(cluster, data_source_url):
|
||||
host = urlparse.urlparse(data_source_url).hostname
|
||||
|
||||
etc_hosts_information = _get_cluster_hosts_information(host, cluster)
|
||||
if etc_hosts_information is None:
|
||||
|
@ -13,6 +13,9 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import random
|
||||
import re
|
||||
import string
|
||||
import uuid
|
||||
|
||||
from oslo_config import cfg
|
||||
@ -65,13 +68,22 @@ def create_workflow_dir(where, path, job, use_uuid=None, chmod=""):
|
||||
return constructed_dir
|
||||
|
||||
|
||||
def get_data_sources(job_execution, job):
|
||||
def get_data_sources(job_execution, job, data_source_urls):
|
||||
if edp.compare_job_type(job.type, edp.JOB_TYPE_JAVA, edp.JOB_TYPE_SPARK):
|
||||
return None, None
|
||||
|
||||
ctx = context.ctx()
|
||||
|
||||
input_source = conductor.data_source_get(ctx, job_execution.input_id)
|
||||
if input_source and input_source.id not in data_source_urls:
|
||||
data_source_urls[input_source.id] = _construct_data_source_url(
|
||||
input_source.url, job_execution.id)
|
||||
|
||||
output_source = conductor.data_source_get(ctx, job_execution.output_id)
|
||||
if output_source and output_source.id not in data_source_urls:
|
||||
data_source_urls[output_source.id] = _construct_data_source_url(
|
||||
output_source.url, job_execution.id)
|
||||
|
||||
return input_source, output_source
|
||||
|
||||
|
||||
@ -169,7 +181,7 @@ def _add_credentials_for_data_sources(ds_list, configs):
|
||||
configs[sw.HADOOP_SWIFT_PASSWORD] = password
|
||||
|
||||
|
||||
def resolve_data_source_references(job_configs):
|
||||
def resolve_data_source_references(job_configs, job_exec_id, data_source_urls):
|
||||
"""Resolve possible data_source references in job_configs.
|
||||
|
||||
Look for any string values in the 'args', 'configs', and 'params'
|
||||
@ -222,7 +234,11 @@ def resolve_data_source_references(job_configs):
|
||||
if len(ds) == 1:
|
||||
ds = ds[0]
|
||||
ds_seen[ds.id] = ds
|
||||
return ds.url
|
||||
if ds.id not in data_source_urls:
|
||||
data_source_urls[ds.id] = _construct_data_source_url(
|
||||
ds.url, job_exec_id)
|
||||
|
||||
return data_source_urls[ds.id]
|
||||
return value
|
||||
|
||||
# Loop over configs/params/args and look up each value as a data_source.
|
||||
@ -251,3 +267,26 @@ def resolve_data_source_references(job_configs):
|
||||
k: v for k, v in six.iteritems(job_configs.get('proxy_configs'))}
|
||||
|
||||
return ds_seen, new_configs
|
||||
|
||||
|
||||
def _construct_data_source_url(url, job_exec_id):
|
||||
"""Resolve placeholders in data_source URL.
|
||||
|
||||
Supported placeholders:
|
||||
|
||||
* %RANDSTR(len)% - will be replaced with random string of lowercase
|
||||
letters of length `len`.
|
||||
* %JOB_EXEC_ID% - will be replaced with the job execution ID.
|
||||
|
||||
"""
|
||||
|
||||
def _randstr(match):
|
||||
len = int(match.group(1))
|
||||
return ''.join(random.choice(string.ascii_lowercase)
|
||||
for _ in xrange(len))
|
||||
|
||||
url = url.replace("%JOB_EXEC_ID%", job_exec_id)
|
||||
|
||||
url = re.sub(r"%RANDSTR\((\d+)\)%", _randstr, url)
|
||||
|
||||
return url
|
||||
|
@ -102,9 +102,11 @@ class OozieJobEngine(base_engine.JobEngine):
|
||||
def run_job(self, job_execution):
|
||||
ctx = context.ctx()
|
||||
|
||||
data_source_urls = {}
|
||||
|
||||
job = conductor.job_get(ctx, job_execution.job_id)
|
||||
input_source, output_source = job_utils.get_data_sources(job_execution,
|
||||
job)
|
||||
input_source, output_source = job_utils.get_data_sources(
|
||||
job_execution, job, data_source_urls)
|
||||
|
||||
# Updated_job_configs will be a copy of job_execution.job_configs with
|
||||
# any name or uuid references to data_sources resolved to paths
|
||||
@ -113,9 +115,13 @@ class OozieJobEngine(base_engine.JobEngine):
|
||||
# just be a reference to job_execution.job_configs to avoid a copy.
|
||||
# Additional_sources will be a list of any data_sources found.
|
||||
additional_sources, updated_job_configs = (
|
||||
job_utils.resolve_data_source_references(job_execution.job_configs)
|
||||
job_utils.resolve_data_source_references(
|
||||
job_execution.job_configs, job_execution.id, data_source_urls)
|
||||
)
|
||||
|
||||
job_execution = conductor.job_execution_update(
|
||||
ctx, job_execution, {"data_source_urls": data_source_urls})
|
||||
|
||||
proxy_configs = updated_job_configs.get('proxy_configs')
|
||||
configs = updated_job_configs.get('configs', {})
|
||||
use_hbase_lib = configs.get('edp.hbase_common_lib', {})
|
||||
@ -130,7 +136,8 @@ class OozieJobEngine(base_engine.JobEngine):
|
||||
|
||||
for data_source in [input_source, output_source] + additional_sources:
|
||||
if data_source and data_source.type == 'hdfs':
|
||||
h.configure_cluster_for_hdfs(self.cluster, data_source)
|
||||
h.configure_cluster_for_hdfs(
|
||||
self.cluster, data_source_urls[data_source.id])
|
||||
break
|
||||
|
||||
hdfs_user = self.get_hdfs_user()
|
||||
@ -146,7 +153,7 @@ class OozieJobEngine(base_engine.JobEngine):
|
||||
wf_xml = workflow_factory.get_workflow_xml(
|
||||
job, self.cluster, updated_job_configs,
|
||||
input_source, output_source,
|
||||
hdfs_user)
|
||||
hdfs_user, data_source_urls)
|
||||
|
||||
path_to_workflow = self._upload_workflow_file(oozie_server, wf_dir,
|
||||
wf_xml, hdfs_user)
|
||||
|
@ -118,9 +118,9 @@ class BaseFactory(object):
|
||||
break
|
||||
return configs
|
||||
|
||||
def get_params(self, input_data, output_data):
|
||||
return {'INPUT': input_data.url,
|
||||
'OUTPUT': output_data.url}
|
||||
def get_params(self, input_data, output_data, data_source_urls):
|
||||
return {'INPUT': data_source_urls[input_data.id],
|
||||
'OUTPUT': data_source_urls[output_data.id]}
|
||||
|
||||
|
||||
class PigFactory(BaseFactory):
|
||||
@ -133,11 +133,12 @@ class PigFactory(BaseFactory):
|
||||
return conductor.job_main_name(context.ctx(), job)
|
||||
|
||||
def get_workflow_xml(self, cluster, job_configs, input_data, output_data,
|
||||
hdfs_user):
|
||||
hdfs_user, data_source_urls):
|
||||
proxy_configs = job_configs.get('proxy_configs')
|
||||
job_dict = {'configs': self.get_configs(input_data, output_data,
|
||||
proxy_configs),
|
||||
'params': self.get_params(input_data, output_data),
|
||||
'params': self.get_params(input_data, output_data,
|
||||
data_source_urls),
|
||||
'args': []}
|
||||
self.update_job_dict(job_dict, job_configs)
|
||||
creator = pig_workflow.PigWorkflowCreator()
|
||||
@ -158,11 +159,12 @@ class HiveFactory(BaseFactory):
|
||||
return conductor.job_main_name(context.ctx(), job)
|
||||
|
||||
def get_workflow_xml(self, cluster, job_configs, input_data, output_data,
|
||||
hdfs_user):
|
||||
hdfs_user, data_source_urls):
|
||||
proxy_configs = job_configs.get('proxy_configs')
|
||||
job_dict = {'configs': self.get_configs(input_data, output_data,
|
||||
proxy_configs),
|
||||
'params': self.get_params(input_data, output_data)}
|
||||
'params': self.get_params(input_data, output_data,
|
||||
data_source_urls)}
|
||||
self.update_job_dict(job_dict, job_configs)
|
||||
|
||||
creator = hive_workflow.HiveWorkflowCreator()
|
||||
@ -175,12 +177,13 @@ class HiveFactory(BaseFactory):
|
||||
|
||||
class MapReduceFactory(BaseFactory):
|
||||
|
||||
def get_configs(self, input_data, output_data, proxy_configs):
|
||||
def get_configs(self, input_data, output_data, proxy_configs,
|
||||
data_source_urls):
|
||||
configs = super(MapReduceFactory, self).get_configs(input_data,
|
||||
output_data,
|
||||
proxy_configs)
|
||||
configs['mapred.input.dir'] = input_data.url
|
||||
configs['mapred.output.dir'] = output_data.url
|
||||
configs['mapred.input.dir'] = data_source_urls[input_data.id]
|
||||
configs['mapred.output.dir'] = data_source_urls[output_data.id]
|
||||
return configs
|
||||
|
||||
def _get_streaming(self, job_dict):
|
||||
@ -189,10 +192,11 @@ class MapReduceFactory(BaseFactory):
|
||||
job_dict['edp_configs']) if k.startswith(prefix)}
|
||||
|
||||
def get_workflow_xml(self, cluster, job_configs, input_data, output_data,
|
||||
hdfs_user):
|
||||
hdfs_user, data_source_urls):
|
||||
proxy_configs = job_configs.get('proxy_configs')
|
||||
job_dict = {'configs': self.get_configs(input_data, output_data,
|
||||
proxy_configs)}
|
||||
proxy_configs,
|
||||
data_source_urls)}
|
||||
self.update_job_dict(job_dict, job_configs)
|
||||
creator = mapreduce_workflow.MapReduceWorkFlowCreator()
|
||||
creator.build_workflow_xml(configuration=job_dict['configs'],
|
||||
|
@ -184,10 +184,15 @@ class SparkJobEngine(base_engine.JobEngine):
|
||||
ctx = context.ctx()
|
||||
job = conductor.job_get(ctx, job_execution.job_id)
|
||||
|
||||
data_source_urls = {}
|
||||
additional_sources, updated_job_configs = (
|
||||
job_utils.resolve_data_source_references(job_execution.job_configs)
|
||||
job_utils.resolve_data_source_references(
|
||||
job_execution.job_configs, job_execution.id, data_source_urls)
|
||||
)
|
||||
|
||||
job_execution = conductor.job_execution_update(
|
||||
ctx, job_execution, {"data_source_urls": data_source_urls})
|
||||
|
||||
for data_source in additional_sources:
|
||||
if data_source and data_source.type == 'hdfs':
|
||||
h.configure_cluster_for_hdfs(self.cluster, data_source)
|
||||
|
@ -164,10 +164,16 @@ class StormJobEngine(base_engine.JobEngine):
|
||||
ctx = context.ctx()
|
||||
job = conductor.job_get(ctx, job_execution.job_id)
|
||||
|
||||
data_source_urls = {}
|
||||
|
||||
additional_sources, updated_job_configs = (
|
||||
job_utils.resolve_data_source_references(job_execution.job_configs)
|
||||
job_utils.resolve_data_source_references(
|
||||
job_execution.job_configs, job_execution.id, data_source_urls)
|
||||
)
|
||||
|
||||
job_execution = conductor.job_execution_update(
|
||||
ctx, job_execution, {"data_source_urls": data_source_urls})
|
||||
|
||||
# We'll always run the driver program on the master
|
||||
master = plugin_utils.get_instance(self.cluster, "nimbus")
|
||||
|
||||
|
@ -461,6 +461,9 @@ class SaharaMigrationsCheckers(object):
|
||||
self.assertColumnNotExists(engine, 'cluster_provision_steps',
|
||||
'started_at')
|
||||
|
||||
def _check_021(self, engine, data):
|
||||
self.assertColumnExists(engine, 'job_executions', 'data_source_urls')
|
||||
|
||||
|
||||
class TestMigrationsMySQL(SaharaMigrationsCheckers,
|
||||
base.BaseWalkMigrationTestCase,
|
||||
|
@ -13,8 +13,10 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import uuid
|
||||
|
||||
import mock
|
||||
import six
|
||||
|
||||
from sahara import conductor as cond
|
||||
from sahara.utils import edp
|
||||
@ -86,6 +88,7 @@ def create_data_source(url, name=None, id=None):
|
||||
|
||||
def _create_job_exec(job_id, type, configs=None):
|
||||
j_exec = mock.Mock()
|
||||
j_exec.id = six.text_type(uuid.uuid4())
|
||||
j_exec.job_id = job_id
|
||||
j_exec.job_configs = configs
|
||||
if edp.compare_job_type(type, edp.JOB_TYPE_JAVA):
|
||||
|
@ -363,6 +363,7 @@ class TestSpark(base.SaharaTestCase):
|
||||
("Spark", "Executor extra classpath",
|
||||
"cluster"): self.driver_cp}[key]
|
||||
|
||||
@mock.patch('sahara.conductor.API.job_execution_update')
|
||||
@mock.patch('sahara.conductor.API.job_execution_get')
|
||||
@mock.patch('sahara.utils.remote.get_remote')
|
||||
@mock.patch('sahara.plugins.spark.config_helper.get_config_value')
|
||||
@ -372,7 +373,8 @@ class TestSpark(base.SaharaTestCase):
|
||||
@mock.patch('sahara.context.ctx', return_value="ctx")
|
||||
def _setup_run_job(self, master_instance, job_configs, files,
|
||||
ctx, job_get, get_instance, create_workflow_dir,
|
||||
get_config_value, get_remote, job_exec_get):
|
||||
get_config_value, get_remote, job_exec_get,
|
||||
job_exec_update):
|
||||
|
||||
def _upload_job_files(where, job_dir, job,
|
||||
libs_subdir=True, job_configs=None):
|
||||
|
@ -269,6 +269,7 @@ class TestStorm(base.SaharaTestCase):
|
||||
autospec=True,
|
||||
return_value=(
|
||||
"MyJob_ed8347a9-39aa-477c-8108-066202eb6130"))
|
||||
@mock.patch('sahara.conductor.API.job_execution_update')
|
||||
@mock.patch('sahara.conductor.API.job_execution_get')
|
||||
@mock.patch('sahara.utils.remote.get_remote')
|
||||
@mock.patch('sahara.service.edp.job_utils.create_workflow_dir')
|
||||
@ -277,7 +278,8 @@ class TestStorm(base.SaharaTestCase):
|
||||
@mock.patch('sahara.context.ctx', return_value="ctx")
|
||||
def _setup_run_job(self, master_instance, job_configs, files,
|
||||
ctx, job_get, get_instance, create_workflow_dir,
|
||||
get_remote, job_exec_get, _generate_topology_name):
|
||||
get_remote, job_exec_get, job_exec_update,
|
||||
_generate_topology_name):
|
||||
|
||||
def _upload_job_files(where, job_dir, job,
|
||||
libs_subdir=True, job_configs=None):
|
||||
|
@ -76,10 +76,12 @@ class TestJobManager(base.SaharaWithDbTestCase):
|
||||
|
||||
input_data = u.create_data_source('swift://ex/i')
|
||||
output_data = u.create_data_source('swift://ex/o')
|
||||
data_source_urls = {input_data.id: input_data.url,
|
||||
output_data.id: output_data.url}
|
||||
|
||||
res = workflow_factory.get_workflow_xml(
|
||||
job, u.create_cluster(), job_exec.job_configs,
|
||||
input_data, output_data, 'hadoop')
|
||||
input_data, output_data, 'hadoop', data_source_urls)
|
||||
|
||||
self.assertIn("""
|
||||
<param>INPUT=swift://ex.sahara/i</param>
|
||||
@ -106,7 +108,7 @@ class TestJobManager(base.SaharaWithDbTestCase):
|
||||
|
||||
res = workflow_factory.get_workflow_xml(
|
||||
job, u.create_cluster(), job_exec.job_configs,
|
||||
input_data, output_data, 'hadoop')
|
||||
input_data, output_data, 'hadoop', data_source_urls)
|
||||
|
||||
self.assertIn("""
|
||||
<configuration>
|
||||
@ -137,10 +139,12 @@ class TestJobManager(base.SaharaWithDbTestCase):
|
||||
|
||||
input_data = u.create_data_source('swift://ex/i')
|
||||
output_data = u.create_data_source('hdfs://user/hadoop/out')
|
||||
data_source_urls = {input_data.id: input_data.url,
|
||||
output_data.id: output_data.url}
|
||||
|
||||
res = workflow_factory.get_workflow_xml(
|
||||
job, u.create_cluster(), job_exec.job_configs,
|
||||
input_data, output_data, 'hadoop')
|
||||
input_data, output_data, 'hadoop', data_source_urls)
|
||||
|
||||
self.assertIn("""
|
||||
<configuration>
|
||||
@ -156,10 +160,12 @@ class TestJobManager(base.SaharaWithDbTestCase):
|
||||
|
||||
input_data = u.create_data_source('hdfs://user/hadoop/in')
|
||||
output_data = u.create_data_source('swift://ex/o')
|
||||
data_source_urls = {input_data.id: input_data.url,
|
||||
output_data.id: output_data.url}
|
||||
|
||||
res = workflow_factory.get_workflow_xml(
|
||||
job, u.create_cluster(), job_exec.job_configs,
|
||||
input_data, output_data, 'hadoop')
|
||||
input_data, output_data, 'hadoop', data_source_urls)
|
||||
|
||||
self.assertIn("""
|
||||
<configuration>
|
||||
@ -177,10 +183,12 @@ class TestJobManager(base.SaharaWithDbTestCase):
|
||||
edp.JOB_TYPE_PIG, configs={'configs': {'dummy': 'value'}})
|
||||
input_data = u.create_data_source('hdfs://user/hadoop/in')
|
||||
output_data = u.create_data_source('hdfs://user/hadoop/out')
|
||||
data_source_urls = {input_data.id: input_data.url,
|
||||
output_data.id: output_data.url}
|
||||
|
||||
res = workflow_factory.get_workflow_xml(
|
||||
job, u.create_cluster(), job_exec.job_configs,
|
||||
input_data, output_data, 'hadoop')
|
||||
input_data, output_data, 'hadoop', data_source_urls)
|
||||
|
||||
self.assertIn("""
|
||||
<configuration>
|
||||
@ -202,10 +210,12 @@ class TestJobManager(base.SaharaWithDbTestCase):
|
||||
|
||||
input_data = u.create_data_source('swift://ex/i')
|
||||
output_data = u.create_data_source('swift://ex/o')
|
||||
data_source_urls = {input_data.id: input_data.url,
|
||||
output_data.id: output_data.url}
|
||||
|
||||
res = workflow_factory.get_workflow_xml(
|
||||
job, u.create_cluster(), job_exec.job_configs,
|
||||
input_data, output_data, 'hadoop')
|
||||
input_data, output_data, 'hadoop', data_source_urls)
|
||||
|
||||
if streaming:
|
||||
self.assertIn("""
|
||||
@ -247,7 +257,7 @@ class TestJobManager(base.SaharaWithDbTestCase):
|
||||
|
||||
res = workflow_factory.get_workflow_xml(
|
||||
job, u.create_cluster(), job_exec.job_configs,
|
||||
input_data, output_data, 'hadoop')
|
||||
input_data, output_data, 'hadoop', data_source_urls)
|
||||
|
||||
self.assertIn("""
|
||||
<property>
|
||||
@ -368,10 +378,12 @@ class TestJobManager(base.SaharaWithDbTestCase):
|
||||
|
||||
input_data = u.create_data_source('swift://ex/i')
|
||||
output_data = u.create_data_source('swift://ex/o')
|
||||
data_source_urls = {input_data.id: input_data.url,
|
||||
output_data.id: output_data.url}
|
||||
|
||||
res = workflow_factory.get_workflow_xml(
|
||||
job, u.create_cluster(), job_exec.job_configs,
|
||||
input_data, output_data, 'hadoop')
|
||||
input_data, output_data, 'hadoop', data_source_urls)
|
||||
|
||||
doc = xml.parseString(res)
|
||||
hive = doc.getElementsByTagName('hive')[0]
|
||||
@ -399,7 +411,7 @@ class TestJobManager(base.SaharaWithDbTestCase):
|
||||
|
||||
res = workflow_factory.get_workflow_xml(
|
||||
job, u.create_cluster(), job_exec.job_configs,
|
||||
input_data, output_data, 'hadoop')
|
||||
input_data, output_data, 'hadoop', data_source_urls)
|
||||
|
||||
doc = xml.parseString(res)
|
||||
hive = doc.getElementsByTagName('hive')[0]
|
||||
@ -520,7 +532,7 @@ class TestJobManager(base.SaharaWithDbTestCase):
|
||||
@mock.patch('sahara.conductor.API.data_source_get')
|
||||
def test_get_data_sources(self, ds):
|
||||
def _conductor_data_source_get(ctx, id):
|
||||
return "obj_" + id
|
||||
return mock.Mock(id=id, url="obj_" + id)
|
||||
|
||||
job, job_exec = u.create_job_exec(edp.JOB_TYPE_PIG)
|
||||
|
||||
@ -529,10 +541,10 @@ class TestJobManager(base.SaharaWithDbTestCase):
|
||||
|
||||
ds.side_effect = _conductor_data_source_get
|
||||
input_source, output_source = (
|
||||
job_utils.get_data_sources(job_exec, job))
|
||||
job_utils.get_data_sources(job_exec, job, {}))
|
||||
|
||||
self.assertEqual('obj_s1', input_source)
|
||||
self.assertEqual('obj_s2', output_source)
|
||||
self.assertEqual('obj_s1', input_source.url)
|
||||
self.assertEqual('obj_s2', output_source.url)
|
||||
|
||||
def test_get_data_sources_java(self):
|
||||
configs = {sw.HADOOP_SWIFT_USERNAME: 'admin',
|
||||
@ -547,7 +559,7 @@ class TestJobManager(base.SaharaWithDbTestCase):
|
||||
job, job_exec = u.create_job_exec(edp.JOB_TYPE_JAVA, configs)
|
||||
|
||||
input_source, output_source = (
|
||||
job_utils.get_data_sources(job_exec, job))
|
||||
job_utils.get_data_sources(job_exec, job, {}))
|
||||
|
||||
self.assertIsNone(input_source)
|
||||
self.assertIsNone(output_source)
|
||||
|
@ -161,14 +161,16 @@ class JobUtilsTestCase(testtools.TestCase):
|
||||
ctx.return_value = 'dummy'
|
||||
|
||||
name_ref = job_utils.DATA_SOURCE_PREFIX+'input'
|
||||
job_exec_id = six.text_type(uuid.uuid4())
|
||||
|
||||
input = u.create_data_source("swift://container/input",
|
||||
name="input",
|
||||
id=six.text_type(uuid.uuid4()))
|
||||
|
||||
output = u.create_data_source("swift://container/output",
|
||||
output = u.create_data_source("swift://container/output.%JOB_EXEC_ID%",
|
||||
name="output",
|
||||
id=six.text_type(uuid.uuid4()))
|
||||
output_url = "swift://container/output." + job_exec_id
|
||||
|
||||
by_name = {'input': input,
|
||||
'output': output}
|
||||
@ -199,9 +201,10 @@ class JobUtilsTestCase(testtools.TestCase):
|
||||
job_utils.DATA_SOURCE_SUBST_UUID: True},
|
||||
'args': [name_ref, output.id, input.id]}
|
||||
|
||||
ds, nc = job_utils.resolve_data_source_references(job_configs)
|
||||
ds, nc = job_utils.resolve_data_source_references(job_configs,
|
||||
job_exec_id, {})
|
||||
self.assertEqual(2, len(ds))
|
||||
self.assertEqual([input.url, output.url, input.url], nc['args'])
|
||||
self.assertEqual([input.url, output_url, input.url], nc['args'])
|
||||
# Swift configs should be filled in since they were blank
|
||||
self.assertEqual(input.credentials['user'],
|
||||
nc['configs']['fs.swift.service.sahara.username'])
|
||||
@ -212,9 +215,10 @@ class JobUtilsTestCase(testtools.TestCase):
|
||||
'fs.swift.service.sahara.password': 'gamgee',
|
||||
job_utils.DATA_SOURCE_SUBST_NAME: False,
|
||||
job_utils.DATA_SOURCE_SUBST_UUID: True}
|
||||
ds, nc = job_utils.resolve_data_source_references(job_configs)
|
||||
ds, nc = job_utils.resolve_data_source_references(job_configs,
|
||||
job_exec_id, {})
|
||||
self.assertEqual(2, len(ds))
|
||||
self.assertEqual([name_ref, output.url, input.url], nc['args'])
|
||||
self.assertEqual([name_ref, output_url, input.url], nc['args'])
|
||||
# Swift configs should not be overwritten
|
||||
self.assertEqual(job_configs['configs'], nc['configs'])
|
||||
|
||||
@ -223,7 +227,8 @@ class JobUtilsTestCase(testtools.TestCase):
|
||||
job_configs['proxy_configs'] = {'proxy_username': 'john',
|
||||
'proxy_password': 'smith',
|
||||
'proxy_trust_id': 'trustme'}
|
||||
ds, nc = job_utils.resolve_data_source_references(job_configs)
|
||||
ds, nc = job_utils.resolve_data_source_references(job_configs,
|
||||
job_exec_id, {})
|
||||
self.assertEqual(1, len(ds))
|
||||
self.assertEqual([input.url, output.id, input.id], nc['args'])
|
||||
|
||||
@ -234,7 +239,8 @@ class JobUtilsTestCase(testtools.TestCase):
|
||||
# Substitution not enabled
|
||||
job_configs['configs'] = {job_utils.DATA_SOURCE_SUBST_NAME: False,
|
||||
job_utils.DATA_SOURCE_SUBST_UUID: False}
|
||||
ds, nc = job_utils.resolve_data_source_references(job_configs)
|
||||
ds, nc = job_utils.resolve_data_source_references(job_configs,
|
||||
job_exec_id, {})
|
||||
self.assertEqual(0, len(ds))
|
||||
self.assertEqual(job_configs['args'], nc['args'])
|
||||
self.assertEqual(job_configs['configs'], nc['configs'])
|
||||
@ -243,7 +249,34 @@ class JobUtilsTestCase(testtools.TestCase):
|
||||
job_configs['configs'] = {job_utils.DATA_SOURCE_SUBST_NAME: True,
|
||||
job_utils.DATA_SOURCE_SUBST_UUID: True}
|
||||
job_configs['args'] = ['val1', 'val2', 'val3']
|
||||
ds, nc = job_utils.resolve_data_source_references(job_configs)
|
||||
ds, nc = job_utils.resolve_data_source_references(job_configs,
|
||||
job_exec_id, {})
|
||||
self.assertEqual(0, len(ds))
|
||||
self.assertEqual(job_configs['args'], nc['args'])
|
||||
self.assertEqual(job_configs['configs'], nc['configs'])
|
||||
self.assertEqual(nc['args'], job_configs['args'])
|
||||
self.assertEqual(nc['configs'], job_configs['configs'])
|
||||
|
||||
def test_construct_data_source_url_no_placeholders(self):
|
||||
base_url = "swift://container/input"
|
||||
job_exec_id = six.text_type(uuid.uuid4())
|
||||
|
||||
url = job_utils._construct_data_source_url(base_url, job_exec_id)
|
||||
|
||||
self.assertEqual(base_url, url)
|
||||
|
||||
def test_construct_data_source_url_job_exec_id_placeholder(self):
|
||||
base_url = "swift://container/input.%JOB_EXEC_ID%.out"
|
||||
job_exec_id = six.text_type(uuid.uuid4())
|
||||
|
||||
url = job_utils._construct_data_source_url(base_url, job_exec_id)
|
||||
|
||||
self.assertEqual(
|
||||
"swift://container/input." + job_exec_id + ".out", url)
|
||||
|
||||
def test_construct_data_source_url_randstr_placeholder(self):
|
||||
base_url = "swift://container/input.%RANDSTR(4)%.%RANDSTR(7)%.out"
|
||||
job_exec_id = six.text_type(uuid.uuid4())
|
||||
|
||||
url = job_utils._construct_data_source_url(base_url, job_exec_id)
|
||||
|
||||
self.assertRegex(
|
||||
url, "swift://container/input\.[a-z]{4}\.[a-z]{7}\.out")
|
||||
|
Loading…
Reference in New Issue
Block a user