Merge "Add manila nfs data sources"
This commit is contained in:
@@ -13,14 +13,9 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import six
|
||||
|
||||
from sahara import conductor as c
|
||||
from sahara import context
|
||||
from sahara.service.edp import job_utils
|
||||
from sahara.service import shares as shares_service
|
||||
|
||||
conductor = c.API
|
||||
|
||||
|
||||
def get_file_info(job_binary, remote):
|
||||
shares = []
|
||||
@@ -28,34 +23,10 @@ def get_file_info(job_binary, remote):
|
||||
shares.extend(remote.instance.node_group.cluster.shares)
|
||||
if remote.instance.node_group.shares:
|
||||
shares.extend(remote.instance.node_group.shares)
|
||||
# url example: 'manila://ManilaShare-uuid/path_to_file'
|
||||
url = six.moves.urllib.parse.urlparse(job_binary.url)
|
||||
share_id = url.netloc
|
||||
if not any(s['id'] == share_id for s in shares):
|
||||
# Automount this share to the cluster
|
||||
cluster = remote.instance.node_group.cluster
|
||||
if cluster.shares:
|
||||
cluster_shares = [dict(s) for s in cluster.shares]
|
||||
else:
|
||||
cluster_shares = []
|
||||
needed_share = {
|
||||
'id': share_id,
|
||||
'path': '/mnt/{0}'.format(share_id),
|
||||
'access_level': 'rw'
|
||||
}
|
||||
|
||||
cluster_shares.append(needed_share)
|
||||
cluster = conductor.cluster_update(
|
||||
context.ctx(), cluster, {'shares': cluster_shares})
|
||||
shares_service.mount_shares(cluster)
|
||||
shares = cluster.shares
|
||||
|
||||
# using list() as a python2/3 workaround
|
||||
share = list(filter(lambda s: s['id'] == share_id, shares))[0]
|
||||
mount_point = share.get('path', "/mnt/%s" % share_id)
|
||||
|
||||
res = {
|
||||
'type': 'path',
|
||||
'path': "{0}{1}".format(mount_point, url.path)
|
||||
}
|
||||
return res
|
||||
path = shares_service.get_share_path(job_binary.url, shares)
|
||||
if path is None:
|
||||
path = job_utils.mount_share_at_default_path(
|
||||
job_binary.url,
|
||||
remote.instance.node_group.cluster)
|
||||
return {'type': 'path',
|
||||
'path': path}
|
||||
|
||||
@@ -25,7 +25,9 @@ import six
|
||||
from sahara import conductor as c
|
||||
from sahara import context
|
||||
from sahara.plugins import base as plugin_base
|
||||
from sahara.service import shares as shares_service
|
||||
from sahara.swift import swift_helper as sw
|
||||
from sahara.utils.openstack import manila as m
|
||||
from sahara.utils import remote
|
||||
|
||||
|
||||
@@ -67,13 +69,13 @@ def create_workflow_dir(where, path, job, use_uuid=None, chmod=""):
|
||||
return constructed_dir
|
||||
|
||||
|
||||
def get_data_sources(job_execution, job, data_source_urls):
|
||||
def get_data_sources(job_execution, job, data_source_urls, cluster=None):
|
||||
|
||||
def _construct(ctx, ds_id):
|
||||
source = conductor.data_source_get(ctx, ds_id)
|
||||
if source and source.id not in data_source_urls:
|
||||
url = _construct_data_source_url(source.url, job_execution.id)
|
||||
runtime_url = _runtime_url(url)
|
||||
runtime_url = _runtime_url(url, cluster)
|
||||
data_source_urls[source.id] = (url, runtime_url)
|
||||
return source
|
||||
|
||||
@@ -178,7 +180,10 @@ def _add_credentials_for_data_sources(ds_list, configs):
|
||||
configs[sw.HADOOP_SWIFT_PASSWORD] = password
|
||||
|
||||
|
||||
def resolve_data_source_references(job_configs, job_exec_id, data_source_urls):
|
||||
def resolve_data_source_references(job_configs,
|
||||
job_exec_id,
|
||||
data_source_urls,
|
||||
cluster=None):
|
||||
"""Resolve possible data_source references in job_configs.
|
||||
|
||||
Look for any string values in the 'args', 'configs', and 'params'
|
||||
@@ -233,7 +238,7 @@ def resolve_data_source_references(job_configs, job_exec_id, data_source_urls):
|
||||
ds_seen[ds.id] = ds
|
||||
if ds.id not in data_source_urls:
|
||||
url = _construct_data_source_url(ds.url, job_exec_id)
|
||||
runtime_url = _runtime_url(url)
|
||||
runtime_url = _runtime_url(url, cluster)
|
||||
data_source_urls[ds.id] = (url, runtime_url)
|
||||
|
||||
return data_source_urls[ds.id][1]
|
||||
@@ -290,10 +295,40 @@ def _construct_data_source_url(url, job_exec_id):
|
||||
return url
|
||||
|
||||
|
||||
def _runtime_url(url):
|
||||
def _runtime_url(url, cluster):
|
||||
if url.startswith(m.MANILA_PREFIX) and cluster:
|
||||
path = shares_service.get_share_path(url, cluster.shares or [])
|
||||
if path is None:
|
||||
path = mount_share_at_default_path(url, cluster)
|
||||
# This gets us the mount point, but we need a file:// scheme to
|
||||
# indicate a local filesystem path
|
||||
return "file://{path}".format(path=path)
|
||||
return url
|
||||
|
||||
|
||||
def to_url_dict(data_source_urls, runtime=False):
|
||||
idx = 1 if runtime else 0
|
||||
return {id: urls[idx] for id, urls in six.iteritems(data_source_urls)}
|
||||
|
||||
|
||||
def mount_share_at_default_path(url, cluster):
|
||||
# Automount this share to the cluster with default path
|
||||
# url example: 'manila://ManilaShare-uuid/path_to_file'
|
||||
share_id = six.moves.urllib.parse.urlparse(url).netloc
|
||||
if cluster.shares:
|
||||
cluster_shares = [dict(s) for s in cluster.shares]
|
||||
else:
|
||||
cluster_shares = []
|
||||
|
||||
needed_share = {
|
||||
'id': share_id,
|
||||
'path': shares_service.default_mount(share_id),
|
||||
'access_level': 'rw'
|
||||
}
|
||||
|
||||
cluster_shares.append(needed_share)
|
||||
cluster = conductor.cluster_update(
|
||||
context.ctx(), cluster, {'shares': cluster_shares})
|
||||
shares_service.mount_shares(cluster)
|
||||
|
||||
return shares_service.get_share_path(url, cluster.shares)
|
||||
|
||||
@@ -108,7 +108,7 @@ class OozieJobEngine(base_engine.JobEngine):
|
||||
|
||||
job = conductor.job_get(ctx, job_execution.job_id)
|
||||
input_source, output_source = job_utils.get_data_sources(
|
||||
job_execution, job, data_source_urls)
|
||||
job_execution, job, data_source_urls, self.cluster)
|
||||
|
||||
# Updated_job_configs will be a copy of job_execution.job_configs with
|
||||
# any name or uuid references to data_sources resolved to paths
|
||||
@@ -117,8 +117,10 @@ class OozieJobEngine(base_engine.JobEngine):
|
||||
# just be a reference to job_execution.job_configs to avoid a copy.
|
||||
# Additional_sources will be a list of any data_sources found.
|
||||
additional_sources, updated_job_configs = (
|
||||
job_utils.resolve_data_source_references(
|
||||
job_execution.job_configs, job_execution.id, data_source_urls)
|
||||
job_utils.resolve_data_source_references(job_execution.job_configs,
|
||||
job_execution.id,
|
||||
data_source_urls,
|
||||
self.cluster)
|
||||
)
|
||||
|
||||
job_execution = conductor.job_execution_update(
|
||||
|
||||
@@ -200,8 +200,10 @@ class SparkJobEngine(base_engine.JobEngine):
|
||||
# keyed by data_source id
|
||||
data_source_urls = {}
|
||||
additional_sources, updated_job_configs = (
|
||||
job_utils.resolve_data_source_references(
|
||||
job_execution.job_configs, job_execution.id, data_source_urls)
|
||||
job_utils.resolve_data_source_references(job_execution.job_configs,
|
||||
job_execution.id,
|
||||
data_source_urls,
|
||||
self.cluster)
|
||||
)
|
||||
|
||||
job_execution = conductor.job_execution_update(
|
||||
|
||||
@@ -174,8 +174,10 @@ class StormJobEngine(base_engine.JobEngine):
|
||||
data_source_urls = {}
|
||||
|
||||
additional_sources, updated_job_configs = (
|
||||
job_utils.resolve_data_source_references(
|
||||
job_execution.job_configs, job_execution.id, data_source_urls)
|
||||
job_utils.resolve_data_source_references(job_execution.job_configs,
|
||||
job_execution.id,
|
||||
data_source_urls,
|
||||
self.cluster)
|
||||
)
|
||||
|
||||
job_execution = conductor.job_execution_update(
|
||||
|
||||
@@ -180,9 +180,11 @@ class _ShareHandler(object):
|
||||
def _get_access_level(self, share_config):
|
||||
return share_config.get('access_level', 'rw')
|
||||
|
||||
@abc.abstractmethod
|
||||
def _default_mount(self):
|
||||
return '/mnt/{0}'.format(self.share.id)
|
||||
|
||||
def _get_path(self, share_info):
|
||||
pass
|
||||
return share_info.get('path', self._default_mount())
|
||||
|
||||
|
||||
class _NFSMounter(_ShareHandler):
|
||||
@@ -230,9 +232,40 @@ class _NFSMounter(_ShareHandler):
|
||||
"access_arg": access_arg}
|
||||
remote.execute_command(mount_command, run_as_root=True)
|
||||
|
||||
def _get_path(self, share_info):
|
||||
return share_info.get('path', '/mnt/%s' % self.share.id)
|
||||
|
||||
|
||||
_share_types = {"NFS": _NFSMounter}
|
||||
SUPPORTED_SHARE_TYPES = _share_types.keys()
|
||||
|
||||
|
||||
def _make_share_path(mount_point, path):
|
||||
return "{0}{1}".format(mount_point, path)
|
||||
|
||||
|
||||
def default_mount(share_id):
|
||||
client = manila.client()
|
||||
return _ShareHandler.create_from_id(share_id, client)._default_mount()
|
||||
|
||||
|
||||
def get_share_path(url, shares):
|
||||
# url example: 'manila://ManilaShare-uuid/path_to_file'
|
||||
url = six.moves.urllib.parse.urlparse(url)
|
||||
# using list() as a python2/3 workaround
|
||||
share_list = list(filter(lambda s: s['id'] == url.netloc, shares))
|
||||
if not share_list:
|
||||
# Share id is not in the share list, let the caller
|
||||
# determine a default path if possible
|
||||
path = None
|
||||
else:
|
||||
# We will always select the first one. Let the
|
||||
# caller determine whether duplicates are okay
|
||||
mount_point = share_list[0].get('path', None)
|
||||
|
||||
# Do this in two steps instead of passing the default
|
||||
# expression to get(), because it's a big side effect
|
||||
if mount_point is None:
|
||||
# The situation here is that the user specified a
|
||||
# share without a path, so the default mnt was used
|
||||
# during cluster provisioning.
|
||||
mount_point = default_mount(share_list[0]['id'])
|
||||
path = _make_share_path(mount_point, url.path)
|
||||
return path
|
||||
|
||||
@@ -24,7 +24,7 @@ conductor = c.API
|
||||
|
||||
data_source_type = {
|
||||
"type": "string",
|
||||
"enum": ["swift", "hdfs", "maprfs"]
|
||||
"enum": ["swift", "hdfs", "maprfs", "manila"]
|
||||
}
|
||||
|
||||
job_configs = {
|
||||
|
||||
@@ -14,6 +14,7 @@
|
||||
# limitations under the License.
|
||||
|
||||
from oslo_config import cfg
|
||||
from oslo_utils import uuidutils
|
||||
import six.moves.urllib.parse as urlparse
|
||||
|
||||
from sahara import conductor as c
|
||||
@@ -32,12 +33,15 @@ def check_data_source_create(data, **kwargs):
|
||||
if "swift" == data["type"]:
|
||||
_check_swift_data_source_create(data)
|
||||
|
||||
if "hdfs" == data["type"]:
|
||||
elif "hdfs" == data["type"]:
|
||||
_check_hdfs_data_source_create(data)
|
||||
|
||||
if "maprfs" == data["type"]:
|
||||
elif "maprfs" == data["type"]:
|
||||
_check_maprfs_data_source_create(data)
|
||||
|
||||
elif "manila" == data["type"]:
|
||||
_check_manila_data_source_create(data)
|
||||
|
||||
|
||||
def _check_swift_data_source_create(data):
|
||||
if len(data['url']) == 0:
|
||||
@@ -89,6 +93,18 @@ def _check_maprfs_data_source_create(data):
|
||||
raise ex.InvalidDataException(_("URL scheme must be 'maprfs'"))
|
||||
|
||||
|
||||
def _check_manila_data_source_create(data):
|
||||
if len(data['url']) == 0:
|
||||
raise ex.InvalidDataException(_("Manila url must not be empty"))
|
||||
url = urlparse.urlparse(data['url'])
|
||||
if url.scheme != "manila":
|
||||
raise ex.InvalidDataException(_("Manila url scheme must be 'manila'"))
|
||||
if not uuidutils.is_uuid_like(url.netloc):
|
||||
raise ex.InvalidDataException(_("Manila url netloc must be a uuid"))
|
||||
if not url.path:
|
||||
raise ex.InvalidDataException(_("Manila url path must not be empty"))
|
||||
|
||||
|
||||
def check_data_source_update(data, **kwargs):
|
||||
ctx = context.ctx()
|
||||
jobs = c.API.job_execution_get_all(ctx)
|
||||
|
||||
Reference in New Issue
Block a user