Support manila shares as binary store
Changes to support manila shares as a binary store. Oozie, Spark and Storm jobs can now run with job binaries stored in manila shares. Change-Id: I2f5fbe3d36ef4b87e5cadd337854e95ed95ebaa0 Implements: bp manila-as-binary-store
This commit is contained in:
parent
39f964e2e9
commit
6761a01b09
@ -437,7 +437,8 @@ def generate_hadoop_setup_script(storage_paths, env_configs):
|
||||
|
||||
for path in storage_paths:
|
||||
script_lines.append("chown -R hadoop:hadoop %s" % path)
|
||||
script_lines.append("chmod -R 755 %s" % path)
|
||||
script_lines.append("chmod -f -R 755 %s ||"
|
||||
"echo 'Permissions unchanged'" % path)
|
||||
return "\n".join(script_lines)
|
||||
|
||||
|
||||
|
@ -15,11 +15,14 @@
|
||||
|
||||
from sahara import context
|
||||
from sahara.service.edp.binary_retrievers import internal_swift as i_swift
|
||||
from sahara.service.edp.binary_retrievers import manila_share as manila
|
||||
from sahara.service.edp.binary_retrievers import sahara_db as db
|
||||
from sahara.swift import utils as su
|
||||
from sahara.utils.openstack import manila as m
|
||||
|
||||
|
||||
def get_raw_binary(job_binary, proxy_configs=None, with_context=False):
|
||||
def get_raw_binary(job_binary, proxy_configs=None,
|
||||
with_context=False, remote=None):
|
||||
'''Get the raw data for a job binary
|
||||
|
||||
This will retrieve the raw data for a job binary from it's source. In the
|
||||
@ -31,6 +34,7 @@ def get_raw_binary(job_binary, proxy_configs=None, with_context=False):
|
||||
:param job_binary: The job binary to retrieve
|
||||
:param proxy_configs: Proxy user configuration to use as credentials
|
||||
:param with_context: Use the current context as credentials
|
||||
:param remote: The remote contains node group and cluster information
|
||||
:returns: The raw data from a job binary
|
||||
|
||||
'''
|
||||
@ -44,4 +48,7 @@ def get_raw_binary(job_binary, proxy_configs=None, with_context=False):
|
||||
else:
|
||||
res = i_swift.get_raw_data(job_binary, proxy_configs)
|
||||
|
||||
if url.startswith(m.MANILA_PREFIX):
|
||||
res = manila.get_file_info(job_binary, remote)
|
||||
|
||||
return res
|
||||
|
61
sahara/service/edp/binary_retrievers/manila_share.py
Normal file
61
sahara/service/edp/binary_retrievers/manila_share.py
Normal file
@ -0,0 +1,61 @@
|
||||
# Copyright (c) 2015 Red Hat Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import six
|
||||
|
||||
from sahara import conductor as c
|
||||
from sahara import context
|
||||
from sahara.service import shares as shares_service
|
||||
|
||||
conductor = c.API
|
||||
|
||||
|
||||
def get_file_info(job_binary, remote):
|
||||
shares = []
|
||||
if remote.instance.node_group.cluster.shares:
|
||||
shares.extend(remote.instance.node_group.cluster.shares)
|
||||
if remote.instance.node_group.shares:
|
||||
shares.extend(remote.instance.node_group.shares)
|
||||
# url example: 'manila://ManilaShare-uuid/path_to_file'
|
||||
url = six.moves.urllib.parse.urlparse(job_binary.url)
|
||||
share_id = url.netloc
|
||||
if not any(s['id'] == share_id for s in shares):
|
||||
# Automount this share to the cluster
|
||||
cluster = remote.instance.node_group.cluster
|
||||
if cluster.shares:
|
||||
cluster_shares = [dict(s) for s in cluster.shares]
|
||||
else:
|
||||
cluster_shares = []
|
||||
needed_share = {
|
||||
'id': share_id,
|
||||
'path': '/mnt/{0}'.format(share_id),
|
||||
'access_level': 'rw'
|
||||
}
|
||||
|
||||
cluster_shares.append(needed_share)
|
||||
cluster = conductor.cluster_update(
|
||||
context.ctx(), cluster, {'shares': cluster_shares})
|
||||
shares_service.mount_shares(cluster)
|
||||
shares = cluster.shares
|
||||
|
||||
# using list() as a python2/3 workaround
|
||||
share = list(filter(lambda s: s['id'] == share_id, shares))[0]
|
||||
mount_point = share.get('path', "/mnt/%s" % share_id)
|
||||
|
||||
res = {
|
||||
'type': 'path',
|
||||
'path': "{0}{1}".format(mount_point, url.path)
|
||||
}
|
||||
return res
|
@ -247,15 +247,27 @@ class OozieJobEngine(base_engine.JobEngine):
|
||||
|
||||
with remote.get_remote(where) as r:
|
||||
for main in mains:
|
||||
raw_data = dispatch.get_raw_binary(main, proxy_configs)
|
||||
h.put_file_to_hdfs(r, raw_data, main.name, job_dir, hdfs_user)
|
||||
raw_data = dispatch.get_raw_binary(
|
||||
main, proxy_configs=proxy_configs, remote=r)
|
||||
if isinstance(raw_data, dict) and raw_data["type"] == "path":
|
||||
h.copy_from_local(r, raw_data['path'],
|
||||
job_dir, hdfs_user)
|
||||
else:
|
||||
h.put_file_to_hdfs(r, raw_data, main.name,
|
||||
job_dir, hdfs_user)
|
||||
uploaded_paths.append(job_dir + '/' + main.name)
|
||||
if len(libs) and job_dir_suffix:
|
||||
# HDFS 2.2.0 fails to put file if the lib dir does not exist
|
||||
self.create_hdfs_dir(r, lib_dir)
|
||||
for lib in libs:
|
||||
raw_data = dispatch.get_raw_binary(lib, proxy_configs)
|
||||
h.put_file_to_hdfs(r, raw_data, lib.name, lib_dir, hdfs_user)
|
||||
raw_data = dispatch.get_raw_binary(
|
||||
lib, proxy_configs=proxy_configs, remote=remote)
|
||||
if isinstance(raw_data, dict) and raw_data["type"] == "path":
|
||||
h.copy_from_local(r, raw_data['path'],
|
||||
lib_dir, hdfs_user)
|
||||
else:
|
||||
h.put_file_to_hdfs(r, raw_data, lib.name,
|
||||
lib_dir, hdfs_user)
|
||||
uploaded_paths.append(lib_dir + '/' + lib.name)
|
||||
for lib in builtin_libs:
|
||||
h.put_file_to_hdfs(r, lib['raw'], lib['name'], lib_dir,
|
||||
|
@ -137,8 +137,12 @@ class SparkJobEngine(base_engine.JobEngine):
|
||||
|
||||
def upload(r, dir, job_file, proxy_configs):
|
||||
dst = os.path.join(dir, job_file.name)
|
||||
raw_data = dispatch.get_raw_binary(job_file, proxy_configs)
|
||||
r.write_file_to(dst, raw_data)
|
||||
raw_data = dispatch.get_raw_binary(
|
||||
job_file, proxy_configs=proxy_configs, remote=r)
|
||||
if isinstance(raw_data, dict) and raw_data["type"] == "path":
|
||||
dst = raw_data['path']
|
||||
else:
|
||||
r.write_file_to(dst, raw_data)
|
||||
return dst
|
||||
|
||||
def upload_builtin(r, dir, builtin):
|
||||
@ -217,7 +221,8 @@ class SparkJobEngine(base_engine.JobEngine):
|
||||
self.master, wf_dir, job, updated_job_configs)
|
||||
|
||||
# We can shorten the paths in this case since we'll run out of wf_dir
|
||||
paths = [os.path.basename(p) for p in paths]
|
||||
paths = [os.path.basename(p) if p.startswith(wf_dir) else p
|
||||
for p in paths]
|
||||
builtin_paths = [os.path.basename(p) for p in builtin_paths]
|
||||
|
||||
# TODO(tmckay): for now, paths[0] is always assumed to be the app
|
||||
@ -304,6 +309,7 @@ class SparkJobEngine(base_engine.JobEngine):
|
||||
if ret == 0:
|
||||
# Success, we'll add the wf_dir in job_execution.extra and store
|
||||
# pid@instance_id as the job id
|
||||
|
||||
# We know the job is running so return "RUNNING"
|
||||
return (stdout.strip() + "@" + self.master.id,
|
||||
edp.JOB_STATUS_RUNNING,
|
||||
|
@ -110,8 +110,13 @@ class StormJobEngine(base_engine.JobEngine):
|
||||
|
||||
def upload(r, dir, job_file, proxy_configs):
|
||||
dst = os.path.join(dir, job_file.name)
|
||||
raw_data = dispatch.get_raw_binary(job_file, proxy_configs)
|
||||
r.write_file_to(dst, raw_data)
|
||||
raw_data = dispatch.get_raw_binary(job_file,
|
||||
proxy_configs=proxy_configs,
|
||||
remote=remote)
|
||||
if isinstance(raw_data, dict) and raw_data["type"] == "path":
|
||||
dst = raw_data['path']
|
||||
else:
|
||||
r.write_file_to(dst, raw_data)
|
||||
return dst
|
||||
|
||||
uploaded_paths = []
|
||||
|
@ -23,6 +23,8 @@ class TestDispatch(base.SaharaTestCase):
|
||||
def setUp(self):
|
||||
super(TestDispatch, self).setUp()
|
||||
|
||||
@mock.patch('sahara.service.edp.binary_retrievers.'
|
||||
'manila_share.get_file_info')
|
||||
@mock.patch(
|
||||
'sahara.service.edp.binary_retrievers.internal_swift.'
|
||||
'get_raw_data_with_context')
|
||||
@ -31,7 +33,7 @@ class TestDispatch(base.SaharaTestCase):
|
||||
@mock.patch('sahara.service.edp.binary_retrievers.sahara_db.get_raw_data')
|
||||
@mock.patch('sahara.context.ctx')
|
||||
def test_get_raw_binary(self, ctx, db_get_raw_data, i_s_get_raw_data,
|
||||
i_s_get_raw_data_with_context):
|
||||
i_s_get_raw_data_with_context, m_s_get_file_info):
|
||||
ctx.return_value = mock.Mock()
|
||||
|
||||
job_binary = mock.Mock()
|
||||
@ -49,3 +51,10 @@ class TestDispatch(base.SaharaTestCase):
|
||||
dispatch.get_raw_binary(job_binary, with_context=True)
|
||||
self.assertEqual(1, i_s_get_raw_data.call_count)
|
||||
self.assertEqual(2, i_s_get_raw_data_with_context.call_count)
|
||||
|
||||
job_binary.url = 'manila://the_share_id/the_path'
|
||||
remote = mock.Mock()
|
||||
remote.instance.node_group.cluster.shares = []
|
||||
remote.instance.node_group.shares = []
|
||||
dispatch.get_raw_binary(job_binary, remote=remote)
|
||||
self.assertEqual(1, m_s_get_file_info.call_count)
|
||||
|
@ -347,6 +347,8 @@ class TestSpark(base.SaharaTestCase):
|
||||
|
||||
# This is to mock "with remote.get_remote(instance) as r"
|
||||
remote_instance = mock.Mock()
|
||||
remote_instance.instance.node_group.cluster.shares = []
|
||||
remote_instance.instance.node_group.shares = []
|
||||
get_remote.return_value.__enter__ = mock.Mock(
|
||||
return_value=remote_instance)
|
||||
|
||||
|
@ -233,6 +233,8 @@ class TestStorm(base.SaharaTestCase):
|
||||
remote_instance = mock.Mock()
|
||||
get_remote.return_value.__enter__ = mock.Mock(
|
||||
return_value=remote_instance)
|
||||
remote_instance.instance.node_group.cluster.shares = []
|
||||
remote_instance.instance.node_group.shares = []
|
||||
|
||||
get_raw_binary.return_value = "data"
|
||||
|
||||
|
@ -18,8 +18,10 @@ import re
|
||||
import jsonschema
|
||||
from oslo_utils import uuidutils
|
||||
import six
|
||||
import six.moves.urllib.parse as urlparse
|
||||
|
||||
from sahara.swift import utils as su
|
||||
from sahara.utils.openstack import manila as m
|
||||
|
||||
|
||||
@jsonschema.FormatChecker.cls_checks('valid_name_hostname')
|
||||
@ -67,6 +69,12 @@ def validate_job_location_format(entry):
|
||||
if entry.startswith(su.SWIFT_INTERNAL_PREFIX):
|
||||
# TODO(nprivalova):add hostname validation
|
||||
return True
|
||||
|
||||
if entry.startswith(m.MANILA_PREFIX):
|
||||
url = urlparse.urlparse(entry)
|
||||
if (uuidutils.is_uuid_like(url.netloc) and
|
||||
len(url.path) > 1):
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
|
@ -41,6 +41,8 @@ CONF = cfg.CONF
|
||||
CONF.register_group(manila_group)
|
||||
CONF.register_opts(opts, group=manila_group)
|
||||
|
||||
MANILA_PREFIX = "manila://"
|
||||
|
||||
|
||||
def client():
|
||||
ctx = context.ctx()
|
||||
|
Loading…
Reference in New Issue
Block a user