Implemented support of placeholders in datasource URLs

Added ability to use placeholders in datasource URLs. Currently
supported placeholders:
* %RANDSTR(len)% - will be replaced with random string of
  lowercase letters of length `len`.
* %JOB_EXEC_ID% - will be replaced with the job execution ID.

Resulting URLs will be stored in a new field at job_execution
table. Using 'info' field doesn't look as good solution since it
is reserved for oozie status.

Next steps:
* write documentation
* update horizon

Implements blueprint: edp-datasource-placeholders

Change-Id: I1d9282b210047982c062b24bd03cf2331ab7599e
This commit is contained in:
Andrew Lazarev 2015-02-24 14:56:27 -08:00
parent 751a97d173
commit 7bae4261d0
15 changed files with 206 additions and 50 deletions

View File

@ -241,6 +241,9 @@ class JobExecution(object):
info
oozie_job_id
return_code
job_configs
extra
data_source_urls
"""

View File

@ -0,0 +1,36 @@
# Copyright 2014 OpenStack Foundation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Add data_source_urls to job_executions to support placeholders
Revision ID: 021
Revises: 020
Create Date: 2015-02-24 12:47:17.871520
"""
# revision identifiers, used by Alembic.
revision = '021'
down_revision = '020'
from alembic import op
import sqlalchemy as sa
from sahara.db.sqlalchemy import types as st
def upgrade():
op.add_column('job_executions',
sa.Column('data_source_urls', st.JsonEncoded()))

View File

