Merge "Add ".sahara" suffix automatically to swift URLs in workflows"
This commit is contained in:
commit
c1d6d02ab7
@ -14,6 +14,7 @@
|
||||
# limitations under the License.
|
||||
|
||||
import six
|
||||
import six.moves.urllib.parse as urlparse
|
||||
|
||||
from sahara import conductor as c
|
||||
from sahara import context
|
||||
@ -22,15 +23,14 @@ from sahara.service.edp.workflow_creator import hive_workflow
|
||||
from sahara.service.edp.workflow_creator import java_workflow
|
||||
from sahara.service.edp.workflow_creator import mapreduce_workflow
|
||||
from sahara.service.edp.workflow_creator import pig_workflow
|
||||
from sahara.swift import swift_helper as sw
|
||||
from sahara.swift import utils as su
|
||||
from sahara.utils import edp
|
||||
from sahara.utils import xmlutils
|
||||
|
||||
|
||||
conductor = c.API
|
||||
|
||||
swift_username = 'fs.swift.service.sahara.username'
|
||||
swift_password = 'fs.swift.service.sahara.password'
|
||||
|
||||
|
||||
class BaseFactory(object):
|
||||
def _separate_edp_configs(self, job_dict):
|
||||
@ -69,6 +69,14 @@ class BaseFactory(object):
|
||||
new_vals = src.get(key, {})
|
||||
value.update(new_vals)
|
||||
|
||||
def inject_swift_url_suffix(self, url):
|
||||
if url.startswith("swift://"):
|
||||
u = urlparse.urlparse(url)
|
||||
if not u.netloc.endswith(su.SWIFT_URL_SUFFIX):
|
||||
return url.replace(u.netloc,
|
||||
u.netloc+"%s" % su.SWIFT_URL_SUFFIX, 1)
|
||||
return url
|
||||
|
||||
def update_job_dict(self, job_dict, exec_dict):
|
||||
pruned_exec_dict, edp_configs = self._prune_edp_configs(exec_dict)
|
||||
self._update_dict(job_dict, pruned_exec_dict)
|
||||
@ -79,14 +87,29 @@ class BaseFactory(object):
|
||||
# Args are listed, not named. Simply replace them.
|
||||
job_dict['args'] = pruned_exec_dict.get('args', [])
|
||||
|
||||
# Find all swift:// paths in args, configs, and params and
|
||||
# add the .sahara suffix to the container if it is not there
|
||||
# already
|
||||
job_dict['args'] = [
|
||||
# TODO(tmckay) args for Pig can actually be -param name=value
|
||||
# and value could conceivably contain swift paths
|
||||
self.inject_swift_url_suffix(arg) for arg in job_dict['args']]
|
||||
|
||||
for k, v in six.iteritems(job_dict.get('configs', {})):
|
||||
job_dict['configs'][k] = self.inject_swift_url_suffix(v)
|
||||
|
||||
for k, v in six.iteritems(job_dict.get('params', {})):
|
||||
job_dict['params'][k] = self.inject_swift_url_suffix(v)
|
||||
|
||||
def get_configs(self, input_data, output_data):
|
||||
configs = {}
|
||||
for src in (input_data, output_data):
|
||||
if src.type == "swift" and hasattr(src, "credentials"):
|
||||
if "user" in src.credentials:
|
||||
configs[swift_username] = src.credentials['user']
|
||||
configs[sw.HADOOP_SWIFT_USERNAME] = src.credentials['user']
|
||||
if "password" in src.credentials:
|
||||
configs[swift_password] = src.credentials['password']
|
||||
configs[
|
||||
sw.HADOOP_SWIFT_PASSWORD] = src.credentials['password']
|
||||
break
|
||||
return configs
|
||||
|
||||
@ -175,6 +198,7 @@ class JavaFactory(BaseFactory):
|
||||
job_dict = {'configs': {},
|
||||
'args': []}
|
||||
self.update_job_dict(job_dict, execution.job_configs)
|
||||
|
||||
main_class, java_opts = self._get_java_configs(job_dict)
|
||||
creator = java_workflow.JavaWorkflowCreator()
|
||||
creator.build_workflow_xml(main_class,
|
||||
|
@ -66,8 +66,12 @@ def _check_swift_data_source_create(data):
|
||||
if url.scheme != "swift":
|
||||
raise ex.InvalidException("URL scheme must be 'swift'")
|
||||
|
||||
# We must have the suffix, and the path must be more than '/'
|
||||
if not url.netloc.endswith(su.SWIFT_URL_SUFFIX) or len(url.path) <= 1:
|
||||
# The swift url suffix does not have to be included in the netloc.
|
||||
# However, if the swift suffix indicator is part of the netloc then
|
||||
# we require the right suffix.
|
||||
# Additionally, the path must be more than '/'
|
||||
if (su.SWIFT_URL_SUFFIX_START in url.netloc and not url.netloc.endswith(
|
||||
su.SWIFT_URL_SUFFIX)) or len(url.path) <= 1:
|
||||
raise ex.InvalidException(
|
||||
"URL must be of the form swift://container%s/object"
|
||||
% su.SWIFT_URL_SUFFIX)
|
||||
|
@ -26,7 +26,8 @@ CONF = cfg.CONF
|
||||
SWIFT_INTERNAL_PREFIX = "swift://"
|
||||
# TODO(mattf): remove support for OLD_SWIFT_INTERNAL_PREFIX
|
||||
OLD_SWIFT_INTERNAL_PREFIX = "swift-internal://"
|
||||
SWIFT_URL_SUFFIX = '.sahara'
|
||||
SWIFT_URL_SUFFIX_START = '.'
|
||||
SWIFT_URL_SUFFIX = SWIFT_URL_SUFFIX_START + 'sahara'
|
||||
|
||||
|
||||
def _get_service_address(service_type):
|
||||
|
@ -19,6 +19,7 @@ import time
|
||||
import uuid
|
||||
|
||||
from sahara.openstack.common import excutils
|
||||
from sahara.swift import swift_helper as sw
|
||||
from sahara.tests.integration.tests import base
|
||||
from sahara.utils import edp
|
||||
|
||||
@ -107,16 +108,16 @@ class EDPTest(base.ITestCase):
|
||||
self.sahara.data_sources.delete(output_id)
|
||||
|
||||
def _add_swift_configs(self, configs):
|
||||
swift_user = "fs.swift.service.sahara.username"
|
||||
swift_passw = "fs.swift.service.sahara.password"
|
||||
|
||||
if "configs" not in configs:
|
||||
configs["configs"] = {}
|
||||
|
||||
if swift_user not in configs["configs"]:
|
||||
configs["configs"][swift_user] = self.common_config.OS_USERNAME
|
||||
if swift_passw not in configs["configs"]:
|
||||
configs["configs"][swift_passw] = self.common_config.OS_PASSWORD
|
||||
if sw.HADOOP_SWIFT_USERNAME not in configs["configs"]:
|
||||
configs["configs"][
|
||||
sw.HADOOP_SWIFT_USERNAME] = self.common_config.OS_USERNAME
|
||||
if sw.HADOOP_SWIFT_PASSWORD not in configs["configs"]:
|
||||
configs["configs"][
|
||||
sw.HADOOP_SWIFT_PASSWORD] = self.common_config.OS_PASSWORD
|
||||
|
||||
@base.skip_test('SKIP_EDP_TEST',
|
||||
'Test for EDP was skipped.')
|
||||
|
@ -21,6 +21,7 @@ from sahara import conductor as cond
|
||||
from sahara.plugins import base as pb
|
||||
from sahara.service.edp import job_manager
|
||||
from sahara.service.edp.workflow_creator import workflow_factory
|
||||
from sahara.swift import swift_helper as sw
|
||||
from sahara.tests.unit import base
|
||||
from sahara.utils import edp
|
||||
from sahara.utils import patches as p
|
||||
@ -96,8 +97,8 @@ class TestJobManager(base.SaharaWithDbTestCase):
|
||||
job, job_exec = _create_all_stack(edp.JOB_TYPE_PIG)
|
||||
job_binary.return_value = {"name": "script.pig"}
|
||||
|
||||
input_data = _create_data_source('swift://ex.sahara/i')
|
||||
output_data = _create_data_source('swift://ex.sahara/o')
|
||||
input_data = _create_data_source('swift://ex/i')
|
||||
output_data = _create_data_source('swift://ex/o')
|
||||
|
||||
creator = workflow_factory.get_creator(job)
|
||||
|
||||
@ -129,7 +130,7 @@ class TestJobManager(base.SaharaWithDbTestCase):
|
||||
job, job_exec = _create_all_stack(edp.JOB_TYPE_PIG)
|
||||
job_binary.return_value = {"name": "script.pig"}
|
||||
|
||||
input_data = _create_data_source('swift://ex.sahara/i')
|
||||
input_data = _create_data_source('swift://ex/i')
|
||||
output_data = _create_data_source('hdfs://user/hadoop/out')
|
||||
|
||||
creator = workflow_factory.get_creator(job)
|
||||
@ -149,7 +150,7 @@ class TestJobManager(base.SaharaWithDbTestCase):
|
||||
</configuration>""", res)
|
||||
|
||||
input_data = _create_data_source('hdfs://user/hadoop/in')
|
||||
output_data = _create_data_source('swift://ex.sahara/o')
|
||||
output_data = _create_data_source('swift://ex/o')
|
||||
|
||||
creator = workflow_factory.get_creator(job)
|
||||
|
||||
@ -196,8 +197,8 @@ class TestJobManager(base.SaharaWithDbTestCase):
|
||||
|
||||
job, job_exec = _create_all_stack(job_type, configs)
|
||||
|
||||
input_data = _create_data_source('swift://ex.sahara/i')
|
||||
output_data = _create_data_source('swift://ex.sahara/o')
|
||||
input_data = _create_data_source('swift://ex/i')
|
||||
output_data = _create_data_source('swift://ex/o')
|
||||
|
||||
creator = workflow_factory.get_creator(job)
|
||||
|
||||
@ -243,12 +244,12 @@ class TestJobManager(base.SaharaWithDbTestCase):
|
||||
# If args include swift paths, user and password values
|
||||
# will have to be supplied via configs instead of being
|
||||
# lifted from input or output data sources
|
||||
configs = {workflow_factory.swift_username: 'admin',
|
||||
workflow_factory.swift_password: 'admin1'}
|
||||
configs = {sw.HADOOP_SWIFT_USERNAME: 'admin',
|
||||
sw.HADOOP_SWIFT_PASSWORD: 'admin1'}
|
||||
|
||||
configs = {
|
||||
'configs': configs,
|
||||
'args': ['input_path',
|
||||
'args': ['swift://ex/i',
|
||||
'output_path']
|
||||
}
|
||||
|
||||
@ -269,7 +270,7 @@ class TestJobManager(base.SaharaWithDbTestCase):
|
||||
</configuration>
|
||||
<main-class>%s</main-class>
|
||||
<java-opts>%s</java-opts>
|
||||
<arg>input_path</arg>
|
||||
<arg>swift://ex.sahara/i</arg>
|
||||
<arg>output_path</arg>""" % (_java_main_class, _java_opts), res)
|
||||
|
||||
@mock.patch('sahara.conductor.API.job_binary_get')
|
||||
@ -278,8 +279,8 @@ class TestJobManager(base.SaharaWithDbTestCase):
|
||||
job, job_exec = _create_all_stack(edp.JOB_TYPE_HIVE)
|
||||
job_binary.return_value = {"name": "script.q"}
|
||||
|
||||
input_data = _create_data_source('swift://ex.sahara/i')
|
||||
output_data = _create_data_source('swift://ex.sahara/o')
|
||||
input_data = _create_data_source('swift://ex/i')
|
||||
output_data = _create_data_source('swift://ex/o')
|
||||
|
||||
creator = workflow_factory.get_creator(job)
|
||||
|
||||
@ -305,8 +306,8 @@ class TestJobManager(base.SaharaWithDbTestCase):
|
||||
def _build_workflow_with_conf_common(self, job_type):
|
||||
job, _ = _create_all_stack(job_type)
|
||||
|
||||
input_data = _create_data_source('swift://ex.sahara/i')
|
||||
output_data = _create_data_source('swift://ex.sahara/o')
|
||||
input_data = _create_data_source('swift://ex/i')
|
||||
output_data = _create_data_source('swift://ex/o')
|
||||
|
||||
job_exec = _create_job_exec(job.id,
|
||||
job_type, configs={"configs": {'c': 'f'}})
|
||||
@ -369,6 +370,15 @@ class TestJobManager(base.SaharaWithDbTestCase):
|
||||
|
||||
self.assertEqual(orig_exec_job_dict, exec_job_dict)
|
||||
|
||||
def test_inject_swift_url_suffix(self):
|
||||
w = workflow_factory.BaseFactory()
|
||||
self.assertEqual(w.inject_swift_url_suffix("swift://ex/o"),
|
||||
"swift://ex.sahara/o")
|
||||
self.assertEqual(w.inject_swift_url_suffix("swift://ex.sahara/o"),
|
||||
"swift://ex.sahara/o")
|
||||
self.assertEqual(w.inject_swift_url_suffix("hdfs://my/path"),
|
||||
"hdfs://my/path")
|
||||
|
||||
|
||||
def _create_all_stack(type, configs=None):
|
||||
b = _create_job_binary('1', type)
|
||||
|
@ -21,7 +21,8 @@ from sahara.service.validations.edp import data_source as ds
|
||||
from sahara.swift import utils as su
|
||||
from sahara.tests.unit.service.validation import utils as u
|
||||
|
||||
SAMPLE_SWIFT_URL = "swift://1234%s/object" % su.SWIFT_URL_SUFFIX
|
||||
SAMPLE_SWIFT_URL = "swift://1234/object"
|
||||
SAMPLE_SWIFT_URL_WITH_SUFFIX = "swift://1234%s/object" % su.SWIFT_URL_SUFFIX
|
||||
|
||||
|
||||
class TestDataSourceValidation(u.ValidationTestCase):
|
||||
@ -103,7 +104,7 @@ class TestDataSourceValidation(u.ValidationTestCase):
|
||||
|
||||
data = {
|
||||
"name": "test_data_data_source",
|
||||
"url": "swif://1234%s/object" % su.SWIFT_URL_SUFFIX,
|
||||
"url": "swif://1234/object",
|
||||
"type": "swift",
|
||||
"description": "incorrect url schema"
|
||||
}
|
||||
@ -112,13 +113,31 @@ class TestDataSourceValidation(u.ValidationTestCase):
|
||||
|
||||
@mock.patch("sahara.service.validations."
|
||||
"edp.base.check_data_source_unique_name")
|
||||
def test_swift_creation_missing_suffix(self,
|
||||
check_data_source_unique_name):
|
||||
def test_swift_creation_explicit_suffix(self,
|
||||
check_data_source_unique_name):
|
||||
check_data_source_unique_name.return_value = True
|
||||
|
||||
data = {
|
||||
"name": "test_data_data_source",
|
||||
"url": "swift://1234/object",
|
||||
"url": SAMPLE_SWIFT_URL_WITH_SUFFIX,
|
||||
"type": "swift",
|
||||
"description": "incorrect url schema",
|
||||
"credentials": {
|
||||
"user": "user",
|
||||
"password": "password"
|
||||
}
|
||||
}
|
||||
self._assert_types(data)
|
||||
|
||||
@mock.patch("sahara.service.validations."
|
||||
"edp.base.check_data_source_unique_name")
|
||||
def test_swift_creation_wrong_suffix(self,
|
||||
check_data_source_unique_name):
|
||||
check_data_source_unique_name.return_value = True
|
||||
|
||||
data = {
|
||||
"name": "test_data_data_source",
|
||||
"url": "swift://1234.suffix/object",
|
||||
"type": "swift",
|
||||
"description": "incorrect url schema"
|
||||
}
|
||||
@ -133,7 +152,7 @@ class TestDataSourceValidation(u.ValidationTestCase):
|
||||
|
||||
data = {
|
||||
"name": "test_data_data_source",
|
||||
"url": "swift://1234%s/" % su.SWIFT_URL_SUFFIX,
|
||||
"url": "swift://1234/",
|
||||
"type": "swift",
|
||||
"description": "incorrect url schema"
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user