Merge "Support manila shares as binary store"
This commit is contained in:
@@ -15,11 +15,14 @@
|
||||
|
||||
from sahara import context
|
||||
from sahara.service.edp.binary_retrievers import internal_swift as i_swift
|
||||
from sahara.service.edp.binary_retrievers import manila_share as manila
|
||||
from sahara.service.edp.binary_retrievers import sahara_db as db
|
||||
from sahara.swift import utils as su
|
||||
from sahara.utils.openstack import manila as m
|
||||
|
||||
|
||||
def get_raw_binary(job_binary, proxy_configs=None, with_context=False):
|
||||
def get_raw_binary(job_binary, proxy_configs=None,
|
||||
with_context=False, remote=None):
|
||||
'''Get the raw data for a job binary
|
||||
|
||||
This will retrieve the raw data for a job binary from it's source. In the
|
||||
@@ -31,6 +34,7 @@ def get_raw_binary(job_binary, proxy_configs=None, with_context=False):
|
||||
:param job_binary: The job binary to retrieve
|
||||
:param proxy_configs: Proxy user configuration to use as credentials
|
||||
:param with_context: Use the current context as credentials
|
||||
:param remote: The remote contains node group and cluster information
|
||||
:returns: The raw data from a job binary
|
||||
|
||||
'''
|
||||
@@ -44,4 +48,7 @@ def get_raw_binary(job_binary, proxy_configs=None, with_context=False):
|
||||
else:
|
||||
res = i_swift.get_raw_data(job_binary, proxy_configs)
|
||||
|
||||
if url.startswith(m.MANILA_PREFIX):
|
||||
res = manila.get_file_info(job_binary, remote)
|
||||
|
||||
return res
|
||||
|
||||
61
sahara/service/edp/binary_retrievers/manila_share.py
Normal file
61
sahara/service/edp/binary_retrievers/manila_share.py
Normal file
@@ -0,0 +1,61 @@
|
||||
# Copyright (c) 2015 Red Hat Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import six
|
||||
|
||||
from sahara import conductor as c
|
||||
from sahara import context
|
||||
from sahara.service import shares as shares_service
|
||||
|
||||
conductor = c.API
|
||||
|
||||
|
||||
def get_file_info(job_binary, remote):
|
||||
shares = []
|
||||
if remote.instance.node_group.cluster.shares:
|
||||
shares.extend(remote.instance.node_group.cluster.shares)
|
||||
if remote.instance.node_group.shares:
|
||||
shares.extend(remote.instance.node_group.shares)
|
||||
# url example: 'manila://ManilaShare-uuid/path_to_file'
|
||||
url = six.moves.urllib.parse.urlparse(job_binary.url)
|
||||
share_id = url.netloc
|
||||
if not any(s['id'] == share_id for s in shares):
|
||||
# Automount this share to the cluster
|
||||
cluster = remote.instance.node_group.cluster
|
||||
if cluster.shares:
|
||||
cluster_shares = [dict(s) for s in cluster.shares]
|
||||
else:
|
||||
cluster_shares = []
|
||||
needed_share = {
|
||||
'id': share_id,
|
||||
'path': '/mnt/{0}'.format(share_id),
|
||||
'access_level': 'rw'
|
||||
}
|
||||
|
||||
cluster_shares.append(needed_share)
|
||||
cluster = conductor.cluster_update(
|
||||
context.ctx(), cluster, {'shares': cluster_shares})
|
||||
shares_service.mount_shares(cluster)
|
||||
shares = cluster.shares
|
||||
|
||||
# using list() as a python2/3 workaround
|
||||
share = list(filter(lambda s: s['id'] == share_id, shares))[0]
|
||||
mount_point = share.get('path', "/mnt/%s" % share_id)
|
||||
|
||||
res = {
|
||||
'type': 'path',
|
||||
'path': "{0}{1}".format(mount_point, url.path)
|
||||
}
|
||||
return res
|
||||
@@ -247,15 +247,27 @@ class OozieJobEngine(base_engine.JobEngine):
|
||||
|
||||
with remote.get_remote(where) as r:
|
||||
for main in mains:
|
||||
raw_data = dispatch.get_raw_binary(main, proxy_configs)
|
||||
h.put_file_to_hdfs(r, raw_data, main.name, job_dir, hdfs_user)
|
||||
raw_data = dispatch.get_raw_binary(
|
||||
main, proxy_configs=proxy_configs, remote=r)
|
||||
if isinstance(raw_data, dict) and raw_data["type"] == "path":
|
||||
h.copy_from_local(r, raw_data['path'],
|
||||
job_dir, hdfs_user)
|
||||
else:
|
||||
h.put_file_to_hdfs(r, raw_data, main.name,
|
||||
job_dir, hdfs_user)
|
||||
uploaded_paths.append(job_dir + '/' + main.name)
|
||||
if len(libs) and job_dir_suffix:
|
||||
# HDFS 2.2.0 fails to put file if the lib dir does not exist
|
||||
self.create_hdfs_dir(r, lib_dir)
|
||||
for lib in libs:
|
||||
raw_data = dispatch.get_raw_binary(lib, proxy_configs)
|
||||
h.put_file_to_hdfs(r, raw_data, lib.name, lib_dir, hdfs_user)
|
||||
raw_data = dispatch.get_raw_binary(
|
||||
lib, proxy_configs=proxy_configs, remote=remote)
|
||||
if isinstance(raw_data, dict) and raw_data["type"] == "path":
|
||||
h.copy_from_local(r, raw_data['path'],
|
||||
lib_dir, hdfs_user)
|
||||
else:
|
||||
h.put_file_to_hdfs(r, raw_data, lib.name,
|
||||
lib_dir, hdfs_user)
|
||||
uploaded_paths.append(lib_dir + '/' + lib.name)
|
||||
for lib in builtin_libs:
|
||||
h.put_file_to_hdfs(r, lib['raw'], lib['name'], lib_dir,
|
||||
|
||||
@@ -137,8 +137,12 @@ class SparkJobEngine(base_engine.JobEngine):
|
||||
|
||||
def upload(r, dir, job_file, proxy_configs):
|
||||
dst = os.path.join(dir, job_file.name)
|
||||
raw_data = dispatch.get_raw_binary(job_file, proxy_configs)
|
||||
r.write_file_to(dst, raw_data)
|
||||
raw_data = dispatch.get_raw_binary(
|
||||
job_file, proxy_configs=proxy_configs, remote=r)
|
||||
if isinstance(raw_data, dict) and raw_data["type"] == "path":
|
||||
dst = raw_data['path']
|
||||
else:
|
||||
r.write_file_to(dst, raw_data)
|
||||
return dst
|
||||
|
||||
def upload_builtin(r, dir, builtin):
|
||||
@@ -217,7 +221,8 @@ class SparkJobEngine(base_engine.JobEngine):
|
||||
self.master, wf_dir, job, updated_job_configs)
|
||||
|
||||
# We can shorten the paths in this case since we'll run out of wf_dir
|
||||
paths = [os.path.basename(p) for p in paths]
|
||||
paths = [os.path.basename(p) if p.startswith(wf_dir) else p
|
||||
for p in paths]
|
||||
builtin_paths = [os.path.basename(p) for p in builtin_paths]
|
||||
|
||||
# TODO(tmckay): for now, paths[0] is always assumed to be the app
|
||||
@@ -304,6 +309,7 @@ class SparkJobEngine(base_engine.JobEngine):
|
||||
if ret == 0:
|
||||
# Success, we'll add the wf_dir in job_execution.extra and store
|
||||
# pid@instance_id as the job id
|
||||
|
||||
# We know the job is running so return "RUNNING"
|
||||
return (stdout.strip() + "@" + self.master.id,
|
||||
edp.JOB_STATUS_RUNNING,
|
||||
|
||||
@@ -110,8 +110,13 @@ class StormJobEngine(base_engine.JobEngine):
|
||||
|
||||
def upload(r, dir, job_file, proxy_configs):
|
||||
dst = os.path.join(dir, job_file.name)
|
||||
raw_data = dispatch.get_raw_binary(job_file, proxy_configs)
|
||||
r.write_file_to(dst, raw_data)
|
||||
raw_data = dispatch.get_raw_binary(job_file,
|
||||
proxy_configs=proxy_configs,
|
||||
remote=remote)
|
||||
if isinstance(raw_data, dict) and raw_data["type"] == "path":
|
||||
dst = raw_data['path']
|
||||
else:
|
||||
r.write_file_to(dst, raw_data)
|
||||
return dst
|
||||
|
||||
uploaded_paths = []
|
||||
|
||||
Reference in New Issue
Block a user