@ -299,6 +299,7 @@ class JobExecution(mb.SaharaBase):
return_code = sa.Column(sa.String(80))
job_configs = sa.Column(st.JsonDictType())
extra = sa.Column(st.JsonDictType())
data_source_urls = sa.Column(st.JsonDictType())
mains_association = sa.Table("mains_association",
mb.SaharaBase.metadata,

View File

@ -87,8 +87,8 @@ def _get_cluster_hosts_information(host, cluster):
return None
def configure_cluster_for_hdfs(cluster, data_source):
host = urlparse.urlparse(data_source.url).hostname
def configure_cluster_for_hdfs(cluster, data_source_url):
host = urlparse.urlparse(data_source_url).hostname
etc_hosts_information = _get_cluster_hosts_information(host, cluster)
if etc_hosts_information is None:

View File

@ -13,6 +13,9 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import random
import re
import string
import uuid
from oslo_config import cfg
@ -65,13 +68,22 @@ def create_workflow_dir(where, path, job, use_uuid=None, chmod=""):
return constructed_dir
def get_data_sources(job_execution, job):
def get_data_sources(job_execution, job, data_source_urls):
if edp.compare_job_type(job.type, edp.JOB_TYPE_JAVA, edp.JOB_TYPE_SPARK):
return None, None
ctx = context.ctx()
input_source = conductor.data_source_get(ctx, job_execution.input_id)
if input_source and input_source.id not in data_source_urls:
data_source_urls[input_source.id] = _construct_data_source_url(
input_source.url, job_execution.id)
output_source = conductor.data_source_get(ctx, job_execution.output_id)
if output_source and output_source.id not in data_source_urls:
data_source_urls[output_source.id] = _construct_data_source_url(
output_source.url, job_execution.id)
return input_source, output_source
@ -169,7 +181,7 @@ def _add_credentials_for_data_sources(ds_list, configs):
configs[sw.HADOOP_SWIFT_PASSWORD] = password
def resolve_data_source_references(job_configs):
def resolve_data_source_references(job_configs, job_exec_id, data_source_urls):
"""Resolve possible data_source references in job_configs.
Look for any string values in the 'args', 'configs', and 'params'
@ -222,7 +234,11 @@ def resolve_data_source_references(job_configs):
if len(ds) == 1:
ds = ds[0]
ds_seen[ds.id] = ds
return ds.url
if ds.id not in data_source_urls:
data_source_urls[ds.id] = _construct_data_source_url(
ds.url, job_exec_id)
return data_source_urls[ds.id]
return value
# Loop over configs/params/args and look up each value as a data_source.
@ -251,3 +267,26 @@ def resolve_data_source_references(job_configs):
k: v for k, v in six.iteritems(job_configs.get('proxy_configs'))}
return ds_seen, new_configs
def _construct_data_source_url(url, job_exec_id):
"""Resolve placeholders in data_source URL.
Supported placeholders:
* %RANDSTR(len)% - will be replaced with random string of lowercase
letters of length `len`.
* %JOB_EXEC_ID% - will be replaced with the job execution ID.
"""
def _randstr(match):
len = int(match.group(1))
return ''.join(random.choice(string.ascii_lowercase)
for _ in xrange(len))
url = url.replace("%JOB_EXEC_ID%", job_exec_id)
url = re.sub(r"%RANDSTR\((\d+)\)%", _randstr, url)
return url

View File

@ -102,9 +102,11 @@ class OozieJobEngine(base_engine.JobEngine):
def run_job(self, job_execution):
ctx = context.ctx()
data_source_urls = {}
job = conductor.job_get(ctx, job_execution.job_id)
input_source, output_source = job_utils.get_data_sources(job_execution,
job)
input_source, output_source = job_utils.get_data_sources(
job_execution, job, data_source_urls)
# Updated_job_configs will be a copy of job_execution.job_configs with
# any name or uuid references to data_sources resolved to paths
@ -113,9 +115,13 @@ class OozieJobEngine(base_engine.JobEngine):
# just be a reference to job_execution.job_configs to avoid a copy.
# Additional_sources will be a list of any data_sources found.
additional_sources, updated_job_configs = (
job_utils.resolve_data_source_references(job_execution.job_configs)
job_utils.resolve_data_source_references(
job_execution.job_configs, job_execution.id, data_source_urls)
)
job_execution = conductor.job_execution_update(
ctx, job_execution, {"data_source_urls": data_source_urls})
proxy_configs = updated_job_configs.get('proxy_configs')
configs = updated_job_configs.get('configs', {})
use_hbase_lib = configs.get('edp.hbase_common_lib', {})
@ -130,7 +136,8 @@ class OozieJobEngine(base_engine.JobEngine):
for data_source in [input_source, output_source] + additional_sources:
if data_source and data_source.type == 'hdfs':
h.configure_cluster_for_hdfs(self.cluster, data_source)
h.configure_cluster_for_hdfs(
self.cluster, data_source_urls[data_source.id])
break
hdfs_user = self.get_hdfs_user()
@ -146,7 +153,7 @@ class OozieJobEngine(base_engine.JobEngine):
wf_xml = workflow_factory.get_workflow_xml(
job, self.cluster, updated_job_configs,
input_source, output_source,
hdfs_user)
hdfs_user, data_source_urls)
path_to_workflow = self._upload_workflow_file(oozie_server, wf_dir,
wf_xml, hdfs_user)

View File

@ -118,9 +118,9 @@ class BaseFactory(object):
break
return configs
def get_params(self, input_data, output_data):
return {'INPUT': input_data.url,
'OUTPUT': output_data.url}
def get_params(self, input_data, output_data, data_source_urls):
return {'INPUT': data_source_urls[input_data.id],
'OUTPUT': data_source_urls[output_data.id]}
class PigFactory(BaseFactory):
@ -133,11 +133,12 @@ class PigFactory(BaseFactory):
return conductor.job_main_name(context.ctx(), job)
def get_workflow_xml(self, cluster, job_configs, input_data, output_data,
hdfs_user):
hdfs_user, data_source_urls):
proxy_configs = job_configs.get('proxy_configs')
job_dict = {'configs': self.get_configs(input_data, output_data,
proxy_configs),
'params': self.get_params(input_data, output_data),
'params': self.get_params(input_data, output_data,
data_source_urls),
'args': []}
self.update_job_dict(job_dict, job_configs)
creator = pig_workflow.PigWorkflowCreator()
@ -158,11 +159,12 @@ class HiveFactory(BaseFactory):
return conductor.job_main_name(context.ctx(), job)
def get_workflow_xml(self, cluster, job_configs, input_data, output_data,
hdfs_user):
hdfs_user, data_source_urls):
proxy_configs = job_configs.get('proxy_configs')
job_dict = {'configs': self.get_configs(input_data, output_data,
proxy_configs),
'params': self.get_params(input_data, output_data)}
'params': self.get_params(input_data, output_data,
data_source_urls)}
self.update_job_dict(job_dict, job_configs)
creator = hive_workflow.HiveWorkflowCreator()
@ -175,12 +177,13 @@ class HiveFactory(BaseFactory):
class MapReduceFactory(BaseFactory):
def get_configs(self, input_data, output_data, proxy_configs):
def get_configs(self, input_data, output_data, proxy_configs,
data_source_urls):
configs = super(MapReduceFactory, self).get_configs(input_data,
output_data,
proxy_configs)
configs['mapred.input.dir'] = input_data.url
configs['mapred.output.dir'] = output_data.url
configs['mapred.input.dir'] = data_source_urls[input_data.id]
configs['mapred.output.dir'] = data_source_urls[output_data.id]
return configs
def _get_streaming(self, job_dict):
@ -189,10 +192,11 @@ class MapReduceFactory(BaseFactory):
job_dict['edp_configs']) if k.startswith(prefix)}
def get_workflow_xml(self, cluster, job_configs, input_data, output_data,
hdfs_user):
hdfs_user, data_source_urls):
proxy_configs = job_configs.get('proxy_configs')
job_dict = {'configs': self.get_configs(input_data, output_data,
proxy_configs)}
proxy_configs,
data_source_urls)}
self.update_job_dict(job_dict, job_configs)
creator = mapreduce_workflow.MapReduceWorkFlowCreator()
creator.build_workflow_xml(configuration=job_dict['configs'],

View File

@ -184,10 +184,15 @@ class SparkJobEngine(base_engine.JobEngine):
ctx = context.ctx()
job = conductor.job_get(ctx, job_execution.job_id)
data_source_urls = {}
additional_sources, updated_job_configs = (
job_utils.resolve_data_source_references(job_execution.job_configs)
job_utils.resolve_data_source_references(
job_execution.job_configs, job_execution.id, data_source_urls)
)
job_execution = conductor.job_execution_update(
ctx, job_execution, {"data_source_urls": data_source_urls})
for data_source in additional_sources:
if data_source and data_source.type == 'hdfs':
h.configure_cluster_for_hdfs(self.cluster, data_source)

View File

@ -164,10 +164,16 @@ class StormJobEngine(base_engine.JobEngine):
ctx = context.ctx()
job = conductor.job_get(ctx, job_execution.job_id)
data_source_urls = {}
additional_sources, updated_job_configs = (
job_utils.resolve_data_source_references(job_execution.job_configs)
job_utils.resolve_data_source_references(
job_execution.job_configs, job_execution.id, data_source_urls)
)
job_execution = conductor.job_execution_update(
ctx, job_execution, {"data_source_urls": data_source_urls})
# We'll always run the driver program on the master
master = plugin_utils.get_instance(self.cluster, "nimbus")

View File

@ -461,6 +461,9 @@ class SaharaMigrationsCheckers(object):
self.assertColumnNotExists(engine, 'cluster_provision_steps',
'started_at')
def _check_021(self, engine, data):
self.assertColumnExists(engine, 'job_executions', 'data_source_urls')
class TestMigrationsMySQL(SaharaMigrationsCheckers,
base.BaseWalkMigrationTestCase,

View File

@ -13,8 +13,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import uuid
import mock
import six
from sahara import conductor as cond
from sahara.utils import edp
@ -86,6 +88,7 @@ def create_data_source(url, name=None, id=None):
def _create_job_exec(job_id, type, configs=None):
j_exec = mock.Mock()
j_exec.id = six.text_type(uuid.uuid4())
j_exec.job_id = job_id
j_exec.job_configs = configs
if edp.compare_job_type(type, edp.JOB_TYPE_JAVA):

View File

@ -363,6 +363,7 @@ class TestSpark(base.SaharaTestCase):
("Spark", "Executor extra classpath",
"cluster"): self.driver_cp}[key]
@mock.patch('sahara.conductor.API.job_execution_update')
@mock.patch('sahara.conductor.API.job_execution_get')
@mock.patch('sahara.utils.remote.get_remote')
@mock.patch('sahara.plugins.spark.config_helper.get_config_value')
@ -372,7 +373,8 @@ class TestSpark(base.SaharaTestCase):
@mock.patch('sahara.context.ctx', return_value="ctx")
def _setup_run_job(self, master_instance, job_configs, files,
ctx, job_get, get_instance, create_workflow_dir,
get_config_value, get_remote, job_exec_get):
get_config_value, get_remote, job_exec_get,
job_exec_update):
def _upload_job_files(where, job_dir, job,
libs_subdir=True, job_configs=None):

View File

@ -269,6 +269,7 @@ class TestStorm(base.SaharaTestCase):
autospec=True,
return_value=(
"MyJob_ed8347a9-39aa-477c-8108-066202eb6130"))
@mock.patch('sahara.conductor.API.job_execution_update')
@mock.patch('sahara.conductor.API.job_execution_get')
@mock.patch('sahara.utils.remote.get_remote')
@mock.patch('sahara.service.edp.job_utils.create_workflow_dir')
@ -277,7 +278,8 @@ class TestStorm(base.SaharaTestCase):
@mock.patch('sahara.context.ctx', return_value="ctx")
def _setup_run_job(self, master_instance, job_configs, files,
ctx, job_get, get_instance, create_workflow_dir,
get_remote, job_exec_get, _generate_topology_name):
get_remote, job_exec_get, job_exec_update,
_generate_topology_name):
def _upload_job_files(where, job_dir, job,
libs_subdir=True, job_configs=None):

View File

@ -76,10 +76,12 @@ class TestJobManager(base.SaharaWithDbTestCase):
input_data = u.create_data_source('swift://ex/i')
output_data = u.create_data_source('swift://ex/o')
data_source_urls = {input_data.id: input_data.url,
output_data.id: output_data.url}
res = workflow_factory.get_workflow_xml(
job, u.create_cluster(), job_exec.job_configs,
input_data, output_data, 'hadoop')
input_data, output_data, 'hadoop', data_source_urls)
self.assertIn("""
<param>INPUT=swift://ex.sahara/i</param>
@ -106,7 +108,7 @@ class TestJobManager(base.SaharaWithDbTestCase):
res = workflow_factory.get_workflow_xml(
job, u.create_cluster(), job_exec.job_configs,
input_data, output_data, 'hadoop')
input_data, output_data, 'hadoop', data_source_urls)
self.assertIn("""
<configuration>
@ -137,10 +139,12 @@ class TestJobManager(base.SaharaWithDbTestCase):
input_data = u.create_data_source('swift://ex/i')
output_data = u.create_data_source('hdfs://user/hadoop/out')
data_source_urls = {input_data.id: input_data.url,
output_data.id: output_data.url}
res = workflow_factory.get_workflow_xml(
job, u.create_cluster(), job_exec.job_configs,
input_data, output_data, 'hadoop')
input_data, output_data, 'hadoop', data_source_urls)
self.assertIn("""
<configuration>
@ -156,10 +160,12 @@ class TestJobManager(base.SaharaWithDbTestCase):
input_data = u.create_data_source('hdfs://user/hadoop/in')
output_data = u.create_data_source('swift://ex/o')
data_source_urls = {input_data.id: input_data.url,
output_data.id: output_data.url}
res = workflow_factory.get_workflow_xml(
job, u.create_cluster(), job_exec.job_configs,
input_data, output_data, 'hadoop')
input_data, output_data, 'hadoop', data_source_urls)
self.assertIn("""
<configuration>
@ -177,10 +183,12 @@ class TestJobManager(base.SaharaWithDbTestCase):
edp.JOB_TYPE_PIG, configs={'configs': {'dummy': 'value'}})
input_data = u.create_data_source('hdfs://user/hadoop/in')
output_data = u.create_data_source('hdfs://user/hadoop/out')
data_source_urls = {input_data.id: input_data.url,
output_data.id: output_data.url}
res = workflow_factory.get_workflow_xml(
job, u.create_cluster(), job_exec.job_configs,
input_data, output_data, 'hadoop')
input_data, output_data, 'hadoop', data_source_urls)
self.assertIn("""
<configuration>
@ -202,10 +210,12 @@ class TestJobManager(base.SaharaWithDbTestCase):
input_data = u.create_data_source('swift://ex/i')
output_data = u.create_data_source('swift://ex/o')
data_source_urls = {input_data.id: input_data.url,
output_data.id: output_data.url}
res = workflow_factory.get_workflow_xml(
job, u.create_cluster(), job_exec.job_configs,
input_data, output_data, 'hadoop')
input_data, output_data, 'hadoop', data_source_urls)
if streaming:
self.assertIn("""
@ -247,7 +257,7 @@ class TestJobManager(base.SaharaWithDbTestCase):
res = workflow_factory.get_workflow_xml(
job, u.create_cluster(), job_exec.job_configs,
input_data, output_data, 'hadoop')
input_data, output_data, 'hadoop', data_source_urls)
self.assertIn("""
<property>
@ -353,10 +363,12 @@ class TestJobManager(base.SaharaWithDbTestCase):
input_data = u.create_data_source('swift://ex/i')
output_data = u.create_data_source('swift://ex/o')
data_source_urls = {input_data.id: input_data.url,
output_data.id: output_data.url}
res = workflow_factory.get_workflow_xml(
job, u.create_cluster(), job_exec.job_configs,
input_data, output_data, 'hadoop')
input_data, output_data, 'hadoop', data_source_urls)
doc = xml.parseString(res)
hive = doc.getElementsByTagName('hive')[0]
@ -384,7 +396,7 @@ class TestJobManager(base.SaharaWithDbTestCase):
res = workflow_factory.get_workflow_xml(
job, u.create_cluster(), job_exec.job_configs,
input_data, output_data, 'hadoop')
input_data, output_data, 'hadoop', data_source_urls)
doc = xml.parseString(res)
hive = doc.getElementsByTagName('hive')[0]
@ -489,7 +501,7 @@ class TestJobManager(base.SaharaWithDbTestCase):
@mock.patch('sahara.conductor.API.data_source_get')
def test_get_data_sources(self, ds):
def _conductor_data_source_get(ctx, id):
return "obj_" + id
return mock.Mock(id=id, url="obj_" + id)
job, job_exec = u.create_job_exec(edp.JOB_TYPE_PIG)
@ -498,10 +510,10 @@ class TestJobManager(base.SaharaWithDbTestCase):
ds.side_effect = _conductor_data_source_get
input_source, output_source = (
job_utils.get_data_sources(job_exec, job))
job_utils.get_data_sources(job_exec, job, {}))
self.assertEqual('obj_s1', input_source)
self.assertEqual('obj_s2', output_source)
self.assertEqual('obj_s1', input_source.url)
self.assertEqual('obj_s2', output_source.url)
def test_get_data_sources_java(self):
configs = {sw.HADOOP_SWIFT_USERNAME: 'admin',
@ -516,7 +528,7 @@ class TestJobManager(base.SaharaWithDbTestCase):
job, job_exec = u.create_job_exec(edp.JOB_TYPE_JAVA, configs)
input_source, output_source = (
job_utils.get_data_sources(job_exec, job))
job_utils.get_data_sources(job_exec, job, {}))
self.assertIsNone(input_source)
self.assertIsNone(output_source)

