diff --git a/sahara/service/edp/oozie/workflow_creator/workflow_factory.py b/sahara/service/edp/oozie/workflow_creator/workflow_factory.py index fd2847811f..fc783dc0c3 100644 --- a/sahara/service/edp/oozie/workflow_creator/workflow_factory.py +++ b/sahara/service/edp/oozie/workflow_creator/workflow_factory.py @@ -13,6 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +from oslo.config import cfg import six import six.moves.urllib.parse as urlparse @@ -29,6 +30,7 @@ from sahara.utils import xmlutils conductor = c.API +CONF = cfg.CONF class BaseFactory(object): @@ -95,13 +97,25 @@ class BaseFactory(object): self.inject_swift_url_suffix(arg) for arg in job_dict['args']] for k, v in six.iteritems(job_dict.get('configs', {})): - job_dict['configs'][k] = self.inject_swift_url_suffix(v) + if k != 'proxy_configs': + job_dict['configs'][k] = self.inject_swift_url_suffix(v) for k, v in six.iteritems(job_dict.get('params', {})): job_dict['params'][k] = self.inject_swift_url_suffix(v) - def get_configs(self, input_data, output_data): + def get_configs(self, input_data, output_data, proxy_configs=None): configs = {} + + if proxy_configs: + configs[sw.HADOOP_SWIFT_USERNAME] = proxy_configs.get( + 'proxy_username') + configs[sw.HADOOP_SWIFT_PASSWORD] = proxy_configs.get( + 'proxy_password') + configs[sw.HADOOP_SWIFT_TRUST_ID] = proxy_configs.get( + 'proxy_trust_id') + configs[sw.HADOOP_SWIFT_DOMAIN_NAME] = CONF.proxy_user_domain_name + return configs + for src in (input_data, output_data): if src.type == "swift" and hasattr(src, "credentials"): if "user" in src.credentials: @@ -128,7 +142,9 @@ class PigFactory(BaseFactory): def get_workflow_xml(self, cluster, execution, input_data, output_data, hdfs_user): - job_dict = {'configs': self.get_configs(input_data, output_data), + proxy_configs = execution.job_configs.get('proxy_configs') + job_dict = {'configs': self.get_configs(input_data, output_data, + proxy_configs), 'params': self.get_params(input_data, output_data), 'args': []} self.update_job_dict(job_dict, execution.job_configs) @@ -151,7 +167,9 @@ class HiveFactory(BaseFactory): def get_workflow_xml(self, cluster, execution, input_data, output_data, hdfs_user): - job_dict = {'configs': self.get_configs(input_data, output_data), + proxy_configs = execution.job_configs.get('proxy_configs') + job_dict = {'configs': self.get_configs(input_data, output_data, + proxy_configs), 'params': self.get_params(input_data, output_data)} self.update_job_dict(job_dict, execution.job_configs) @@ -165,9 +183,10 @@ class HiveFactory(BaseFactory): class MapReduceFactory(BaseFactory): - def get_configs(self, input_data, output_data): + def get_configs(self, input_data, output_data, proxy_configs): configs = super(MapReduceFactory, self).get_configs(input_data, - output_data) + output_data, + proxy_configs) configs['mapred.input.dir'] = input_data.url configs['mapred.output.dir'] = output_data.url return configs @@ -179,7 +198,9 @@ class MapReduceFactory(BaseFactory): def get_workflow_xml(self, cluster, execution, input_data, output_data, hdfs_user): - job_dict = {'configs': self.get_configs(input_data, output_data)} + proxy_configs = execution.job_configs.get('proxy_configs') + job_dict = {'configs': self.get_configs(input_data, output_data, + proxy_configs)} self.update_job_dict(job_dict, execution.job_configs) creator = mapreduce_workflow.MapReduceWorkFlowCreator() creator.build_workflow_xml(configuration=job_dict['configs'], @@ -194,8 +215,24 @@ class JavaFactory(BaseFactory): java_opts = job_dict['edp_configs'].get('edp.java.java_opts', None) return main_class, java_opts + def get_configs(self, proxy_configs=None): + configs = {} + + if proxy_configs: + configs[sw.HADOOP_SWIFT_USERNAME] = proxy_configs.get( + 'proxy_username') + configs[sw.HADOOP_SWIFT_PASSWORD] = proxy_configs.get( + 'proxy_password') + configs[sw.HADOOP_SWIFT_TRUST_ID] = proxy_configs.get( + 'proxy_trust_id') + configs[sw.HADOOP_SWIFT_DOMAIN_NAME] = CONF.proxy_user_domain_name + return configs + + return configs + def get_workflow_xml(self, cluster, execution, *args, **kwargs): - job_dict = {'configs': {}, + proxy_configs = execution.job_configs.get('proxy_configs') + job_dict = {'configs': self.get_configs(proxy_configs=proxy_configs), 'args': []} self.update_job_dict(job_dict, execution.job_configs) diff --git a/sahara/service/validations/edp/data_source.py b/sahara/service/validations/edp/data_source.py index 65e0b6f21b..e2b84de5f1 100644 --- a/sahara/service/validations/edp/data_source.py +++ b/sahara/service/validations/edp/data_source.py @@ -13,12 +13,14 @@ # See the License for the specific language governing permissions and # limitations under the License. +from oslo.config import cfg import six.moves.urllib.parse as urlparse import sahara.exceptions as ex import sahara.service.validations.edp.base as b from sahara.swift import utils as su +CONF = cfg.CONF DATA_SOURCE_SCHEMA = { "type": "object", @@ -76,12 +78,14 @@ def _check_swift_data_source_create(data): "URL must be of the form swift://container%s/object" % su.SWIFT_URL_SUFFIX) - if "credentials" not in data: + if not CONF.use_domain_for_proxy_users and "credentials" not in data: raise ex.InvalidCredentials("No credentials provided for Swift") - if "user" not in data["credentials"]: + if not CONF.use_domain_for_proxy_users and ( + "user" not in data["credentials"]): raise ex.InvalidCredentials( "User is not provided in credentials for Swift") - if "password" not in data["credentials"]: + if not CONF.use_domain_for_proxy_users and ( + "password" not in data["credentials"]): raise ex.InvalidCredentials( "Password is not provided in credentials for Swift") diff --git a/sahara/swift/swift_helper.py b/sahara/swift/swift_helper.py index 92f74787d9..be9a5f7fcf 100644 --- a/sahara/swift/swift_helper.py +++ b/sahara/swift/swift_helper.py @@ -30,6 +30,8 @@ HADOOP_SWIFT_TENANT = 'fs.swift.service.sahara.tenant' HADOOP_SWIFT_USERNAME = 'fs.swift.service.sahara.username' HADOOP_SWIFT_PASSWORD = 'fs.swift.service.sahara.password' HADOOP_SWIFT_REGION = 'fs.swift.service.sahara.region' +HADOOP_SWIFT_TRUST_ID = 'fs.swift.service.sahara.trust.id' +HADOOP_SWIFT_DOMAIN_NAME = 'fs.swift.service.sahara.domain.name' def retrieve_tenant(): diff --git a/sahara/swift/utils.py b/sahara/swift/utils.py index cad709f8e0..8824fc16ab 100644 --- a/sahara/swift/utils.py +++ b/sahara/swift/utils.py @@ -34,7 +34,15 @@ def retrieve_auth_url(): """ info = urlparse.urlparse(context.current().auth_uri) - return "%s://%s:%s/%s/" % (info.scheme, info.hostname, info.port, 'v2.0') + if CONF.use_domain_for_proxy_users: + url = 'v3/auth' + else: + url = 'v2.0' + + return '{scheme}://{hostname}:{port}/{url}/'.format(scheme=info.scheme, + hostname=info.hostname, + port=info.port, + url=url) def retrieve_preauth_url(): diff --git a/sahara/tests/unit/service/edp/edp_test_utils.py b/sahara/tests/unit/service/edp/edp_test_utils.py index 573a02198e..241202f9c6 100644 --- a/sahara/tests/unit/service/edp/edp_test_utils.py +++ b/sahara/tests/unit/service/edp/edp_test_utils.py @@ -26,10 +26,11 @@ _java_main_class = "org.apache.hadoop.examples.WordCount" _java_opts = "-Dparam1=val1 -Dparam2=val2" -def create_job_exec(type, configs=None): +def create_job_exec(type, configs=None, proxy=False): b = create_job_binary('1', type) j = _create_job('2', b, type) - e = _create_job_exec(j.id, type, configs) + _cje_func = _create_job_exec_with_proxy if proxy else _create_job_exec + e = _cje_func(j.id, type, configs) return j, e @@ -87,3 +88,16 @@ def _create_job_exec(job_id, type, configs=None): j_exec.job_configs['configs']['edp.java.main_class'] = _java_main_class j_exec.job_configs['configs']['edp.java.java_opts'] = _java_opts return j_exec + + +def _create_job_exec_with_proxy(job_id, type, configs=None): + j_exec = _create_job_exec(job_id, type, configs) + j_exec.id = '00000000-1111-2222-3333-4444444444444444' + if not j_exec.job_configs: + j_exec.job_configs = {} + j_exec.job_configs['proxy_configs'] = { + 'proxy_username': 'job_' + j_exec.id, + 'proxy_password': '55555555-6666-7777-8888-999999999999', + 'proxy_trust_id': '0123456789abcdef0123456789abcdef' + } + return j_exec diff --git a/sahara/tests/unit/service/edp/test_job_manager.py b/sahara/tests/unit/service/edp/test_job_manager.py index c42594763b..942b67f5aa 100644 --- a/sahara/tests/unit/service/edp/test_job_manager.py +++ b/sahara/tests/unit/service/edp/test_job_manager.py @@ -112,7 +112,7 @@ class TestJobManager(base.SaharaWithDbTestCase): @mock.patch('sahara.conductor.API.job_binary_get') def test_build_workflow_for_job_pig(self, job_binary): - job, job_exec = u.create_job_exec(edp.JOB_TYPE_PIG) + job, job_exec = u.create_job_exec(edp.JOB_TYPE_PIG, configs={}) job_binary.return_value = {"name": "script.pig"} input_data = u.create_data_source('swift://ex/i') @@ -140,11 +140,40 @@ class TestJobManager(base.SaharaWithDbTestCase): self.assertIn("", res) + # testing workflow creation with a proxy domain + self.override_config('use_domain_for_proxy_users', True) + self.override_config("proxy_user_domain_name", 'sahara_proxy_domain') + job, job_exec = u.create_job_exec(edp.JOB_TYPE_PIG, proxy=True) + + res = workflow_factory.get_workflow_xml( + job, u.create_cluster(), job_exec, input_data, output_data, + 'hadoop') + + self.assertIn(""" + + + fs.swift.service.sahara.domain.name + sahara_proxy_domain + + + fs.swift.service.sahara.password + 55555555-6666-7777-8888-999999999999 + + + fs.swift.service.sahara.trust.id + 0123456789abcdef0123456789abcdef + + + fs.swift.service.sahara.username + job_00000000-1111-2222-3333-4444444444444444 + + """, res) + @mock.patch('sahara.conductor.API.job_binary_get') def test_build_workflow_swift_configs(self, job_binary): # Test that swift configs come from either input or output data sources - job, job_exec = u.create_job_exec(edp.JOB_TYPE_PIG) + job, job_exec = u.create_job_exec(edp.JOB_TYPE_PIG, configs={}) job_binary.return_value = {"name": "script.pig"} input_data = u.create_data_source('swift://ex/i') @@ -202,7 +231,7 @@ class TestJobManager(base.SaharaWithDbTestCase): """, res) - def _build_workflow_common(self, job_type, streaming=False): + def _build_workflow_common(self, job_type, streaming=False, proxy=False): if streaming: configs = {'edp.streaming.mapper': '/usr/bin/cat', 'edp.streaming.reducer': '/usr/bin/wc'} @@ -238,21 +267,53 @@ class TestJobManager(base.SaharaWithDbTestCase): swift://ex.sahara/i """, res) - self.assertIn(""" + if not proxy: + self.assertIn(""" fs.swift.service.sahara.password admin1 """, res) - self.assertIn(""" + self.assertIn(""" fs.swift.service.sahara.username admin """, res) + else: + # testing workflow creation with a proxy domain + self.override_config('use_domain_for_proxy_users', True) + self.override_config("proxy_user_domain_name", + 'sahara_proxy_domain') + job, job_exec = u.create_job_exec(job_type, proxy=True) + + res = workflow_factory.get_workflow_xml( + job, u.create_cluster(), job_exec, input_data, output_data, + 'hadoop') + + self.assertIn(""" + + fs.swift.service.sahara.domain.name + sahara_proxy_domain + + + fs.swift.service.sahara.password + 55555555-6666-7777-8888-999999999999 + + + fs.swift.service.sahara.trust.id + 0123456789abcdef0123456789abcdef + + + fs.swift.service.sahara.username + job_00000000-1111-2222-3333-4444444444444444 + """, res) def test_build_workflow_for_job_mapreduce(self): self._build_workflow_common(edp.JOB_TYPE_MAPREDUCE) self._build_workflow_common(edp.JOB_TYPE_MAPREDUCE, streaming=True) + self._build_workflow_common(edp.JOB_TYPE_MAPREDUCE, proxy=True) + self._build_workflow_common(edp.JOB_TYPE_MAPREDUCE, streaming=True, + proxy=True) def test_build_workflow_for_job_java(self): # If args include swift paths, user and password values @@ -287,10 +348,48 @@ class TestJobManager(base.SaharaWithDbTestCase): swift://ex.sahara/i output_path""" % (_java_main_class, _java_opts), res) + # testing workflow creation with a proxy domain + self.override_config('use_domain_for_proxy_users', True) + self.override_config("proxy_user_domain_name", 'sahara_proxy_domain') + configs = { + 'configs': {}, + 'args': ['swift://ex/i', + 'output_path'] + } + + job, job_exec = u.create_job_exec(edp.JOB_TYPE_JAVA, configs, + proxy=True) + res = workflow_factory.get_workflow_xml(job, u.create_cluster(), + job_exec) + + self.assertIn(""" + + + fs.swift.service.sahara.domain.name + sahara_proxy_domain + + + fs.swift.service.sahara.password + 55555555-6666-7777-8888-999999999999 + + + fs.swift.service.sahara.trust.id + 0123456789abcdef0123456789abcdef + + + fs.swift.service.sahara.username + job_00000000-1111-2222-3333-4444444444444444 + + + %s + %s + swift://ex.sahara/i + output_path""" % (_java_main_class, _java_opts), res) + @mock.patch('sahara.conductor.API.job_binary_get') def test_build_workflow_for_job_hive(self, job_binary): - job, job_exec = u.create_job_exec(edp.JOB_TYPE_HIVE) + job, job_exec = u.create_job_exec(edp.JOB_TYPE_HIVE, configs={}) job_binary.return_value = {"name": "script.q"} input_data = u.create_data_source('swift://ex/i') @@ -316,38 +415,39 @@ class TestJobManager(base.SaharaWithDbTestCase): INPUT=swift://ex.sahara/i OUTPUT=swift://ex.sahara/o""", res) - def _build_workflow_with_conf_common(self, job_type): + # testing workflow creation with a proxy domain + self.override_config('use_domain_for_proxy_users', True) + self.override_config("proxy_user_domain_name", 'sahara_proxy_domain') - input_data = u.create_data_source('swift://ex/i') - output_data = u.create_data_source('swift://ex/o') - - job, job_exec = u.create_job_exec(job_type, - configs={"configs": {'c': 'f'}}) + job, job_exec = u.create_job_exec(edp.JOB_TYPE_HIVE, proxy=True) res = workflow_factory.get_workflow_xml( job, u.create_cluster(), job_exec, input_data, output_data, 'hadoop') self.assertIn(""" + /user/hadoop/conf/hive-site.xml + - c - f - """, res) - - self.assertIn(""" + fs.swift.service.sahara.domain.name + sahara_proxy_domain + - mapred.input.dir - swift://ex.sahara/i - """, res) - - self.assertIn(""" + fs.swift.service.sahara.password + 55555555-6666-7777-8888-999999999999 + - mapred.output.dir - swift://ex.sahara/o - """, res) - - def test_build_workflow_for_job_mapreduce_with_conf(self): - self._build_workflow_with_conf_common(edp.JOB_TYPE_MAPREDUCE) + fs.swift.service.sahara.trust.id + 0123456789abcdef0123456789abcdef + + + fs.swift.service.sahara.username + job_00000000-1111-2222-3333-4444444444444444 + + + + INPUT=swift://ex.sahara/i + OUTPUT=swift://ex.sahara/o""", res) def test_update_job_dict(self): w = workflow_factory.BaseFactory() diff --git a/sahara/tests/unit/service/validation/edp/test_data_source.py b/sahara/tests/unit/service/validation/edp/test_data_source.py index 9791f4bb5a..a1d5d7af65 100644 --- a/sahara/tests/unit/service/validation/edp/test_data_source.py +++ b/sahara/tests/unit/service/validation/edp/test_data_source.py @@ -60,6 +60,9 @@ class TestDataSourceValidation(u.ValidationTestCase): } with testtools.ExpectedException(ex.InvalidCredentials): ds.check_data_source_create(data) + # proxy enabled should allow creation without credentials + self.override_config('use_domain_for_proxy_users', True) + ds.check_data_source_create(data) @mock.patch("sahara.service.validations." "edp.base.check_data_source_unique_name") @@ -79,6 +82,9 @@ class TestDataSourceValidation(u.ValidationTestCase): } with testtools.ExpectedException(ex.InvalidCredentials): ds.check_data_source_create(data) + # proxy enabled should allow creation without credentials + self.override_config('use_domain_for_proxy_users', True) + ds.check_data_source_create(data) @mock.patch("sahara.service.validations." "edp.base.check_data_source_unique_name") @@ -98,6 +104,9 @@ class TestDataSourceValidation(u.ValidationTestCase): } with testtools.ExpectedException(ex.InvalidCredentials): ds.check_data_source_create(data) + # proxy enabled should allow creation without credentials + self.override_config('use_domain_for_proxy_users', True) + ds.check_data_source_create(data) @mock.patch("sahara.service.validations." "edp.base.check_data_source_unique_name")