diff --git a/sahara/plugins/spark/config_helper.py b/sahara/plugins/spark/config_helper.py index 6c783bb3..fc4402d5 100644 --- a/sahara/plugins/spark/config_helper.py +++ b/sahara/plugins/spark/config_helper.py @@ -437,7 +437,8 @@ def generate_hadoop_setup_script(storage_paths, env_configs): for path in storage_paths: script_lines.append("chown -R hadoop:hadoop %s" % path) - script_lines.append("chmod -R 755 %s" % path) + script_lines.append("chmod -f -R 755 %s ||" + "echo 'Permissions unchanged'" % path) return "\n".join(script_lines) diff --git a/sahara/service/edp/binary_retrievers/dispatch.py b/sahara/service/edp/binary_retrievers/dispatch.py index d8c147c7..cd95e849 100644 --- a/sahara/service/edp/binary_retrievers/dispatch.py +++ b/sahara/service/edp/binary_retrievers/dispatch.py @@ -15,11 +15,14 @@ from sahara import context from sahara.service.edp.binary_retrievers import internal_swift as i_swift +from sahara.service.edp.binary_retrievers import manila_share as manila from sahara.service.edp.binary_retrievers import sahara_db as db from sahara.swift import utils as su +from sahara.utils.openstack import manila as m -def get_raw_binary(job_binary, proxy_configs=None, with_context=False): +def get_raw_binary(job_binary, proxy_configs=None, + with_context=False, remote=None): '''Get the raw data for a job binary This will retrieve the raw data for a job binary from it's source. In the @@ -31,6 +34,7 @@ def get_raw_binary(job_binary, proxy_configs=None, with_context=False): :param job_binary: The job binary to retrieve :param proxy_configs: Proxy user configuration to use as credentials :param with_context: Use the current context as credentials + :param remote: The remote contains node group and cluster information :returns: The raw data from a job binary ''' @@ -44,4 +48,7 @@ def get_raw_binary(job_binary, proxy_configs=None, with_context=False): else: res = i_swift.get_raw_data(job_binary, proxy_configs) + if url.startswith(m.MANILA_PREFIX): + res = manila.get_file_info(job_binary, remote) + return res diff --git a/sahara/service/edp/binary_retrievers/manila_share.py b/sahara/service/edp/binary_retrievers/manila_share.py new file mode 100644 index 00000000..29238131 --- /dev/null +++ b/sahara/service/edp/binary_retrievers/manila_share.py @@ -0,0 +1,61 @@ +# Copyright (c) 2015 Red Hat Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import six + +from sahara import conductor as c +from sahara import context +from sahara.service import shares as shares_service + +conductor = c.API + + +def get_file_info(job_binary, remote): + shares = [] + if remote.instance.node_group.cluster.shares: + shares.extend(remote.instance.node_group.cluster.shares) + if remote.instance.node_group.shares: + shares.extend(remote.instance.node_group.shares) + # url example: 'manila://ManilaShare-uuid/path_to_file' + url = six.moves.urllib.parse.urlparse(job_binary.url) + share_id = url.netloc + if not any(s['id'] == share_id for s in shares): + # Automount this share to the cluster + cluster = remote.instance.node_group.cluster + if cluster.shares: + cluster_shares = [dict(s) for s in cluster.shares] + else: + cluster_shares = [] + needed_share = { + 'id': share_id, + 'path': '/mnt/{0}'.format(share_id), + 'access_level': 'rw' + } + + cluster_shares.append(needed_share) + cluster = conductor.cluster_update( + context.ctx(), cluster, {'shares': cluster_shares}) + shares_service.mount_shares(cluster) + shares = cluster.shares + + # using list() as a python2/3 workaround + share = list(filter(lambda s: s['id'] == share_id, shares))[0] + mount_point = share.get('path', "/mnt/%s" % share_id) + + res = { + 'type': 'path', + 'path': "{0}{1}".format(mount_point, url.path) + } + return res diff --git a/sahara/service/edp/oozie/engine.py b/sahara/service/edp/oozie/engine.py index 4a03d6be..0bc2087f 100644 --- a/sahara/service/edp/oozie/engine.py +++ b/sahara/service/edp/oozie/engine.py @@ -247,15 +247,27 @@ class OozieJobEngine(base_engine.JobEngine): with remote.get_remote(where) as r: for main in mains: - raw_data = dispatch.get_raw_binary(main, proxy_configs) - h.put_file_to_hdfs(r, raw_data, main.name, job_dir, hdfs_user) + raw_data = dispatch.get_raw_binary( + main, proxy_configs=proxy_configs, remote=r) + if isinstance(raw_data, dict) and raw_data["type"] == "path": + h.copy_from_local(r, raw_data['path'], + job_dir, hdfs_user) + else: + h.put_file_to_hdfs(r, raw_data, main.name, + job_dir, hdfs_user) uploaded_paths.append(job_dir + '/' + main.name) if len(libs) and job_dir_suffix: # HDFS 2.2.0 fails to put file if the lib dir does not exist self.create_hdfs_dir(r, lib_dir) for lib in libs: - raw_data = dispatch.get_raw_binary(lib, proxy_configs) - h.put_file_to_hdfs(r, raw_data, lib.name, lib_dir, hdfs_user) + raw_data = dispatch.get_raw_binary( + lib, proxy_configs=proxy_configs, remote=remote) + if isinstance(raw_data, dict) and raw_data["type"] == "path": + h.copy_from_local(r, raw_data['path'], + lib_dir, hdfs_user) + else: + h.put_file_to_hdfs(r, raw_data, lib.name, + lib_dir, hdfs_user) uploaded_paths.append(lib_dir + '/' + lib.name) for lib in builtin_libs: h.put_file_to_hdfs(r, lib['raw'], lib['name'], lib_dir, diff --git a/sahara/service/edp/spark/engine.py b/sahara/service/edp/spark/engine.py index 6a090e11..610431e5 100644 --- a/sahara/service/edp/spark/engine.py +++ b/sahara/service/edp/spark/engine.py @@ -137,8 +137,12 @@ class SparkJobEngine(base_engine.JobEngine): def upload(r, dir, job_file, proxy_configs): dst = os.path.join(dir, job_file.name) - raw_data = dispatch.get_raw_binary(job_file, proxy_configs) - r.write_file_to(dst, raw_data) + raw_data = dispatch.get_raw_binary( + job_file, proxy_configs=proxy_configs, remote=r) + if isinstance(raw_data, dict) and raw_data["type"] == "path": + dst = raw_data['path'] + else: + r.write_file_to(dst, raw_data) return dst def upload_builtin(r, dir, builtin): @@ -217,7 +221,8 @@ class SparkJobEngine(base_engine.JobEngine): self.master, wf_dir, job, updated_job_configs) # We can shorten the paths in this case since we'll run out of wf_dir - paths = [os.path.basename(p) for p in paths] + paths = [os.path.basename(p) if p.startswith(wf_dir) else p + for p in paths] builtin_paths = [os.path.basename(p) for p in builtin_paths] # TODO(tmckay): for now, paths[0] is always assumed to be the app @@ -304,6 +309,7 @@ class SparkJobEngine(base_engine.JobEngine): if ret == 0: # Success, we'll add the wf_dir in job_execution.extra and store # pid@instance_id as the job id + # We know the job is running so return "RUNNING" return (stdout.strip() + "@" + self.master.id, edp.JOB_STATUS_RUNNING, diff --git a/sahara/service/edp/storm/engine.py b/sahara/service/edp/storm/engine.py index fe3652d2..478b2655 100644 --- a/sahara/service/edp/storm/engine.py +++ b/sahara/service/edp/storm/engine.py @@ -110,8 +110,13 @@ class StormJobEngine(base_engine.JobEngine): def upload(r, dir, job_file, proxy_configs): dst = os.path.join(dir, job_file.name) - raw_data = dispatch.get_raw_binary(job_file, proxy_configs) - r.write_file_to(dst, raw_data) + raw_data = dispatch.get_raw_binary(job_file, + proxy_configs=proxy_configs, + remote=remote) + if isinstance(raw_data, dict) and raw_data["type"] == "path": + dst = raw_data['path'] + else: + r.write_file_to(dst, raw_data) return dst uploaded_paths = [] diff --git a/sahara/tests/unit/service/edp/binary_retrievers/test_dispatch.py b/sahara/tests/unit/service/edp/binary_retrievers/test_dispatch.py index 741c6e96..ba25384a 100644 --- a/sahara/tests/unit/service/edp/binary_retrievers/test_dispatch.py +++ b/sahara/tests/unit/service/edp/binary_retrievers/test_dispatch.py @@ -23,6 +23,8 @@ class TestDispatch(base.SaharaTestCase): def setUp(self): super(TestDispatch, self).setUp() + @mock.patch('sahara.service.edp.binary_retrievers.' + 'manila_share.get_file_info') @mock.patch( 'sahara.service.edp.binary_retrievers.internal_swift.' 'get_raw_data_with_context') @@ -31,7 +33,7 @@ class TestDispatch(base.SaharaTestCase): @mock.patch('sahara.service.edp.binary_retrievers.sahara_db.get_raw_data') @mock.patch('sahara.context.ctx') def test_get_raw_binary(self, ctx, db_get_raw_data, i_s_get_raw_data, - i_s_get_raw_data_with_context): + i_s_get_raw_data_with_context, m_s_get_file_info): ctx.return_value = mock.Mock() job_binary = mock.Mock() @@ -49,3 +51,10 @@ class TestDispatch(base.SaharaTestCase): dispatch.get_raw_binary(job_binary, with_context=True) self.assertEqual(1, i_s_get_raw_data.call_count) self.assertEqual(2, i_s_get_raw_data_with_context.call_count) + + job_binary.url = 'manila://the_share_id/the_path' + remote = mock.Mock() + remote.instance.node_group.cluster.shares = [] + remote.instance.node_group.shares = [] + dispatch.get_raw_binary(job_binary, remote=remote) + self.assertEqual(1, m_s_get_file_info.call_count) diff --git a/sahara/tests/unit/service/edp/spark/base.py b/sahara/tests/unit/service/edp/spark/base.py index 8f979162..e0125fb6 100644 --- a/sahara/tests/unit/service/edp/spark/base.py +++ b/sahara/tests/unit/service/edp/spark/base.py @@ -347,6 +347,8 @@ class TestSpark(base.SaharaTestCase): # This is to mock "with remote.get_remote(instance) as r" remote_instance = mock.Mock() + remote_instance.instance.node_group.cluster.shares = [] + remote_instance.instance.node_group.shares = [] get_remote.return_value.__enter__ = mock.Mock( return_value=remote_instance) diff --git a/sahara/tests/unit/service/edp/storm/test_storm.py b/sahara/tests/unit/service/edp/storm/test_storm.py index f1b1da7b..d7482621 100644 --- a/sahara/tests/unit/service/edp/storm/test_storm.py +++ b/sahara/tests/unit/service/edp/storm/test_storm.py @@ -233,6 +233,8 @@ class TestStorm(base.SaharaTestCase): remote_instance = mock.Mock() get_remote.return_value.__enter__ = mock.Mock( return_value=remote_instance) + remote_instance.instance.node_group.cluster.shares = [] + remote_instance.instance.node_group.shares = [] get_raw_binary.return_value = "data" diff --git a/sahara/utils/api_validator.py b/sahara/utils/api_validator.py index c278754a..1bfdc000 100644 --- a/sahara/utils/api_validator.py +++ b/sahara/utils/api_validator.py @@ -18,8 +18,10 @@ import re import jsonschema from oslo_utils import uuidutils import six +import six.moves.urllib.parse as urlparse from sahara.swift import utils as su +from sahara.utils.openstack import manila as m @jsonschema.FormatChecker.cls_checks('valid_name_hostname') @@ -67,6 +69,12 @@ def validate_job_location_format(entry): if entry.startswith(su.SWIFT_INTERNAL_PREFIX): # TODO(nprivalova):add hostname validation return True + + if entry.startswith(m.MANILA_PREFIX): + url = urlparse.urlparse(entry) + if (uuidutils.is_uuid_like(url.netloc) and + len(url.path) > 1): + return True return False diff --git a/sahara/utils/openstack/manila.py b/sahara/utils/openstack/manila.py index e5b944b0..dce0010a 100644 --- a/sahara/utils/openstack/manila.py +++ b/sahara/utils/openstack/manila.py @@ -41,6 +41,8 @@ CONF = cfg.CONF CONF.register_group(manila_group) CONF.register_opts(opts, group=manila_group) +MANILA_PREFIX = "manila://" + def client(): ctx = context.ctx()