View File

@ -161,14 +161,16 @@ class JobUtilsTestCase(testtools.TestCase):
ctx.return_value = 'dummy'
name_ref = job_utils.DATA_SOURCE_PREFIX+'input'
job_exec_id = six.text_type(uuid.uuid4())
input = u.create_data_source("swift://container/input",
name="input",
id=six.text_type(uuid.uuid4()))
output = u.create_data_source("swift://container/output",
output = u.create_data_source("swift://container/output.%JOB_EXEC_ID%",
name="output",
id=six.text_type(uuid.uuid4()))
output_url = "swift://container/output." + job_exec_id
by_name = {'input': input,
'output': output}
@ -199,9 +201,10 @@ class JobUtilsTestCase(testtools.TestCase):
job_utils.DATA_SOURCE_SUBST_UUID: True},
'args': [name_ref, output.id, input.id]}
ds, nc = job_utils.resolve_data_source_references(job_configs)
ds, nc = job_utils.resolve_data_source_references(job_configs,
job_exec_id, {})
self.assertEqual(2, len(ds))
self.assertEqual([input.url, output.url, input.url], nc['args'])
self.assertEqual([input.url, output_url, input.url], nc['args'])
# Swift configs should be filled in since they were blank
self.assertEqual(input.credentials['user'],
nc['configs']['fs.swift.service.sahara.username'])
@ -212,9 +215,10 @@ class JobUtilsTestCase(testtools.TestCase):
'fs.swift.service.sahara.password': 'gamgee',
job_utils.DATA_SOURCE_SUBST_NAME: False,
job_utils.DATA_SOURCE_SUBST_UUID: True}
ds, nc = job_utils.resolve_data_source_references(job_configs)
ds, nc = job_utils.resolve_data_source_references(job_configs,
job_exec_id, {})
self.assertEqual(2, len(ds))
self.assertEqual([name_ref, output.url, input.url], nc['args'])
self.assertEqual([name_ref, output_url, input.url], nc['args'])
# Swift configs should not be overwritten
self.assertEqual(job_configs['configs'], nc['configs'])
@ -223,7 +227,8 @@ class JobUtilsTestCase(testtools.TestCase):
job_configs['proxy_configs'] = {'proxy_username': 'john',
'proxy_password': 'smith',
'proxy_trust_id': 'trustme'}
ds, nc = job_utils.resolve_data_source_references(job_configs)
ds, nc = job_utils.resolve_data_source_references(job_configs,
job_exec_id, {})
self.assertEqual(1, len(ds))
self.assertEqual([input.url, output.id, input.id], nc['args'])
@ -234,7 +239,8 @@ class JobUtilsTestCase(testtools.TestCase):
# Substitution not enabled
job_configs['configs'] = {job_utils.DATA_SOURCE_SUBST_NAME: False,
job_utils.DATA_SOURCE_SUBST_UUID: False}
ds, nc = job_utils.resolve_data_source_references(job_configs)
ds, nc = job_utils.resolve_data_source_references(job_configs,
job_exec_id, {})
self.assertEqual(0, len(ds))
self.assertEqual(job_configs['args'], nc['args'])
self.assertEqual(job_configs['configs'], nc['configs'])
@ -243,7 +249,34 @@ class JobUtilsTestCase(testtools.TestCase):
job_configs['configs'] = {job_utils.DATA_SOURCE_SUBST_NAME: True,
job_utils.DATA_SOURCE_SUBST_UUID: True}
job_configs['args'] = ['val1', 'val2', 'val3']
ds, nc = job_utils.resolve_data_source_references(job_configs)
ds, nc = job_utils.resolve_data_source_references(job_configs,
job_exec_id, {})
self.assertEqual(0, len(ds))
self.assertEqual(job_configs['args'], nc['args'])
self.assertEqual(job_configs['configs'], nc['configs'])
self.assertEqual(nc['args'], job_configs['args'])
self.assertEqual(nc['configs'], job_configs['configs'])
def test_construct_data_source_url_no_placeholders(self):
base_url = "swift://container/input"
job_exec_id = six.text_type(uuid.uuid4())
url = job_utils._construct_data_source_url(base_url, job_exec_id)
self.assertEqual(base_url, url)
def test_construct_data_source_url_job_exec_id_placeholder(self):
base_url = "swift://container/input.%JOB_EXEC_ID%.out"
job_exec_id = six.text_type(uuid.uuid4())
url = job_utils._construct_data_source_url(base_url, job_exec_id)
self.assertEqual(
"swift://container/input." + job_exec_id + ".out", url)
def test_construct_data_source_url_randstr_placeholder(self):
base_url = "swift://container/input.%RANDSTR(4)%.%RANDSTR(7)%.out"
job_exec_id = six.text_type(uuid.uuid4())
url = job_utils._construct_data_source_url(base_url, job_exec_id)
self.assertRegex(
url, "swift://container/input\.[a-z]{4}\.[a-z]{7}\.out")