diff --git a/sahara/service/edp/workflow_creator/workflow_factory.py b/sahara/service/edp/workflow_creator/workflow_factory.py
index 25f5776b..d31daa29 100644
--- a/sahara/service/edp/workflow_creator/workflow_factory.py
+++ b/sahara/service/edp/workflow_creator/workflow_factory.py
@@ -14,6 +14,7 @@
# limitations under the License.
import six
+import six.moves.urllib.parse as urlparse
from sahara import conductor as c
from sahara import context
@@ -22,15 +23,14 @@ from sahara.service.edp.workflow_creator import hive_workflow
from sahara.service.edp.workflow_creator import java_workflow
from sahara.service.edp.workflow_creator import mapreduce_workflow
from sahara.service.edp.workflow_creator import pig_workflow
+from sahara.swift import swift_helper as sw
+from sahara.swift import utils as su
from sahara.utils import edp
from sahara.utils import xmlutils
conductor = c.API
-swift_username = 'fs.swift.service.sahara.username'
-swift_password = 'fs.swift.service.sahara.password'
-
class BaseFactory(object):
def _separate_edp_configs(self, job_dict):
@@ -69,6 +69,14 @@ class BaseFactory(object):
new_vals = src.get(key, {})
value.update(new_vals)
+ def inject_swift_url_suffix(self, url):
+ if url.startswith("swift://"):
+ u = urlparse.urlparse(url)
+ if not u.netloc.endswith(su.SWIFT_URL_SUFFIX):
+ return url.replace(u.netloc,
+ u.netloc+"%s" % su.SWIFT_URL_SUFFIX, 1)
+ return url
+
def update_job_dict(self, job_dict, exec_dict):
pruned_exec_dict, edp_configs = self._prune_edp_configs(exec_dict)
self._update_dict(job_dict, pruned_exec_dict)
@@ -79,14 +87,29 @@ class BaseFactory(object):
# Args are listed, not named. Simply replace them.
job_dict['args'] = pruned_exec_dict.get('args', [])
+ # Find all swift:// paths in args, configs, and params and
+ # add the .sahara suffix to the container if it is not there
+ # already
+ job_dict['args'] = [
+ # TODO(tmckay) args for Pig can actually be -param name=value
+ # and value could conceivably contain swift paths
+ self.inject_swift_url_suffix(arg) for arg in job_dict['args']]
+
+ for k, v in six.iteritems(job_dict.get('configs', {})):
+ job_dict['configs'][k] = self.inject_swift_url_suffix(v)
+
+ for k, v in six.iteritems(job_dict.get('params', {})):
+ job_dict['params'][k] = self.inject_swift_url_suffix(v)
+
def get_configs(self, input_data, output_data):
configs = {}
for src in (input_data, output_data):
if src.type == "swift" and hasattr(src, "credentials"):
if "user" in src.credentials:
- configs[swift_username] = src.credentials['user']
+ configs[sw.HADOOP_SWIFT_USERNAME] = src.credentials['user']
if "password" in src.credentials:
- configs[swift_password] = src.credentials['password']
+ configs[
+ sw.HADOOP_SWIFT_PASSWORD] = src.credentials['password']
break
return configs
@@ -175,6 +198,7 @@ class JavaFactory(BaseFactory):
job_dict = {'configs': {},
'args': []}
self.update_job_dict(job_dict, execution.job_configs)
+
main_class, java_opts = self._get_java_configs(job_dict)
creator = java_workflow.JavaWorkflowCreator()
creator.build_workflow_xml(main_class,
diff --git a/sahara/service/validations/edp/data_source.py b/sahara/service/validations/edp/data_source.py
index 413d544d..65e0b6f2 100644
--- a/sahara/service/validations/edp/data_source.py
+++ b/sahara/service/validations/edp/data_source.py
@@ -66,8 +66,12 @@ def _check_swift_data_source_create(data):
if url.scheme != "swift":
raise ex.InvalidException("URL scheme must be 'swift'")
- # We must have the suffix, and the path must be more than '/'
- if not url.netloc.endswith(su.SWIFT_URL_SUFFIX) or len(url.path) <= 1:
+ # The swift url suffix does not have to be included in the netloc.
+ # However, if the swift suffix indicator is part of the netloc then
+ # we require the right suffix.
+ # Additionally, the path must be more than '/'
+ if (su.SWIFT_URL_SUFFIX_START in url.netloc and not url.netloc.endswith(
+ su.SWIFT_URL_SUFFIX)) or len(url.path) <= 1:
raise ex.InvalidException(
"URL must be of the form swift://container%s/object"
% su.SWIFT_URL_SUFFIX)
diff --git a/sahara/swift/utils.py b/sahara/swift/utils.py
index 581c0017..2a19ff87 100644
--- a/sahara/swift/utils.py
+++ b/sahara/swift/utils.py
@@ -26,7 +26,8 @@ CONF = cfg.CONF
SWIFT_INTERNAL_PREFIX = "swift://"
# TODO(mattf): remove support for OLD_SWIFT_INTERNAL_PREFIX
OLD_SWIFT_INTERNAL_PREFIX = "swift-internal://"
-SWIFT_URL_SUFFIX = '.sahara'
+SWIFT_URL_SUFFIX_START = '.'
+SWIFT_URL_SUFFIX = SWIFT_URL_SUFFIX_START + 'sahara'
def _get_service_address(service_type):
diff --git a/sahara/tests/integration/tests/edp.py b/sahara/tests/integration/tests/edp.py
index 5a7a58da..bfda91b8 100644
--- a/sahara/tests/integration/tests/edp.py
+++ b/sahara/tests/integration/tests/edp.py
@@ -19,6 +19,7 @@ import time
import uuid
from sahara.openstack.common import excutils
+from sahara.swift import swift_helper as sw
from sahara.tests.integration.tests import base
from sahara.utils import edp
@@ -107,16 +108,16 @@ class EDPTest(base.ITestCase):
self.sahara.data_sources.delete(output_id)
def _add_swift_configs(self, configs):
- swift_user = "fs.swift.service.sahara.username"
- swift_passw = "fs.swift.service.sahara.password"
if "configs" not in configs:
configs["configs"] = {}
- if swift_user not in configs["configs"]:
- configs["configs"][swift_user] = self.common_config.OS_USERNAME
- if swift_passw not in configs["configs"]:
- configs["configs"][swift_passw] = self.common_config.OS_PASSWORD
+ if sw.HADOOP_SWIFT_USERNAME not in configs["configs"]:
+ configs["configs"][
+ sw.HADOOP_SWIFT_USERNAME] = self.common_config.OS_USERNAME
+ if sw.HADOOP_SWIFT_PASSWORD not in configs["configs"]:
+ configs["configs"][
+ sw.HADOOP_SWIFT_PASSWORD] = self.common_config.OS_PASSWORD
@base.skip_test('SKIP_EDP_TEST',
'Test for EDP was skipped.')
diff --git a/sahara/tests/unit/service/edp/test_job_manager.py b/sahara/tests/unit/service/edp/test_job_manager.py
index 8c0d6a9c..90bba2bb 100644
--- a/sahara/tests/unit/service/edp/test_job_manager.py
+++ b/sahara/tests/unit/service/edp/test_job_manager.py
@@ -21,6 +21,7 @@ from sahara import conductor as cond
from sahara.plugins import base as pb
from sahara.service.edp import job_manager
from sahara.service.edp.workflow_creator import workflow_factory
+from sahara.swift import swift_helper as sw
from sahara.tests.unit import base
from sahara.utils import edp
from sahara.utils import patches as p
@@ -96,8 +97,8 @@ class TestJobManager(base.SaharaWithDbTestCase):
job, job_exec = _create_all_stack(edp.JOB_TYPE_PIG)
job_binary.return_value = {"name": "script.pig"}
- input_data = _create_data_source('swift://ex.sahara/i')
- output_data = _create_data_source('swift://ex.sahara/o')
+ input_data = _create_data_source('swift://ex/i')
+ output_data = _create_data_source('swift://ex/o')
creator = workflow_factory.get_creator(job)
@@ -129,7 +130,7 @@ class TestJobManager(base.SaharaWithDbTestCase):
job, job_exec = _create_all_stack(edp.JOB_TYPE_PIG)
job_binary.return_value = {"name": "script.pig"}
- input_data = _create_data_source('swift://ex.sahara/i')
+ input_data = _create_data_source('swift://ex/i')
output_data = _create_data_source('hdfs://user/hadoop/out')
creator = workflow_factory.get_creator(job)
@@ -149,7 +150,7 @@ class TestJobManager(base.SaharaWithDbTestCase):
""", res)
input_data = _create_data_source('hdfs://user/hadoop/in')
- output_data = _create_data_source('swift://ex.sahara/o')
+ output_data = _create_data_source('swift://ex/o')
creator = workflow_factory.get_creator(job)
@@ -196,8 +197,8 @@ class TestJobManager(base.SaharaWithDbTestCase):
job, job_exec = _create_all_stack(job_type, configs)
- input_data = _create_data_source('swift://ex.sahara/i')
- output_data = _create_data_source('swift://ex.sahara/o')
+ input_data = _create_data_source('swift://ex/i')
+ output_data = _create_data_source('swift://ex/o')
creator = workflow_factory.get_creator(job)
@@ -243,12 +244,12 @@ class TestJobManager(base.SaharaWithDbTestCase):
# If args include swift paths, user and password values
# will have to be supplied via configs instead of being
# lifted from input or output data sources
- configs = {workflow_factory.swift_username: 'admin',
- workflow_factory.swift_password: 'admin1'}
+ configs = {sw.HADOOP_SWIFT_USERNAME: 'admin',
+ sw.HADOOP_SWIFT_PASSWORD: 'admin1'}
configs = {
'configs': configs,
- 'args': ['input_path',
+ 'args': ['swift://ex/i',
'output_path']
}
@@ -269,7 +270,7 @@ class TestJobManager(base.SaharaWithDbTestCase):
%s
%s
- input_path
+ swift://ex.sahara/i
output_path""" % (_java_main_class, _java_opts), res)
@mock.patch('sahara.conductor.API.job_binary_get')
@@ -278,8 +279,8 @@ class TestJobManager(base.SaharaWithDbTestCase):
job, job_exec = _create_all_stack(edp.JOB_TYPE_HIVE)
job_binary.return_value = {"name": "script.q"}
- input_data = _create_data_source('swift://ex.sahara/i')
- output_data = _create_data_source('swift://ex.sahara/o')
+ input_data = _create_data_source('swift://ex/i')
+ output_data = _create_data_source('swift://ex/o')
creator = workflow_factory.get_creator(job)
@@ -305,8 +306,8 @@ class TestJobManager(base.SaharaWithDbTestCase):
def _build_workflow_with_conf_common(self, job_type):
job, _ = _create_all_stack(job_type)
- input_data = _create_data_source('swift://ex.sahara/i')
- output_data = _create_data_source('swift://ex.sahara/o')
+ input_data = _create_data_source('swift://ex/i')
+ output_data = _create_data_source('swift://ex/o')
job_exec = _create_job_exec(job.id,
job_type, configs={"configs": {'c': 'f'}})
@@ -369,6 +370,15 @@ class TestJobManager(base.SaharaWithDbTestCase):
self.assertEqual(orig_exec_job_dict, exec_job_dict)
+ def test_inject_swift_url_suffix(self):
+ w = workflow_factory.BaseFactory()
+ self.assertEqual(w.inject_swift_url_suffix("swift://ex/o"),
+ "swift://ex.sahara/o")
+ self.assertEqual(w.inject_swift_url_suffix("swift://ex.sahara/o"),
+ "swift://ex.sahara/o")
+ self.assertEqual(w.inject_swift_url_suffix("hdfs://my/path"),
+ "hdfs://my/path")
+
def _create_all_stack(type, configs=None):
b = _create_job_binary('1', type)
diff --git a/sahara/tests/unit/service/validation/edp/test_data_source.py b/sahara/tests/unit/service/validation/edp/test_data_source.py
index fae38846..6f190340 100644
--- a/sahara/tests/unit/service/validation/edp/test_data_source.py
+++ b/sahara/tests/unit/service/validation/edp/test_data_source.py
@@ -21,7 +21,8 @@ from sahara.service.validations.edp import data_source as ds
from sahara.swift import utils as su
from sahara.tests.unit.service.validation import utils as u
-SAMPLE_SWIFT_URL = "swift://1234%s/object" % su.SWIFT_URL_SUFFIX
+SAMPLE_SWIFT_URL = "swift://1234/object"
+SAMPLE_SWIFT_URL_WITH_SUFFIX = "swift://1234%s/object" % su.SWIFT_URL_SUFFIX
class TestDataSourceValidation(u.ValidationTestCase):
@@ -103,7 +104,7 @@ class TestDataSourceValidation(u.ValidationTestCase):
data = {
"name": "test_data_data_source",
- "url": "swif://1234%s/object" % su.SWIFT_URL_SUFFIX,
+ "url": "swif://1234/object",
"type": "swift",
"description": "incorrect url schema"
}
@@ -112,13 +113,31 @@ class TestDataSourceValidation(u.ValidationTestCase):
@mock.patch("sahara.service.validations."
"edp.base.check_data_source_unique_name")
- def test_swift_creation_missing_suffix(self,
- check_data_source_unique_name):
+ def test_swift_creation_explicit_suffix(self,
+ check_data_source_unique_name):
check_data_source_unique_name.return_value = True
data = {
"name": "test_data_data_source",
- "url": "swift://1234/object",
+ "url": SAMPLE_SWIFT_URL_WITH_SUFFIX,
+ "type": "swift",
+ "description": "incorrect url schema",
+ "credentials": {
+ "user": "user",
+ "password": "password"
+ }
+ }
+ self._assert_types(data)
+
+ @mock.patch("sahara.service.validations."
+ "edp.base.check_data_source_unique_name")
+ def test_swift_creation_wrong_suffix(self,
+ check_data_source_unique_name):
+ check_data_source_unique_name.return_value = True
+
+ data = {
+ "name": "test_data_data_source",
+ "url": "swift://1234.suffix/object",
"type": "swift",
"description": "incorrect url schema"
}
@@ -133,7 +152,7 @@ class TestDataSourceValidation(u.ValidationTestCase):
data = {
"name": "test_data_data_source",
- "url": "swift://1234%s/" % su.SWIFT_URL_SUFFIX,
+ "url": "swift://1234/",
"type": "swift",
"description": "incorrect url schema"
}